Flink (sur le panneau) de consommer des données à partir de Kafka (sur le panneau)

0

La question

J'ai Flink (gestionnaire de tâches et d'emploi manager) et Kafka exécute en tant que docker images sur mon mac.
J'ai créé un Flink travail et déployé. Le travail utilise FlinkKafkaConsumer et FlinkKafkaProducer et devrait consommer de kafka et de produire de kafka.

Ressemble à la "bootstrap.servers" J'utilise (kafka:9092) n'a pas de sens pour Flink qui échoue avec:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

docker ps résultats

CONTAINER ID   IMAGE                    COMMAND                  CREATED        STATUS        PORTS                                                NAMES
b0cb56cb1941   flink:latest             "/docker-entrypoint.…"   5 hours ago    Up 5 hours    6123/tcp, 8081/tcp                                   taskmanager
0c29ca57a5bb   flink:latest             "/docker-entrypoint.…"   5 hours ago    Up 5 hours    6123/tcp, 0.0.0.0:8081->8081/tcp                     jobmanager
c0e2a3ffc8e0   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   29 hours ago   Up 29 hours   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
b9ae03f8f026   wurstmeister/kafka       "start-kafka.sh"         29 hours ago   Up 29 hours   0.0.0.0:9092->9092/tcp                               kafka

docker réseau de ls résultats

NETWORK ID     NAME            DRIVER    SCOPE
1c09bfc0a2e9   bridge          bridge    local
268c4521f1de   flink-network   bridge    local
2479a63017ab   host            host      local
bb11245fba78   kafka_default   bridge    local
245dcc1e0d76   none            null      local

J'ai également exécuter:

docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka 

et utilisé l'IP, j'ai eu comme bootstrap.les serveurs (172.18.0.2:9092)

Mise à jour #1

J'utilise un sous-ensemble de docker-composer.yml (Par Martijn Visser)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8091:8091"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.0.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'

  jobmanager:
    image: flink:1.13.2-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.13.2-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

docker ps

CONTAINER ID   IMAGE                                   COMMAND                  CREATED          STATUS          PORTS                                            NAMES
2f465a0a4129   confluentinc/cp-kafka-rest:7.0.0        "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   0.0.0.0:8082->8082/tcp                           rest-proxy
eb25992c47d0   confluentinc/cp-schema-registry:7.0.0   "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   8081/tcp, 0.0.0.0:8091->8091/tcp                 schema-registry
1081319da296   confluentinc/cp-kafka:7.0.0             "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp   broker
de9056ee250c   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   30 minutes ago   Up 30 minutes   6123/tcp, 8081/tcp                               kafka_taskmanager_1
b38beefc35e3   confluentinc/cp-zookeeper:7.0.0         "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp       zookeeper
e6db23fa8842   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   30 minutes ago   Up 30 minutes   6123/tcp, 0.0.0.0:8081->8081/tcp                 kafka_jobmanager_1

Quelle est la bonne bootstrap.les serveurs de la valeur à être utilisé?
Comment puis-je faire Flink "voir" Kafka?

apache-flink apache-kafka docker
2021-11-23 17:16:10
1

La meilleure réponse

1

Très probablement, vous aurez à configurer KAFKA_ADVERTISED_LISTENERS et point Flink à la valeur configurée. Par exemple, dans mon Panneau de configuration à https://github.com/MartijnVisser/flink-only-sql j'ai la configuration suivante dans mon menu fixe composer fichier:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  jobmanager:
    image: flink:1.14.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
  taskmanager:
    image: flink:1.14.0-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

Je peux alors utiliser broker:29092 comme Kafka bootstrap serveurs.

2021-11-23 17:42:17

Martin. J'ai pris un sous-ensemble du panneau de composer et de s'en servir. (Voir Mise À Jour #1). Maintenant je me connecte sur le courtier conteneur (dans le but de créer des topics, etc) et je ne peux pas trouver le kafka*.sh scripts. Où puis-je les trouver?
balderman

Salut Martijn Visser. J'ai réussi à faire le travail de commencer (à l'aide de broker:29092). Le point est que le consommateur ne reçoit pas de messages. Je sais que j'ai envoyer des messages dans le sujet, car j'ai producteur/consommateur scripts qui fonctionne. De toute façon - j'accepte votre réponse.
balderman

Salut @balderman - Êtes-vous toujours un message d'erreur ou juste une réponse vide? Vous pouvez utiliser soit dans la liste de la rubrique ou de voir les données sur un sujet: docker-compose exec broker kafka-topics \ --list \ --bootstrap-server broker:29092 docker-compose exec broker kafka-console-consumer \ --bootstrap-server broker:29092 \ --topic notifications \ --from-beginning
Martijn Visser

J'ai créé un nouveau post qui décrivent la situation actuelle. Voir stackoverflow.com/questions/70100813/... - merci
balderman

Dans d'autres langues

Cette page est dans d'autres langues

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................