Гайды

Kafka Consumers и consumer groups

Rebalance, poll и max.poll.interval, auto vs manual commit, static membership, isolation.level и consumer lag.

~11 мин чтения

Kafka Consumers и consumer groups

Consumer читает записи партиция за партицией, смещая offset. Consumer group распределяет партиции между членами и хранит committed offsets в топике __consumer_offsets. Базовые термины — Apache Kafka: топики, партиции и оффсеты; DLQ и ошибки в приложениях — отдельный гайд; DLQ в Connect — Kafka Connect; стриминговая обработка — Kafka Streams.


1. Группа и rebalance

При join нового consumer'а или при heartbeat timeout / max.poll.interval срабатывает rebalance: партиции отбираются и назначаются заново.

session.timeout.ms — как долго считать consumer мёртвым без heartbeat.
heartbeat.interval.ms — как часто слать heartbeat (обычно ~1/3 от session timeout).
max.poll.interval.ms — максимум между вызовами poll(); если обработка одной пачки дольше — consumer вылетит из группы → rebalance.

Тяжёлую работу выносите из poll()-цикла в пул потоков, но не забывайте про max.poll.interval.


2. Semantics: auto vs manual commit

enable.auto.commit=true (по умолчанию в старых примерах) — коммит оффсетов периодически; риск: сообщение обработано, но оффсет ещё не закоммичен → при сбое повтор; или оффсет закоммичен до конца обработки → потеря.

Ручной коммит после успешной обработки батча:

  • Синхронно commitSync() — надёжнее, блокирует.
  • Асинхронно commitAsync() — быстрее, обрабатывать колбэки ошибок.

Паттерн at-least-once: обработали → закоммитили. Идемпотентность обработчика обязательна.


3. poll() и батчи

poll(Duration) возвращает много записей сразу. Размер управляется max.partition.fetch.bytes, fetch.max.bytes, max.poll.records на клиенте и лимитами брокера.

Маленький max.poll.records — меньше памяти и времени на итерацию; больше — выше throughput при быстрой обработке.


4. Партиция и порядок

Внутри партиции порядок фиксирован. Один consumer в группе читает партицию целиком; при N consumer'ах и M партициях «лишние» consumer'ы простаивают (M < N).


5. Static membership (group.instance.id)

Чтобы при кратковременном рестарте pod'а не было полного rebalance шторма, задают group.instance.id — «тот же участник» после переподключения (версии и настройки брокера должны поддерживать).


6. Seek, assign, subscribe

  • subscribe(topics) — динамическое назначение партиций через группу.
  • assign(partitions) — ручное закрепление; без балансировки группы; нужно для особых случаев.
  • seek() — прыжок на оффсет (replay, backfill).

7. Изоляция чтения транзакционных записей

isolation.level=read_committed — consumer не увидит «незакоммиченные» транзакционные сообщения до commit producer'а.


7.1 Cooperative sticky assignor

partition.assignment.strategy с CooperativeSticky уменьшает «stop-the-world» rebalance: партиции отзываются постепенно. Полезно при частых деплоях; проверьте версию клиента и брокера на совместимость.


8. Частые проблемы

СимптомВозможная причина
Постоянные rebalanceДолгая обработка > max.poll.interval, GC паузы
ДубликатыAt-least-once без идемпотентного consumer
«Залипание» на партицииОдин consumer упал, сессия не истекла
Лаг растётМало consumer'ов или медленная обработка

Метрика consumer lag (разница high watermark и committed offset) — главный SLO для чтения. Подробнее — Мониторинг Kafka.


9. Чек-лист

  • Размер группы и число партиций согласованы с нагрузкой.
  • max.poll.interval покрывает 99-й перцентиль обработки батча.
  • Стратегия commit осознанна (auto vs manual).
  • Идемпотентность или дедупликация на стороне приложения.
  • Мониторинг lag по группе/топику.
  • Graceful shutdown: wakeup() + закрытие после обработки текущего батча.

Дальше: Producers · Streams · Тег Kafka