Sync vs Async: где граница
Асинхронность в Python. Когда async оправдан, когда нет. Event loop, structured concurrency, blocking-код в async — типичные ошибки.
TL;DR
- async ≠ параллелизм. asyncio — одна нить. GIL никуда не делся.
- async оправдан для I/O-bound задач с большим числом ожиданий. Для CPU-bound — не подходит.
- Structured concurrency (TaskGroup, anyio) сильно снижает сложность управления задачами.
- CancelledError — не exception в обычном смысле. Игнорирование = потерянный cancel.
Что решает async
Классический сценарий: сервис делает 10 HTTP-запросов подряд, каждый по 200ms. Синхронно — 2 секунды суммарно. Async — 200ms суммарно (все запросы конкурентно).
Магии нет: пока один запрос ждёт ответа от сети, event loop переключается на другой. Всё в одной нити.
Цикл, который управляет корутинами: пока одна ожидает I/O, запускает следующую готовую. Одна нить, множество задач в параллель по времени.
Важно: async — про concurrency (одновременное выполнение по времени), не про parallelism (одновременное выполнение по CPU). Настоящий параллелизм в Python — через multiprocessing, subprocess или C-расширения.
Когда async оправдан
I/O-bound с высоким числом ожиданий. HTTP-клиент к внешним API, БД, брокер сообщений. Пока идёт сеть — событийный цикл делает полезную работу.
Множество конкурентных подключений. WebSocket-сервер, чат, real-time API. 10k коннекций — норма на одном процессе.
Задачи с fan-out. Один запрос порождает несколько зависимых. asyncio.gather — 5 запросов параллельно, ждём максимум одного.
Когда async НЕ оправдан
CPU-bound. Обработка изображений, ML-инференс, шифрование, парсинг больших файлов. Async не даёт параллелизма — только замедляет из-за overhead event loop.
Скрипт-однодневка. 100 строк процедурного кода. Sync проще пишется, отлаживается, поддерживается.
Внутренний инструмент с одним пользователем. CLI, cron-задача. Async — оверхэд без выгоды.
Legacy-кодовая база на sync с одной async-задачей. Смешение sync и async — источник багов. Либо всё sync, либо всё async.
Как правильно: базовый async
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url)
response.raise_for_status()
return response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [fetch(client, url) for url in urls]
return await asyncio.gather(*tasks)
# запуск:
asyncio.run(fetch_all(['https://api1', 'https://api2', ...]))
Ключевые моменты:
async def= корутина, возвращаетCoroutine, не результат.await— точка, где event loop может переключиться.asyncio.gather(*tasks)— параллельный запуск, ждём всех.asyncio.run(...)— точка входа программы.
Structured Concurrency
Подход, при котором время жизни всех запущенных задач ограничено видимой областью кода. Задача, не завершившаяся к выходу из области — либо отменяется, либо блок ждёт её.
Python 3.11+ ввёл TaskGroup:
import asyncio
async def process_items(items):
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(handle_item(item))
# к этой точке все задачи гарантированно завершились
Преимущества над asyncio.gather:
- Отмена: если одна задача упала, остальные автоматически отменяются.
- Exception grouping: несколько ошибок агрегируются в
ExceptionGroup. - Ресурсы освобождаются при выходе — никаких «забытых» задач.
▸ anyio для более старого Python
До 3.11 (или для универсальности между asyncio и trio) — anyio даёт create_task_group() с той же семантикой:
import anyio
async def process_items(items):
async with anyio.create_task_group() as tg:
for item in items:
tg.start_soon(handle_item, item)Работает на 3.8+, портирует TaskGroup обратно.
Timeouts и cancellation
asyncio.timeout() — стандартный способ ограничить время выполнения:
async def call_with_deadline(url: str) -> dict:
try:
async with asyncio.timeout(3.0):
return await fetch(url)
except asyncio.TimeoutError:
logger.warning('timeout for %s', url)
raise
При таймауте CancelledError пробрасывается в корутину. Если она обрабатывается try/except Exception — cancel может быть потерян.
Blocking-код в async — типичные ошибки
Одна time.sleep(1) в async-функции блокирует весь event loop на секунду. Все параллельные корутины стоят.
Симптомы:
- Задачи, которые должны параллельно, идут последовательно.
- Один клиент замораживает всех.
- P99 latency скачет непредсказуемо.
Что блокирует event loop:
time.sleep()вместоawait asyncio.sleep().requestsвместоhttpx/aiohttp.- Синхронные драйверы БД (
psycopg2вместоasyncpg). - CPU-bound код (парсинг, шифрование, ML).
open()/read()больших файлов.
Как обходить:
- Синхронный I/O →
asyncio.to_thread(func, *args). Запускает в отдельной нити. - CPU-bound →
ProcessPoolExecutorчерезloop.run_in_executor. - Мониторинг:
asynciodebug mode — логирует блокирующие корутины.
async def call_legacy_sync_api(payload):
# sync-функция → offload в thread pool
result = await asyncio.to_thread(legacy_sync_call, payload)
return result
Sync внутри async: контекстные утечки
async def handle_request(request):
user = await get_user(request.user_id)
log_metric(user) # sync, но быстро — вроде ок?
log_metric открывает файл, синхронно пишет — 1-2ms. На запрос не критично. Но если под нагрузкой 10к RPS — эти 1-2ms суммируются, event loop замирает, throughput падает.
Правило: любой блокирующий вызов в async-функции — источник латентности. Даже быстрый.
Как не надо
1. Sync-запрос в async-функции
async def handler(request):
data = requests.get('https://api').json() # !!
return process(data)
requests блокирует. Все параллельные обработчики ждут.
Правильно: httpx.AsyncClient + await client.get(...).
2. Забытый await
async def bad():
task = asyncio.create_task(work()) # запущено, но не ждём
return 'done' # task может не завершиться
Fire-and-forget задача переживает функцию. Ошибка внутри неё не пробросится.
Правильно: TaskGroup или явное ожидание await task.
3. Слишком много одновременных задач
async def download_many(urls):
async with httpx.AsyncClient() as client:
return await asyncio.gather(*[client.get(u) for u in urls]) # 10k urls
10k одновременных соединений съест ресурсы. Сервер отдающий тоже.
Правильно: семафор для ограничения concurrency:
async def download_many(urls, max_concurrent=20):
sem = asyncio.Semaphore(max_concurrent)
async def bounded(url):
async with sem:
return await client.get(url)
async with httpx.AsyncClient() as client:
return await asyncio.gather(*[bounded(u) for u in urls])
4. Смешение asyncio.run и уже работающего event loop
async def outer():
asyncio.run(inner()) # !! RuntimeError: cannot be called
asyncio.run() создаёт новый event loop. Внутри уже работающего — ошибка.
Правильно: просто await inner().
5. Обработка CancelledError без re-raise
Уже разбирали. except Exception может съесть отмену.
Trade-offs
| Ситуация | Async оправдан | Sync |
|---|---|---|
| HTTP-сервер с 1000+ RPS | ✓ | |
| Много одновременных внешних API | ✓ | |
| WebSocket / SSE / real-time | ✓ | |
| CPU-heavy обработка | ✓ (или multiprocessing) | |
| Cron-задача 1 раз в сутки | ✓ | |
| CLI-инструмент | ✓ | |
| Легаси-код на sync, нужно 1 async-вызов | ✓ (не смешивать) |
В твоём же коде
Анонимизированный микросервис — правильный кандидат на async:
- Получает сообщение из RabbitMQ (I/O).
- Вызывает marketplace-api (I/O).
- Пишет в PostgreSQL через asyncpg (I/O).
- Публикует событие (I/O).
Всё I/O-bound, async полностью оправдан. Стек: FastStream + httpx + SQLAlchemy async + asyncpg.
Одна вещь, которая заслуживает внимания:
- Использование
NullPoolдля PgBouncer означает создание нового соединения на каждый запрос. Экономит memory на серверах, но увеличивает latency. При росте нагрузки может потребоваться пересмотр.
Дальнейшее чтение
- Raymond Hettinger. Fear and Awaiting in Async. 2022. Обязательный доклад для понимания асинхронности в Python.
- Nathaniel J. Smith. Notes on structured concurrency. Оригинал идеи structured concurrency, автор trio.
- Guido van Rossum. PEP 3156: asyncio. Мотивация и дизайн.
- Python Docs. asyncio. Официальная документация.
- anyio. Documentation. Универсальный API поверх asyncio и trio.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. asyncio позволяет...
Q2. Что не так с except Exception внутри async-функции с точки зрения отмены?
Q3. Что делает asyncio.TaskGroup лучше asyncio.gather?
Q4. В async-функции вызов time.sleep(1) — это...
Q5. CPU-bound задача (например, парсинг большого XML) в async-сервисе — как правильно?