Transactional Outbox и идемпотентность
Dual-write проблема и её решение через Outbox. Publishing relay. Idempotency keys. Как гарантировать доставку без exactly-once.
TL;DR
- Dual-write без Outbox — самый частый источник рассинхронизации в микросервисах.
- Outbox: событие пишется в ту же БД, в ту же транзакцию, публикуется relay-процессом.
- Consumer сохраняет идемпотентный ключ — гарантия «effectively-once» при at-least-once доставке.
- CDC (Debezium) — альтернатива polling relay: читаем WAL напрямую, минимум latency.
Проблема Dual Write
Классический use case:
async def execute(self, cmd):
async with self._uow:
order = Order.place(cmd)
await self._orders.save(order)
await self._broker.publish(OrderPlaced(order.id)) # !!
Две системы — БД и брокер. Возможных сценариев:
Всё ок. БД коммит, publish успех. Consumer’ы получили. Идеал.
БД упала перед save. UnitOfWork откатывается, publish не вызывается. Клиент видит ошибку. Согласованно.
Publish упал. save прошёл, publish бросил ConnectionError. Order в БД, событие consumer’ам не пришло. Рассинхронизация.
Сервис упал между commit и publish. save прошёл, процесс умер, publish не выполнен. Рассинхронизация.
Сценарии 3 и 4 — систематическая проблема, а не «редкий сбой». При росте нагрузки они происходят регулярно.
Ситуация, когда одна бизнес-операция должна атомарно изменить состояние в двух системах (например, БД и брокер), не участвующих в общей транзакции.
Ложные решения
«Публикуем перед save»
async def execute(self, cmd):
order = Order.place(cmd)
await self._broker.publish(OrderPlaced(order.id)) # !!
async with self._uow:
await self._orders.save(order)
Publish прошёл, save упал. Consumer’ы получили событие о заказе, которого нет в БД.
«Retry publish в loop»
for attempt in range(10):
try:
await self._broker.publish(event)
break
except BrokerError:
await asyncio.sleep(2 ** attempt)
Если сервис умер во время retry — событие потеряно. Retry не решает проблему падения процесса.
«2PC / XA-транзакции»
Классический ответ распределённых транзакций: two-phase commit между БД и брокером. Технически возможно, практически:
- Мало брокеров поддерживают XA (RabbitMQ — да, Kafka — нет).
- Значительный overhead (2× round trips).
- Не масштабируется — координатор — single point of failure.
- Проблема partial commit при network partition.
2PC избегают в современных системах.
Transactional Outbox
Идея простая: вместо публикации во внешний брокер, пишем событие в специальную таблицу в той же БД. Отдельный процесс (relay) читает таблицу и публикует.
┌────────────────────────────────────────────────┐
│ Use case (в транзакции) │
│ 1. INSERT INTO orders (...) │
│ 2. INSERT INTO outbox (event_type, payload) │
│ COMMIT — оба атомарно │
└─────────────────────┬──────────────────────────┘
│
┌────────▼─────────┐
│ БД (Postgres) │
└────────┬─────────┘
│ polling / CDC
┌────────▼─────────┐
│ Outbox Relay │
└────────┬─────────┘
│
┌───────▼───────┐
│ Broker │
└───────────────┘
Гарантии:
- Событие сохранено ровно тогда, когда сохранён aggregate. Не может быть «commit прошёл, event потерян».
- Relay при перезапуске читает outbox — прочитает даже старые события.
- Publish в брокер — идемпотентно (relay помечает published).
Как правильно: реализация
Схема таблицы
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
event_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished
ON outbox (id)
WHERE published_at IS NULL;
event_id— UUID для идемпотентности consumer’а.aggregate_id— для partitioning в Kafka или для ordering.published_at— маркер relay.- Partial index — быстрый select of unpublished.
Порт Outbox
class OutboxPort(Protocol):
async def append(self, event: DomainEvent) -> None: ...
Реализация
class PostgresOutbox:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def append(self, event: DomainEvent) -> None:
row = OutboxRow(
event_id=event.event_id,
event_type=type(event).__name__,
aggregate_id=str(event.aggregate_id),
payload=event.to_dict(),
)
self._session.add(row)
Use case
class PlaceOrderCommandHandler:
async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
async with self._uow:
order = Order.place(cmd)
await self._orders.save(order)
for event in order.pull_events():
await self._outbox.append(event)
# commit по выходу — atomically
return order.id
Order + events в одной транзакции. Гарантия: либо оба сохранились, либо ничего.
Relay-процесс
class OutboxRelay:
def __init__(
self,
session_factory: async_sessionmaker,
publisher: MessagePublisher,
batch_size: int = 100,
poll_interval: float = 0.5,
) -> None:
self._session_factory = session_factory
self._publisher = publisher
self._batch_size = batch_size
self._poll_interval = poll_interval
async def run(self) -> None:
while True:
try:
published = await self._process_batch()
if published == 0:
await asyncio.sleep(self._poll_interval)
except Exception as e:
logger.exception('outbox relay error')
await asyncio.sleep(self._poll_interval)
async def _process_batch(self) -> int:
async with self._session_factory() as session:
stmt = (
select(OutboxRow)
.where(OutboxRow.published_at.is_(None))
.order_by(OutboxRow.id)
.limit(self._batch_size)
.with_for_update(skip_locked=True)
)
rows = (await session.execute(stmt)).scalars().all()
for row in rows:
await self._publisher.publish(
topic=self._topic_for(row.event_type),
payload=row.payload,
key=row.aggregate_id,
headers={'event_id': str(row.event_id)},
)
row.published_at = datetime.utcnow()
await session.commit()
return len(rows)
Ключевое: FOR UPDATE SKIP LOCKED — несколько экземпляров relay могут работать параллельно, каждый берёт свою партию, никаких блокировок.
CDC как альтернатива polling
Polling — простой, но добавляет latency (poll_interval секунд) и нагрузку на БД.
Альтернатива — Change Data Capture. Читаем WAL Postgres напрямую и публикуем изменения таблицы outbox.
Механизм чтения журнала транзакций БД (Postgres WAL, MySQL binlog) и публикации всех изменений как поток событий.
Стандартный инструмент — Debezium. Настройка:
- Debezium подписывается на logical replication slot Postgres.
- При INSERT в outbox генерирует событие для Kafka.
- Публикация в брокер — секунды после commit.
Преимущества: low latency, нет polling нагрузки на БД. Недостатки: дополнительная инфраструктура (Debezium + Kafka Connect), сложнее debug.
Для большинства проектов polling relay достаточно. CDC — когда важна near-realtime доставка.
Идемпотентность consumer’а
Outbox гарантирует at-least-once доставку. Consumer может получить одно и то же событие несколько раз (retry, replay).
Правильный consumer идемпотентен:
class OnOrderPlaced:
async def handle(self, event: OrderPlacedEvent) -> None:
async with self._uow:
# проверка идемпотентности
if await self._processed.exists(event.event_id):
return
# обработка
reservation = Reservation.for_order(event.order_id, event.items)
await self._reservations.save(reservation)
# маркер обработки в той же транзакции
await self._processed.add(event.event_id)
Таблица processed_events:
CREATE TABLE processed_events (
consumer_name TEXT NOT NULL,
event_id UUID NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (consumer_name, event_id)
);
Проверка + запись — в одной транзакции с бизнес-логикой. При retry: проверка вернёт true, обработка пропускается.
Natural keys (когда возможно)
Если есть уникальный natural key — UNIQUE INDEX в БД сам обеспечит идемпотентность:
async def handle(self, event: PaymentCharged):
async with self._uow:
payment = Payment(
id=event.payment_id, # UNIQUE
order_id=event.order_id,
amount=event.amount,
)
try:
await self._payments.save(payment)
except UniqueViolation:
# уже сохранён — retry просто пропускается
return
Проще. Меньше кода. Работает, если у события есть natural unique id.
Как не надо
1. Publish после commit без Outbox
async def execute(self, cmd):
async with self._uow:
order = Order.place(cmd)
await self._orders.save(order)
# commit
await self._broker.publish(event) # !!
Сценарий: сервис упал между commit и publish. Событие потеряно навсегда.
2. Outbox с delete вместо published_at
# relay
async def process(self):
events = await session.execute(select(OutboxRow))
for event in events:
await publisher.publish(event)
await session.execute(delete(OutboxRow).where(OutboxRow.id == event.id))
await session.commit()
Если publish прошёл, delete упал (network glitch, deadlock) — при retry публикация повторится, но событие уже прочитано downstream. Дубль без гарантии идемпотентности.
Правильно: flag published_at + периодический cleanup через отдельный job.
3. Idempotency check без транзакции
async def handle(self, event):
if await self._processed.exists(event.event_id):
return
await self._reservations.save(...) # !! между этими двумя строчками — race
await self._processed.add(event.event_id)
Между exists и add — другой процесс может обработать то же событие. Оба пройдут проверку → дубль.
Правильно: всё в одной транзакции. Проверка через SELECT ... FOR UPDATE или через INSERT ... ON CONFLICT DO NOTHING.
4. Outbox без индекса на unpublished
-- relay каждый poll делает full scan таблицы outbox
SELECT * FROM outbox WHERE published_at IS NULL LIMIT 100;
К году в outbox миллионы published rows. Full scan становится медленным.
Правильно: partial index WHERE published_at IS NULL + периодическая архивация старых published rows.
Trade-offs
| Ситуация | Outbox нужен | Достаточно прямой публикации |
|---|---|---|
| Микросервисы с eventual consistency | ✓ | |
| События критичны для бизнеса (платежи, заказы) | ✓ | |
| События информационные (log, metric) | ✓ | |
| Единственный процесс, всё в одной БД | ✓ | |
| Нужна at-least-once с гарантией | ✓ |
В твоём же коде
Анонимизированный микросервис — классический candidate для Outbox:
# use_cases/standardize_position.py — текущее
async def execute(self, message):
normalized = await self._normalizer.normalize(...)
await self._items.update_normalized_data(...) # sync save
await self._events.publish_standardized_position(...) # !! dual-write
Сценарий сбоя: save прошёл, publish упал. Позиция в БД со standardized data, matcher-service никогда не узнает. Наблюдаемо как «характеристики стандартизированы, но кандидаты не подбираются».
Рефакторинг:
Таблица outbox в той же БД.
Записывать событие в outbox в той же транзакции, что и update.
Отдельный процесс OutboxRelay читает outbox и публикует в RabbitMQ.
matcher-service должен быть идемпотентным (уже есть position_id как natural key).
Инкремент к текущей архитектуре — не переписывание с нуля.
Дальнейшее чтение
- Chris Richardson. Microservices Patterns. Chapter 3, Transactional Outbox pattern.
- Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — доставка сообщений, идемпотентность.
- Debezium Documentation. Outbox Event Router.
- Gunnar Morling. Reliable Microservices Data Exchange With the Outbox Pattern. Разбор с Debezium.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Что решает Transactional Outbox?
Q2. Почему нельзя просто «сначала save, потом publish»?
Q3. CDC (Debezium) vs polling relay: что даёт CDC?
Q4. Идемпотентность consumer'а нужна потому что...
Q5. Как правильно проверять идемпотентность?