Уровень 6 · Продвинутые Глава 20 10 мин

CQRS

Command Query Responsibility Segregation. Разные модели для чтения и записи. Когда оправдан, когда — overengineering. Соотношение с Event Sourcing.

TL;DR

  • CQRS = разделение моделей чтения и записи. Не обязательно два хранилища.
  • Исток: Command-Query Separation Мейера (1988), формализовал Greg Young в 2010.
  • Оправдан при разной нагрузке или разных требованиях к формату read/write.
  • CQRS ≠ Event Sourcing. Часто вместе, но независимы.

От CQS к CQRS

Идея восходит к Бертрану Мейеру и Command-Query Separation (1988):

Every method should either be a command that performs an action, or a query that returns data to the caller, but not both.

Bertrand Meyer Object-Oriented Software Construction, 1988

Метод либо меняет состояние (command, возвращает void), либо возвращает данные без побочных эффектов (query). Не одновременно.

CQRS — тот же принцип, но на уровне всей архитектуры:

Определение CQRS (Command Query Responsibility Segregation) (разделение ответственностей команд и запросов)

Разделение модели записи (write model) и модели чтения (read model). Разные объекты, разные API, потенциально разные хранилища.

Greg Young формализовал в 2010: CQRS Documents.

Что даёт разделение

Классический CRUD: одна модель User используется и для API POST /users, и для GET /users, и для GET /users?filter=active. Одна форма данных, один Repository, одна БД.

Проблемы:

  • Write model оптимизирована для инвариантов (aggregate, normalized).
  • Read model нужны денормализованные view для UI (join двух-трёх таблиц каждый раз).
  • Одна модель = компромисс, никому не удобно.

CQRS:

                   ┌──────────────────────┐
   POST /order ───▶│  Command Handler     │
                   │  (Order aggregate)   │
                   │  → writes to DB      │
                   └──────────┬───────────┘
                              │ event

                   ┌──────────────────────┐
                   │  Projection updater  │
                   │  denormalizes to     │
                   │  read model          │
                   └──────────┬───────────┘

   GET /orders ◀──────────────┘
   (read denormalized)

Write и read — разные модели, разные API, часто разные хранилища.

Как правильно: базовый CQRS

Команды и запросы — разные объекты:

@dataclass(frozen=True)
class PlaceOrderCommand:
    customer_id: CustomerId
    items: list[OrderItemInput]

class PlaceOrderHandler:
    def __init__(self, orders: OrderRepositoryPort, uow: UnitOfWork) -> None:
        self._orders = orders
        self._uow = uow

    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._uow:
            order = Order.place(cmd)
            await self._orders.save(order)
            return order.id
@dataclass(frozen=True)
class ListOrdersForCustomerQuery:
    customer_id: CustomerId
    limit: int = 20

@dataclass(frozen=True)
class OrderSummary:
    """Read model: денормализованная view для UI."""
    id: str
    total: str
    status: str
    items_count: int
    customer_name: str
    placed_at: str

class ListOrdersForCustomerHandler:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def execute(self, query: ListOrdersForCustomerQuery) -> list[OrderSummary]:
        # Прямой SQL, без aggregate — read model
        result = await self._session.execute(
            text("""
                SELECT
                    o.id, o.total, o.status,
                    COUNT(l.id) AS items_count,
                    c.name AS customer_name,
                    o.placed_at
                FROM orders o
                JOIN customers c ON c.id = o.customer_id
                LEFT JOIN order_lines l ON l.order_id = o.id
                WHERE o.customer_id = :customer_id
                GROUP BY o.id, c.name
                ORDER BY o.placed_at DESC
                LIMIT :limit
            """),
            {'customer_id': query.customer_id, 'limit': query.limit},
        )
        return [OrderSummary(**row._asdict()) for row in result]

Ключевые различия:

  • Command handler работает с aggregate, репозиторием, доменными правилами.
  • Query handler работает с raw SQL / view / read model — никакого aggregate.

