Akka Stream websocket'i sürekli tüketiyor

0

Soru

Scala ve Akka Akışında yeniyim ve bir websocket'ten JSON Dize mesajları almaya ve bunları bir Kafka konusuna itmeye çalışıyorum.

Şimdilik sadece "ws'den mesaj al" kısmı üzerinde çalışıyorum.

Websocket'ten gelen mesajlar şöyle görünür :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

Bu JSON mesajını birden çok mesaja bölmek istiyorum :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

Ve sonra bu iletilerin her birini bir kafka konusuna itin.

İşte şimdiye kadar başardıklarım :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

Beklenen çıktı Json mesajını alıyorum ama bu yapımcıyı GraphDSL kullanmak gibi daha "Akka-ish" tarzında yazıp yazamayacağımı merak ediyordum. Bu yüzden birkaç sorum olacak :

  • Bir GraphDSL kullanarak bir Websocket'i sürekli olarak tüketmek mümkün mü ? Eğer evet ise, bana bir örnek gösterebilir misiniz lütfen ?
  • Ws'yi bir GraphDSL kullanarak tüketmek iyi bir fikir midir ?
  • Alınan Json Mesajını kafka'ya göndermeden önce yaptığım gibi ayrıştırmalı mıyım ? Yoksa daha düşük gecikme süresi için olduğu gibi göndermek daha mı iyi ?
  • Mesajı Kafka'ya gönderdikten sonra Apache Storm kullanarak tüketmeyi planlıyorum, bu iyi bir fikir mi ? Yoksa Akka ile mi kalmalıyım ?

Beni okuduğunuz için teşekkürler, Saygılar, Arès

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

En iyi cevabı

1

Bu kod çok Akka-ish: scaladsl tıpkı Akka gibi GraphDSL veya bir özel uygulama GraphStage. Tek nedeni, IMO/E, gitmek için GraphDSL grafiğin gerçek şekli kolayca ifade edilemezse scaladsl.

Ben şahsen bir tanımlama yoluna giderdim CoinPrice modeli açık hale getirmek için sınıf

case class CoinPrice(coin: String, price: BigDecimal)

Ve sonra bir Flow[Message, CoinPrice, NotUsed] 1 gelen mesajı sıfıra veya daha fazlasına ayrıştırır CoinPrices. (Burada Play JSON kullanarak) gibi bir şey:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

Mesajdaki json'ların boyutunun ne olduğuna bağlı olarak, JSON ayrıştırması ile ayıklama arasında zaman uyumsuz bir sınıra izin vermek için bunu farklı akış aşamalarına ayırmak isteyebilirsiniz CoinPrices. Örneğin,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

Yukarıdakilerde, her iki taraftaki aşamalar async sınır, ayrı aktörlerde ve dolayısıyla muhtemelen eşzamanlı olarak yürütülür (eğer yeterli CPU çekirdeği varsa vb.).), aktörlerin mesajları koordine etmesi ve değiş tokuş etmesi için ekstra ek yük pahasına. Bu ekstra koordinasyon / iletişim yükü (krş . Gunther'in Evrensel Ölçeklenebilirlik Yasası), yalnızca JSON nesneleri yeterince büyükse ve yeterince hızlı bir şekilde gelirse (bir öncekinin işlemeyi bitirmesinden önce sürekli olarak gelirse) buna değecektir.

Amacınız, program duruncaya kadar websocket'i tüketmekse, yalnızca kullanmayı daha net bulabilirsiniz Source.never[Message].

2021-11-21 12:42:30

Teşekkürler cevap için, bunu çok net bir şekilde, ben sadece bir soru tho. Cevabımı farklı akış aşamalarına nasıl ayırabilirim ? Bana küçük bir örnek gösterebilir misin lütfen ? Yoksa beni dokümantasyonun uygun kısmına mı yönlendireceksin ?
Arès

Diğer dillerde

Bu sayfa diğer dillerde

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................