Уровень 5 · Асинхронность Глава 17 10 мин

Messaging patterns

Enterprise Integration Patterns (Hohpe & Woolf). Pub/sub vs request-reply vs RPC. RabbitMQ vs Kafka. Схема сообщения как контракт.

TL;DR

  • Sync RPC (HTTP, gRPC) — простой, но связывает время жизни двух сервисов.
  • Async pub/sub — свобода в обмен на eventual consistency.
  • "Exactly once" — миф. Есть "at-least-once + идемпотентный consumer".
  • RabbitMQ и Kafka — разные модели. Первый — работа-как-очередь, второй — журнал событий.

Три способа общаться

Сервисы могут общаться тремя способами:

    Sync RPC (HTTP/gRPC). Клиент вызывает, ждёт ответа. Время жизни клиента и сервера пересекается.

    Async request-reply. Клиент отправляет запрос, продолжает работу. Ответ приходит позже — через callback, poll, или отдельное сообщение.

    Pub/Sub (event-driven). Один сервис публикует событие, кто-то (или никто) на него реагирует. Публикующий не знает про consumer’ов.

Каждый способ решает разные задачи. Выбор — не про технологию, а про семантику взаимодействия.

Sync RPC: простой, но связывает

client                     server
  │─── POST /orders ───────▶│
  │                          │ ... 200ms работы
  │◀── 201 Created ─────────│

Плюсы:

  • Простая ментальная модель. Как локальный вызов функции.
  • Прямая обратная связь: успех/ошибка немедленно.
  • Легко отлаживать: request ↔ response.

Минусы:

  • Оба сервиса должны быть живы. Если server упал — client получает 5xx.
  • Latency каскадируется. A → B → C → D: сумма всех задержек.
  • Backpressure не работает. Если server медленный, клиенты копятся, потом ложится server.
  • Tight coupling во времени. Client ждёт response — блокировка ресурса.

Когда применять: внутренние вызовы, где нужен немедленный ответ и системы контролируются одной командой.

Async pub/sub: свобода за eventual consistency

publisher              broker                subscriber
   │─── OrderPlaced ────▶│
   │                      │─── OrderPlaced ────▶│
   │                      │                      │ обработка когда угодно
   │                      │                      │

Плюсы:

  • Publisher не знает про consumer’ов. Один event → 0/1/N подписчиков.
  • Асинхронно. Publisher не ждёт обработки.
  • Отказоустойчиво. Consumer лежит — сообщения ждут в очереди.
  • Легко добавлять функциональность. Новый обработчик = новый subscriber, publisher не трогаем.

Минусы:

  • Eventual consistency. Между публикацией и обработкой — окно неопределённости.
  • Сложнее отлаживать. Trace через несколько сервисов.
  • Схема event = контракт. Изменение ломает consumer’ов.

Когда применять: между bounded contexts, при eventual consistency между aggregate, для событий с множеством подписчиков.

Request-reply async

Компромисс: async в транспорте, но семантически запрос-ответ.

client                     queue                     server
  │─ Request(id=1, replyTo=q_c) ─▶│
  │                                  │──────────▶│ обработка
  │                                  │            │
  │◀── Response(corr_id=1) ─── q_c ◀│◀───────────│

Используется, когда:

  • Клиент не готов ждать в течение всей обработки.
  • Нужен ответ, но не сразу.
  • Инфраструктура — очереди (RabbitMQ), а не HTTP.

Реализуется через correlation_id и reply-to очередь. Клиент подписан на свою очередь, ищет ответ с нужным correlation_id.

Delivery guarantees

Три уровня доставки — по возрастанию сложности:

Определение At-most-once

Сообщение доставляется 0 или 1 раз. Возможна потеря, но не дубли.

Определение At-least-once

Сообщение доставляется 1 или больше раз. Возможны дубли, но не потери.

Определение Exactly-once

Сообщение доставляется ровно 1 раз. Практически недостижимо в распределённой системе.

Effectively-once is achievable through combining at-least-once delivery with idempotent processing.

Martin Kleppmann Designing Data-Intensive Applications, 2017

Т.е. exactly-once — миф. То, что называют «exactly-once» в брокерах (Kafka EOS) — это на самом деле «at-least-once + идемпотентная обработка на нашей стороне».

Идемпотентность consumer’а

При at-least-once consumer иногда получает одно и то же сообщение дважды (retry после timeout, network partition, брокер перезапустился). Если обработчик не идемпотентен — двойная запись, двойной email, двойное списание.

Идемпотентность: обработка одного и того же сообщения несколько раз даёт тот же результат, что одна.

async def on_payment_charged(event: PaymentCharged) -> None:
    # проверяем: обрабатывали ли уже?
    if await self._processed.exists(event.event_id):        
        return

    async with self._uow:
        order = await self._orders.find_by_id(event.order_id)
        order.mark_paid(event.payment_id)
        await self._orders.save(order)

        # атомарно записываем идемпотентный ключ
        await self._processed.add(event.event_id)           

Проверка + запись должны быть в одной транзакции — иначе гонка между проверкой и записью пропустит дубль.

Альтернатива: natural keys. Если payment_id — уникальный ID платежа, UNIQUE INDEX в БД сам обеспечит идемпотентность на INSERT.

