Read models и projections
Read side в CQRS: денормализованные модели, проекции, eventual consistency. Materialized views как простой старт.
TL;DR
- Read model не обязан соответствовать write model — оптимизирован под запрос.
- Projections — подписчики на события, строят и обновляют собственный view.
- Materialized views в БД — простой способ начать без полного CQRS.
- Eventual consistency нужно явно проговаривать с бизнесом.
Что такое Read Model
Структура данных, оптимизированная под конкретный запрос UI или API. Обычно денормализованная. Может жить в отдельной таблице, view, БД или индексе.
Обычная нормализованная схема:
orders (id, customer_id, total, status, placed_at)
order_lines (id, order_id, product_id, quantity, unit_price)
customers (id, name, email)
products (id, name, category)
Запрос «покажи мне все заказы с деталями» — 3 JOIN. Один заказ на 20 items — 20+ строк на выход. UI получает данные, склеивает в JavaScript. При росте нагрузки — падает.
Read model для того же:
order_summary (
order_id PRIMARY KEY,
customer_name TEXT, -- уже присоединено
total NUMERIC,
status TEXT,
items_count INTEGER, -- уже посчитано
first_product TEXT, -- для превью
placed_at TIMESTAMPTZ
)
Один SELECT без JOIN. Данные в готовом виде.
Проекции обновляют read models
Процесс, слушающий события write model и обновляющий read model. Однонаправленное преобразование.
class OrderSummaryProjection:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def on_order_placed(self, event: OrderPlacedEvent) -> None:
# UPSERT — идемпотентно
stmt = insert(OrderSummaryRow).values(
order_id=event.order_id,
customer_name=event.customer_name,
total=event.total,
status='placed',
items_count=len(event.items),
first_product=event.items[0].product_name if event.items else None,
placed_at=event.occurred_at,
).on_conflict_do_nothing(index_elements=['order_id'])
await self._session.execute(stmt)
async def on_order_confirmed(self, event: OrderConfirmedEvent) -> None:
await self._session.execute(
update(OrderSummaryRow)
.where(OrderSummaryRow.order_id == event.order_id)
.values(status='confirmed')
)
Ключевые свойства:
- Идемпотентная —
INSERT ON CONFLICT DO NOTHINGилиUPDATE. - Однонаправленная — только слушает и обновляет. Никакой back-pressure.
- Инкапсулированная — одна проекция знает про одну read model.
Одна модель ≠ одна проекция
Одна read model — одна структура для одного вида запросов.
Одна проекция — один процесс, обновляющий read model.
Часто путают. Возможные вариации:
Один read model + один projection. Стандартный случай.
Один read model, несколько projection’ов. Разные события влияют — OrderProjection слушает OrderPlaced, PaymentProjection слушает PaymentReceived, обе пишут в order_summary.
Один projection, несколько read models. OrderReceived обновляет и orders_by_customer, и orders_analytics, и orders_search_index.
Гибкость в дизайне — одна из выгод CQRS.
Materialized Views как простой старт
Не всегда нужно писать проекции. Postgres умеет materialized views — представление, которое можно refresh на запрос:
CREATE MATERIALIZED VIEW order_summary AS
SELECT
o.id AS order_id,
c.name AS customer_name,
o.total,
o.status,
COUNT(l.id) AS items_count,
o.placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_lines l ON l.order_id = o.id
GROUP BY o.id, c.name;
CREATE UNIQUE INDEX ON order_summary (order_id);
Query — быстрое чтение:
SELECT * FROM order_summary WHERE customer_name ILIKE 'John%';
Обновление:
REFRESH MATERIALIZED VIEW CONCURRENTLY order_summary;
CONCURRENTLY — без блокировки чтений. Кроном — раз в минуту / час / день, зависит от нужной свежести.
Плюсы materialized views:
- Простой старт: SQL, без event handler’ов.
- Легко понять и отладить.
- Атомарность refresh.
Минусы:
- Latency зависит от частоты refresh.
- Refresh пересчитывает всё (без инкремента).
- Не подходит для больших таблиц с частыми изменениями.
Для многих команд material views — правильный первый шаг к CQRS. Если приросли, разросся domain — тогда переход на проекции с events.
Хранение read models
Read model не обязан жить в основной БД. Варианты:
Та же БД
orders (write)
order_summary (read model, обновляется проекцией)
Плюсы: транзакции возможны, atomic updates. Минусы: read + write competing за I/O.
Отдельная OLAP-БД
orders (Postgres OLTP) → CDC → orders_analytics (ClickHouse OLAP).
Read heavy analytics не мешают write path.
Search-индекс
orders (Postgres) → проекция → orders (Elasticsearch).
Full-text search, faceted queries, аггрегации.
Кэш
orders (Postgres) → проекция → order:{id} (Redis).
Микросекунды на чтение единичной записи.
Комбо
orders (Postgres) → события → 4 разных read models в 4 разных хранилищах, каждый оптимизирован под свой use case.
Eventual Consistency: явный контракт
Async projections всегда работают с delay. Между OrderPlaced write и обновлением read model — миллисекунды или секунды.
Практическая проблема:
User: [click "Place Order"]
↓ POST /orders → 201 Created
↓ redirect to /my-orders
↓ GET /my-orders → order NOT FOUND
Пользователь только что оформил заказ — не видит его. Технически корректно (eventual). Практически — жалоба в support.
Решения:
Sync проекция для критичных данных. Обновлять order_summary в той же транзакции, что и orders. Простой sync CQRS.
Read-your-writes через session. Клиент помнит order_id, читает его напрямую. Fallback на read model.
Optimistic UI. На клиенте показываем «заказ #новый» сразу после 201. Проверяем через N секунд, что действительно есть.
Долго ждать. Показать «обработка… 1-2 секунды». Явно контракт «немного подождите».
Выбор — бизнес-решение. Технически возможно всё.
Как правильно: полный поток
Write path:
class PlaceOrderHandler:
async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
async with self._uow:
order = Order.place(cmd)
await self._orders.save(order)
for event in order.pull_events():
await self._outbox.append(event)
return order.id
Проекция:
class OrderSummaryProjection:
async def on_order_placed(self, event: OrderPlacedEvent) -> None:
summary = OrderSummaryRow(
order_id=event.order_id,
total=event.total,
status='placed',
placed_at=event.occurred_at,
)
await self._session.merge(summary) # UPSERT
Query:
class ListOrdersHandler:
async def execute(self, query: ListOrdersQuery) -> list[OrderSummary]:
result = await self._session.execute(
select(OrderSummaryRow)
.where(OrderSummaryRow.customer_id == query.customer_id)
.order_by(OrderSummaryRow.placed_at.desc())
.limit(query.limit)
)
return [_to_dto(row) for row in result.scalars()]
Query — прямая работа с read model. Никакого aggregate, никакого repository для orders — только read table.
Как не надо
1. Query через reconstruction aggregate
async def list_orders_over_5k():
all_ids = await orders.list_all_ids()
result = []
for order_id in all_ids:
order = await orders.find_by_id(order_id) # !!
if order.total > 5000:
result.append(order.to_dto())
return result
Собираем aggregate ради простой проверки суммы. Медленно, дорого, нелепо.
Правильно: query — SELECT ... WHERE total > 5000 из read model.
2. Write через read model
async def confirm_order(order_id):
await session.execute(
update(OrderSummaryRow).values(status='confirmed').where(...) # !!
)
Обновление минуя aggregate и его инварианты. Order.confirm() не вызван, событие не сгенерировано, side effects не запущены.
Правильно: команда → aggregate → save → проекция обновит read model.
3. Read model = зеркало write model
orders (write) → orders_summary с ТЕМ ЖЕ набором полей # !!
Проекция не даёт выгоды. Просто дублируем данные.
Правильно: read model денормализован, оптимизирован под запрос. Если совпадает с write — вопрос: а нужен ли отдельный read?
4. Не идемпотентная проекция
async def on_order_placed(event):
self._session.add(OrderSummaryRow(order_id=event.order_id, ...)) # duplicate key при retry
Retry события → IntegrityError → projection остановлена.
Правильно: merge / INSERT ... ON CONFLICT DO NOTHING/UPDATE.
5. Проекция с бизнес-логикой
async def on_order_placed(event):
# !! логика подсчёта премии — не задача проекции
if event.customer.is_premium and event.total > 10000:
loyalty_points = event.total * 0.05
else:
loyalty_points = event.total * 0.01
await self._session.merge(OrderSummaryRow(..., loyalty_points=loyalty_points))
Проекция копирует данные, не решает бизнес-задачи. loyalty_points — это домен, живёт в Customer или отдельном use case.
Правильно: проекция читает готовые данные из события. Бизнес-логика происходит в write path.
Trade-offs
| Ситуация | Проекция + read model | Materialized view | Прямой JOIN на write model |
|---|---|---|---|
| Read-heavy, редкие writes | ✓ | ✓ | |
| Строгая consistency нужна | (sync проекция) | ✓ | |
| Разные read shapes | ✓ | ||
| Prototype | ✓ | ✓ | |
| Отдельная OLAP-нагрузка | ✓ (другая БД) | ||
| Fulltext search / faceted queries | ✓ (Elasticsearch) |
В твоём же коде
В анонимизированном микросервисе — read models не нужны, сервис ничего не отдаёт наружу через API. Но принцип виден:
- Write: RabbitMQ →
OrderStandardizeUseCase→ PostgreSQL. - Read (гипотетически): REST-эндпоинт
GET /items/{id}/standardizationмог бы работать через прямой SQL изtenders_position_attributes. Никакого aggregate.
Уже сейчас имеет смысл разделение по типам handler’ов: use case для обработки сообщений (write path), отдельный handler для чтения (query path).
Дальнейшее чтение
- Greg Young. CQRS Documents. Секции про read models и projections.
- Martin Fowler. CommandQuerySeparation. Основа идеи.
- Vaughn Vernon. Implementing Domain-Driven Design. Chapter 4 — Architecture. CQRS + Read models.
- Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — materialized views и stream processing.
- Postgres Docs. Materialized Views. Референс для простой реализации.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Read model должен быть...
Q2. Materialized view в Postgres — это...
Q3. Свойства правильной проекции:
Q4. Проблема "оформил заказ → не вижу в списке" — это...
Q5. Что не должна делать проекция?