Уровень 5 · Асинхронность Глава 18 11 мин

Transactional Outbox и идемпотентность

Dual-write проблема и её решение через Outbox. Publishing relay. Idempotency keys. Как гарантировать доставку без exactly-once.

TL;DR

  • Dual-write без Outbox — самый частый источник рассинхронизации в микросервисах.
  • Outbox: событие пишется в ту же БД, в ту же транзакцию, публикуется relay-процессом.
  • Consumer сохраняет идемпотентный ключ — гарантия «effectively-once» при at-least-once доставке.
  • CDC (Debezium) — альтернатива polling relay: читаем WAL напрямую, минимум latency.

Проблема Dual Write

Классический use case:

async def execute(self, cmd):
    async with self._uow:
        order = Order.place(cmd)
        await self._orders.save(order)

    await self._broker.publish(OrderPlaced(order.id))     # !!    

Две системы — БД и брокер. Возможных сценариев:

    Всё ок. БД коммит, publish успех. Consumer’ы получили. Идеал.

    БД упала перед save. UnitOfWork откатывается, publish не вызывается. Клиент видит ошибку. Согласованно.

    Publish упал. save прошёл, publish бросил ConnectionError. Order в БД, событие consumer’ам не пришло. Рассинхронизация.

    Сервис упал между commit и publish. save прошёл, процесс умер, publish не выполнен. Рассинхронизация.

Сценарии 3 и 4 — систематическая проблема, а не «редкий сбой». При росте нагрузки они происходят регулярно.

Определение Dual-write problem (проблема двойной записи)

Ситуация, когда одна бизнес-операция должна атомарно изменить состояние в двух системах (например, БД и брокер), не участвующих в общей транзакции.

Ложные решения

«Публикуем перед save»

async def execute(self, cmd):
    order = Order.place(cmd)
    await self._broker.publish(OrderPlaced(order.id))     # !!  
    async with self._uow:
        await self._orders.save(order)

Publish прошёл, save упал. Consumer’ы получили событие о заказе, которого нет в БД.

«Retry publish в loop»

for attempt in range(10):
    try:
        await self._broker.publish(event)
        break
    except BrokerError:
        await asyncio.sleep(2 ** attempt)

Если сервис умер во время retry — событие потеряно. Retry не решает проблему падения процесса.

«2PC / XA-транзакции»

Классический ответ распределённых транзакций: two-phase commit между БД и брокером. Технически возможно, практически:

  • Мало брокеров поддерживают XA (RabbitMQ — да, Kafka — нет).
  • Значительный overhead (2× round trips).
  • Не масштабируется — координатор — single point of failure.
  • Проблема partial commit при network partition.

2PC избегают в современных системах.

Transactional Outbox

Идея простая: вместо публикации во внешний брокер, пишем событие в специальную таблицу в той же БД. Отдельный процесс (relay) читает таблицу и публикует.

┌────────────────────────────────────────────────┐
│  Use case (в транзакции)                       │
│   1. INSERT INTO orders (...)                  │
│   2. INSERT INTO outbox (event_type, payload)  │
│   COMMIT — оба атомарно                        │
└─────────────────────┬──────────────────────────┘

             ┌────────▼─────────┐
             │   БД (Postgres)  │
             └────────┬─────────┘
                      │  polling / CDC
             ┌────────▼─────────┐
             │  Outbox Relay    │
             └────────┬─────────┘

              ┌───────▼───────┐
              │    Broker     │
              └───────────────┘

Гарантии:

  • Событие сохранено ровно тогда, когда сохранён aggregate. Не может быть «commit прошёл, event потерян».
  • Relay при перезапуске читает outbox — прочитает даже старые события.
  • Publish в брокер — идемпотентно (relay помечает published).

Как правильно: реализация

Схема таблицы

CREATE TABLE outbox (
    id           BIGSERIAL PRIMARY KEY,
    event_id     UUID NOT NULL UNIQUE,
    event_type   TEXT NOT NULL,
    aggregate_id TEXT NOT NULL,
    payload      JSONB NOT NULL,
    created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ
);

