Гайды
Kafka Connect: интеграция с БД и S3
Distributed workers, JDBC и Debezium source, S3 sink, конвертеры, SMT, DLQ и эксплуатация коннекторов.
~11 мин чтения
Kafka Connect: интеграция с БД и S3
Kafka Connect — фреймворк для потоковой интеграции Kafka с внешними системами через готовые или кастомные коннекторы. Режимы: Source (наружу → Kafka) и Sink (Kafka → наружу). Термины Kafka — Apache Kafka: топики, партиции и оффсеты; схемы — Avro и Schema Registry.
1. Архитектура Connect
- Worker — JVM-процесс, запускающий коннекторы.
- Connector — конфигурация (JDBC Source, Debezium, S3 Sink…).
- Task — параллельные задачи внутри connector'а (масштаб по партициям/таблицам).
Distributed mode — несколько worker'ов с общим топиком конфигурации и rebalance задач (аналог consumer group).
Скелет REST-конфига (упрощённо; имена топиков и классы зависят от дистрибутива):
{
"name": "jdbc-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/app",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db.users.",
"table.whitelist": "users",
"tasks.max": "1"
}
}
POST к /connectors на Connect REST API создаёт коннектор; GET /connectors/{name}/status — первый экран при инцидентах.
Имя connector.class и набор полей зависят от дистрибутива (Confluent Platform, Strimzi Kafka Connect, Debezium-only образ) — копируйте из доки конкретного коннектора, а не из «примерного» JSON.
2. Source: JDBC и CDC
JdbcSourceConnector — периодический poll таблицы или query (incrementing column / timestamp). Просто, но не даёт true CDC без дополнений.
Debezium (часто как отдельный Kafka Connect connector) — логическая репликация БД (binlog, WAL) → события INSERT/UPDATE/DELETE в Kafka. Низкая задержка, полная модель изменений.
3. Sink в S3 / облако
S3 Sink (Confluent или OSS аналоги) — батчирует сообщения в Parquet/Avro/JSON файлы по времени или размеру, коммитит offset'ы после успешной записи.
Важно: ключи объектов, partitioner, flush.size / rotate.interval.ms, идемпотентность при повторной доставке.
4. Конвертеры и схемы
key.converter / value.converter: StringConverter, JsonConverter, AvroConverter (с Schema Registry). Согласуйте с топиками и downstream.
Single Message Transform (SMT) — лёгкие преобразования полей, маскирование, маршрутизация без отдельного сервиса. Не складывайте тяжёлую бизнес-логику в SMT — отладка и версионирование усложняются.
5. Offset storage и ребаланс
В distributed mode конфиг и offset'ы коннекторов живут во внутренних топиках Kafka; при добавлении worker'а идёт rebalance задач — краткий pause ingestion. Планируйте достаточно партиций внутренних топиков Connect под размер кластера.
6. Ошибки и DLQ в Connect
errors.tolerance, errors.deadletterqueue.topic.name — отправка битых записей в DLQ-топик с заголовками ошибки. На стороне приложений — см. DLQ и обработка ошибок в Kafka; consumer semantics — Kafka Consumers.
7. Эксплуатация
- Версии connector'ов и драйверов JDBC отдельно от версии Kafka.
- Мониторинг task FAILED, restarts, lag source от БД — см. Мониторинг Kafka.
- Секреты — через ConfigProvider / внешние vault'ы, не plaintext в Git.
8. Когда Connect, а когда свой код
| Connect | Свой consumer/producer |
|---|---|
| Стандартные интеграции, мало логики | Сложная бизнес-логика, нестандартные протоколы |
| Быстрый старт CDC → Kafka | Тонкий контроль backpressure и ретраев |
Exactly-once для связки sink ↔ внешняя система почти всегда означает идемпотентные записи + транзакции Kafka (enable.idempotence) там, где поддерживается коннектором; читайте матрицу для S3/JDBC и не путайте EOS Kafka с EOS вашей БД.
9. Чек-лист
- Режим distributed + N workers для отказоустойчивости.
- Конвертеры и Registry согласованы.
- DLQ и логирование для poison messages.
- Оценена нагрузка на БД от JDBC poll (интервалы, индексы).
- Лимиты памяти JVM worker'ов под размер батчей sink.
Дальше: Consumers · Мониторинг · Тег Kafka