Уровень 6 · Продвинутые Глава 21 11 мин

Event Sourcing

Хранить события, не состояние. Rebuild aggregate on demand. Versioning событий, snapshots. Когда оправдан, когда — overkill.

TL;DR

  • Event Sourcing — состояние выводится из последовательности событий, не хранится напрямую.
  • Плюсы: полная история, audit trail, time travel, replay для новых read models.
  • Минусы: сложность, versioning событий, projections lag.
  • Оправдан для доменов с ценной историей (finance, insurance). Не для CRUD.

В чём радикальная идея

В обычном CRUD-сервисе БД хранит текущее состояние:

orders
| id | customer_id | total | status    | placed_at |
| 42 | 7           | 5000  | confirmed | 12:34     |

Event Sourcing переворачивает это:

event_store
| id | aggregate_id | event_type      | payload                              | occurred_at |
| 1  | order:42     | OrderPlaced     | {customer_id:7, items:[...]}         | 12:00       |
| 2  | order:42     | OrderConfirmed  | {}                                   | 12:10       |
| 3  | order:42     | ShippingUpdated | {tracking:"XYZ"}                     | 14:20       |

Состояние Order не хранится. Оно выводится из истории событий: Order = apply(OrderPlaced, OrderConfirmed, ShippingUpdated).

Определение Event Sourcing (источник — события)

Персистентность через хранение всех изменений домена как последовательности иммутабельных событий. Текущее состояние — производное от истории.

Что это даёт

    Полная история. Никаких updated_at без указания «что именно изменилось». Каждое изменение — событие с причиной и данными.

    Audit trail из коробки. Ответ на вопросы «кто, когда, почему» — прямое чтение event log.

    Time travel. Что было в 15:00 вчера? — apply события до этой отметки → получим состояние.

    Rebuild проекций. Появилась новая аналитика — прочитали все события с начала, построили новую read model. Без миграций, без backfill.

    Estimating counterfactuals. «Что было бы, если бы клиент не отменил заказ?» — можно воспроизвести альтернативную историю.

Что это стоит

    Сложность разработки. Каждое изменение — событие. Каждое событие нужно версионировать. Aggregate реконструируется из истории.

    Query — не через SQL. «Найти заказы на 5000 ₽» — нельзя SELECT * FROM orders WHERE total > 5000. Нужны отдельные read models (проекции) — обычно через CQRS.

    Migration событий = кошмар. Изменили формат OrderPlaced v1 → v2. Что делать со старыми? Upcast, dual-read, миграция event log.

    Eventual consistency между write и read. Наследие CQRS.

    Дебаг новичкам сложнее. «Почему у клиента баланс 100, а не 200?» — надо читать историю событий, а не смотреть на строку в БД.

Как правильно: базовая реализация

Aggregate

@dataclass
class Order:
    id: OrderId
    customer_id: CustomerId | None = None
    items: list[OrderItem] = field(default_factory=list)
    status: OrderStatus = OrderStatus.INITIAL
    version: int = 0
    _pending_events: list[DomainEvent] = field(default_factory=list)

    @classmethod
    def place(cls, order_id, customer_id, items) -> 'Order':
        order = cls(id=order_id)
        order._apply(OrderPlaced(order_id, customer_id, items))
        return order

    def confirm(self) -> None:
        if self.status is not OrderStatus.PLACED:
            raise DomainError(f'cannot confirm order in state {self.status}')
        self._apply(OrderConfirmed(self.id))

    def _apply(self, event: DomainEvent) -> None:                     
        """Мутирует состояние + сохраняет событие."""
        self._mutate(event)
        self._pending_events.append(event)
        self.version += 1

    def _mutate(self, event: DomainEvent) -> None:                    
        """Функциональная логика: как event изменяет состояние."""
        match event:
            case OrderPlaced(order_id=oid, customer_id=cid, items=its):
                self.id = oid
                self.customer_id = cid
                self.items = list(its)
                self.status = OrderStatus.PLACED
            case OrderConfirmed():
                self.status = OrderStatus.CONFIRMED

    @classmethod
    def from_history(cls, events: list[DomainEvent]) -> 'Order':      
        """Реконструкция из истории."""
        order = cls(id=OrderId(0))  # placeholder
        for event in events:
            order._mutate(event)
            order.version += 1
        return order

    def pull_events(self) -> list[DomainEvent]:
        events, self._pending_events = self._pending_events, []
        return events