CREATE INDEX idx_outbox_unpublished
    ON outbox (id)
    WHERE published_at IS NULL;
  • event_id — UUID для идемпотентности consumer’а.
  • aggregate_id — для partitioning в Kafka или для ordering.
  • published_at — маркер relay.
  • Partial index — быстрый select of unpublished.

Порт Outbox

class OutboxPort(Protocol):
    async def append(self, event: DomainEvent) -> None: ...

Реализация

class PostgresOutbox:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def append(self, event: DomainEvent) -> None:
        row = OutboxRow(
            event_id=event.event_id,
            event_type=type(event).__name__,
            aggregate_id=str(event.aggregate_id),
            payload=event.to_dict(),
        )
        self._session.add(row)

Use case

class PlaceOrderCommandHandler:
    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._uow:                            
            order = Order.place(cmd)
            await self._orders.save(order)
            for event in order.pull_events():
                await self._outbox.append(event)         
            # commit по выходу — atomically
        return order.id

Order + events в одной транзакции. Гарантия: либо оба сохранились, либо ничего.

Relay-процесс

class OutboxRelay:
    def __init__(
        self,
        session_factory: async_sessionmaker,
        publisher: MessagePublisher,
        batch_size: int = 100,
        poll_interval: float = 0.5,
    ) -> None:
        self._session_factory = session_factory
        self._publisher = publisher
        self._batch_size = batch_size
        self._poll_interval = poll_interval

    async def run(self) -> None:
        while True:
            try:
                published = await self._process_batch()
                if published == 0:
                    await asyncio.sleep(self._poll_interval)
            except Exception as e:
                logger.exception('outbox relay error')
                await asyncio.sleep(self._poll_interval)

    async def _process_batch(self) -> int:
        async with self._session_factory() as session:
            stmt = (
                select(OutboxRow)
                .where(OutboxRow.published_at.is_(None))
                .order_by(OutboxRow.id)
                .limit(self._batch_size)
                .with_for_update(skip_locked=True)         
            )
            rows = (await session.execute(stmt)).scalars().all()

            for row in rows:
                await self._publisher.publish(
                    topic=self._topic_for(row.event_type),
                    payload=row.payload,
                    key=row.aggregate_id,
                    headers={'event_id': str(row.event_id)},
                )
                row.published_at = datetime.utcnow()

            await session.commit()
            return len(rows)

Ключевое: FOR UPDATE SKIP LOCKED — несколько экземпляров relay могут работать параллельно, каждый берёт свою партию, никаких блокировок.

CDC как альтернатива polling

Polling — простой, но добавляет latency (poll_interval секунд) и нагрузку на БД.

Альтернатива — Change Data Capture. Читаем WAL Postgres напрямую и публикуем изменения таблицы outbox.

Определение Change Data Capture (CDC) (захват изменений данных)

Механизм чтения журнала транзакций БД (Postgres WAL, MySQL binlog) и публикации всех изменений как поток событий.

Стандартный инструмент — Debezium. Настройка:

  • Debezium подписывается на logical replication slot Postgres.
  • При INSERT в outbox генерирует событие для Kafka.
  • Публикация в брокер — секунды после commit.

Преимущества: low latency, нет polling нагрузки на БД. Недостатки: дополнительная инфраструктура (Debezium + Kafka Connect), сложнее debug.

Для большинства проектов polling relay достаточно. CDC — когда важна near-realtime доставка.

Идемпотентность consumer’а

Outbox гарантирует at-least-once доставку. Consumer может получить одно и то же событие несколько раз (retry, replay).

Правильный consumer идемпотентен:

class OnOrderPlaced:
    async def handle(self, event: OrderPlacedEvent) -> None:
        async with self._uow:
            # проверка идемпотентности
            if await self._processed.exists(event.event_id):        
                return

            # обработка
            reservation = Reservation.for_order(event.order_id, event.items)
            await self._reservations.save(reservation)

            # маркер обработки в той же транзакции
            await self._processed.add(event.event_id)               

