Гайды

Data pipelines с Apache Airflow

DAG, Scheduler и Executor, TaskFlow API, идемпотентность backfill и секреты.

~10 мин чтения

Data pipelines с Apache Airflow

Apache Airflow — оркестрация DAG (направленных ациклических графов) задач в Python: Operators, Sensors, Hooks к БД/облаку. Планирование, ретраи, backfill, SLA. Трансформации в warehouse — часто dbtОсновы dbt.


1. DAG

Файл Python в dags/; Scheduler парсит DAG, Executor (Celery/Kubernetes) запускает задачи, Metadata DB хранит состояние.


2. Идиомы

@dag(schedule=..., start_date=...), PythonOperator, TaskGroup, XCom для передачи небольших данных между задачами (осторожно с объёмом).

Минимальный DAG на TaskFlow API (Airflow 2.x):

python
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example"],
)
def etl_demo():
    @task
    def extract():
        return {"rows": 42}

    @task
    def transform(data: dict) -> dict:
        return {"count": data["rows"] * 2}

    @task
    def load(data: dict):
        print("load", data)

    load(transform(extract()))

etl_demo_dag = etl_demo()

Динамически генерируемые DAG из списка источников удобны, но усложняют парсинг планировщиком — держите число DAG-файлов и сложность генерации под контролем, иначе scheduler будет долго обходить dags/.


3. Идемпотентность

Повторный запуск дня (backfill) не должен портить данные; merge вместо слепого append.


4. Airflow 2.x

TaskFlow API@task декораторы, типизированные XCom.


5. Зависимости между DAG и внешние события

ExternalTaskSensor ждёт успех задачи в другом DAG — аккуратно с таймаутами и poke_interval, чтобы не забивать слоты. Для событий извне (файл в GCS, сообщение) — Sensors с reschedule mode вместо длительного оккупирования worker-слота.


6. Пулы, приоритеты и SLA

Pools ограничивают параллелизм по типу ресурса (например, тяжёлые запросы к одному API). priority_weight и queue влияют на порядок при конкуренции. SLA misses — алерты на просроченные задачи.


7. Развёртывание и версии

Один репозиторий с DAG'ами или образ с dags/; синхронизация на scheduler/webserver. Фиксируйте версию Airflow и зависимостей провайдеров; мажорные апгрейды читайте по migration guide (изменения в DB schema metadata).


8. Чек-лист

  • Тесты DAG (dag.test()), CI на импорт ошибок.
  • Ресурсы executor'а под тяжёлые задачи.
  • Секреты через Connections + backend vault.
  • Мониторинг failed tasks и длительности пула.
  • catchup=False там, где исторический прогон DAG нежелателен.
  • Лимиты на размер XCom и тяжёлые объекты вместо передачи через XCom.

Дальше: тег «Data engineering»