Ключевые концепции:

  • _apply — вызывается при бизнес-операции: мутация + сохранение события.
  • _mutate — чистая логика применения события. Используется и при новых операциях, и при реконструкции.
  • from_history — восстанавливает aggregate из последовательности событий.

Event Store

class EventStorePort(Protocol):
    async def load(self, aggregate_id: str) -> list[DomainEvent]: ...

    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None: ...

expected_versionoptimistic concurrency. Пишем: «я думал, что версия была 5». Если сейчас в store уже 6 — конфликт, повторяем операцию.

Repository

class EventSourcedOrderRepository:
    def __init__(self, store: EventStorePort) -> None:
        self._store = store

    async def find_by_id(self, order_id: OrderId) -> Order | None:
        events = await self._store.load(f'order:{order_id}')
        return Order.from_history(events) if events else None

    async def save(self, order: Order) -> None:
        events = order.pull_events()
        if not events:
            return
        await self._store.append(
            aggregate_id=f'order:{order.id}',
            events=events,
            expected_version=order.version - len(events),
        )

Repository не хранит текущее состояние. Только читает события и сохраняет новые.

Snapshots

Aggregate с историей в 10k событий реконструируется медленно. Ограничение snapshot’ами.

Определение Snapshot (снапшот)

Периодическое сохранение полного состояния aggregate вместе с версией. Реконструкция: загрузить snapshot + применить события с этой версии.

async def find_by_id(self, order_id):
    snapshot = await self._store.load_snapshot(f'order:{order_id}')
    events = await self._store.load(
        f'order:{order_id}',
        from_version=snapshot.version if snapshot else 0,
    )

    order = Order.from_snapshot(snapshot) if snapshot else Order(id=order_id)
    for event in events:
        order._mutate(event)
        order.version += 1
    return order

Стратегии:

  • Каждые N событий.
  • При росте latency чтения выше порога.
  • По времени (раз в день).

Snapshot — оптимизация, а не архитектура. Всё равно события — единственный источник истины.

Versioning событий

Схема OrderPlaced v1 = {order_id, customer_id, items}. Через полгода нужно добавить promo_code. Что делать со старыми событиями?

Три подхода:

1. Upcasting

При загрузке из store — конвертируем старые версии в новую в памяти.

class OrderPlacedUpcaster:
    def upcast(self, raw: dict) -> OrderPlaced:
        if raw.get('_version', 1) == 1:
            raw['promo_code'] = None  # default для v1
        return OrderPlaced(**raw)

Плюс: event log не переписывается. Минус: код накапливает upcasters (v1 → v2 → v3 → v4).

2. Weak schema

События — dict’ы. Отсутствующие поля читаются с default.

Плюс: гибко. Минус: type safety теряется.

3. Migration event log

Один раз пробегаемся по всем событиям, переписываем в новый формат.

Плюс: чистый event log. Минус: миграция сложная, риск ошибок, не для больших объёмов.

Vernon и Young рекомендуют upcasting — единственный подход, который работает для больших live-систем.

CQRS + Event Sourcing

Event Sourcing даёт события. CQRS даёт разделение read/write. Часто вместе:

Command ──▶ Aggregate ──▶ Events ──▶ Event Store


                        Projections


                        Read Models ──▶ Queries
  • Write: команда → aggregate реконструируется → бизнес-операция → новые события.
  • Events → event store.
  • Отдельные проекции слушают события и строят read models.
  • Queries работают с read models.

Мощная связка. И сложная — не начинайте с неё без веских причин.