Это уже CQRS. Одна БД, разные модели.

Sync vs Async CQRS

Два варианта уровнем выше:

Sync CQRS (одна БД)

  • Write и read используют одну БД.
  • Read — через SQL views, materialized views, direct queries.
  • Consistency — сильная (write commit → read виден сразу).
  • Overhead — минимальный.

Sync CQRS — самая частая реализация. Достаточная для большинства проектов.

Async CQRS (разные хранилища)

  • Write — в OLTP БД (Postgres).
  • Read — в оптимизированном хранилище (Elasticsearch, Redis, отдельная denormalized таблица).
  • Синхронизация через события: write publishes OrderPlaced → projection updater обновляет read model.
  • Consistency — eventual.
┌────────────────┐     ┌────────────────┐
│  Postgres      │     │  Elasticsearch │
│  (write)       │     │  (read)        │
└────────┬───────┘     └────────▲───────┘
         │                      │
         │ event                │ projection
         └──────────▶ Broker ───┘

Async CQRS — для сценариев с сильной асимметрией нагрузки (много reads на 1 write) или разными форматами.

Проекция (Projection)

Определение Projection (проекция)

Компонент, который слушает события и обновляет read model. Отвечает за преобразование write-модели в read-модель.

class OrderSummaryProjection:
    """Обновляет order_summary при изменениях Order."""

    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def on_order_placed(self, event: OrderPlacedEvent) -> None:
        summary = OrderSummaryRow(
            id=event.order_id,
            customer_id=event.customer_id,
            total=event.total,
            status='placed',
            items_count=len(event.items),
            placed_at=event.occurred_at,
        )
        await self._session.merge(summary)

    async def on_order_confirmed(self, event: OrderConfirmedEvent) -> None:
        await self._session.execute(
            update(OrderSummaryRow)
            .where(OrderSummaryRow.id == event.order_id)
            .values(status='confirmed', confirmed_at=event.occurred_at)
        )

    async def on_order_cancelled(self, event: OrderCancelledEvent) -> None:
        await self._session.execute(
            update(OrderSummaryRow)
            .where(OrderSummaryRow.id == event.order_id)
            .values(status='cancelled', cancel_reason=event.reason)
        )

Одна проекция — одна read model. При добавлении новой view — новая проекция. Read models дешёвые: можно строить много, любого формата.

Идемпотентность проекций

Проекция обрабатывает event OrderPlaced дважды (retry, replay). Что произойдёт?

Если проекция использует INSERT — второй раз бросит duplicate key. Если UPSERT (merge, ON CONFLICT DO UPDATE) — просто перезапишет теми же данными.

Правило: проекции всегда идемпотентны. Обычно через UPSERT или позиционный маркер (последний обработанный event ID).

Rebuild projections

Одна из мощнейших фич CQRS: можно перестроить read model с нуля.

Сценарий: новая колонка в read model, нужно заполнить историческими данными.

class RebuildOrderSummaryProjection:
    async def run(self) -> None:
        # 1. Очищаем read model
        await self._session.execute(delete(OrderSummaryRow))

        # 2. Читаем ВСЕ события с начала (event log из Kafka или outbox)
        async for event in self._events.read_from(topic='orders', from_beginning=True):
            handler = self._get_handler(type(event))
            if handler:
                await handler(event)

        await self._session.commit()

Работает, если события — единственный источник истины (event log). Даёт возможность полностью пересобирать read models — исправлять баги, добавлять новые фичи ретроспективно.

Здесь CQRS начинает пересекаться с Event Sourcing — про это следующая глава.

Как не надо

1. CQRS для CRUD

# API из 5 endpoint'ов, простой CRUD
class CreateUserCommand: ...
class UpdateUserCommand: ...
class DeleteUserCommand: ...
class ListUsersQuery: ...
class GetUserByIdQuery: ...

class CreateUserHandler: ...
class UpdateUserHandler: ...
# 5 handler'ов, 5 команд, 5 query — вместо простой CRUD-реализации

