J'ai Flink (gestionnaire de tâches et d'emploi manager) et Kafka exécute en tant que docker images sur mon mac.
J'ai créé un Flink travail et déployé.
Le travail utilise FlinkKafkaConsumer
et FlinkKafkaProducer
et devrait consommer de kafka et de produire de kafka.
Ressemble à la "bootstrap.servers"
J'utilise (kafka:9092
) n'a pas de sens pour Flink qui échoue avec:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
docker ps résultats
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b0cb56cb1941 flink:latest "/docker-entrypoint.…" 5 hours ago Up 5 hours 6123/tcp, 8081/tcp taskmanager
0c29ca57a5bb flink:latest "/docker-entrypoint.…" 5 hours ago Up 5 hours 6123/tcp, 0.0.0.0:8081->8081/tcp jobmanager
c0e2a3ffc8e0 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 29 hours ago Up 29 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
b9ae03f8f026 wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 29 hours 0.0.0.0:9092->9092/tcp kafka
docker réseau de ls résultats
NETWORK ID NAME DRIVER SCOPE
1c09bfc0a2e9 bridge bridge local
268c4521f1de flink-network bridge local
2479a63017ab host host local
bb11245fba78 kafka_default bridge local
245dcc1e0d76 none null local
J'ai également exécuter:
docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka
et utilisé l'IP, j'ai eu comme bootstrap.les serveurs (172.18.0.2:9092
)
Mise à jour #1
J'utilise un sous-ensemble de docker-composer.yml (Par Martijn Visser)
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091
schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8091:8091"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091
rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'
jobmanager:
image: flink:1.13.2-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.13.2-scala_2.12
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2f465a0a4129 confluentinc/cp-kafka-rest:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 0.0.0.0:8082->8082/tcp rest-proxy
eb25992c47d0 confluentinc/cp-schema-registry:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 8081/tcp, 0.0.0.0:8091->8091/tcp schema-registry
1081319da296 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp broker
de9056ee250c flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 30 minutes ago Up 30 minutes 6123/tcp, 8081/tcp kafka_taskmanager_1
b38beefc35e3 confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
e6db23fa8842 flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 30 minutes ago Up 30 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp kafka_jobmanager_1
Quelle est la bonne bootstrap.les serveurs de la valeur à être utilisé?
Comment puis-je faire Flink "voir" Kafka?