Как не надо

1. Event Sourcing для CRUD

# У нас 3 сущности, 5 endpoint'ов, простой домен
class UserCreated(DomainEvent): ...
class UserEmailUpdated(DomainEvent): ...
class UserPhoneUpdated(DomainEvent): ...
class UserDeleted(DomainEvent): ...

Для CRUD без бизнес-логики — event sourcing даёт complexity без выгоды. Обычный updated_at покроет audit нужды.

2. События с состоянием, а не изменением

class OrderState(DomainEvent):                              # !!
    order_id: OrderId
    customer_id: CustomerId
    items: list
    total: Decimal
    status: str

Каждый event — полный снимок текущего состояния. Это не event sourcing, это snapshot-oriented с overhead.

Правильно: события описывают что произошло. OrderPlaced, OrderConfirmed, PriceReduced — с минимальными delta-полями.

3. Read из event store

async def find_orders_over_5k():
    all_events = await event_store.load_all()               # !!  
    orders = {}
    for event in all_events:
        # реконструируем каждый Order
        ...
    return [o for o in orders.values() if o.total > 5000]

Читать все события для запроса — dead-slow. Event store не для queries.

Правильно: CQRS. Проекция строит read model orders_summary, query SELECT * FROM orders_summary WHERE total > 5000.

4. Хранение снапшотов вместо событий

async def save(order):
    await event_store.replace(f'order:{order.id}', order.to_dict())    # !!  

Перезаписываем состояние — но это уже не event sourcing. Просто key-value store.

5. Забыли про versioning с самого начала

# v1 (год назад)
class OrderPlaced:
    order_id: str
    customer_id: str

# v2 (сейчас) — как читать старые?
class OrderPlaced:
    order_id: str
    customer_id: str
    tenant_id: str        # !! новое обязательное поле  

Старые события не парсятся, aggregate не восстанавливается.

Правильно: никогда не удалять поля, все новые поля — optional с default. Upcasters для существенных изменений.

Trade-offs

СитуацияEvent Sourcing оправданОбычная персистентность
Ценная история изменений (finance, insurance, audit)
Нужен time travel / rebuild проекций
Домен с многими сложными состояниями
CRUD с 5 полями
Команда без опыта в ES
Прототип / MVP
Read-heavy без сложных инвариантов✓ (обычный CRUD + CDC для audit)

Правило большого пальца: если бизнес действительно спрашивает «покажи мне историю изменений» / «когда именно это случилось» / «что было бы, если бы» — задумайтесь про Event Sourcing. Если нет — обычный CRUD с audit-таблицей достаточен.

В твоём же коде

Event Sourcing для анонимизированного микросервиса — избыточен. Домен простой, история изменений не самоценна, aggregate практически нет.

Если бы сервис вырос до Order Management с богатой историей (approve → hold → release → priceReview → …), ES стал бы вариантом. Пока — обычный persistence достаточен.

Что стоит взять из ES без принятия его целиком:

  • Domain events как объекты первого класса (уже частично есть).
  • Outbox для доставки — по сути маленький event log.
  • Идея «событие описывает что произошло» вместо «UPDATE this field».

Дальнейшее чтение

  • Greg Young. Event Sourcing. Доклад «8 lines of code». Ясное объяснение.
  • Martin Fowler. Event Sourcing. 2005. Ранняя формулировка.
  • Vaughn Vernon. Implementing Domain-Driven Design. Chapter 8-A — Event Sourcing.
  • Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — event logs, streams, replay.
  • Greg Young. Versioning in an Event Sourced System. Compact e-book на leanpub — как жить с versioning.
  • EventStoreDB. Специализированная БД для event sourcing.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Event Sourcing — это...

  2. Q2. Что даёт Event Sourcing?

  3. Q3. Snapshot в Event Sourcing — это...

  4. Q4. Как правильно обращаться с versioning событий?

  5. Q5. Когда Event Sourcing НЕ стоит применять?