Akka akış Girişi (`In`) Çıkış olarak ('Out`)

0

Soru

Aşağıdakileri yapan bir kod parçası yazmaya çalışıyorum:-

  1. S3 gibi uzak bir kaynaktan büyük bir csv dosyası okur.
  2. Dosya kaydını kayda göre işleyin.
  3. Kullanıcıya bildirim gönder
  4. Çıktıyı uzak bir konuma yazma

Giriş csv'sinde örnek kayıt:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Giriş csv'sinde bir kaydı temsil eden giriş durum sınıfım:

case class InputRecord(recordId: String, name: String, salary: Long)

Çıktı csv'sinde örnek kayıt (yazılması gereken):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Giriş csv'sinde bir kaydı temsil eden çıktı durum sınıfım:

case class OutputRecord(recordId: String, name: String, designation: String)

Akka stream csv kullanarak bir kaydı okuma (Alpakka reactive s3 kullanır https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Şimdi kayıtları işlemek için bir işlevim var:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Çıktı kaydını csv olarak yazma işlevi

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

E-posta bildirimi gönderme işlevi:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Hepsini birbirine dikmek

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

15. ve 16. Satırda bir hata alıyorum, ya 15. Satırı ya da 16. Satırı ekleyebiliyorum ama her ikisinden de değil notify & writeOutput ihtiyaçlar outputRecord. Haber verildikten sonra kaybederim. outputRecord.

İkisini de ekleyebileceğim bir yol var mı notify ve writeOutput aynı grafiğe mi?

İlk çağırmak istediğim gibi paralel yürütme aramıyorum notify ve sonra sadece writeOutput. Yani bu yardımcı olmuyor: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Kullanım durumu bana çok basit geliyor ama bazıları nasıl temiz bir çözüm bulamıyorum.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

En iyi cevabı

1

Çıktınotify is a PushResult ama giriş writeOutput oluyor ByteString. Bunu değiştirdikten sonra derlenecektir. İhtiyacın olabilir ByteString gelen aynı olsun OutputRecord.

BTW, sağladığınız örnek kodda benzer bir hata var readCSV ve process.

2021-11-24 03:36:16

Diğer dillerde

Bu sayfa diğer dillerde

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................