Гайды
Data pipelines с Apache Airflow
DAG, Scheduler и Executor, TaskFlow API, идемпотентность backfill и секреты.
~10 мин чтения
Data pipelines с Apache Airflow
Apache Airflow — оркестрация DAG (направленных ациклических графов) задач в Python: Operators, Sensors, Hooks к БД/облаку. Планирование, ретраи, backfill, SLA. Трансформации в warehouse — часто dbt — Основы dbt.
1. DAG
Файл Python в dags/; Scheduler парсит DAG, Executor (Celery/Kubernetes) запускает задачи, Metadata DB хранит состояние.
2. Идиомы
@dag(schedule=..., start_date=...), PythonOperator, TaskGroup, XCom для передачи небольших данных между задачами (осторожно с объёмом).
Минимальный DAG на TaskFlow API (Airflow 2.x):
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example"],
)
def etl_demo():
@task
def extract():
return {"rows": 42}
@task
def transform(data: dict) -> dict:
return {"count": data["rows"] * 2}
@task
def load(data: dict):
print("load", data)
load(transform(extract()))
etl_demo_dag = etl_demo()
Динамически генерируемые DAG из списка источников удобны, но усложняют парсинг планировщиком — держите число DAG-файлов и сложность генерации под контролем, иначе scheduler будет долго обходить dags/.
3. Идемпотентность
Повторный запуск дня (backfill) не должен портить данные; merge вместо слепого append.
4. Airflow 2.x
TaskFlow API — @task декораторы, типизированные XCom.
5. Зависимости между DAG и внешние события
ExternalTaskSensor ждёт успех задачи в другом DAG — аккуратно с таймаутами и poke_interval, чтобы не забивать слоты. Для событий извне (файл в GCS, сообщение) — Sensors с reschedule mode вместо длительного оккупирования worker-слота.
6. Пулы, приоритеты и SLA
Pools ограничивают параллелизм по типу ресурса (например, тяжёлые запросы к одному API). priority_weight и queue влияют на порядок при конкуренции. SLA misses — алерты на просроченные задачи.
7. Развёртывание и версии
Один репозиторий с DAG'ами или образ с dags/; синхронизация на scheduler/webserver. Фиксируйте версию Airflow и зависимостей провайдеров; мажорные апгрейды читайте по migration guide (изменения в DB schema metadata).
8. Чек-лист
- Тесты DAG (
dag.test()), CI на импорт ошибок. - Ресурсы executor'а под тяжёлые задачи.
- Секреты через Connections + backend vault.
- Мониторинг failed tasks и длительности пула.
-
catchup=Falseтам, где исторический прогон DAG нежелателен. - Лимиты на размер XCom и тяжёлые объекты вместо передачи через XCom.
Дальше: тег «Data engineering»