Уровень 6 · Продвинутые Глава 22 10 мин

Read models и projections

Read side в CQRS: денормализованные модели, проекции, eventual consistency. Materialized views как простой старт.

TL;DR

  • Read model не обязан соответствовать write model — оптимизирован под запрос.
  • Projections — подписчики на события, строят и обновляют собственный view.
  • Materialized views в БД — простой способ начать без полного CQRS.
  • Eventual consistency нужно явно проговаривать с бизнесом.

Что такое Read Model

Определение Read Model (модель чтения)

Структура данных, оптимизированная под конкретный запрос UI или API. Обычно денормализованная. Может жить в отдельной таблице, view, БД или индексе.

Обычная нормализованная схема:

orders          (id, customer_id, total, status, placed_at)
order_lines     (id, order_id, product_id, quantity, unit_price)
customers       (id, name, email)
products        (id, name, category)

Запрос «покажи мне все заказы с деталями» — 3 JOIN. Один заказ на 20 items — 20+ строк на выход. UI получает данные, склеивает в JavaScript. При росте нагрузки — падает.

Read model для того же:

order_summary   (
    order_id       PRIMARY KEY,
    customer_name  TEXT,        -- уже присоединено
    total          NUMERIC,
    status         TEXT,
    items_count    INTEGER,     -- уже посчитано
    first_product  TEXT,        -- для превью
    placed_at      TIMESTAMPTZ
)

Один SELECT без JOIN. Данные в готовом виде.

Проекции обновляют read models

Определение Projection (проекция)

Процесс, слушающий события write model и обновляющий read model. Однонаправленное преобразование.

class OrderSummaryProjection:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def on_order_placed(self, event: OrderPlacedEvent) -> None:
        # UPSERT — идемпотентно
        stmt = insert(OrderSummaryRow).values(
            order_id=event.order_id,
            customer_name=event.customer_name,
            total=event.total,
            status='placed',
            items_count=len(event.items),
            first_product=event.items[0].product_name if event.items else None,
            placed_at=event.occurred_at,
        ).on_conflict_do_nothing(index_elements=['order_id'])
        await self._session.execute(stmt)

    async def on_order_confirmed(self, event: OrderConfirmedEvent) -> None:
        await self._session.execute(
            update(OrderSummaryRow)
            .where(OrderSummaryRow.order_id == event.order_id)
            .values(status='confirmed')
        )

Ключевые свойства:

  • ИдемпотентнаяINSERT ON CONFLICT DO NOTHING или UPDATE.
  • Однонаправленная — только слушает и обновляет. Никакой back-pressure.
  • Инкапсулированная — одна проекция знает про одну read model.

Одна модель ≠ одна проекция

Одна read model — одна структура для одного вида запросов.

Одна проекция — один процесс, обновляющий read model.

Часто путают. Возможные вариации:

    Один read model + один projection. Стандартный случай.

    Один read model, несколько projection’ов. Разные события влияют — OrderProjection слушает OrderPlaced, PaymentProjection слушает PaymentReceived, обе пишут в order_summary.

    Один projection, несколько read models. OrderReceived обновляет и orders_by_customer, и orders_analytics, и orders_search_index.

Гибкость в дизайне — одна из выгод CQRS.

Materialized Views как простой старт

Не всегда нужно писать проекции. Postgres умеет materialized views — представление, которое можно refresh на запрос:

CREATE MATERIALIZED VIEW order_summary AS
SELECT
    o.id AS order_id,
    c.name AS customer_name,
    o.total,
    o.status,
    COUNT(l.id) AS items_count,
    o.placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_lines l ON l.order_id = o.id
GROUP BY o.id, c.name;

CREATE UNIQUE INDEX ON order_summary (order_id);

Query — быстрое чтение:

SELECT * FROM order_summary WHERE customer_name ILIKE 'John%';

Обновление:

REFRESH MATERIALIZED VIEW CONCURRENTLY order_summary;

CONCURRENTLY — без блокировки чтений. Кроном — раз в минуту / час / день, зависит от нужной свежести.

Плюсы materialized views:

  • Простой старт: SQL, без event handler’ов.
  • Легко понять и отладить.
  • Атомарность refresh.

Минусы:

  • Latency зависит от частоты refresh.
  • Refresh пересчитывает всё (без инкремента).
  • Не подходит для больших таблиц с частыми изменениями.

Для многих команд material views — правильный первый шаг к CQRS. Если приросли, разросся domain — тогда переход на проекции с events.

Хранение read models

Read model не обязан жить в основной БД. Варианты:

Та же БД

orders          (write)
order_summary   (read model, обновляется проекцией)

Плюсы: транзакции возможны, atomic updates. Минусы: read + write competing за I/O.

Отдельная OLAP-БД

orders (Postgres OLTP) → CDC → orders_analytics (ClickHouse OLAP).

Read heavy analytics не мешают write path.

Search-индекс

orders (Postgres) → проекция → orders (Elasticsearch).

Full-text search, faceted queries, аггрегации.

Кэш

orders (Postgres) → проекция → order:{id} (Redis).

Микросекунды на чтение единичной записи.

Комбо

orders (Postgres) → события → 4 разных read models в 4 разных хранилищах, каждый оптимизирован под свой use case.

Eventual Consistency: явный контракт

Async projections всегда работают с delay. Между OrderPlaced write и обновлением read model — миллисекунды или секунды.

Практическая проблема:

