Гайды

Kafka Connect: интеграция с БД и S3

Distributed workers, JDBC и Debezium source, S3 sink, конвертеры, SMT, DLQ и эксплуатация коннекторов.

~11 мин чтения

Kafka Connect: интеграция с БД и S3

Kafka Connect — фреймворк для потоковой интеграции Kafka с внешними системами через готовые или кастомные коннекторы. Режимы: Source (наружу → Kafka) и Sink (Kafka → наружу). Термины Kafka — Apache Kafka: топики, партиции и оффсеты; схемы — Avro и Schema Registry.


1. Архитектура Connect

  • Worker — JVM-процесс, запускающий коннекторы.
  • Connector — конфигурация (JDBC Source, Debezium, S3 Sink…).
  • Task — параллельные задачи внутри connector'а (масштаб по партициям/таблицам).

Distributed mode — несколько worker'ов с общим топиком конфигурации и rebalance задач (аналог consumer group).

Скелет REST-конфига (упрощённо; имена топиков и классы зависят от дистрибутива):

json
{
  "name": "jdbc-source-users",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db:5432/app",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "db.users.",
    "table.whitelist": "users",
    "tasks.max": "1"
  }
}

POST к /connectors на Connect REST API создаёт коннектор; GET /connectors/{name}/status — первый экран при инцидентах.

Имя connector.class и набор полей зависят от дистрибутива (Confluent Platform, Strimzi Kafka Connect, Debezium-only образ) — копируйте из доки конкретного коннектора, а не из «примерного» JSON.


2. Source: JDBC и CDC

JdbcSourceConnector — периодический poll таблицы или query (incrementing column / timestamp). Просто, но не даёт true CDC без дополнений.

Debezium (часто как отдельный Kafka Connect connector) — логическая репликация БД (binlog, WAL) → события INSERT/UPDATE/DELETE в Kafka. Низкая задержка, полная модель изменений.


3. Sink в S3 / облако

S3 Sink (Confluent или OSS аналоги) — батчирует сообщения в Parquet/Avro/JSON файлы по времени или размеру, коммитит offset'ы после успешной записи.

Важно: ключи объектов, partitioner, flush.size / rotate.interval.ms, идемпотентность при повторной доставке.


4. Конвертеры и схемы

key.converter / value.converter: StringConverter, JsonConverter, AvroConverter (с Schema Registry). Согласуйте с топиками и downstream.

Single Message Transform (SMT) — лёгкие преобразования полей, маскирование, маршрутизация без отдельного сервиса. Не складывайте тяжёлую бизнес-логику в SMT — отладка и версионирование усложняются.


5. Offset storage и ребаланс

В distributed mode конфиг и offset'ы коннекторов живут во внутренних топиках Kafka; при добавлении worker'а идёт rebalance задач — краткий pause ingestion. Планируйте достаточно партиций внутренних топиков Connect под размер кластера.


6. Ошибки и DLQ в Connect

errors.tolerance, errors.deadletterqueue.topic.name — отправка битых записей в DLQ-топик с заголовками ошибки. На стороне приложений — см. DLQ и обработка ошибок в Kafka; consumer semantics — Kafka Consumers.


7. Эксплуатация

  • Версии connector'ов и драйверов JDBC отдельно от версии Kafka.
  • Мониторинг task FAILED, restarts, lag source от БД — см. Мониторинг Kafka.
  • Секреты — через ConfigProvider / внешние vault'ы, не plaintext в Git.

8. Когда Connect, а когда свой код

ConnectСвой consumer/producer
Стандартные интеграции, мало логикиСложная бизнес-логика, нестандартные протоколы
Быстрый старт CDC → KafkaТонкий контроль backpressure и ретраев

Exactly-once для связки sink ↔ внешняя система почти всегда означает идемпотентные записи + транзакции Kafka (enable.idempotence) там, где поддерживается коннектором; читайте матрицу для S3/JDBC и не путайте EOS Kafka с EOS вашей БД.


9. Чек-лист

  • Режим distributed + N workers для отказоустойчивости.
  • Конвертеры и Registry согласованы.
  • DLQ и логирование для poison messages.
  • Оценена нагрузка на БД от JDBC poll (интервалы, индексы).
  • Лимиты памяти JVM worker'ов под размер батчей sink.

Дальше: Consumers · Мониторинг · Тег Kafka