Гайды

Асинхронные эндпоинты и asyncpg

Пул asyncpg в lifespan, Depends и acquire, транзакции, параметризация $1, executemany и не блокировать event loop.

~10 мин чтения

Асинхронные эндпоинты и asyncpg

asyncpg — быстрый асинхронный драйвер PostgreSQL без ORM. В связке с FastAPI пул соединений создаётся в lifespan и передаётся через Depends. Основы цикла событий — asyncio в Python. База FastAPI — Быстрый старт; SQL-оптимизация на стороне БД — EXPLAIN ANALYZE в PostgreSQL.


1. Установка

bash
pip install asyncpg

2. Пул в lifespan

python
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI

pool: asyncpg.Pool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost:5432/dbname",
        min_size=2,
        max_size=10,
        command_timeout=60,
    )
    app.state.db = pool
    yield
    await pool.close()

app = FastAPI(lifespan=lifespan)

3. Зависимость для запроса

python
from typing import Annotated
from fastapi import Depends, Request

def get_pool(request: Request) -> asyncpg.Pool:
    return request.app.state.db

PoolDep = Annotated[asyncpg.Pool, Depends(get_pool)]

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: PoolDep):
    async with db.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT id, email FROM users WHERE id = $1", user_id
        )
    if not row:
        from fastapi import HTTPException
        raise HTTPException(404)
    return dict(row)

acquire() возвращает соединение в пул после блока async with.


4. Транзакции

python
async with db.acquire() as conn:
    async with conn.transaction():
        await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, 1)
        await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, 2)

При исключении — rollback автоматически.


5. Параметризация

Всегда $1, $2, не конкатенация строк — защита от SQL injection.

python
await conn.fetch("SELECT * FROM items WHERE owner_id = $1 AND status = $2", owner, "active")

6. executemany и копирование

Для массовых вставок — copy_records_to_table или executemany; для очень больших объёмов — COPY через тот же драйвер по документации asyncpg.


7. Init соединения и PgBouncer

init вызывается при выдаче соединения из пула: SET (таймзона, application_name, statement_timeout), при необходимости — регистрация кодеков.

python
async def init_connection(conn: asyncpg.Connection) -> None:
    await conn.execute("SET TIME ZONE 'UTC'")

pool = await asyncpg.create_pool(
    dsn,
    init=init_connection,
)

При PgBouncer в режиме transaction отключайте или ограничивайте statement_cache_size (server-side prepared statements не дружат с отсоединением после каждой транзакции) — см. заметки в pgBouncer.


8. Не блокировать event loop

Долгий CPU-код или синхронные вызовы БД (psycopg2) внутри async def заблокируют все запросы. Либо весь стек async, либо run_in_executor для legacy.


9. Альтернативы

  • SQLAlchemy 2 async + asyncpg — ORM и миграции Alembic; см. документацию SQLAlchemy 2.0.
  • encode/databases — тонкая обёртка поверх драйверов.

10. Чек-лист

  • Размер пула под нагрузку и лимиты PostgreSQL (max_connections).
  • Таймауты command_timeout.
  • init / application_name для трассировки в pg_stat_activity.
  • Обработка UniqueViolationError → 409.
  • Healthcheck с SELECT 1 при старте readiness — см. деплой FastAPI.

Дальше: Dependency Injection · тег FastAPI