Уровень 3 · DDD Глава 10 11 мин

Domain Events

События как first-class citizens в domain. Как aggregate собирает события. Разница domain events и integration events. Публикация через Outbox.

TL;DR

  • Domain Event — факт, произошедший в домене. Именуется в прошедшем времени: OrderPlaced, PaymentReceived.
  • События собираются внутри aggregate во время бизнес-операции. Публикуются после commit.
  • Domain events живут внутри контекста; integration events — контракт между контекстами.
  • Публикация до commit — риск ложного события. После commit без Outbox — риск потери.

Что такое Domain Event

Определение Domain Event (доменное событие)

Иммутабельный факт, что-то произошедшее в домене. Интересен нескольким сторонам (стейкхолдерам).

— Evans, DDD, 2003

Три свойства:

  1. Прошедшее время. OrderPlaced, не PlaceOrder. PaymentReceived, не ReceivePayment.
  2. Иммутабельность. Событие произошло. Изменить нельзя. Хранится как есть.
  3. Полнота контекста. Содержит всё, что нужно потребителю для реакции. Не заставляет consumer’а лезть за деталями обратно.

Зачем нужны

Раньше: use case напрямую вызывал всех, кому что-то нужно от него.

class PlaceOrderUseCase:
    async def execute(self, cmd):
        order = Order.place(cmd)
        await self._orders.save(order)

        # прямые вызовы всех заинтересованных
        await self._email_sender.send_order_confirmation(order)     
        await self._inventory.reserve(order.items)                  
        await self._analytics.track_order(order)                    
        await self._crm.update_customer_ltv(order.customer_id)      

Проблемы:

  • PlaceOrderUseCase знает про email, inventory, analytics, CRM. Он теперь про всё.
  • Появилось шестое желание — правим use case. OCP нарушен.
  • Тесты use case тянут за собой все зависимости.
  • Если один шаг падает — весь use case падает.

С событиями: use case знает только про Order. Все реакции — обработчики событий.

class PlaceOrderUseCase:
    async def execute(self, cmd):
        order = Order.place(cmd)
        await self._orders.save(order)
        await self._events.publish(order.pull_events())    

# отдельные обработчики (в других модулях):
class SendOrderConfirmationEmail:
    async def on_order_placed(self, event: OrderPlaced): ...

class ReserveInventory:
    async def on_order_placed(self, event: OrderPlaced): ...

Use case ничего не знает про email и inventory. Список подписчиков расширяется без изменения PlaceOrderUseCase.

Как собирать события внутри aggregate

Стандартный паттерн — aggregate накапливает события, чтобы их забрал application service.

@dataclass
class Order:
    id: OrderId
    customer_id: CustomerId
    status: OrderStatus
    _events: list[DomainEvent] = field(default_factory=list)     

    def confirm(self) -> None:
        if self.status is not OrderStatus.PENDING:
            raise DomainError(f'cannot confirm order in state {self.status}')
        self.status = OrderStatus.CONFIRMED
        self._events.append(OrderConfirmed(                        
            order_id=self.id,
            customer_id=self.customer_id,
            confirmed_at=datetime.utcnow(),
        ))

    def cancel(self, reason: str) -> None:
        if self.status is OrderStatus.CANCELLED:
            raise DomainError('already cancelled')
        self.status = OrderStatus.CANCELLED
        self._events.append(OrderCancelled(order_id=self.id, reason=reason))

    def pull_events(self) -> list[DomainEvent]:                    
        events, self._events = self._events, []
        return events

Как это работает:

  • Бизнес-метод (confirm, cancel) записывает событие в _events.
  • pull_events() возвращает накопленные и очищает список.
  • Application service вызывает pull_events() после save() и передаёт EventPublisherPort.

События определяются как Value Objects

@dataclass(frozen=True)
class DomainEvent:
    """База: все события иммутабельные."""
    occurred_at: datetime = field(default_factory=datetime.utcnow)


@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: OrderId
    customer_id: CustomerId
    total: Money
    items_count: int


