Гайды
Kafka Consumers и consumer groups
Rebalance, poll и max.poll.interval, auto vs manual commit, static membership, isolation.level и consumer lag.
~11 мин чтения
Kafka Consumers и consumer groups
Consumer читает записи партиция за партицией, смещая offset. Consumer group распределяет партиции между членами и хранит committed offsets в топике __consumer_offsets. Базовые термины — Apache Kafka: топики, партиции и оффсеты; DLQ и ошибки в приложениях — отдельный гайд; DLQ в Connect — Kafka Connect; стриминговая обработка — Kafka Streams.
1. Группа и rebalance
При join нового consumer'а или при heartbeat timeout / max.poll.interval срабатывает rebalance: партиции отбираются и назначаются заново.
session.timeout.ms — как долго считать consumer мёртвым без heartbeat.
heartbeat.interval.ms — как часто слать heartbeat (обычно ~1/3 от session timeout).
max.poll.interval.ms — максимум между вызовами poll(); если обработка одной пачки дольше — consumer вылетит из группы → rebalance.
Тяжёлую работу выносите из poll()-цикла в пул потоков, но не забывайте про max.poll.interval.
2. Semantics: auto vs manual commit
enable.auto.commit=true (по умолчанию в старых примерах) — коммит оффсетов периодически; риск: сообщение обработано, но оффсет ещё не закоммичен → при сбое повтор; или оффсет закоммичен до конца обработки → потеря.
Ручной коммит после успешной обработки батча:
- Синхронно
commitSync()— надёжнее, блокирует. - Асинхронно
commitAsync()— быстрее, обрабатывать колбэки ошибок.
Паттерн at-least-once: обработали → закоммитили. Идемпотентность обработчика обязательна.
3. poll() и батчи
poll(Duration) возвращает много записей сразу. Размер управляется max.partition.fetch.bytes, fetch.max.bytes, max.poll.records на клиенте и лимитами брокера.
Маленький max.poll.records — меньше памяти и времени на итерацию; больше — выше throughput при быстрой обработке.
4. Партиция и порядок
Внутри партиции порядок фиксирован. Один consumer в группе читает партицию целиком; при N consumer'ах и M партициях «лишние» consumer'ы простаивают (M < N).
5. Static membership (group.instance.id)
Чтобы при кратковременном рестарте pod'а не было полного rebalance шторма, задают group.instance.id — «тот же участник» после переподключения (версии и настройки брокера должны поддерживать).
6. Seek, assign, subscribe
subscribe(topics)— динамическое назначение партиций через группу.assign(partitions)— ручное закрепление; без балансировки группы; нужно для особых случаев.seek()— прыжок на оффсет (replay, backfill).
7. Изоляция чтения транзакционных записей
isolation.level=read_committed — consumer не увидит «незакоммиченные» транзакционные сообщения до commit producer'а.
7.1 Cooperative sticky assignor
partition.assignment.strategy с CooperativeSticky уменьшает «stop-the-world» rebalance: партиции отзываются постепенно. Полезно при частых деплоях; проверьте версию клиента и брокера на совместимость.
8. Частые проблемы
| Симптом | Возможная причина |
|---|---|
| Постоянные rebalance | Долгая обработка > max.poll.interval, GC паузы |
| Дубликаты | At-least-once без идемпотентного consumer |
| «Залипание» на партиции | Один consumer упал, сессия не истекла |
| Лаг растёт | Мало consumer'ов или медленная обработка |
Метрика consumer lag (разница high watermark и committed offset) — главный SLO для чтения. Подробнее — Мониторинг Kafka.
9. Чек-лист
- Размер группы и число партиций согласованы с нагрузкой.
-
max.poll.intervalпокрывает 99-й перцентиль обработки батча. - Стратегия commit осознанна (auto vs manual).
- Идемпотентность или дедупликация на стороне приложения.
- Мониторинг lag по группе/топику.
- Graceful shutdown:
wakeup()+ закрытие после обработки текущего батча.