Messaging patterns
Enterprise Integration Patterns (Hohpe & Woolf). Pub/sub vs request-reply vs RPC. RabbitMQ vs Kafka. Схема сообщения как контракт.
TL;DR
- Sync RPC (HTTP, gRPC) — простой, но связывает время жизни двух сервисов.
- Async pub/sub — свобода в обмен на eventual consistency.
- "Exactly once" — миф. Есть "at-least-once + идемпотентный consumer".
- RabbitMQ и Kafka — разные модели. Первый — работа-как-очередь, второй — журнал событий.
Три способа общаться
Сервисы могут общаться тремя способами:
Sync RPC (HTTP/gRPC). Клиент вызывает, ждёт ответа. Время жизни клиента и сервера пересекается.
Async request-reply. Клиент отправляет запрос, продолжает работу. Ответ приходит позже — через callback, poll, или отдельное сообщение.
Pub/Sub (event-driven). Один сервис публикует событие, кто-то (или никто) на него реагирует. Публикующий не знает про consumer’ов.
Каждый способ решает разные задачи. Выбор — не про технологию, а про семантику взаимодействия.
Sync RPC: простой, но связывает
client server
│─── POST /orders ───────▶│
│ │ ... 200ms работы
│◀── 201 Created ─────────│
│
Плюсы:
- Простая ментальная модель. Как локальный вызов функции.
- Прямая обратная связь: успех/ошибка немедленно.
- Легко отлаживать: request ↔ response.
Минусы:
- Оба сервиса должны быть живы. Если server упал — client получает 5xx.
- Latency каскадируется. A → B → C → D: сумма всех задержек.
- Backpressure не работает. Если server медленный, клиенты копятся, потом ложится server.
- Tight coupling во времени. Client ждёт response — блокировка ресурса.
Когда применять: внутренние вызовы, где нужен немедленный ответ и системы контролируются одной командой.
Async pub/sub: свобода за eventual consistency
publisher broker subscriber
│─── OrderPlaced ────▶│
│ │─── OrderPlaced ────▶│
│ │ │ обработка когда угодно
│ │ │
Плюсы:
- Publisher не знает про consumer’ов. Один event → 0/1/N подписчиков.
- Асинхронно. Publisher не ждёт обработки.
- Отказоустойчиво. Consumer лежит — сообщения ждут в очереди.
- Легко добавлять функциональность. Новый обработчик = новый subscriber, publisher не трогаем.
Минусы:
- Eventual consistency. Между публикацией и обработкой — окно неопределённости.
- Сложнее отлаживать. Trace через несколько сервисов.
- Схема event = контракт. Изменение ломает consumer’ов.
Когда применять: между bounded contexts, при eventual consistency между aggregate, для событий с множеством подписчиков.
Request-reply async
Компромисс: async в транспорте, но семантически запрос-ответ.
client queue server
│─ Request(id=1, replyTo=q_c) ─▶│
│ │──────────▶│ обработка
│ │ │
│◀── Response(corr_id=1) ─── q_c ◀│◀───────────│
Используется, когда:
- Клиент не готов ждать в течение всей обработки.
- Нужен ответ, но не сразу.
- Инфраструктура — очереди (RabbitMQ), а не HTTP.
Реализуется через correlation_id и reply-to очередь. Клиент подписан на свою очередь, ищет ответ с нужным correlation_id.
Delivery guarantees
Три уровня доставки — по возрастанию сложности:
Сообщение доставляется 0 или 1 раз. Возможна потеря, но не дубли.
Сообщение доставляется 1 или больше раз. Возможны дубли, но не потери.
Сообщение доставляется ровно 1 раз. Практически недостижимо в распределённой системе.
Effectively-once is achievable through combining at-least-once delivery with idempotent processing.
Т.е. exactly-once — миф. То, что называют «exactly-once» в брокерах (Kafka EOS) — это на самом деле «at-least-once + идемпотентная обработка на нашей стороне».
Идемпотентность consumer’а
При at-least-once consumer иногда получает одно и то же сообщение дважды (retry после timeout, network partition, брокер перезапустился). Если обработчик не идемпотентен — двойная запись, двойной email, двойное списание.
Идемпотентность: обработка одного и того же сообщения несколько раз даёт тот же результат, что одна.
async def on_payment_charged(event: PaymentCharged) -> None:
# проверяем: обрабатывали ли уже?
if await self._processed.exists(event.event_id):
return
async with self._uow:
order = await self._orders.find_by_id(event.order_id)
order.mark_paid(event.payment_id)
await self._orders.save(order)
# атомарно записываем идемпотентный ключ
await self._processed.add(event.event_id)
Проверка + запись должны быть в одной транзакции — иначе гонка между проверкой и записью пропустит дубль.
Альтернатива: natural keys. Если payment_id — уникальный ID платежа, UNIQUE INDEX в БД сам обеспечит идемпотентность на INSERT.
RabbitMQ vs Kafka
Два самых популярных брокера. Разные модели.
RabbitMQ — очереди
- Push-модель. Broker сам доставляет сообщение подписчику.
- Ack/nack. Consumer подтверждает обработку, брокер удаляет сообщение.
- Multiple consumers одной очереди — round-robin (work queue).
- Complex routing. Exchanges, bindings, routing keys.
- Message TTL, dead-letter. Богатая работа с ошибками.
Когда: command-driven, work distribution, task queues, request-reply.
Kafka — журнал событий
- Pull-модель. Consumer сам читает из log.
- Persistent log. Сообщения хранятся долго (retention), можно прочитать заново.
- Partitioning. Consumer group распределяет partition’ы между instances.
- Ordering по partition. Внутри partition — строгий порядок.
- Event sourcing / replay. Полная история изменений.
Когда: event-driven, event sourcing, аналитика, stream processing, replay.
Схема сообщения — контракт
Схема event — не «наш DTO», это публичный контракт между сервисами. Изменение = potentially breaking change.
Правила:
Явное версионирование. OrderPlacedV1, OrderPlacedV2 — разные типы. Не «просто обновили поле».
Обратная совместимость. Новый producer должен работать со старым consumer. Добавление опционального поля — ок, удаление — ломает.
Форматы: JSON Schema, Protobuf, Avro. Что-то формальное, что можно validate. Не «мы знаем формат в голове».
Contract testing. Consumer публикует свои ожидания. Producer проверяет, что не ломает их. Инструменты: Pact.
Anti-patterns
1. Sync RPC вместо events для eventual операций
async def on_order_placed(event):
await email_service.send(event.customer_id, template='order_placed') # !!
email_service.send() — sync HTTP-вызов. Если email-сервис лёг, order-обработчик тоже упадёт. Каскадное падение.
Правильно: публикация event OrderPlaced → email-service подписан отдельно → пришёл сбой, retry на своей стороне.
2. Publisher знает про consumer’ов
async def place_order(cmd):
order = Order.place(cmd)
await orders.save(order)
# publisher явно знает, кого позвать
await email_service.notify(order) # !!
await inventory_service.reserve(order.items) # !!
await analytics.track(order) # !!
Смысл events пропадает — вместо publish/subscribe получается набор прямых вызовов.
Правильно: await events.publish(OrderPlaced(...)). Consumer’ы подписываются независимо.
3. Chatty microservices
client → A: 1 запрос
A → B: 5 запросов (по одному per item)
B → C: 3 запроса каждый
15+ сетевых вызовов на один пользовательский запрос. Latency + failures + сложность отладки.
Правильно:
- Batch API (
B.getMany(ids)). - CQRS с денормализованным read model.
- События вместо polling.
4. Игнорирование ordering
Consumer’ы обрабатывают события параллельно. Если события OrderCreated и OrderConfirmed для одного заказа обрабатываются в неверном порядке — попытка подтвердить несуществующий заказ.
Правильно:
- Kafka: партиционирование по
order_id— события одного aggregate идут в одну партицию строго по порядку. - RabbitMQ: consistent hashing exchange или single consumer per aggregate.
5. Отсутствие DLQ (dead-letter queue)
Сообщение отработать нельзя (bad payload, permanent failure). Retry в цикле → блокировка очереди → задержка обработки других сообщений.
Правильно: N попыток → DLQ. Сообщение переходит в отдельную очередь, обычная работа продолжается. DLQ мониторится и разбирается вручную.
Trade-offs
| Ситуация | Sync RPC | Async pub/sub |
|---|---|---|
| Немедленный ответ клиенту | ✓ | |
| Между bounded contexts | ✓ | |
| Один запрос → множество реакций | ✓ | |
| Простая ментальная модель | ✓ | |
| Отказоустойчивость к сбоям consumer | ✓ | |
| Полная история для аудита / replay | ✓ (event log) | |
| Строгий порядок обработки | ✓ | (сложнее) |
В твоём же коде
Анонимизированный микросервис использует RabbitMQ + FastStream — правильный выбор для задачи. Сервис-обработчик получает OrderItemMessage, обрабатывает, публикует StandardizedItem.
Слабые места:
- Нет DLQ. Failed сообщения не выделяются, застревают в retry.
- Нет идемпотентности. Один и тот же
item_idможет обработаться дважды (RabbitMQ дал redelivery) → дубль в БД + дублирующее событие downstream. - Dual-write между БД и брокером без Outbox — см. следующую главу.
Дальнейшее чтение
- Gregor Hohpe, Bobby Woolf. Enterprise Integration Patterns. Каталог messaging-паттернов. Классика.
- Chris Richardson. Microservices Patterns. Chapters 3, 4, 6 — messaging patterns в микросервисах.
- Sam Newman. Building Microservices (2nd Edition). Chapters 4-5 — inter-service communication.
- Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — потоки, доставка, идемпотентность.
- microservices.io. Каталог паттернов от Richardson.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Exactly-once delivery в распределённой системе — это...
Q2. Основное отличие RabbitMQ от Kafka:
Q3. Что не так с sync HTTP-вызовом внутри обработчика события?
Q4. Идемпотентность consumer'а — это...
Q5. Dead-letter queue нужна для...