Je suis en train d'écrire un morceau de code qui n'suivantes:-
- Lit un gros fichier csv à partir de la source à distance comme le s3.
- Traiter le fichier à l'enregistrement par enregistrement.
- Envoyer une notification à l'utilisateur
- Écrire la sortie dans un emplacement distant
Enregistrement des échantillons en entrée csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Mon entrée classe de cas de ce qui constitue un record dans l'entrée csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Enregistrement des échantillons en sortie csv (qui doit être écrit):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Ma sortie de la classe de cas de ce qui constitue un record dans l'entrée csv:
case class OutputRecord(recordId: String, name: String, designation: String)
La lecture d'un enregistrement à l'aide d'akka flux csv (utilise Alpakka réactif s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Maintenant, j'ai une fonction pour traiter les dossiers:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Fonction pour écrire le OutputRecord csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Fonction pour envoyer un email de notification:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Couture tous ensemble
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
Sur la Ligne 15 et 16, je reçois une erreur, je suis en mesure d'ajouter la Ligne 15 ou 16, mais pas les deux depuis deux notify
& writeOutput
besoins outputRecord
. Une fois notifier est appelé, je lâche mon outputRecord
.
Est il possible que je peux ajouter à la fois notify
et writeOutput
de même graphique?
Je ne suis pas à la recherche pour une exécution parallèle que je veux pour le premier appel notify
et alors seulement writeOutput
. Ce n'est donc pas utile: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Le cas d'utilisation semble très simple pour moi, mais je ne suis pas en mesure de trouver une solution propre.