Ya bir Kafka'nın tüketicisi bir mesajı çok uzun süre idare ederse? Kafka bu bölümü başka bir tüketiciye yeniden atayacak ve mesaj iki katına çıkacak mı?

0

Soru

Varsaymak Kafka, 1 partition, 2 consumers.(2. tüketici boşta)

1.mesajın bir mesaj tükettiğini, diğer 3 servisle başa çıktığını ve aniden bunlardan birine yapıştığını ve Kafka'nın zaman aşımını kaçırdığını varsayalım.

Kafka, bölümü 2. tüketiciye yeniden atayacak ve mesaj iki kez işlenecek mi (1. bölümün sonunda başarılı olacağını varsayalım)?

1

En iyi cevabı

1

Ya bir Kafka'nın tüketicisi bir mesajı çok uzun süre idare ederse? Kafka bu bölümü başka bir tüketiciye yeniden atayacak ve mesaj iki katına çıkacak mı?

Evet, bu doğru. Kafka consumer'ın bir iletiyi işlemesi çok uzun sürerse ve sonraki poll() gecikirse, Kafka bu bölümü başka bir tüketiciye yeniden atayacak ve ileti tekrar (ve tekrar) işlenecektir.

Daha anlaşılır olması için, ilk karar ve uzun uzun da Nasıl tanımlamak lazım?'.

Bu özellik tarafından tanımlanır max.poll.interval.ms. Dokümanlardan,

Tüketici grubu yönetimi kullanılırken anket () çağrıları arasındaki maksimum gecikme. Bu, tüketicinin daha fazla kayıt almadan önce boşta kalabileceği süreye bir üst sınır koyar. Poll() bu zaman aşımının sona ermesinden önce çağrılmazsa, tüketici başarısız sayılır ve grup bölümleri başka bir üyeye yeniden atamak için yeniden dengelenir.

Bu süre içinde yoklama() çağrısı yoksa tüketici grubu yeniden dengelenir.

Bir mülk daha var. auto.commit.interval.ms. Otomatik tamamlama uzaklıkları denetimi yalnızca anket sırasında çağrılır-geçen sürenin yapılandırılmış otomatik tamamlama aralığı süresinden büyük olup olmadığını kontrol eder ve sonuç evet ise, uzaklığın kaydedilip kaydedilmediğini kontrol eder.

Kafka consumer'ın kayıtları işlemesi çok uzun sürüyorsa, sonraki poll() çağrısı da ertelenir ve son poll () ' da döndürülen uzaklıklar kaydedilmez. Yeniden dengeleme şu anda gerçekleşirse, bu bölüme atanan yeni tüketici istemcisi iletileri yeniden işlemeye başlayacaktır.

Tüketici grubu yeniden dengelenmesi ve sonuçta ortaya çıkan bölüm yeniden atanması bu değer artırılarak önlenebilir. Bu, anketler arasındaki izin verilen aralığı artıracak ve tüketicilere poll () öğesinden döndürülen kayıt (lar) ı işlemeleri için daha fazla zaman tanıyacaktır. Tüketicilerin sadece arama içinde dengelemesinin ankete katıl, max yoklama aralığı da grubu yeniden dengeler geciktirecek artırır.

Maksimum anket aralığını büyük bir değere yükseltmede bir sorun daha var. Tüketici başka bir nedenle ölürse, yapılandırılandan daha uzun sürer max.poll.interval.ms arızayı tespit etmek için aralık.

session.timeout.ms ve heartbeat.interval.ms bu durumda, toplam arızayı mümkün olduğunca erken tespit etmek için kullanılabilir.

Bu parametreler hakkında daha fazla bilgi için:

Lütfen değerlerin aşağıdakiler için yapılandırıldığını unutmayın session.timeout.ms broker yapılandırmasında özelliklere göre yapılandırıldığı gibi izin verilen aralıkta olmalıdır

  • group.min.session.timeout.ms
  • group.max.session.timeout.ms

Aksi takdirde, tüketici istemcisini başlatırken aşağıdaki istisna atılır.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Güncelleştirme: İletileri yeniden işlemekten kaçınmak için

KafkaConsumer sınıfında başka bir yöntem var commitAsync() commit ofset işlemini tetiklemek için.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

commitSync() ve commitAsync () hakkında daha fazla bilgi için lütfen bu konuya bakın

Bir ofseti manuel olarak işlemek, ofsetin işlendiğini, böylece Kafka'nın aynı bölüm için kaydedilmiş kayıtları tekrar göndermeyeceğini söyleme eylemidir. Ofsetler manuel olarak işlendiğinde, tüketici herhangi bir nedenle kayıtları işlemeden önce ölürse, bu kayıtların tekrar işlenmeme ihtimalinin bulunduğunu unutmamak önemlidir.

2021-11-25 07:04:25

Teşekkür ederim, bu açık. İkinci işlemden kaçınmanın herhangi bir yolu var mı?
J.J. Beam

@ JJ Beam, bağlantılar ve örneklerle cevabı güncelledi
arunkvelu

Diğer dillerde

Bu sayfa diğer dillerde

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................