Уровень 5 · Асинхронность Глава 16 9 мин

Sync vs Async: где граница

Асинхронность в Python. Когда async оправдан, когда нет. Event loop, structured concurrency, blocking-код в async — типичные ошибки.

TL;DR

  • async ≠ параллелизм. asyncio — одна нить. GIL никуда не делся.
  • async оправдан для I/O-bound задач с большим числом ожиданий. Для CPU-bound — не подходит.
  • Structured concurrency (TaskGroup, anyio) сильно снижает сложность управления задачами.
  • CancelledError — не exception в обычном смысле. Игнорирование = потерянный cancel.

Что решает async

Классический сценарий: сервис делает 10 HTTP-запросов подряд, каждый по 200ms. Синхронно — 2 секунды суммарно. Async — 200ms суммарно (все запросы конкурентно).

Магии нет: пока один запрос ждёт ответа от сети, event loop переключается на другой. Всё в одной нити.

Определение Event loop (цикл событий)

Цикл, который управляет корутинами: пока одна ожидает I/O, запускает следующую готовую. Одна нить, множество задач в параллель по времени.

Важно: async — про concurrency (одновременное выполнение по времени), не про parallelism (одновременное выполнение по CPU). Настоящий параллелизм в Python — через multiprocessing, subprocess или C-расширения.

Когда async оправдан

    I/O-bound с высоким числом ожиданий. HTTP-клиент к внешним API, БД, брокер сообщений. Пока идёт сеть — событийный цикл делает полезную работу.

    Множество конкурентных подключений. WebSocket-сервер, чат, real-time API. 10k коннекций — норма на одном процессе.

    Задачи с fan-out. Один запрос порождает несколько зависимых. asyncio.gather — 5 запросов параллельно, ждём максимум одного.

Когда async НЕ оправдан

    CPU-bound. Обработка изображений, ML-инференс, шифрование, парсинг больших файлов. Async не даёт параллелизма — только замедляет из-за overhead event loop.

    Скрипт-однодневка. 100 строк процедурного кода. Sync проще пишется, отлаживается, поддерживается.

    Внутренний инструмент с одним пользователем. CLI, cron-задача. Async — оверхэд без выгоды.

    Legacy-кодовая база на sync с одной async-задачей. Смешение sync и async — источник багов. Либо всё sync, либо всё async.

Как правильно: базовый async

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=10.0) as client:
        tasks = [fetch(client, url) for url in urls]
        return await asyncio.gather(*tasks)

# запуск:
asyncio.run(fetch_all(['https://api1', 'https://api2', ...]))

Ключевые моменты:

  • async def = корутина, возвращает Coroutine, не результат.
  • await — точка, где event loop может переключиться.
  • asyncio.gather(*tasks) — параллельный запуск, ждём всех.
  • asyncio.run(...) — точка входа программы.

Structured Concurrency

Определение Structured concurrency (структурная конкурентность)

Подход, при котором время жизни всех запущенных задач ограничено видимой областью кода. Задача, не завершившаяся к выходу из области — либо отменяется, либо блок ждёт её.

Python 3.11+ ввёл TaskGroup:

import asyncio

async def process_items(items):
    async with asyncio.TaskGroup() as tg:              
        for item in items:
            tg.create_task(handle_item(item))
    # к этой точке все задачи гарантированно завершились

Преимущества над asyncio.gather:

  • Отмена: если одна задача упала, остальные автоматически отменяются.
  • Exception grouping: несколько ошибок агрегируются в ExceptionGroup.
  • Ресурсы освобождаются при выходе — никаких «забытых» задач.
▸ anyio для более старого Python

До 3.11 (или для универсальности между asyncio и trio) — anyio даёт create_task_group() с той же семантикой:

import anyio

async def process_items(items):
    async with anyio.create_task_group() as tg:
        for item in items:
            tg.start_soon(handle_item, item)

Работает на 3.8+, портирует TaskGroup обратно.

Timeouts и cancellation

asyncio.timeout() — стандартный способ ограничить время выполнения:

async def call_with_deadline(url: str) -> dict:
    try:
        async with asyncio.timeout(3.0):                
            return await fetch(url)
    except asyncio.TimeoutError:
        logger.warning('timeout for %s', url)
        raise

При таймауте CancelledError пробрасывается в корутину. Если она обрабатывается try/except Exception — cancel может быть потерян.

Blocking-код в async — типичные ошибки

Одна time.sleep(1) в async-функции блокирует весь event loop на секунду. Все параллельные корутины стоят.

