Event Sourcing
Хранить события, не состояние. Rebuild aggregate on demand. Versioning событий, snapshots. Когда оправдан, когда — overkill.
TL;DR
- Event Sourcing — состояние выводится из последовательности событий, не хранится напрямую.
- Плюсы: полная история, audit trail, time travel, replay для новых read models.
- Минусы: сложность, versioning событий, projections lag.
- Оправдан для доменов с ценной историей (finance, insurance). Не для CRUD.
В чём радикальная идея
В обычном CRUD-сервисе БД хранит текущее состояние:
orders
| id | customer_id | total | status | placed_at |
| 42 | 7 | 5000 | confirmed | 12:34 |
Event Sourcing переворачивает это:
event_store
| id | aggregate_id | event_type | payload | occurred_at |
| 1 | order:42 | OrderPlaced | {customer_id:7, items:[...]} | 12:00 |
| 2 | order:42 | OrderConfirmed | {} | 12:10 |
| 3 | order:42 | ShippingUpdated | {tracking:"XYZ"} | 14:20 |
Состояние Order не хранится. Оно выводится из истории событий: Order = apply(OrderPlaced, OrderConfirmed, ShippingUpdated).
Персистентность через хранение всех изменений домена как последовательности иммутабельных событий. Текущее состояние — производное от истории.
Что это даёт
Полная история. Никаких updated_at без указания «что именно изменилось». Каждое изменение — событие с причиной и данными.
Audit trail из коробки. Ответ на вопросы «кто, когда, почему» — прямое чтение event log.
Time travel. Что было в 15:00 вчера? — apply события до этой отметки → получим состояние.
Rebuild проекций. Появилась новая аналитика — прочитали все события с начала, построили новую read model. Без миграций, без backfill.
Estimating counterfactuals. «Что было бы, если бы клиент не отменил заказ?» — можно воспроизвести альтернативную историю.
Что это стоит
Сложность разработки. Каждое изменение — событие. Каждое событие нужно версионировать. Aggregate реконструируется из истории.
Query — не через SQL. «Найти заказы на 5000 ₽» — нельзя SELECT * FROM orders WHERE total > 5000. Нужны отдельные read models (проекции) — обычно через CQRS.
Migration событий = кошмар. Изменили формат OrderPlaced v1 → v2. Что делать со старыми? Upcast, dual-read, миграция event log.
Eventual consistency между write и read. Наследие CQRS.
Дебаг новичкам сложнее. «Почему у клиента баланс 100, а не 200?» — надо читать историю событий, а не смотреть на строку в БД.
Как правильно: базовая реализация
Aggregate
@dataclass
class Order:
id: OrderId
customer_id: CustomerId | None = None
items: list[OrderItem] = field(default_factory=list)
status: OrderStatus = OrderStatus.INITIAL
version: int = 0
_pending_events: list[DomainEvent] = field(default_factory=list)
@classmethod
def place(cls, order_id, customer_id, items) -> 'Order':
order = cls(id=order_id)
order._apply(OrderPlaced(order_id, customer_id, items))
return order
def confirm(self) -> None:
if self.status is not OrderStatus.PLACED:
raise DomainError(f'cannot confirm order in state {self.status}')
self._apply(OrderConfirmed(self.id))
def _apply(self, event: DomainEvent) -> None:
"""Мутирует состояние + сохраняет событие."""
self._mutate(event)
self._pending_events.append(event)
self.version += 1
def _mutate(self, event: DomainEvent) -> None:
"""Функциональная логика: как event изменяет состояние."""
match event:
case OrderPlaced(order_id=oid, customer_id=cid, items=its):
self.id = oid
self.customer_id = cid
self.items = list(its)
self.status = OrderStatus.PLACED
case OrderConfirmed():
self.status = OrderStatus.CONFIRMED
@classmethod
def from_history(cls, events: list[DomainEvent]) -> 'Order':
"""Реконструкция из истории."""
order = cls(id=OrderId(0)) # placeholder
for event in events:
order._mutate(event)
order.version += 1
return order
def pull_events(self) -> list[DomainEvent]:
events, self._pending_events = self._pending_events, []
return events
Ключевые концепции:
_apply— вызывается при бизнес-операции: мутация + сохранение события._mutate— чистая логика применения события. Используется и при новых операциях, и при реконструкции.from_history— восстанавливает aggregate из последовательности событий.
Event Store
class EventStorePort(Protocol):
async def load(self, aggregate_id: str) -> list[DomainEvent]: ...
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None: ...
expected_version — optimistic concurrency. Пишем: «я думал, что версия была 5». Если сейчас в store уже 6 — конфликт, повторяем операцию.
Repository
class EventSourcedOrderRepository:
def __init__(self, store: EventStorePort) -> None:
self._store = store
async def find_by_id(self, order_id: OrderId) -> Order | None:
events = await self._store.load(f'order:{order_id}')
return Order.from_history(events) if events else None
async def save(self, order: Order) -> None:
events = order.pull_events()
if not events:
return
await self._store.append(
aggregate_id=f'order:{order.id}',
events=events,
expected_version=order.version - len(events),
)
Repository не хранит текущее состояние. Только читает события и сохраняет новые.
Snapshots
Aggregate с историей в 10k событий реконструируется медленно. Ограничение snapshot’ами.
Периодическое сохранение полного состояния aggregate вместе с версией. Реконструкция: загрузить snapshot + применить события с этой версии.
async def find_by_id(self, order_id):
snapshot = await self._store.load_snapshot(f'order:{order_id}')
events = await self._store.load(
f'order:{order_id}',
from_version=snapshot.version if snapshot else 0,
)
order = Order.from_snapshot(snapshot) if snapshot else Order(id=order_id)
for event in events:
order._mutate(event)
order.version += 1
return order
Стратегии:
- Каждые N событий.
- При росте latency чтения выше порога.
- По времени (раз в день).
Snapshot — оптимизация, а не архитектура. Всё равно события — единственный источник истины.
Versioning событий
Схема OrderPlaced v1 = {order_id, customer_id, items}. Через полгода нужно добавить promo_code. Что делать со старыми событиями?
Три подхода:
1. Upcasting
При загрузке из store — конвертируем старые версии в новую в памяти.
class OrderPlacedUpcaster:
def upcast(self, raw: dict) -> OrderPlaced:
if raw.get('_version', 1) == 1:
raw['promo_code'] = None # default для v1
return OrderPlaced(**raw)
Плюс: event log не переписывается. Минус: код накапливает upcasters (v1 → v2 → v3 → v4).
2. Weak schema
События — dict’ы. Отсутствующие поля читаются с default.
Плюс: гибко. Минус: type safety теряется.
3. Migration event log
Один раз пробегаемся по всем событиям, переписываем в новый формат.
Плюс: чистый event log. Минус: миграция сложная, риск ошибок, не для больших объёмов.
Vernon и Young рекомендуют upcasting — единственный подход, который работает для больших live-систем.
CQRS + Event Sourcing
Event Sourcing даёт события. CQRS даёт разделение read/write. Часто вместе:
Command ──▶ Aggregate ──▶ Events ──▶ Event Store
│
▼
Projections
│
▼
Read Models ──▶ Queries
- Write: команда → aggregate реконструируется → бизнес-операция → новые события.
- Events → event store.
- Отдельные проекции слушают события и строят read models.
- Queries работают с read models.
Мощная связка. И сложная — не начинайте с неё без веских причин.
Как не надо
1. Event Sourcing для CRUD
# У нас 3 сущности, 5 endpoint'ов, простой домен
class UserCreated(DomainEvent): ...
class UserEmailUpdated(DomainEvent): ...
class UserPhoneUpdated(DomainEvent): ...
class UserDeleted(DomainEvent): ...
Для CRUD без бизнес-логики — event sourcing даёт complexity без выгоды. Обычный updated_at покроет audit нужды.
2. События с состоянием, а не изменением
class OrderState(DomainEvent): # !!
order_id: OrderId
customer_id: CustomerId
items: list
total: Decimal
status: str
Каждый event — полный снимок текущего состояния. Это не event sourcing, это snapshot-oriented с overhead.
Правильно: события описывают что произошло. OrderPlaced, OrderConfirmed, PriceReduced — с минимальными delta-полями.
3. Read из event store
async def find_orders_over_5k():
all_events = await event_store.load_all() # !!
orders = {}
for event in all_events:
# реконструируем каждый Order
...
return [o for o in orders.values() if o.total > 5000]
Читать все события для запроса — dead-slow. Event store не для queries.
Правильно: CQRS. Проекция строит read model orders_summary, query SELECT * FROM orders_summary WHERE total > 5000.
4. Хранение снапшотов вместо событий
async def save(order):
await event_store.replace(f'order:{order.id}', order.to_dict()) # !!
Перезаписываем состояние — но это уже не event sourcing. Просто key-value store.
5. Забыли про versioning с самого начала
# v1 (год назад)
class OrderPlaced:
order_id: str
customer_id: str
# v2 (сейчас) — как читать старые?
class OrderPlaced:
order_id: str
customer_id: str
tenant_id: str # !! новое обязательное поле
Старые события не парсятся, aggregate не восстанавливается.
Правильно: никогда не удалять поля, все новые поля — optional с default. Upcasters для существенных изменений.
Trade-offs
| Ситуация | Event Sourcing оправдан | Обычная персистентность |
|---|---|---|
| Ценная история изменений (finance, insurance, audit) | ✓ | |
| Нужен time travel / rebuild проекций | ✓ | |
| Домен с многими сложными состояниями | ✓ | |
| CRUD с 5 полями | ✓ | |
| Команда без опыта в ES | ✓ | |
| Прототип / MVP | ✓ | |
| Read-heavy без сложных инвариантов | ✓ (обычный CRUD + CDC для audit) |
Правило большого пальца: если бизнес действительно спрашивает «покажи мне историю изменений» / «когда именно это случилось» / «что было бы, если бы» — задумайтесь про Event Sourcing. Если нет — обычный CRUD с audit-таблицей достаточен.
В твоём же коде
Event Sourcing для анонимизированного микросервиса — избыточен. Домен простой, история изменений не самоценна, aggregate практически нет.
Если бы сервис вырос до Order Management с богатой историей (approve → hold → release → priceReview → …), ES стал бы вариантом. Пока — обычный persistence достаточен.
Что стоит взять из ES без принятия его целиком:
- Domain events как объекты первого класса (уже частично есть).
- Outbox для доставки — по сути маленький event log.
- Идея «событие описывает что произошло» вместо «UPDATE this field».
Дальнейшее чтение
- Greg Young. Event Sourcing. Доклад «8 lines of code». Ясное объяснение.
- Martin Fowler. Event Sourcing. 2005. Ранняя формулировка.
- Vaughn Vernon. Implementing Domain-Driven Design. Chapter 8-A — Event Sourcing.
- Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — event logs, streams, replay.
- Greg Young. Versioning in an Event Sourced System. Compact e-book на leanpub — как жить с versioning.
- EventStoreDB. Специализированная БД для event sourcing.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Event Sourcing — это...
Q2. Что даёт Event Sourcing?
Q3. Snapshot в Event Sourcing — это...
Q4. Как правильно обращаться с versioning событий?
Q5. Когда Event Sourcing НЕ стоит применять?