Таблица processed_events:

CREATE TABLE processed_events (
    consumer_name TEXT NOT NULL,
    event_id      UUID NOT NULL,
    processed_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (consumer_name, event_id)
);

Проверка + запись — в одной транзакции с бизнес-логикой. При retry: проверка вернёт true, обработка пропускается.

Natural keys (когда возможно)

Если есть уникальный natural key — UNIQUE INDEX в БД сам обеспечит идемпотентность:

async def handle(self, event: PaymentCharged):
    async with self._uow:
        payment = Payment(
            id=event.payment_id,       # UNIQUE     
            order_id=event.order_id,
            amount=event.amount,
        )
        try:
            await self._payments.save(payment)
        except UniqueViolation:
            # уже сохранён — retry просто пропускается
            return

Проще. Меньше кода. Работает, если у события есть natural unique id.

Как не надо

1. Publish после commit без Outbox

async def execute(self, cmd):
    async with self._uow:
        order = Order.place(cmd)
        await self._orders.save(order)
    # commit

    await self._broker.publish(event)             # !!    

Сценарий: сервис упал между commit и publish. Событие потеряно навсегда.

2. Outbox с delete вместо published_at

# relay
async def process(self):
    events = await session.execute(select(OutboxRow))
    for event in events:
        await publisher.publish(event)
        await session.execute(delete(OutboxRow).where(OutboxRow.id == event.id))   
    await session.commit()

Если publish прошёл, delete упал (network glitch, deadlock) — при retry публикация повторится, но событие уже прочитано downstream. Дубль без гарантии идемпотентности.

Правильно: flag published_at + периодический cleanup через отдельный job.

3. Idempotency check без транзакции

async def handle(self, event):
    if await self._processed.exists(event.event_id):
        return
    await self._reservations.save(...)              # !! между этими двумя строчками — race    
    await self._processed.add(event.event_id)

Между exists и add — другой процесс может обработать то же событие. Оба пройдут проверку → дубль.

Правильно: всё в одной транзакции. Проверка через SELECT ... FOR UPDATE или через INSERT ... ON CONFLICT DO NOTHING.

4. Outbox без индекса на unpublished

-- relay каждый poll делает full scan таблицы outbox
SELECT * FROM outbox WHERE published_at IS NULL LIMIT 100;

К году в outbox миллионы published rows. Full scan становится медленным.

Правильно: partial index WHERE published_at IS NULL + периодическая архивация старых published rows.

Trade-offs

СитуацияOutbox нуженДостаточно прямой публикации
Микросервисы с eventual consistency
События критичны для бизнеса (платежи, заказы)
События информационные (log, metric)
Единственный процесс, всё в одной БД
Нужна at-least-once с гарантией

В твоём же коде

Анонимизированный микросервис — классический candidate для Outbox:

# use_cases/standardize_position.py — текущее
async def execute(self, message):
    normalized = await self._normalizer.normalize(...)
    await self._items.update_normalized_data(...)        # sync save
    await self._events.publish_standardized_position(...)  # !! dual-write   

Сценарий сбоя: save прошёл, publish упал. Позиция в БД со standardized data, matcher-service никогда не узнает. Наблюдаемо как «характеристики стандартизированы, но кандидаты не подбираются».

Рефакторинг:

    Таблица outbox в той же БД.

    Записывать событие в outbox в той же транзакции, что и update.

    Отдельный процесс OutboxRelay читает outbox и публикует в RabbitMQ.

    matcher-service должен быть идемпотентным (уже есть position_id как natural key).

Инкремент к текущей архитектуре — не переписывание с нуля.

Дальнейшее чтение

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Что решает Transactional Outbox?

  2. Q2. Почему нельзя просто «сначала save, потом publish»?

  3. Q3. CDC (Debezium) vs polling relay: что даёт CDC?

  4. Q4. Идемпотентность consumer'а нужна потому что...

  5. Q5. Как правильно проверять идемпотентность?