Симптомы:

  • Задачи, которые должны параллельно, идут последовательно.
  • Один клиент замораживает всех.
  • P99 latency скачет непредсказуемо.

Что блокирует event loop:

  • time.sleep() вместо await asyncio.sleep().
  • requests вместо httpx/aiohttp.
  • Синхронные драйверы БД (psycopg2 вместо asyncpg).
  • CPU-bound код (парсинг, шифрование, ML).
  • open()/read() больших файлов.

Как обходить:

  • Синхронный I/O → asyncio.to_thread(func, *args). Запускает в отдельной нити.
  • CPU-bound → ProcessPoolExecutor через loop.run_in_executor.
  • Мониторинг: asyncio debug mode — логирует блокирующие корутины.
async def call_legacy_sync_api(payload):
    # sync-функция → offload в thread pool
    result = await asyncio.to_thread(legacy_sync_call, payload)     
    return result

Sync внутри async: контекстные утечки

async def handle_request(request):
    user = await get_user(request.user_id)
    log_metric(user)                    # sync, но быстро — вроде ок?  

log_metric открывает файл, синхронно пишет — 1-2ms. На запрос не критично. Но если под нагрузкой 10к RPS — эти 1-2ms суммируются, event loop замирает, throughput падает.

Правило: любой блокирующий вызов в async-функции — источник латентности. Даже быстрый.

Как не надо

1. Sync-запрос в async-функции

async def handler(request):
    data = requests.get('https://api').json()     # !!    
    return process(data)

requests блокирует. Все параллельные обработчики ждут.

Правильно: httpx.AsyncClient + await client.get(...).

2. Забытый await

async def bad():
    task = asyncio.create_task(work())    # запущено, но не ждём  
    return 'done'                          # task может не завершиться

Fire-and-forget задача переживает функцию. Ошибка внутри неё не пробросится.

Правильно: TaskGroup или явное ожидание await task.

3. Слишком много одновременных задач

async def download_many(urls):
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(*[client.get(u) for u in urls])   # 10k urls

10k одновременных соединений съест ресурсы. Сервер отдающий тоже.

Правильно: семафор для ограничения concurrency:

async def download_many(urls, max_concurrent=20):
    sem = asyncio.Semaphore(max_concurrent)                 

    async def bounded(url):
        async with sem:
            return await client.get(url)

    async with httpx.AsyncClient() as client:
        return await asyncio.gather(*[bounded(u) for u in urls])

4. Смешение asyncio.run и уже работающего event loop

async def outer():
    asyncio.run(inner())               # !! RuntimeError: cannot be called   

asyncio.run() создаёт новый event loop. Внутри уже работающего — ошибка.

Правильно: просто await inner().

5. Обработка CancelledError без re-raise

Уже разбирали. except Exception может съесть отмену.

Trade-offs

СитуацияAsync оправданSync
HTTP-сервер с 1000+ RPS
Много одновременных внешних API
WebSocket / SSE / real-time
CPU-heavy обработка✓ (или multiprocessing)
Cron-задача 1 раз в сутки
CLI-инструмент
Легаси-код на sync, нужно 1 async-вызов✓ (не смешивать)

В твоём же коде

Анонимизированный микросервис — правильный кандидат на async:

  • Получает сообщение из RabbitMQ (I/O).
  • Вызывает marketplace-api (I/O).
  • Пишет в PostgreSQL через asyncpg (I/O).
  • Публикует событие (I/O).

Всё I/O-bound, async полностью оправдан. Стек: FastStream + httpx + SQLAlchemy async + asyncpg.

Одна вещь, которая заслуживает внимания:

  • Использование NullPool для PgBouncer означает создание нового соединения на каждый запрос. Экономит memory на серверах, но увеличивает latency. При росте нагрузки может потребоваться пересмотр.

Дальнейшее чтение

  • Raymond Hettinger. Fear and Awaiting in Async. 2022. Обязательный доклад для понимания асинхронности в Python.
  • Nathaniel J. Smith. Notes on structured concurrency. Оригинал идеи structured concurrency, автор trio.
  • Guido van Rossum. PEP 3156: asyncio. Мотивация и дизайн.
  • Python Docs. asyncio. Официальная документация.
  • anyio. Documentation. Универсальный API поверх asyncio и trio.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. asyncio позволяет...

  2. Q2. Что не так с except Exception внутри async-функции с точки зрения отмены?

  3. Q3. Что делает asyncio.TaskGroup лучше asyncio.gather?

  4. Q4. В async-функции вызов time.sleep(1) — это...

  5. Q5. CPU-bound задача (например, парсинг большого XML) в async-сервисе — как правильно?