Гайды
Асинхронные эндпоинты и asyncpg
Пул asyncpg в lifespan, Depends и acquire, транзакции, параметризация $1, executemany и не блокировать event loop.
~10 мин чтения
Асинхронные эндпоинты и asyncpg
asyncpg — быстрый асинхронный драйвер PostgreSQL без ORM. В связке с FastAPI пул соединений создаётся в lifespan и передаётся через Depends. Основы цикла событий — asyncio в Python. База FastAPI — Быстрый старт; SQL-оптимизация на стороне БД — EXPLAIN ANALYZE в PostgreSQL.
1. Установка
pip install asyncpg
2. Пул в lifespan
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI
pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost:5432/dbname",
min_size=2,
max_size=10,
command_timeout=60,
)
app.state.db = pool
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
3. Зависимость для запроса
from typing import Annotated
from fastapi import Depends, Request
def get_pool(request: Request) -> asyncpg.Pool:
return request.app.state.db
PoolDep = Annotated[asyncpg.Pool, Depends(get_pool)]
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: PoolDep):
async with db.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, email FROM users WHERE id = $1", user_id
)
if not row:
from fastapi import HTTPException
raise HTTPException(404)
return dict(row)
acquire() возвращает соединение в пул после блока async with.
4. Транзакции
async with db.acquire() as conn:
async with conn.transaction():
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, 1)
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, 2)
При исключении — rollback автоматически.
5. Параметризация
Всегда $1, $2, не конкатенация строк — защита от SQL injection.
await conn.fetch("SELECT * FROM items WHERE owner_id = $1 AND status = $2", owner, "active")
6. executemany и копирование
Для массовых вставок — copy_records_to_table или executemany; для очень больших объёмов — COPY через тот же драйвер по документации asyncpg.
7. Init соединения и PgBouncer
init вызывается при выдаче соединения из пула: SET (таймзона, application_name, statement_timeout), при необходимости — регистрация кодеков.
async def init_connection(conn: asyncpg.Connection) -> None:
await conn.execute("SET TIME ZONE 'UTC'")
pool = await asyncpg.create_pool(
dsn,
init=init_connection,
)
При PgBouncer в режиме transaction отключайте или ограничивайте statement_cache_size (server-side prepared statements не дружат с отсоединением после каждой транзакции) — см. заметки в pgBouncer.
8. Не блокировать event loop
Долгий CPU-код или синхронные вызовы БД (psycopg2) внутри async def заблокируют все запросы. Либо весь стек async, либо run_in_executor для legacy.
9. Альтернативы
- SQLAlchemy 2 async +
asyncpg— ORM и миграции Alembic; см. документацию SQLAlchemy 2.0. - encode/databases — тонкая обёртка поверх драйверов.
10. Чек-лист
- Размер пула под нагрузку и лимиты PostgreSQL (
max_connections). - Таймауты
command_timeout. -
init/application_nameдля трассировки вpg_stat_activity. - Обработка
UniqueViolationError→ 409. - Healthcheck с
SELECT 1при старте readiness — см. деплой FastAPI.
Дальше: Dependency Injection · тег FastAPI