Crypto asset platform client for Gemini with Akka
Being interested in distributed ledger technology and actively trading crypto currency I’m always trying to make my life easier with tooling. There are several crypto trading platforms and most of them are providing an API.
Today I want to talk about how we can use the WebSocket API provided by Gemini to get market data in near real time that can be utilized for a better overview over the market and potentially much more.
Gemini offer two different types of WebSocket API:
- private Order Events API
- public Market Data API
this post, we cover the latter one.
Problem
We need a program that allows us to
- Subscribe to the event stream
- Lift the events inside our intern data model
- Be able to extract information by applying further transformations, filters, aggregations etc… on the stream
- Can be composed with data streams of other exchanges for comparisons or decisions based on different sources
Sounds good so far, although the requirements bring a whole set of engineering problems if we want to have a stable piece of software. Special attention should be granted to IO and network aspects as well as to concurrency and flow control.
In detail we have to consider:
- Failures on the producer side of the event stream e.g. event stream slows down or drops
- Failures on the network layer
- Concurrently process data with respect to available heap memory
- IO does not block CPU heavy computations and vice versa
Sounds a lot, and it would be a lot, if most of it not came for free :) with akka-streams. Akka-strams providing out-of-the box support for concurrent stream processing, flow control mechanisms like buffering or backpressure, a DSL to express streams as graphs and many more cool things you can check on their homepage.
The libraries we will use:
Let’s get this party started!
Subscribing the event stream
Before doing something with data - we have to get data!
Akka-streams is giving us a build-in support for WebSockets. Let’s use it to construct a request flow.
// Source address
val uri = "wss://api.gemini.com/v1/marketdata/"
val pair = "ethusd"
// Constructing the flow for ethusd
val requestFlow : Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(s"$uri/$pair"))
The requestFlow produce an output of type Message
that is representing a message in context of WebSocket.
An akka stream usually contains a Source ~> Flow ~> Sink
Image is taken from opencredo.com
in our case we would like to have our event stream represented by a Source for example as a Source[Event]
based on that we can compose flows that could extract/filter/aggregate event information and send it to a custom Sink that finish out stream by dumping the data inside a database or to another service.
Another important case is that the Gemini API do support a market data stream for 3 different pairs of currency.
From | To | Name |
---|---|---|
Ethereum | USD | ethusd |
Bitcoin | USD | btcusd |
Ethereum | Bitcoin | ethbtc |
We will combine them inside one Flow to maximum avoid boilerplate code.
// Define our Source
val ourSource = Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// Create requests for all currency pairs available at Gemini
val requests = CurrencyPairs.values.map { currencyPair =>
Http().webSocketClientFlow(WebSocketRequest(s"$uri/$currencyPair")
}
// Source with empty Promise[Message]
val source = Source.maybe[Message]
// Create a merge shape that combine the request flows
val merge = b.add(Merge[Message](requests.size))
// Merge the flows
requests.zipWithIndex.foreach { case (flow, i) =>
source ~> flow ~> merge.in(i)
}
// Return the merge inside a SourceShape
SourceShape(merge.out.outlet)
})
Ok, let’s look into the above statement in detail:
- We are creating a
Source
from aGraph
described withGraphDSL
, a Domain Specific Language in akka that allow you to write your processes as a graph. - Creating a
Flow[Message, Message, Future[WebSocketUpgradeResponse]]
for each currency pair. - Declare a
Source.maybe[Message]
as kind of a trigger for our WebSocket flow. It sends aPromise[Message]
downstream. As long we not complete thePromise[Message]
the stream will not complete and we keep listening the incoming. This technique provide us with an infinite incoming stream. - We combine the currency pair flows from step 2 in a so called junction. The junction of type
Merge[In]
combine N inputs to 1 output. - Define the output of merge as the output of the
Source
.
Get the data model straight
To work on data - we have to interpret data!
Now as we have the event stream, we should lift the incoming Message
inside an ADT. Gemini sends us JSON, as next,
we define our ADT by simply writing types that are mirroring the responses. Let’s have a brief look how it might look like:
// represent one Message
case class GeminiEvent(eventId: Long, events: Seq[GeminiEventDelta],
currencyPair: Option[CurrencyPair] = None
) extends MarketEvent
// Marker for different types of delta events.
sealed trait GeminiEventDelta {
// Geminis event type marker
val eventType: GeminiEventType
}
// An example how a delta event looks like
case class ChangeEvent(
delta: BigDecimal,
price: BigDecimal,
reason: GeminiEventReason,
remaining: BigDecimal,
side: GeminiSide
) extends GeminiEventDelta {
override val eventType: GeminiEventType = GeminiEventTypes.change
// All type definitions can be found in git.
//https://github.com/allquantor/scemini/blob/master/src/main/scala/io.allquantor.s//cemini/adt/gemini/GeminiEvents.scala
}
Events also contain some constant expressions that in our implementation are expressed with case objects. An implementation with case objects instead of scala.Enumeration
is generally more flexible and typesafe, since enums are erasing type information after compiling.
object GeminiEventTypes extends Stringable {
sealed trait GeminiEventType
case object trade extends GeminiEventType
case object change extends GeminiEventType
case object auction_indicative extends GeminiEventType
case object auction_open extends GeminiEventType
case object auction_result extends GeminiEventType
case object geminiTypeDefault extends GeminiEventType
val values = Set(trade, change, auction_indicative, auction_open, auction_result)
override type T = GeminiEventType
override val defaultValue: GeminiEventType = geminiTypeDefault
// All type definitions can be found in git.
//https://github.com/allquantor/scemini/blob/master/src/main/scala/io.allquantor.sce//mini/adt/gemini/GeminiConstants.scala
}
Lift the Data
What have we done so far? We subscribe the stream data and know how the model looks like. What is missing is the part Message => GeminiEvent
- making a type out of the incoming message.
There are numerous libraries in Scala that work with JSON. Although akka have a build in integration with spray-json I will use circe which is a pretty neat pure functional library for working with JSON.
Circe handle the decoding of JSON the same as almost all modern JSON libraries - by using Ad hoc polymorphism. We have to provide an implicit typeclass inside the context of JSON deserialization.
This is how it looks like:
trait GeminiMarketReads {
// Parsing recursion entry point.
implicit val eventRead: Decoder[GeminiEvent] = (c: HCursor) => for {
id <- c.downField("eventId").as[Long]
events <- c.downField("events").as[Seq[GeminiEventDelta]]
} yield GeminiEvent(id, events)
// Identify the event type and run the appropriate reader.
implicit val eventDeltaRead: Decoder[GeminiEventDelta] = (c: HCursor) => {
implicit val cursor = c
val eventType = c.downField("type").to(GeminiEventTypes.fromString)
eventType.flatMap {
case GeminiEventTypes.change => changeEventRead
case GeminiEventTypes.trade => tradeEventRead
case GeminiEventTypes.auction_open => auctionOpenRead
case GeminiEventTypes.auction_indicative => auctionIndicativeRead
case GeminiEventTypes.auction_result => auctionResultRead
case _ => Decoder.failedWithMessage("Event type is unknown, JSON could not be decoded")(c)
}
}
// All decoders can be found in git
//https://github.com/allquantor/scemini/blob/master/src/main/scala/io.allquantor.scemini/materialization/GeminiMarketReads.scala
}
We get the eventID
and the EventType
and forward the c:HCursor
to the appropriate decoder instance. Decoder yield an instance of Either[circe.Error,T]
where T
is the expected result type. The decoding is side effect free, any parsing/decoding error is gonna be encoded as circe.Error
and will contain a sequence of actions done on your JSON before any fail and you get a good understanding where and why it has failed.
Integrating parsing
Lets define a Flow
of type Flow[(Message,CurrencyPair), Future[Either[circle.Error, GeminiEvent]]]
where each element is a tuple representing a WebSocket
message and the according CurrencyPair
and transform it to the event type of our model.
private val lifting = {
// Define a separate thread pool for CPU intensive operations.
val CPUec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
// Helper to avoid code duplication
implicit class Parser(s: String) {
def transform(implicit c: CurrencyPair) = parse(s)
.flatMap(_.as[GeminiEvent])
.map(_.copy(currencyPair = Some(c)))
}
Flow[(Message, CurrencyPair)].collect {
case (elem@(m: Message, c: CurrencyPair)) =>
implicit val currencyPair: CurrencyPair = c
(m: @unchecked) match {
// Using a separate fixed thread pool for potentially CPU intensive computations
case TextMessage.Strict(msg) => Future(msg.transform)(CPUec)
case TextMessage.Streamed(stream) => stream.limit(100)
.completionTimeout(5000.millis)
.runFold("")(_ + _).map(_.transform)(CPUec)
}
}.mapAsync(4)(identity)
}
A WebSocket Message have two subtypes
Strict
- the message data is already available as a whole.Streamed
- the data is aSource
streaming the data comes in.
Gemini use both of them. Streamed
as the potentially large initial JSON that mirror the entire state of the order book and Strict
for smaller update events.
As soon as we have the message we parse it into circe
intern AST structure and then decode it in to our model. The parsing of JSON can be CPU intensive, to ensure CPU and IO tasks does not stand in each others way we are using a separate fixed thread pool exclusively for decoding operations. Since our flow return a Future we use mapAsync to process the tasks concurrency and pass them downstream.
Put everything together
Lets consolidate our implementations inside a class.
class GeminiMarkets(currencyPairs: Seq[CurrencyPair],
uri: String)(implicit system: ActorSystem)
extends ExchangePlatformClient with GeminiMarketReads {
private val requests = currencyPairs.map(pair =>
(Http().webSocketClientFlow(WebSocketRequest(s"$uri/$pair")), pair))
private val lifting = {
implicit class Parser(s: String) {
def transform(implicit c: CurrencyPair) = parse(s)
.flatMap(_.as[GeminiEvent])
.map(_.copy(currencyPair = Some(c)))
}
Flow[(Message, CurrencyPair)].collect {
case (elem@(m: Message, c: CurrencyPair)) =>
implicit val currencyPair: CurrencyPair = c
(m: @unchecked) match {
case TextMessage.Strict(msg) => Future(msg.transform)(CPUec)
case TextMessage.Streamed(stream) => stream.limit(100)
.completionTimeout(5000.millis)
.runFold("")(_ + _).map(_.transform)(CPUec)
}
}.mapAsync(4)(identity)
}
lazy val source = Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source.maybe[Message]
val merge = b.add(Merge[(Message, CurrencyPair)](requests.size))
requests.zipWithIndex.foreach { case ((flow, c), i) =>
source ~> flow.map((_, c)) ~> merge.in(i)
}
SourceShape(merge.out.via(lifting).outlet)
})
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher
val logger = Logging.getLogger(system, this)
lazy val supervisionStrategy: Supervision.Decider = {
case ex: Exception =>
logger.error(s"Error in graph processing with ${ex.getStackTrace.mkString(",")}")
Supervision.Resume
}
val currencyPairs = Seq(CurrencyPairs.ethusd, CurrencyParis.btcusd)
// Use the abstract class to hide some instance creation logic
val client = ExchangePlatformClient.asGeminiCleint(currencyPairs)
.source
// Define recover strategy when an exception is thrown inside of the source
.withAttributes(ActorAttributes.supervisionStrategy(supervisionStrategy))
client.to(Sink.foreach(println)).run()
}
Beside of consolidation, this code emphasize an approach for the failure handling problem defined above.
We implemented a supervisionStrategy: Supervision.Decider
that decide how do deal with exceptions thrown inside the stream.
In our case the decider will log the stackstrace and resume with the stream processing - emit the next element. Akka offers three ways to react on exceptions: Stop, Resume, Restart
, please check the documentationto find a strategy which fits your needs.
Summary
As result we have implemented a Source of market events that now can be used for advanced analytical tasks. It has been shown that the usual set of problems coming with streaming events from an external source can be successfully mitigated with the toolset akka-streams provide.
Now, we could use akkas flow processing functionality to extract data of our interest.
A simple example if we only interested on TradeEvents
val dumpIntoDataBaseSink = Sink.foreach(...)
source.filter(_.isRight)
.map(_.right.get)
.map(_.events.filter(_.eventType == GeminiEventTypes.trade))
.to(dumpIntoDataBaseSink)
.run()
We can combine it with streams from other exchanges, or apply advanced techniques like windowing to get time based information.
The code is available on Github. Thanks for reading and happy hAkking!