Akka çayı'nda iki Akışın Birleştirilmesi

0

Soru

İki Akışı birleştirmeye çalışıyorum ve uygulamamın çıktısını açıklayamıyorum.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

Bu koddan aşağıdaki çıktıyı bekliyorum.

2
3
4
.
.
.
11
10
20
.
.
.
100

Bunun yerine, yalnızca "2" nin yazdırıldığını görüyorum. Lütfen implantasyonumda neyin yanlış olduğunu ve istenen çıktıyı elde etmek için programı nasıl değiştirmem gerektiğini açıklayabilir misiniz?

akka akka-stream scala
2021-10-21 17:29:00
2

En iyi cevabı

3

Akka Stream'in API dokümanlarından:

Concat:

Geçerli akış kullanılabilir bir öğeye sahip olduğunda yayar; Geçerli giriş tamamlanırsa, bir sonrakini dener

Broadcast:

Tüm çıkışlar geri basıncı durdurduğunda ve bir giriş elemanı mevcut olduğunda yayar

İki operatör, nasıl çalıştıkları konusunda bir çatışma olduğu için birlikte çalışmayacaktır -- Concat tüm öğeleri birinden çekmeye çalışır Broadcast'nin çıkışları diğerine geçmeden önce Broadcast tüm çıktıları için talep olmadıkça yaymaz.

İhtiyacınız olan şey için aşağıdakileri kullanarak birleştirebilirsinizconcat yorumcular tarafından önerildiği gibi:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

veya eşdeğer olarak kullanınSource.combine aşağıdaki gibi:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

Kullanım GraphDSL, Source uygulamasının basitleştirilmiş bir sürümüdür.birleştirmek:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

Diğer dillerde

Bu sayfa diğer dillerde

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................