Гайды
Celery: распределённые очереди задач
Брокер и backend результатов, минимальная настройка, идемпотентность, маршрутизация очередей и настройки воркера.
~11 мин чтения
Celery: распределённые очереди задач
Celery — распределённая очередь задач для Python: воркеры забирают сообщения из брокера (Redis, RabbitMQ и др.), выполняют функции (tasks) и опционально сохраняют результат в backend результатов. Периодика — Celery Beat; мониторинг — Flower. Для сравнения брокеров см. RabbitMQ vs Kafka и Redis: кеш и структуры данных.
1. Компоненты
| Компонент | Роль |
|---|---|
| Application | Экземпляр Celery с конфигом и регистрацией задач |
| Broker | Очередь сообщений (Redis/RabbitMQ) |
| Worker | Процесс, выполняющий задачи |
| Result backend | Хранение return-значений (Redis, RPC, disabled) |
2. Минимальная настройка
pip install celery[redis]
celery_app.py:
from celery import Celery
app = Celery(
"proj",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["proj.tasks"],
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)
tasks.py:
from .celery_app import app
@app.task
def add(x: int, y: int) -> int:
return x + y
Запуск воркера:
celery -A proj.celery_app worker --loglevel=info --concurrency=4
Вызов:
from proj.tasks import add
r = add.delay(2, 3) # асинхронно
r.get(timeout=10) # блокирующее ожидание результата
add.apply_async((2, 3), countdown=60) # отложенный старт
3. Идемпотентность и ретраи
Задачи at-least-once: при таймауте брокер может доставить снова. Делайте обработку идемпотентной (уникальные ключи в БД, upsert).
@app.task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
def fetch_url(self, url: str):
...
4. Маршрутизация и очереди
app.conf.task_routes = {
"proj.tasks.heavy_*": {"queue": "heavy"},
"proj.tasks.light_*": {"queue": "light"},
}
Запуск воркера только на очередь: celery -A proj worker -Q heavy.
5. Важные настройки
task_acks_late=True— ack после выполнения (при падении воркера задача вернётся в очередь).worker_prefetch_multiplier— сколько задач «заранее» забирать с брокера (для длинных задач ставьте 1).task_time_limit/task_soft_time_limit— защита от зависаний.
6. Django / FastAPI
- Django:
django-celery-results, shared settings,CELERY_BROKER_URLв env. - FastAPI: отдельный worker-процесс; не блокируйте event loop тяжёлым Celery внутри
async def— вызывайте.delay()и сразу отвечайте клиенту. См. asyncio и asyncpg.
7. Canvas: цепочки и хор
chain(a.s(), b.s()) выполняет задачи последовательно; group(...) — параллельно; chord(header)(callback) ждёт завершения группы и запускает финальную задачу с результатами — удобно для map-reduce внутри проекта без отдельного оркестратора.
from celery import chain, group, chord
workflow = chain(fetch.s(url), parse.s(), store.s())
workflow.apply_async()
Не передавайте в аргументах ORM-объекты и открытые файлы — только сериализуемые данные (id, пути, S3 URI).
8. Чек-лист
- Брокер с персистентностью и репликацией для продакшена.
- Результаты отключены, если не нужны (
result_backend=None). - Логирование
task_idдля трассировки. - Отдельные очереди под приоритет и изоляцию нагрузки.
Дальше: Beat · Flower · тег Celery