Kafka'dan veri tüketmek için (docker'da) yanıp sönme (docker'da)

0

Soru

Mac'imde docker görüntüleri olarak çalışan Flink (görev yöneticisi ve iş yöneticisi) ve Kafka var.
Bir Flink işi oluşturdum ve dağıttım. İş kullanır FlinkKafkaConsumer ve FlinkKafkaProducer ve kafka'dan tüketmeli ve kafka'ya geri üretmeli.

Görünüşe göre "bootstrap.servers" Kullandığım (kafka:9092) ile başarısız olan Flink için hiçbir anlamı yoktur:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

docker ps sonuçları

CONTAINER ID   IMAGE                    COMMAND                  CREATED        STATUS        PORTS                                                NAMES
b0cb56cb1941   flink:latest             "/docker-entrypoint.…"   5 hours ago    Up 5 hours    6123/tcp, 8081/tcp                                   taskmanager
0c29ca57a5bb   flink:latest             "/docker-entrypoint.…"   5 hours ago    Up 5 hours    6123/tcp, 0.0.0.0:8081->8081/tcp                     jobmanager
c0e2a3ffc8e0   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   29 hours ago   Up 29 hours   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
b9ae03f8f026   wurstmeister/kafka       "start-kafka.sh"         29 hours ago   Up 29 hours   0.0.0.0:9092->9092/tcp                               kafka

docker network ls sonuçları

NETWORK ID     NAME            DRIVER    SCOPE
1c09bfc0a2e9   bridge          bridge    local
268c4521f1de   flink-network   bridge    local
2479a63017ab   host            host      local
bb11245fba78   kafka_default   bridge    local
245dcc1e0d76   none            null      local

Ayrıca işletiyorum :

docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka 

ve aldığım IP'yi bootstrap olarak kullandım.hizmetçiler (172.18.0.2:9092)

Güncelleme #1

Docker-compose'un bir alt kümesini kullanıyorum.yml (Martijn Visser tarafından)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8091:8091"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.0.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'

  jobmanager:
    image: flink:1.13.2-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.13.2-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

docker ps

CONTAINER ID   IMAGE                                   COMMAND                  CREATED          STATUS          PORTS                                            NAMES
2f465a0a4129   confluentinc/cp-kafka-rest:7.0.0        "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   0.0.0.0:8082->8082/tcp                           rest-proxy
eb25992c47d0   confluentinc/cp-schema-registry:7.0.0   "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   8081/tcp, 0.0.0.0:8091->8091/tcp                 schema-registry
1081319da296   confluentinc/cp-kafka:7.0.0             "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp   broker
de9056ee250c   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   30 minutes ago   Up 30 minutes   6123/tcp, 8081/tcp                               kafka_taskmanager_1
b38beefc35e3   confluentinc/cp-zookeeper:7.0.0         "/etc/confluent/dock…"   30 minutes ago   Up 30 minutes   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp       zookeeper
e6db23fa8842   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   30 minutes ago   Up 30 minutes   6123/tcp, 0.0.0.0:8081->8081/tcp                 kafka_jobmanager_1

Doğru önyükleme nedir.kullanılacak sunucu değeri?
Flink'in Kafka'yı "görmesini" nasıl sağlayabilirim?

apache-flink apache-kafka docker
2021-11-23 17:16:10
1

En iyi cevabı

1

Büyük olasılıkla yapılandırmanız gerekecek KAFKA_ADVERTISED_LISTENERS ve Flink'i yapılandırılmış değere getirin. Örneğin, Docker kurulumumda https://github.com/MartijnVisser/flink-only-sql Docker oluşturma dosyamda aşağıdaki yapılandırmaya sahibim:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  jobmanager:
    image: flink:1.14.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
  taskmanager:
    image: flink:1.14.0-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

O zaman kullanabilirim broker:29092 kafka önyükleme sunucuları olarak.

2021-11-23 17:42:17

Kırlangıç. Docker kompozisyonunun bir alt kümesini aldım ve kullandım. Güncelleme #1). Şimdi broker kapsayıcısına giriş yapıyorum (konular oluşturmak için vb.) Ve kafka*. sh komut dosyalarını bulamıyorum. Onları nerede bulabilirim?
balderman

Hi Martijn Visser. İşi başlatmayı başardım (kullanarak broker:29092). Mesele şu ki, tüketici herhangi bir mesaj almıyor. Çalışan üretici / tüketici komut dosyalarım olduğu için konuya mesaj gönderdiğimi biliyorum. Her neyse, cevabını kabul ediyorum.
balderman

Merhaba @ balderman-Hala bir hata mesajı mı alıyorsunuz yoksa sadece boş bir yanıt mı alıyorsunuz? Konuyu listelemek veya bir konuyla ilgili verileri görmek için aşağıdakileri kullanabilirsiniz: docker-compose exec broker kafka-topics \ --list \ --bootstrap-server broker:29092 docker-compose exec broker kafka-console-consumer \ --bootstrap-server broker:29092 \ --topic notifications \ --from-beginning
Martijn Visser

Mevcut durumu açıklayan yeni bir yazı oluşturdum. Bakın stackoverflow.com/questions/70100813 / ... - teşekkürler
balderman

Diğer dillerde

Bu sayfa diğer dillerde

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................