Гайды

Celery: распределённые очереди задач

Брокер и backend результатов, минимальная настройка, идемпотентность, маршрутизация очередей и настройки воркера.

~11 мин чтения

Celery: распределённые очереди задач

Celery — распределённая очередь задач для Python: воркеры забирают сообщения из брокера (Redis, RabbitMQ и др.), выполняют функции (tasks) и опционально сохраняют результат в backend результатов. Периодика — Celery Beat; мониторинг — Flower. Для сравнения брокеров см. RabbitMQ vs Kafka и Redis: кеш и структуры данных.


1. Компоненты

КомпонентРоль
ApplicationЭкземпляр Celery с конфигом и регистрацией задач
BrokerОчередь сообщений (Redis/RabbitMQ)
WorkerПроцесс, выполняющий задачи
Result backendХранение return-значений (Redis, RPC, disabled)

2. Минимальная настройка

bash
pip install celery[redis]

celery_app.py:

python
from celery import Celery

app = Celery(
    "proj",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["proj.tasks"],
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
)

tasks.py:

python
from .celery_app import app

@app.task
def add(x: int, y: int) -> int:
    return x + y

Запуск воркера:

bash
celery -A proj.celery_app worker --loglevel=info --concurrency=4

Вызов:

python
from proj.tasks import add
r = add.delay(2, 3)       # асинхронно
r.get(timeout=10)       # блокирующее ожидание результата
add.apply_async((2, 3), countdown=60)  # отложенный старт

3. Идемпотентность и ретраи

Задачи at-least-once: при таймауте брокер может доставить снова. Делайте обработку идемпотентной (уникальные ключи в БД, upsert).

python
@app.task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
def fetch_url(self, url: str):
    ...

4. Маршрутизация и очереди

python
app.conf.task_routes = {
    "proj.tasks.heavy_*": {"queue": "heavy"},
    "proj.tasks.light_*": {"queue": "light"},
}

Запуск воркера только на очередь: celery -A proj worker -Q heavy.


5. Важные настройки

  • task_acks_late=True — ack после выполнения (при падении воркера задача вернётся в очередь).
  • worker_prefetch_multiplier — сколько задач «заранее» забирать с брокера (для длинных задач ставьте 1).
  • task_time_limit / task_soft_time_limit — защита от зависаний.

6. Django / FastAPI

  • Django: django-celery-results, shared settings, CELERY_BROKER_URL в env.
  • FastAPI: отдельный worker-процесс; не блокируйте event loop тяжёлым Celery внутри async def — вызывайте .delay() и сразу отвечайте клиенту. См. asyncio и asyncpg.

7. Canvas: цепочки и хор

chain(a.s(), b.s()) выполняет задачи последовательно; group(...) — параллельно; chord(header)(callback) ждёт завершения группы и запускает финальную задачу с результатами — удобно для map-reduce внутри проекта без отдельного оркестратора.

python
from celery import chain, group, chord

workflow = chain(fetch.s(url), parse.s(), store.s())
workflow.apply_async()

Не передавайте в аргументах ORM-объекты и открытые файлы — только сериализуемые данные (id, пути, S3 URI).


8. Чек-лист

  • Брокер с персистентностью и репликацией для продакшена.
  • Результаты отключены, если не нужны (result_backend=None).
  • Логирование task_id для трассировки.
  • Отдельные очереди под приоритет и изоляцию нагрузки.

Дальше: Beat · Flower · тег Celery