CQRS
Command Query Responsibility Segregation. Разные модели для чтения и записи. Когда оправдан, когда — overengineering. Соотношение с Event Sourcing.
TL;DR
- CQRS = разделение моделей чтения и записи. Не обязательно два хранилища.
- Исток: Command-Query Separation Мейера (1988), формализовал Greg Young в 2010.
- Оправдан при разной нагрузке или разных требованиях к формату read/write.
- CQRS ≠ Event Sourcing. Часто вместе, но независимы.
От CQS к CQRS
Идея восходит к Бертрану Мейеру и Command-Query Separation (1988):
Every method should either be a command that performs an action, or a query that returns data to the caller, but not both.
Метод либо меняет состояние (command, возвращает void), либо возвращает данные без побочных эффектов (query). Не одновременно.
CQRS — тот же принцип, но на уровне всей архитектуры:
Разделение модели записи (write model) и модели чтения (read model). Разные объекты, разные API, потенциально разные хранилища.
Greg Young формализовал в 2010: CQRS Documents.
Что даёт разделение
Классический CRUD: одна модель User используется и для API POST /users, и для GET /users, и для GET /users?filter=active. Одна форма данных, один Repository, одна БД.
Проблемы:
- Write model оптимизирована для инвариантов (aggregate, normalized).
- Read model нужны денормализованные view для UI (join двух-трёх таблиц каждый раз).
- Одна модель = компромисс, никому не удобно.
CQRS:
┌──────────────────────┐
POST /order ───▶│ Command Handler │
│ (Order aggregate) │
│ → writes to DB │
└──────────┬───────────┘
│ event
▼
┌──────────────────────┐
│ Projection updater │
│ denormalizes to │
│ read model │
└──────────┬───────────┘
│
GET /orders ◀──────────────┘
(read denormalized)
Write и read — разные модели, разные API, часто разные хранилища.
Как правильно: базовый CQRS
Команды и запросы — разные объекты:
@dataclass(frozen=True)
class PlaceOrderCommand:
customer_id: CustomerId
items: list[OrderItemInput]
class PlaceOrderHandler:
def __init__(self, orders: OrderRepositoryPort, uow: UnitOfWork) -> None:
self._orders = orders
self._uow = uow
async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
async with self._uow:
order = Order.place(cmd)
await self._orders.save(order)
return order.id
@dataclass(frozen=True)
class ListOrdersForCustomerQuery:
customer_id: CustomerId
limit: int = 20
@dataclass(frozen=True)
class OrderSummary:
"""Read model: денормализованная view для UI."""
id: str
total: str
status: str
items_count: int
customer_name: str
placed_at: str
class ListOrdersForCustomerHandler:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def execute(self, query: ListOrdersForCustomerQuery) -> list[OrderSummary]:
# Прямой SQL, без aggregate — read model
result = await self._session.execute(
text("""
SELECT
o.id, o.total, o.status,
COUNT(l.id) AS items_count,
c.name AS customer_name,
o.placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_lines l ON l.order_id = o.id
WHERE o.customer_id = :customer_id
GROUP BY o.id, c.name
ORDER BY o.placed_at DESC
LIMIT :limit
"""),
{'customer_id': query.customer_id, 'limit': query.limit},
)
return [OrderSummary(**row._asdict()) for row in result]
Ключевые различия:
- Command handler работает с aggregate, репозиторием, доменными правилами.
- Query handler работает с raw SQL / view / read model — никакого aggregate.
Это уже CQRS. Одна БД, разные модели.
Sync vs Async CQRS
Два варианта уровнем выше:
Sync CQRS (одна БД)
- Write и read используют одну БД.
- Read — через SQL views, materialized views, direct queries.
- Consistency — сильная (write commit → read виден сразу).
- Overhead — минимальный.
Sync CQRS — самая частая реализация. Достаточная для большинства проектов.
Async CQRS (разные хранилища)
- Write — в OLTP БД (Postgres).
- Read — в оптимизированном хранилище (Elasticsearch, Redis, отдельная denormalized таблица).
- Синхронизация через события: write publishes
OrderPlaced→ projection updater обновляет read model. - Consistency — eventual.
┌────────────────┐ ┌────────────────┐
│ Postgres │ │ Elasticsearch │
│ (write) │ │ (read) │
└────────┬───────┘ └────────▲───────┘
│ │
│ event │ projection
└──────────▶ Broker ───┘
Async CQRS — для сценариев с сильной асимметрией нагрузки (много reads на 1 write) или разными форматами.
Проекция (Projection)
Компонент, который слушает события и обновляет read model. Отвечает за преобразование write-модели в read-модель.
class OrderSummaryProjection:
"""Обновляет order_summary при изменениях Order."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def on_order_placed(self, event: OrderPlacedEvent) -> None:
summary = OrderSummaryRow(
id=event.order_id,
customer_id=event.customer_id,
total=event.total,
status='placed',
items_count=len(event.items),
placed_at=event.occurred_at,
)
await self._session.merge(summary)
async def on_order_confirmed(self, event: OrderConfirmedEvent) -> None:
await self._session.execute(
update(OrderSummaryRow)
.where(OrderSummaryRow.id == event.order_id)
.values(status='confirmed', confirmed_at=event.occurred_at)
)
async def on_order_cancelled(self, event: OrderCancelledEvent) -> None:
await self._session.execute(
update(OrderSummaryRow)
.where(OrderSummaryRow.id == event.order_id)
.values(status='cancelled', cancel_reason=event.reason)
)
Одна проекция — одна read model. При добавлении новой view — новая проекция. Read models дешёвые: можно строить много, любого формата.
Идемпотентность проекций
Проекция обрабатывает event OrderPlaced дважды (retry, replay). Что произойдёт?
Если проекция использует INSERT — второй раз бросит duplicate key. Если UPSERT (merge, ON CONFLICT DO UPDATE) — просто перезапишет теми же данными.
Правило: проекции всегда идемпотентны. Обычно через UPSERT или позиционный маркер (последний обработанный event ID).
Rebuild projections
Одна из мощнейших фич CQRS: можно перестроить read model с нуля.
Сценарий: новая колонка в read model, нужно заполнить историческими данными.
class RebuildOrderSummaryProjection:
async def run(self) -> None:
# 1. Очищаем read model
await self._session.execute(delete(OrderSummaryRow))
# 2. Читаем ВСЕ события с начала (event log из Kafka или outbox)
async for event in self._events.read_from(topic='orders', from_beginning=True):
handler = self._get_handler(type(event))
if handler:
await handler(event)
await self._session.commit()
Работает, если события — единственный источник истины (event log). Даёт возможность полностью пересобирать read models — исправлять баги, добавлять новые фичи ретроспективно.
Здесь CQRS начинает пересекаться с Event Sourcing — про это следующая глава.
Как не надо
1. CQRS для CRUD
# API из 5 endpoint'ов, простой CRUD
class CreateUserCommand: ...
class UpdateUserCommand: ...
class DeleteUserCommand: ...
class ListUsersQuery: ...
class GetUserByIdQuery: ...
class CreateUserHandler: ...
class UpdateUserHandler: ...
# 5 handler'ов, 5 команд, 5 query — вместо простой CRUD-реализации
CQRS overengineered для базового CRUD. Разделение не приносит выгоды, а стоимость boilerplate — реальная.
Правильно: обычный REST + Repository для простых случаев. CQRS — когда есть реальная асимметрия между read и write.
2. Async CQRS без явного контракта consistency
async def place_order(cmd):
await command_bus.send(PlaceOrderCommand(...))
# клиент получает 200 OK
# но проекция ещё не обновилась
orders = await query_bus.send(ListOrdersQuery(...)) # заказа нет!
Клиент видит «заказ создан», сразу читает список — заказа нет. Замешательство.
Правильно: либо синхронное обновление проекции внутри той же транзакции, либо явное «на клиенте показываем оптимистично».
3. Command handler возвращает read data
class PlaceOrderCommand:
...
class PlaceOrderHandler:
async def execute(self, cmd) -> OrderDetailsDTO: # !!
order = Order.place(cmd)
await self._orders.save(order)
return OrderDetailsDTO.from_order(order) # !! command возвращает read data
Command handler возвращает view. Смешение — за command’ой должна идти отдельная query.
Правильно: command возвращает только OrderId (или void). Read — отдельным вызовом.
Компромисс: если UI требует данные сразу — вернуть минимум (id, чтобы клиент мог сделать read). Или использовать Task-based UI: команды пишут, показываем «выполнено», отдельно refresh view.
4. Одна БД, но overengineering в разделении
class UserCommandHandler:
def __init__(self, write_db: WriteDB): ...
class UserQueryHandler:
def __init__(self, read_db: ReadDB): ... # !! одна БД, но два wrapper'а
Абстракция «write DB / read DB», хотя фактически один Postgres. Overengineering.
Правильно: разделяйте когда БД реально разные. Одна БД → один AsyncSession, разные handler’ы.
5. Проекция без идемпотентности
async def on_order_placed(event):
self._session.add(OrderSummaryRow(id=event.order_id, ...)) # !! duplicate key на retry
await self._session.commit()
Retry → IntegrityError → проекция остановилась.
Правильно: merge или ON CONFLICT DO UPDATE.
Trade-offs
| Ситуация | CQRS оправдан | Обычный CRUD |
|---|---|---|
| Разная нагрузка read vs write (100:1) | ✓ | |
| Разные форматы (write nested, read flat) | ✓ | |
| Много разных view над одними данными | ✓ | |
| Простой CRUD с одним view | ✓ | |
| Прототип | ✓ | |
| Небольшая команда, простой домен | ✓ (Layered) |
Между «полный CRUD» и «полный async CQRS» — множество промежуточных решений. Начинайте с sync CQRS (одна БД, разные handler’ы) — этого хватает 90% случаев.
В твоём же коде
Анонимизированный микросервис не использует CQRS явно, но фактически близок:
- Write path: RabbitMQ → use case → PostgreSQL.
- Read path — нет. Сервис ничего не отдаёт наружу, только пишет.
Если бы появился REST-эндпоинт GET /items/{id}/standardization, разумно было бы:
- Command side: оставить как есть.
- Query side: отдельный handler с прямым SQL, без вовлечения
Itemaggregate.
Полноценный async CQRS с отдельным read store здесь избыточен — нет ассиметрии нагрузки.
Дальнейшее чтение
- Greg Young. CQRS Documents. 2010. Первоисточник.
- Bertrand Meyer. Object-Oriented Software Construction (2nd ed). 1997. Command-Query Separation.
- Martin Fowler. CQRS. Компактное определение с трезвыми оговорками.
- Vaughn Vernon. Implementing Domain-Driven Design. Chapter 4. CQRS в связке с DDD.
- cqrs.wordpress.com/documents/task-based-ui/. Task-based UI как естественный consumer CQRS.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. CQRS — это в первую очередь про...
Q2. Основной корень идеи CQRS — это...
Q3. Когда async CQRS с отдельным read store оправдан?
Q4. Свойства проекции (projection):
Q5. Command handler правильно возвращает...