Akka flux d'Entrée ("Dans") comme en Sortie (`Out`)

0

La question

Je suis en train d'écrire un morceau de code qui n'suivantes:-

  1. Lit un gros fichier csv à partir de la source à distance comme le s3.
  2. Traiter le fichier à l'enregistrement par enregistrement.
  3. Envoyer une notification à l'utilisateur
  4. Écrire la sortie dans un emplacement distant

Enregistrement des échantillons en entrée csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Mon entrée classe de cas de ce qui constitue un record dans l'entrée csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Enregistrement des échantillons en sortie csv (qui doit être écrit):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Ma sortie de la classe de cas de ce qui constitue un record dans l'entrée csv:

case class OutputRecord(recordId: String, name: String, designation: String)

La lecture d'un enregistrement à l'aide d'akka flux csv (utilise Alpakka réactif s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Maintenant, j'ai une fonction pour traiter les dossiers:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Fonction pour écrire le OutputRecord csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Fonction pour envoyer un email de notification:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Couture tous ensemble

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

Sur la Ligne 15 et 16, je reçois une erreur, je suis en mesure d'ajouter la Ligne 15 ou 16, mais pas les deux depuis deux notify & writeOutput besoins outputRecord. Une fois notifier est appelé, je lâche mon outputRecord.

Est il possible que je peux ajouter à la fois notify et writeOutput de même graphique?

Je ne suis pas à la recherche pour une exécution parallèle que je veux pour le premier appel notify et alors seulement writeOutput. Ce n'est donc pas utile: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Le cas d'utilisation semble très simple pour moi, mais je ne suis pas en mesure de trouver une solution propre.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

La meilleure réponse

1

La sortie de notify est un PushResultmais l'entrée de writeOutput est ByteString. Une fois que vous changez que de compiler. Dans le cas où vous avez besoin ByteString, obtenir la même chose de OutputRecord.

BTW, dans l'exemple de code que vous avez fournis, une erreur similaire existe dans readCSV et process.

2021-11-24 03:36:16

Dans d'autres langues

Cette page est dans d'autres langues

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................