Mac'imde docker görüntüleri olarak çalışan Flink (görev yöneticisi ve iş yöneticisi) ve Kafka var.
Bir Flink işi oluşturdum ve dağıttım.
İş kullanır FlinkKafkaConsumer
ve FlinkKafkaProducer
ve kafka'dan tüketmeli ve kafka'ya geri üretmeli.
Görünüşe göre "bootstrap.servers"
Kullandığım (kafka:9092
) ile başarısız olan Flink için hiçbir anlamı yoktur:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
docker ps sonuçları
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b0cb56cb1941 flink:latest "/docker-entrypoint.…" 5 hours ago Up 5 hours 6123/tcp, 8081/tcp taskmanager
0c29ca57a5bb flink:latest "/docker-entrypoint.…" 5 hours ago Up 5 hours 6123/tcp, 0.0.0.0:8081->8081/tcp jobmanager
c0e2a3ffc8e0 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 29 hours ago Up 29 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
b9ae03f8f026 wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 29 hours 0.0.0.0:9092->9092/tcp kafka
docker network ls sonuçları
NETWORK ID NAME DRIVER SCOPE
1c09bfc0a2e9 bridge bridge local
268c4521f1de flink-network bridge local
2479a63017ab host host local
bb11245fba78 kafka_default bridge local
245dcc1e0d76 none null local
Ayrıca işletiyorum :
docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka
ve aldığım IP'yi bootstrap olarak kullandım.hizmetçiler (172.18.0.2:9092
)
Güncelleme #1
Docker-compose'un bir alt kümesini kullanıyorum.yml (Martijn Visser tarafından)
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091
schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8091:8091"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091
rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'
jobmanager:
image: flink:1.13.2-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.13.2-scala_2.12
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2f465a0a4129 confluentinc/cp-kafka-rest:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 0.0.0.0:8082->8082/tcp rest-proxy
eb25992c47d0 confluentinc/cp-schema-registry:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 8081/tcp, 0.0.0.0:8091->8091/tcp schema-registry
1081319da296 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp broker
de9056ee250c flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 30 minutes ago Up 30 minutes 6123/tcp, 8081/tcp kafka_taskmanager_1
b38beefc35e3 confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
e6db23fa8842 flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 30 minutes ago Up 30 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp kafka_jobmanager_1
Doğru önyükleme nedir.kullanılacak sunucu değeri?
Flink'in Kafka'yı "görmesini" nasıl sağlayabilirim?