RabbitMQ vs Kafka

Два самых популярных брокера. Разные модели.

RabbitMQ — очереди

  • Push-модель. Broker сам доставляет сообщение подписчику.
  • Ack/nack. Consumer подтверждает обработку, брокер удаляет сообщение.
  • Multiple consumers одной очереди — round-robin (work queue).
  • Complex routing. Exchanges, bindings, routing keys.
  • Message TTL, dead-letter. Богатая работа с ошибками.

Когда: command-driven, work distribution, task queues, request-reply.

Kafka — журнал событий

  • Pull-модель. Consumer сам читает из log.
  • Persistent log. Сообщения хранятся долго (retention), можно прочитать заново.
  • Partitioning. Consumer group распределяет partition’ы между instances.
  • Ordering по partition. Внутри partition — строгий порядок.
  • Event sourcing / replay. Полная история изменений.

Когда: event-driven, event sourcing, аналитика, stream processing, replay.

Схема сообщения — контракт

Схема event — не «наш DTO», это публичный контракт между сервисами. Изменение = potentially breaking change.

Правила:

    Явное версионирование. OrderPlacedV1, OrderPlacedV2 — разные типы. Не «просто обновили поле».

    Обратная совместимость. Новый producer должен работать со старым consumer. Добавление опционального поля — ок, удаление — ломает.

    Форматы: JSON Schema, Protobuf, Avro. Что-то формальное, что можно validate. Не «мы знаем формат в голове».

    Contract testing. Consumer публикует свои ожидания. Producer проверяет, что не ломает их. Инструменты: Pact.

Anti-patterns

1. Sync RPC вместо events для eventual операций

async def on_order_placed(event):
    await email_service.send(event.customer_id, template='order_placed')   # !!    

email_service.send() — sync HTTP-вызов. Если email-сервис лёг, order-обработчик тоже упадёт. Каскадное падение.

Правильно: публикация event OrderPlaced → email-service подписан отдельно → пришёл сбой, retry на своей стороне.

2. Publisher знает про consumer’ов

async def place_order(cmd):
    order = Order.place(cmd)
    await orders.save(order)

    # publisher явно знает, кого позвать
    await email_service.notify(order)              # !!  
    await inventory_service.reserve(order.items)   # !!  
    await analytics.track(order)                   # !!  

Смысл events пропадает — вместо publish/subscribe получается набор прямых вызовов.

Правильно: await events.publish(OrderPlaced(...)). Consumer’ы подписываются независимо.

3. Chatty microservices

client → A: 1 запрос
    A → B: 5 запросов (по одному per item)
        B → C: 3 запроса каждый

15+ сетевых вызовов на один пользовательский запрос. Latency + failures + сложность отладки.

Правильно:

  • Batch API (B.getMany(ids)).
  • CQRS с денормализованным read model.
  • События вместо polling.

4. Игнорирование ordering

Consumer’ы обрабатывают события параллельно. Если события OrderCreated и OrderConfirmed для одного заказа обрабатываются в неверном порядке — попытка подтвердить несуществующий заказ.

Правильно:

  • Kafka: партиционирование по order_id — события одного aggregate идут в одну партицию строго по порядку.
  • RabbitMQ: consistent hashing exchange или single consumer per aggregate.

5. Отсутствие DLQ (dead-letter queue)

Сообщение отработать нельзя (bad payload, permanent failure). Retry в цикле → блокировка очереди → задержка обработки других сообщений.

Правильно: N попыток → DLQ. Сообщение переходит в отдельную очередь, обычная работа продолжается. DLQ мониторится и разбирается вручную.

Trade-offs

СитуацияSync RPCAsync pub/sub
Немедленный ответ клиенту
Между bounded contexts
Один запрос → множество реакций
Простая ментальная модель
Отказоустойчивость к сбоям consumer
Полная история для аудита / replay✓ (event log)
Строгий порядок обработки(сложнее)

В твоём же коде

Анонимизированный микросервис использует RabbitMQ + FastStream — правильный выбор для задачи. Сервис-обработчик получает OrderItemMessage, обрабатывает, публикует StandardizedItem.

Слабые места:

  • Нет DLQ. Failed сообщения не выделяются, застревают в retry.
  • Нет идемпотентности. Один и тот же item_id может обработаться дважды (RabbitMQ дал redelivery) → дубль в БД + дублирующее событие downstream.
  • Dual-write между БД и брокером без Outbox — см. следующую главу.

Дальнейшее чтение

  • Gregor Hohpe, Bobby Woolf. Enterprise Integration Patterns. Каталог messaging-паттернов. Классика.
  • Chris Richardson. Microservices Patterns. Chapters 3, 4, 6 — messaging patterns в микросервисах.
  • Sam Newman. Building Microservices (2nd Edition). Chapters 4-5 — inter-service communication.
  • Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — потоки, доставка, идемпотентность.
  • microservices.io. Каталог паттернов от Richardson.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Exactly-once delivery в распределённой системе — это...

  2. Q2. Основное отличие RabbitMQ от Kafka:

  3. Q3. Что не так с sync HTTP-вызовом внутри обработчика события?

  4. Q4. Идемпотентность consumer'а — это...

  5. Q5. Dead-letter queue нужна для...