CQRS overengineered для базового CRUD. Разделение не приносит выгоды, а стоимость boilerplate — реальная.

Правильно: обычный REST + Repository для простых случаев. CQRS — когда есть реальная асимметрия между read и write.

2. Async CQRS без явного контракта consistency

async def place_order(cmd):
    await command_bus.send(PlaceOrderCommand(...))
    # клиент получает 200 OK
    # но проекция ещё не обновилась
    orders = await query_bus.send(ListOrdersQuery(...))  # заказа нет!   

Клиент видит «заказ создан», сразу читает список — заказа нет. Замешательство.

Правильно: либо синхронное обновление проекции внутри той же транзакции, либо явное «на клиенте показываем оптимистично».

3. Command handler возвращает read data

class PlaceOrderCommand:
    ...

class PlaceOrderHandler:
    async def execute(self, cmd) -> OrderDetailsDTO:                 # !!  
        order = Order.place(cmd)
        await self._orders.save(order)
        return OrderDetailsDTO.from_order(order)  # !! command возвращает read data   

Command handler возвращает view. Смешение — за command’ой должна идти отдельная query.

Правильно: command возвращает только OrderId (или void). Read — отдельным вызовом.

Компромисс: если UI требует данные сразу — вернуть минимум (id, чтобы клиент мог сделать read). Или использовать Task-based UI: команды пишут, показываем «выполнено», отдельно refresh view.

4. Одна БД, но overengineering в разделении

class UserCommandHandler:
    def __init__(self, write_db: WriteDB): ...

class UserQueryHandler:
    def __init__(self, read_db: ReadDB): ...    # !! одна БД, но два wrapper'а  

Абстракция «write DB / read DB», хотя фактически один Postgres. Overengineering.

Правильно: разделяйте когда БД реально разные. Одна БД → один AsyncSession, разные handler’ы.

5. Проекция без идемпотентности

async def on_order_placed(event):
    self._session.add(OrderSummaryRow(id=event.order_id, ...))    # !! duplicate key на retry  
    await self._session.commit()

Retry → IntegrityError → проекция остановилась.

Правильно: merge или ON CONFLICT DO UPDATE.

Trade-offs

СитуацияCQRS оправданОбычный CRUD
Разная нагрузка read vs write (100:1)
Разные форматы (write nested, read flat)
Много разных view над одними данными
Простой CRUD с одним view
Прототип
Небольшая команда, простой домен✓ (Layered)

Между «полный CRUD» и «полный async CQRS» — множество промежуточных решений. Начинайте с sync CQRS (одна БД, разные handler’ы) — этого хватает 90% случаев.

В твоём же коде

Анонимизированный микросервис не использует CQRS явно, но фактически близок:

  • Write path: RabbitMQ → use case → PostgreSQL.
  • Read path — нет. Сервис ничего не отдаёт наружу, только пишет.

Если бы появился REST-эндпоинт GET /items/{id}/standardization, разумно было бы:

  • Command side: оставить как есть.
  • Query side: отдельный handler с прямым SQL, без вовлечения Item aggregate.

Полноценный async CQRS с отдельным read store здесь избыточен — нет ассиметрии нагрузки.

Дальнейшее чтение

  • Greg Young. CQRS Documents. 2010. Первоисточник.
  • Bertrand Meyer. Object-Oriented Software Construction (2nd ed). 1997. Command-Query Separation.
  • Martin Fowler. CQRS. Компактное определение с трезвыми оговорками.
  • Vaughn Vernon. Implementing Domain-Driven Design. Chapter 4. CQRS в связке с DDD.
  • cqrs.wordpress.com/documents/task-based-ui/. Task-based UI как естественный consumer CQRS.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. CQRS — это в первую очередь про...

  2. Q2. Основной корень идеи CQRS — это...

  3. Q3. Когда async CQRS с отдельным read store оправдан?

  4. Q4. Свойства проекции (projection):

  5. Q5. Command handler правильно возвращает...