Que faire si une de Kafka consommateur gère un message trop long? Va Kafka reconduire cette partition à un autre consommateur, et le message sera doublement traitée?

0

La question

Supposons que Kafka, 1 partition, 2 consumers.(2ème consommateur est en veille)

Supposons que le 1er on a consommé un message, va à manipuler avec 3 autres services, et tout à coup la colle sur l'un d'entre eux et de manquer le Kafka du délai d'attente.

Va Kafka renommer la partition de la 2e à la consommation et le message sera doublement traitée (à supposer que le 1er on finit par réussir)?

1

La meilleure réponse

1

Que faire si une de Kafka consommateur gère un message trop long? Va Kafka reconduire cette partition à un autre consommateur, et le message sera doublement traitée?

Oui, c'est correct. Si Kafka consommateur prend trop de temps pour traiter un message et ultérieure poll() est retardée, Kafka re-nommer cette partition à un autre consommateur, et le message sera traité à nouveau (et encore).

Pour plus de clarté, nous avons d'abord besoin de décider et de définir Combien de temps est trop long?'.

Il est défini par la propriété max.poll.interval.ms. À partir de la docs,

Le délai maximum entre les appels de poll() lors de l'utilisation d'un groupe de consommateurs de gestion. Cette place la limite supérieure de la quantité de temps que le consommateur puisse être inactif avant de l'extraction de plusieurs enregistrements. Si poll() n'est pas appelé avant l'expiration de ce délai, le client est considéré comme échoué et que le groupe se rééquilibrer afin de réaffecter les partitions d'un autre membre.

Groupe de consommateurs est rééquilibré si il n'y a pas d'appels à la poll() à l'intérieur de ce temps.

Il y a une propriété auto.commit.interval.ms. La validation automatique des décalages de contrôle sera appelée qu'au cours de ce sondage, il vérifie si le temps écoulé est supérieur à la configuration de la validation automatique de l'intervalle de temps et si le résultat est oui, le décalage est engagé.

Si Kafka consommateur est de prendre trop de temps pour traiter les dossiers, puis par la suite poll() est également retardée et les décalages de retour sur le dernier sondage, les() ne sont pas validées. Si rééquilibrer qui se passe à ce moment, le nouveau consommateur, client affecté à cette partition commence à traiter les messages.

Le groupe des consommateurs de rééquilibrer et de la partition résultante de la réaffectation peut être évité en augmentant cette valeur. Cela permettra d'accroître le permis intervalle entre les sondages et donner plus de temps aux consommateurs pour gérer l'enregistrement(s) retourné à partir de poll(). Les consommateurs seulement de rejoindre le rééquilibrage à l'intérieur de l'appel d'interroger, de manière à augmenter max intervalle de sondage sera également retarder le groupe de rééquilibre.

Il y a un problème de plus en augmentant max intervalle d'interrogation à une grande valeur. Si le consommateur meurt pour une autre raison, il prend plus de temps que le configuré max.poll.interval.ms intervalle de détecter la panne.

session.timeout.ms et heartbeat.interval.ms sont disponibles dans ce cas de détecter l'échec total comme plus tôt possible.

Pour plus de détails sur ces paramètres:

Veuillez noter que les valeurs configurées pour session.timeout.ms doit être dans la plage admissible, tel que défini dans la configuration du broker par les propriétés

  • groupe.min.session.délai d'attente.ms
  • groupe.max.session.délai d'attente.ms

Sinon, à la suite d'exception sera levée lors du démarrage du client consommateur.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Mise à jour: Pour éviter de manipuler à nouveau les messages

Il existe une autre méthode dans la classe KafkaConsumer commitAsync() pour déclencher commettre des décalages de fonctionnement.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

Pour plus de détails sur commitSync() et commitAsync(), veuillez consulter ce fil de discussion

Commettre un décalage manuellement est une action de dire que le décalage a été traitée de sorte que le Kafka de ne pas envoyer les dossiers engagés pour la même partition. Lorsque les décalages sont engagés manuellement, il est important de noter que si le consommateur meurt avant le traitement des dossiers pour une raison quelconque, il y a une chance pour que ces dossiers ne seront pas traités à nouveau.

2021-11-25 07:04:25

Merci, c'est clair. Est-il possible d'éviter la deuxième manipulation?
J.J. Beam

@J. J. Faisceau de réponse mis à jour avec des liens et de l'échantillon
arunkvelu

Dans d'autres langues

Cette page est dans d'autres langues

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................