@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
    order_id: OrderId
    customer_id: CustomerId
    confirmed_at: datetime


@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
    order_id: OrderId
    reason: str

Свойства:

  • frozen=True — иммутабельность на уровне рантайма.
  • Полные данные. OrderPlaced содержит total и items_count — consumer не должен идти обратно к Order за деталями.
  • Прошедшее время в именах — обязательно.

Domain Events vs Integration Events

Различие критично, но часто игнорируется.

Определение Domain Event

Живёт внутри Bounded Context. Формат может меняться вместе с моделью. Consumer — код внутри того же сервиса.

Определение Integration Event

Контракт между Bounded Contexts (обычно микросервисами). Формат стабилен, версионируется. Consumer — другой сервис.

АспектDomain EventIntegration Event
Область действияВнутри контекстаМежду контекстами
ТранспортВ памяти или local queueKafka, RabbitMQ, event bus
СхемаМожет меняться свободноСтабильна, версионируется
ConsumerСвой же сервисВнешние сервисы
ПолнотаМожет отсылать к aggregateДолжен быть self-contained
ПримерOrderConfirmed в памятиOrderConfirmedV1 в RabbitMQ

Часто одно превращается в другое. OrderConfirmed (domain) в рамках обработчика конвертируется в OrderConfirmedV1 (integration) и публикуется во внешнюю очередь. Разделение позволяет менять domain-модель, не ломая контракт с внешними системами.

Пример трансформации

class PublishOrderConfirmedIntegrationEvent:
    """Слушает domain event, публикует integration event."""

    def __init__(self, publisher: IntegrationEventPublisherPort):
        self._publisher = publisher

    async def on_order_confirmed(self, event: OrderConfirmed) -> None:
        integration_event = OrderConfirmedV1(
            order_id=str(event.order_id),
            customer_id=str(event.customer_id),
            confirmed_at=event.confirmed_at.isoformat(),
            schema_version=1,
        )
        await self._publisher.publish(
            topic='orders.confirmed',
            payload=integration_event,
        )

Domain event живёт секунды, integration event — навсегда в event log.

Публикация: когда именно

Три момента публикации, только один правильный:

    До save (❌). Событие ушло, но save упал — consumer’ы получили событие о несуществующем состоянии. Реагируют на данные, которых нет в БД.

    После save, до commit (❌). Save прошёл, event опубликован. Commit падает — БД откатывается. То же самое: event про несуществующие данные.

    После commit транзакции (✓). События публикуются только когда данные точно сохранены. Но с оговоркой: между commit и publish может упасть publisher или сервис. Тогда события потеряны. Решение — Outbox pattern.

Outbox Pattern

Схема:

    Внутри транзакции: запись в outbox таблицу того же соединения. Данные и событие атомарно сохраняются в БД.

    После commit: отдельный процесс (relay) читает outbox, публикует в брокер, помечает как отправленное.

    Если publisher упал — relay попробует снова. При корректной идемпотентности consumer’а — no data loss.

Подробный разбор — в главе Transactional Outbox. Здесь достаточно понимать, что domain events → integration events → Outbox → broker — стандартный поток надёжной доставки.

Как правильно: полный поток

class PlaceOrderCommandHandler:
    def __init__(
        self,
        orders: OrderRepositoryPort,
        outbox: OutboxPort,
        uow: UnitOfWork,
    ) -> None:
        self._orders = orders
        self._outbox = outbox
        self._uow = uow

    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._uow:                     
            order = Order.place(cmd)              # aggregate накапливает событие
            await self._orders.save(order)        # запись в orders table

            events = order.pull_events()          
            for event in events:
                await self._outbox.append(event)  # запись в outbox table

            # commit по выходу из with — обе таблицы атомарно
        return order.id

Отдельный процесс:

class OutboxRelay:
    async def run(self):
        while True:
            batch = await self._outbox.fetch_pending(limit=100)
            for entry in batch:
                integration_event = self._translate(entry.event)
                await self._broker.publish(integration_event)
                await self._outbox.mark_published(entry.id)
            await asyncio.sleep(0.5)

