Domain Events
События как first-class citizens в domain. Как aggregate собирает события. Разница domain events и integration events. Публикация через Outbox.
TL;DR
- Domain Event — факт, произошедший в домене. Именуется в прошедшем времени: OrderPlaced, PaymentReceived.
- События собираются внутри aggregate во время бизнес-операции. Публикуются после commit.
- Domain events живут внутри контекста; integration events — контракт между контекстами.
- Публикация до commit — риск ложного события. После commit без Outbox — риск потери.
Что такое Domain Event
Иммутабельный факт, что-то произошедшее в домене. Интересен нескольким сторонам (стейкхолдерам).
Три свойства:
- Прошедшее время.
OrderPlaced, неPlaceOrder.PaymentReceived, неReceivePayment. - Иммутабельность. Событие произошло. Изменить нельзя. Хранится как есть.
- Полнота контекста. Содержит всё, что нужно потребителю для реакции. Не заставляет consumer’а лезть за деталями обратно.
Зачем нужны
Раньше: use case напрямую вызывал всех, кому что-то нужно от него.
class PlaceOrderUseCase:
async def execute(self, cmd):
order = Order.place(cmd)
await self._orders.save(order)
# прямые вызовы всех заинтересованных
await self._email_sender.send_order_confirmation(order)
await self._inventory.reserve(order.items)
await self._analytics.track_order(order)
await self._crm.update_customer_ltv(order.customer_id)
Проблемы:
PlaceOrderUseCaseзнает про email, inventory, analytics, CRM. Он теперь про всё.- Появилось шестое желание — правим use case. OCP нарушен.
- Тесты use case тянут за собой все зависимости.
- Если один шаг падает — весь use case падает.
С событиями: use case знает только про Order. Все реакции — обработчики событий.
class PlaceOrderUseCase:
async def execute(self, cmd):
order = Order.place(cmd)
await self._orders.save(order)
await self._events.publish(order.pull_events())
# отдельные обработчики (в других модулях):
class SendOrderConfirmationEmail:
async def on_order_placed(self, event: OrderPlaced): ...
class ReserveInventory:
async def on_order_placed(self, event: OrderPlaced): ...
Use case ничего не знает про email и inventory. Список подписчиков расширяется без изменения PlaceOrderUseCase.
Как собирать события внутри aggregate
Стандартный паттерн — aggregate накапливает события, чтобы их забрал application service.
@dataclass
class Order:
id: OrderId
customer_id: CustomerId
status: OrderStatus
_events: list[DomainEvent] = field(default_factory=list)
def confirm(self) -> None:
if self.status is not OrderStatus.PENDING:
raise DomainError(f'cannot confirm order in state {self.status}')
self.status = OrderStatus.CONFIRMED
self._events.append(OrderConfirmed(
order_id=self.id,
customer_id=self.customer_id,
confirmed_at=datetime.utcnow(),
))
def cancel(self, reason: str) -> None:
if self.status is OrderStatus.CANCELLED:
raise DomainError('already cancelled')
self.status = OrderStatus.CANCELLED
self._events.append(OrderCancelled(order_id=self.id, reason=reason))
def pull_events(self) -> list[DomainEvent]:
events, self._events = self._events, []
return events
Как это работает:
- Бизнес-метод (
confirm,cancel) записывает событие в_events. pull_events()возвращает накопленные и очищает список.- Application service вызывает
pull_events()послеsave()и передаётEventPublisherPort.
События определяются как Value Objects
@dataclass(frozen=True)
class DomainEvent:
"""База: все события иммутабельные."""
occurred_at: datetime = field(default_factory=datetime.utcnow)
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: OrderId
customer_id: CustomerId
total: Money
items_count: int
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
order_id: OrderId
customer_id: CustomerId
confirmed_at: datetime
@dataclass(frozen=True)
class OrderCancelled(DomainEvent):
order_id: OrderId
reason: str
Свойства:
frozen=True— иммутабельность на уровне рантайма.- Полные данные.
OrderPlacedсодержитtotalиitems_count— consumer не должен идти обратно кOrderза деталями. - Прошедшее время в именах — обязательно.
Domain Events vs Integration Events
Различие критично, но часто игнорируется.
Живёт внутри Bounded Context. Формат может меняться вместе с моделью. Consumer — код внутри того же сервиса.
Контракт между Bounded Contexts (обычно микросервисами). Формат стабилен, версионируется. Consumer — другой сервис.
| Аспект | Domain Event | Integration Event |
|---|---|---|
| Область действия | Внутри контекста | Между контекстами |
| Транспорт | В памяти или local queue | Kafka, RabbitMQ, event bus |
| Схема | Может меняться свободно | Стабильна, версионируется |
| Consumer | Свой же сервис | Внешние сервисы |
| Полнота | Может отсылать к aggregate | Должен быть self-contained |
| Пример | OrderConfirmed в памяти | OrderConfirmedV1 в RabbitMQ |
Часто одно превращается в другое. OrderConfirmed (domain) в рамках обработчика конвертируется в OrderConfirmedV1 (integration) и публикуется во внешнюю очередь. Разделение позволяет менять domain-модель, не ломая контракт с внешними системами.
Пример трансформации
class PublishOrderConfirmedIntegrationEvent:
"""Слушает domain event, публикует integration event."""
def __init__(self, publisher: IntegrationEventPublisherPort):
self._publisher = publisher
async def on_order_confirmed(self, event: OrderConfirmed) -> None:
integration_event = OrderConfirmedV1(
order_id=str(event.order_id),
customer_id=str(event.customer_id),
confirmed_at=event.confirmed_at.isoformat(),
schema_version=1,
)
await self._publisher.publish(
topic='orders.confirmed',
payload=integration_event,
)
Domain event живёт секунды, integration event — навсегда в event log.
Публикация: когда именно
Три момента публикации, только один правильный:
До save (❌). Событие ушло, но save упал — consumer’ы получили событие о несуществующем состоянии. Реагируют на данные, которых нет в БД.
После save, до commit (❌). Save прошёл, event опубликован. Commit падает — БД откатывается. То же самое: event про несуществующие данные.
После commit транзакции (✓). События публикуются только когда данные точно сохранены. Но с оговоркой: между commit и publish может упасть publisher или сервис. Тогда события потеряны. Решение — Outbox pattern.
Outbox Pattern
Схема:
Внутри транзакции: запись в outbox таблицу того же соединения. Данные и событие атомарно сохраняются в БД.
После commit: отдельный процесс (relay) читает outbox, публикует в брокер, помечает как отправленное.
Если publisher упал — relay попробует снова. При корректной идемпотентности consumer’а — no data loss.
Подробный разбор — в главе Transactional Outbox. Здесь достаточно понимать, что domain events → integration events → Outbox → broker — стандартный поток надёжной доставки.
Как правильно: полный поток
class PlaceOrderCommandHandler:
def __init__(
self,
orders: OrderRepositoryPort,
outbox: OutboxPort,
uow: UnitOfWork,
) -> None:
self._orders = orders
self._outbox = outbox
self._uow = uow
async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
async with self._uow:
order = Order.place(cmd) # aggregate накапливает событие
await self._orders.save(order) # запись в orders table
events = order.pull_events()
for event in events:
await self._outbox.append(event) # запись в outbox table
# commit по выходу из with — обе таблицы атомарно
return order.id
Отдельный процесс:
class OutboxRelay:
async def run(self):
while True:
batch = await self._outbox.fetch_pending(limit=100)
for entry in batch:
integration_event = self._translate(entry.event)
await self._broker.publish(integration_event)
await self._outbox.mark_published(entry.id)
await asyncio.sleep(0.5)
Как не надо
1. События с настоящим временем
@dataclass
class PlaceOrder(DomainEvent): # !!
order_id: OrderId
customer_id: CustomerId
PlaceOrder — императив, приказ. Domain event должен описывать факт: OrderPlaced.
Правильно: OrderPlaced, OrderConfirmed, OrderCancelled.
2. Публикация из aggregate
class Order:
def __init__(self, ..., publisher):
self._publisher = publisher
def confirm(self):
self.status = 'confirmed'
self._publisher.publish(OrderConfirmed(...)) # !!
Aggregate знает про publisher. Правило зависимостей нарушено — domain импортирует infrastructure. Тесты требуют мокать publisher.
Правильно: aggregate накапливает через _events, application service забирает через pull_events().
3. Событие без полных данных
@dataclass
class OrderPlaced:
order_id: OrderId
Consumer получил OrderPlaced — что делать? Идёт в orders за деталями. Дополнительный запрос. При eventual consistency — данные могут ещё не докатиться, consumer получит None.
Правильно: событие содержит всё нужное. OrderPlaced(order_id, customer_id, total, items_count, placed_at).
4. Публикация до commit
async def execute(self, cmd):
order = Order.place(cmd)
await self._events.publish(order.pull_events()) # !!
await self._orders.save(order)
# если save упадёт — event ушёл впустую
Event ушёл, save упал. Consumer получил OrderPlaced, идёт в БД — заказа нет.
Правильно: сначала save, потом (в идеале через Outbox) publish.
5. Domain event напрямую в интеграционный брокер
class Order:
def confirm(self):
self.status = 'confirmed'
self._events.append(OrderConfirmed(...))
# elsewhere:
for event in order.pull_events():
await rabbitmq.publish(event, topic='orders') # !!
Domain event напрямую летит в брокер как integration event. Проблемы: меняете OrderConfirmed в domain — ломаете всех внешних consumer’ов. Схема события domain не должна быть контрактом между сервисами.
Правильно: отдельный OrderConfirmedV1 для integration, отдельный OrderConfirmed для domain. Трансляция через handler.
Trade-offs
| Ситуация | Domain events оправданы | Прямые вызовы |
|---|---|---|
| Одно бизнес-действие вызывает несколько побочных эффектов | ✓ | |
| Обработчики могут добавляться независимо | ✓ | |
| Нужен audit trail всех значимых изменений | ✓ | |
| Один use case, одна реакция, никогда не изменится | ✓ | |
| Простой CRUD без побочных эффектов | ✓ | |
| Prototype/MVP на 2 недели | ✓ |
В твоём же коде
Анонимизированный микросервис публикует StandardizedItem после сохранения — по сути, это integration event. Domain events как отдельная категория отсутствуют.
Что стоит понять:
StandardizedItem— integration event: летит в matcher-service.- Domain events (
ItemStandardized,StandardizationFailed) могли бы существовать в промежутке. Application service публиковал бы их сначала как domain events, затем handler’ы транслировали бы в integration events.
Пока сервис простой — прямая публикация в брокер приемлема. Но dual-write без Outbox — реальная проблема. См. Case study → dual-write-without-outbox.
Дальнейшее чтение
- Eric Evans. Domain-Driven Design. События появились в поздних дополнениях к книге и в DDD Reference.
- Vaughn Vernon. Implementing Domain-Driven Design. Chapter 8 — Domain Events.
- Martin Fowler. Domain Event. Классическое короткое определение.
- Greg Young. CQRS Documents. Про роль событий в CQRS + Event Sourcing.
- Chris Richardson. Microservices Patterns. Chapter 5 — Domain events + Outbox.
- Alberto Brandolini. EventStorming. Метод коллективного обнаружения domain events.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Как правильно называть domain event?
Q2. Кто должен публиковать domain event?
Q3. В чём разница Domain Event и Integration Event?
Q4. Что не так с публикацией event до save?
Q5. Что должно быть в OrderPlaced для integration event?