User: [click "Place Order"]
       ↓ POST /orders → 201 Created
       ↓ redirect to /my-orders
       ↓ GET /my-orders → order NOT FOUND

Пользователь только что оформил заказ — не видит его. Технически корректно (eventual). Практически — жалоба в support.

Решения:

    Sync проекция для критичных данных. Обновлять order_summary в той же транзакции, что и orders. Простой sync CQRS.

    Read-your-writes через session. Клиент помнит order_id, читает его напрямую. Fallback на read model.

    Optimistic UI. На клиенте показываем «заказ #новый» сразу после 201. Проверяем через N секунд, что действительно есть.

    Долго ждать. Показать «обработка… 1-2 секунды». Явно контракт «немного подождите».

Выбор — бизнес-решение. Технически возможно всё.

Как правильно: полный поток

Write path:

class PlaceOrderHandler:
    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._uow:
            order = Order.place(cmd)
            await self._orders.save(order)
            for event in order.pull_events():
                await self._outbox.append(event)
            return order.id

Проекция:

class OrderSummaryProjection:
    async def on_order_placed(self, event: OrderPlacedEvent) -> None:
        summary = OrderSummaryRow(
            order_id=event.order_id,
            total=event.total,
            status='placed',
            placed_at=event.occurred_at,
        )
        await self._session.merge(summary)  # UPSERT

Query:

class ListOrdersHandler:
    async def execute(self, query: ListOrdersQuery) -> list[OrderSummary]:
        result = await self._session.execute(
            select(OrderSummaryRow)
            .where(OrderSummaryRow.customer_id == query.customer_id)
            .order_by(OrderSummaryRow.placed_at.desc())
            .limit(query.limit)
        )
        return [_to_dto(row) for row in result.scalars()]

Query — прямая работа с read model. Никакого aggregate, никакого repository для orders — только read table.

Как не надо

1. Query через reconstruction aggregate

async def list_orders_over_5k():
    all_ids = await orders.list_all_ids()
    result = []
    for order_id in all_ids:
        order = await orders.find_by_id(order_id)     # !!  
        if order.total > 5000:
            result.append(order.to_dto())
    return result

Собираем aggregate ради простой проверки суммы. Медленно, дорого, нелепо.

Правильно: query — SELECT ... WHERE total > 5000 из read model.

2. Write через read model

async def confirm_order(order_id):
    await session.execute(
        update(OrderSummaryRow).values(status='confirmed').where(...)   # !!    
    )

Обновление минуя aggregate и его инварианты. Order.confirm() не вызван, событие не сгенерировано, side effects не запущены.

Правильно: команда → aggregate → save → проекция обновит read model.

3. Read model = зеркало write model

orders (write) → orders_summary с ТЕМ ЖЕ набором полей   # !!

Проекция не даёт выгоды. Просто дублируем данные.

Правильно: read model денормализован, оптимизирован под запрос. Если совпадает с write — вопрос: а нужен ли отдельный read?

4. Не идемпотентная проекция

async def on_order_placed(event):
    self._session.add(OrderSummaryRow(order_id=event.order_id, ...))   # duplicate key при retry  

Retry события → IntegrityError → projection остановлена.

Правильно: merge / INSERT ... ON CONFLICT DO NOTHING/UPDATE.

5. Проекция с бизнес-логикой

async def on_order_placed(event):
    # !! логика подсчёта премии — не задача проекции
    if event.customer.is_premium and event.total > 10000:
        loyalty_points = event.total * 0.05
    else:
        loyalty_points = event.total * 0.01
    await self._session.merge(OrderSummaryRow(..., loyalty_points=loyalty_points))

Проекция копирует данные, не решает бизнес-задачи. loyalty_points — это домен, живёт в Customer или отдельном use case.

Правильно: проекция читает готовые данные из события. Бизнес-логика происходит в write path.

Trade-offs

СитуацияПроекция + read modelMaterialized viewПрямой JOIN на write model
Read-heavy, редкие writes
Строгая consistency нужна(sync проекция)
Разные read shapes
Prototype
Отдельная OLAP-нагрузка✓ (другая БД)
Fulltext search / faceted queries✓ (Elasticsearch)

В твоём же коде

В анонимизированном микросервисе — read models не нужны, сервис ничего не отдаёт наружу через API. Но принцип виден:

  • Write: RabbitMQ → OrderStandardizeUseCase → PostgreSQL.
  • Read (гипотетически): REST-эндпоинт GET /items/{id}/standardization мог бы работать через прямой SQL из tenders_position_attributes. Никакого aggregate.

Уже сейчас имеет смысл разделение по типам handler’ов: use case для обработки сообщений (write path), отдельный handler для чтения (query path).

Дальнейшее чтение

  • Greg Young. CQRS Documents. Секции про read models и projections.
  • Martin Fowler. CommandQuerySeparation. Основа идеи.
  • Vaughn Vernon. Implementing Domain-Driven Design. Chapter 4 — Architecture. CQRS + Read models.
  • Martin Kleppmann. Designing Data-Intensive Applications. Chapter 11 — materialized views и stream processing.
  • Postgres Docs. Materialized Views. Референс для простой реализации.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Read model должен быть...

  2. Q2. Materialized view в Postgres — это...

  3. Q3. Свойства правильной проекции:

  4. Q4. Проблема "оформил заказ → не вижу в списке" — это...

  5. Q5. Что не должна делать проекция?