Как не надо

1. События с настоящим временем

@dataclass
class PlaceOrder(DomainEvent):        # !!  
    order_id: OrderId
    customer_id: CustomerId

PlaceOrder — императив, приказ. Domain event должен описывать факт: OrderPlaced.

Правильно: OrderPlaced, OrderConfirmed, OrderCancelled.

2. Публикация из aggregate

class Order:
    def __init__(self, ..., publisher):        
        self._publisher = publisher

    def confirm(self):
        self.status = 'confirmed'
        self._publisher.publish(OrderConfirmed(...))   # !!    

Aggregate знает про publisher. Правило зависимостей нарушено — domain импортирует infrastructure. Тесты требуют мокать publisher.

Правильно: aggregate накапливает через _events, application service забирает через pull_events().

3. Событие без полных данных

@dataclass
class OrderPlaced:
    order_id: OrderId                    

Consumer получил OrderPlaced — что делать? Идёт в orders за деталями. Дополнительный запрос. При eventual consistency — данные могут ещё не докатиться, consumer получит None.

Правильно: событие содержит всё нужное. OrderPlaced(order_id, customer_id, total, items_count, placed_at).

4. Публикация до commit

async def execute(self, cmd):
    order = Order.place(cmd)
    await self._events.publish(order.pull_events())     # !!    
    await self._orders.save(order)
    # если save упадёт — event ушёл впустую

Event ушёл, save упал. Consumer получил OrderPlaced, идёт в БД — заказа нет.

Правильно: сначала save, потом (в идеале через Outbox) publish.

5. Domain event напрямую в интеграционный брокер

class Order:
    def confirm(self):
        self.status = 'confirmed'
        self._events.append(OrderConfirmed(...))

# elsewhere:
for event in order.pull_events():
    await rabbitmq.publish(event, topic='orders')      # !!    

Domain event напрямую летит в брокер как integration event. Проблемы: меняете OrderConfirmed в domain — ломаете всех внешних consumer’ов. Схема события domain не должна быть контрактом между сервисами.

Правильно: отдельный OrderConfirmedV1 для integration, отдельный OrderConfirmed для domain. Трансляция через handler.

Trade-offs

СитуацияDomain events оправданыПрямые вызовы
Одно бизнес-действие вызывает несколько побочных эффектов
Обработчики могут добавляться независимо
Нужен audit trail всех значимых изменений
Один use case, одна реакция, никогда не изменится
Простой CRUD без побочных эффектов
Prototype/MVP на 2 недели

В твоём же коде

Анонимизированный микросервис публикует StandardizedItem после сохранения — по сути, это integration event. Domain events как отдельная категория отсутствуют.

Что стоит понять:

  • StandardizedItem — integration event: летит в matcher-service.
  • Domain events (ItemStandardized, StandardizationFailed) могли бы существовать в промежутке. Application service публиковал бы их сначала как domain events, затем handler’ы транслировали бы в integration events.

Пока сервис простой — прямая публикация в брокер приемлема. Но dual-write без Outbox — реальная проблема. См. Case study → dual-write-without-outbox.

Дальнейшее чтение

  • Eric Evans. Domain-Driven Design. События появились в поздних дополнениях к книге и в DDD Reference.
  • Vaughn Vernon. Implementing Domain-Driven Design. Chapter 8 — Domain Events.
  • Martin Fowler. Domain Event. Классическое короткое определение.
  • Greg Young. CQRS Documents. Про роль событий в CQRS + Event Sourcing.
  • Chris Richardson. Microservices Patterns. Chapter 5 — Domain events + Outbox.
  • Alberto Brandolini. EventStorming. Метод коллективного обнаружения domain events.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Как правильно называть domain event?

  2. Q2. Кто должен публиковать domain event?

  3. Q3. В чём разница Domain Event и Integration Event?

  4. Q4. Что не так с публикацией event до save?

  5. Q5. Что должно быть в OrderPlaced для integration event?