Aşağıdakileri yapan bir kod parçası yazmaya çalışıyorum:-
- S3 gibi uzak bir kaynaktan büyük bir csv dosyası okur.
- Dosya kaydını kayda göre işleyin.
- Kullanıcıya bildirim gönder
- Çıktıyı uzak bir konuma yazma
Giriş csv'sinde örnek kayıt:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Giriş csv'sinde bir kaydı temsil eden giriş durum sınıfım:
case class InputRecord(recordId: String, name: String, salary: Long)
Çıktı csv'sinde örnek kayıt (yazılması gereken):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Giriş csv'sinde bir kaydı temsil eden çıktı durum sınıfım:
case class OutputRecord(recordId: String, name: String, designation: String)
Akka stream csv kullanarak bir kaydı okuma (Alpakka reactive s3 kullanır https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Şimdi kayıtları işlemek için bir işlevim var:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Çıktı kaydını csv olarak yazma işlevi
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
E-posta bildirimi gönderme işlevi:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Hepsini birbirine dikmek
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
15. ve 16. Satırda bir hata alıyorum, ya 15. Satırı ya da 16. Satırı ekleyebiliyorum ama her ikisinden de değil notify
& writeOutput
ihtiyaçlar outputRecord
. Haber verildikten sonra kaybederim. outputRecord
.
İkisini de ekleyebileceğim bir yol var mı notify
ve writeOutput
aynı grafiğe mi?
İlk çağırmak istediğim gibi paralel yürütme aramıyorum notify
ve sonra sadece writeOutput
. Yani bu yardımcı olmuyor: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Kullanım durumu bana çok basit geliyor ama bazıları nasıl temiz bir çözüm bulamıyorum.