Scala ve Akka Akışında yeniyim ve bir websocket'ten JSON Dize mesajları almaya ve bunları bir Kafka konusuna itmeye çalışıyorum.
Şimdilik sadece "ws'den mesaj al" kısmı üzerinde çalışıyorum.
Websocket'ten gelen mesajlar şöyle görünür :
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
Bu JSON mesajını birden çok mesaja bölmek istiyorum :
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
Ve sonra bu iletilerin her birini bir kafka konusuna itin.
İşte şimdiye kadar başardıklarım :
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
Beklenen çıktı Json mesajını alıyorum ama bu yapımcıyı GraphDSL kullanmak gibi daha "Akka-ish" tarzında yazıp yazamayacağımı merak ediyordum. Bu yüzden birkaç sorum olacak :
- Bir GraphDSL kullanarak bir Websocket'i sürekli olarak tüketmek mümkün mü ? Eğer evet ise, bana bir örnek gösterebilir misiniz lütfen ?
- Ws'yi bir GraphDSL kullanarak tüketmek iyi bir fikir midir ?
- Alınan Json Mesajını kafka'ya göndermeden önce yaptığım gibi ayrıştırmalı mıyım ? Yoksa daha düşük gecikme süresi için olduğu gibi göndermek daha mı iyi ?
- Mesajı Kafka'ya gönderdikten sonra Apache Storm kullanarak tüketmeyi planlıyorum, bu iyi bir fikir mi ? Yoksa Akka ile mi kalmalıyım ?
Beni okuduğunuz için teşekkürler, Saygılar, Arès