Уровень 4 · Персистентность Глава 13 11 мин

Unit of Work

Управление транзакциями явно. AsyncSession как UoW. Nested transactions. Где живёт транзакция — use case, не repository.

TL;DR

  • Unit of Work — объект, отслеживающий изменения aggregate за одну бизнес-операцию и координирующий commit.
  • В SQLAlchemy AsyncSession = UoW. Отдельная обёртка обычно не нужна.
  • Одна транзакция = один use case. Границей владеет application service, не repository.
  • Savepoints — редкий инструмент, но важно понимать, что это.

Зачем эта глава

В главе Repository мы упомянули: транзакция живёт в use case, не в repository. Здесь — почему это так, как оформить, где границы, что делать с nested-случаями и как это ложится на SQLAlchemy async.

Unit of Work — не «ещё один паттерн, чтобы усложнить». Это ответ на конкретную боль: одна бизнес-операция изменяет несколько aggregate. Как их сохранить атомарно?

Что такое Unit of Work

Определение Unit of Work (единица работы)

Объект, который отслеживает изменения в набор бизнес-объектов за одну логическую операцию и координирует запись изменений и разрешение конкурентных проблем.

— Fowler, PoEAA, 2002

Три задачи:

    Change tracking. Знать, какие aggregate загружены, какие изменены, какие добавлены, какие удалены.

    Coordinated commit. Вписать все изменения в одну транзакцию БД. Либо все, либо ничего.

    Concurrency control. Optimistic locking, версии, обнаружение конфликтов.

В большинстве современных ORM UoW реализован в самом фреймворке.

SQLAlchemy AsyncSession как UoW

The Session begins in a mostly stateless form. Once queries are issued or other objects are persisted with it, it requests a connection resource from an Engine that is associated with the Session, and then establishes a transaction on that connection.

SQLAlchemy Documentation Session Basics

AsyncSession уже является Unit of Work:

  • Change tracking через identity map.
  • Coordinated commit через session.commit().
  • Concurrency control — через version_id_col и optimistic locking (опционально).

Дополнительный слой UoW нужен, если:

  • Хотите инкапсулировать транзакцию от use case (чтобы use case не знал про AsyncSession).
  • Работаете сразу с несколькими источниками (SQLAlchemy + Redis + Kafka).
  • Строите Domain Events + Outbox, где нужен явный контроль момента commit.

Для 90% случаев достаточно AsyncSession напрямую или тонкой обёртки UnitOfWork.

Как правильно: UoW обёртка

class UnitOfWork(Protocol):
    """Явная граница транзакции."""

    async def __aenter__(self) -> 'UnitOfWork': ...
    async def __aexit__(self, exc_type, exc, tb) -> None: ...
    async def commit(self) -> None: ...
    async def rollback(self) -> None: ...
class SqlAlchemyUnitOfWork:
    def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
        self._session_factory = session_factory
        self._session: AsyncSession | None = None

    @property
    def session(self) -> AsyncSession:
        if self._session is None:
            raise RuntimeError('UoW not active — use async with')
        return self._session

    async def __aenter__(self) -> 'SqlAlchemyUnitOfWork':
        self._session = self._session_factory()
        return self

    async def __aexit__(self, exc_type, exc, tb) -> None:
        try:
            if exc_type is None:
                await self._session.commit()      
            else:
                await self._session.rollback()    
        finally:
            await self._session.close()
            self._session = None

Use case:

class PlaceOrderCommandHandler:
    def __init__(
        self,
        uow: UnitOfWork,
        orders: OrderRepositoryPort,
        events: OutboxPort,
    ) -> None:
        self._uow = uow
        self._orders = orders
        self._events = events

    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._uow:                              
            order = Order.place(cmd)
            await self._orders.save(order)
            for event in order.pull_events():
                await self._events.append(event)
            return order.id
        # commit по выходу — атомарно

Repository не создаёт транзакции — использует session из UoW.

Как правильно: без обёртки

Часто UoW-обёртка избыточна. AsyncSession инжектится напрямую, использование async with session.begin() даёт всё то же самое:

class PlaceOrderCommandHandler:
    def __init__(
        self,
        session: AsyncSession,
        orders: OrderRepositoryPort,
        events: OutboxPort,
    ) -> None:
        self._session = session
        self._orders = orders
        self._events = events

    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._session.begin():                  
            order = Order.place(cmd)
            await self._orders.save(order)
            for event in order.pull_events():
                await self._events.append(event)
            return order.id

Плюсы: меньше кода, прямее. Минусы: application service знает про SQLAlchemy. Тестируемость через in-memory repository теряется (нужен in-memory session).

Выбор:

  • Нужна абстракция от ORM для тестов → отдельный UnitOfWork Port.
  • Работаем на одном стеке, тесты через SQLite in-memory → AsyncSession напрямую.

Границы транзакции: одна на use case

Правило: одна бизнес-операция = одна транзакция. Не короче, не длиннее.

Практический принцип:

    Всё, что должно быть атомарно — в одну транзакцию.

    Всё, что может быть eventual (события, side effects) — отдельно через Outbox или отдельные транзакции.

    Никаких внешних вызовов внутри транзакции. HTTP, RPC, ожидание другого сервиса — до или после.

Nested transactions и savepoints

Иногда нужно выполнить блок операций так, что при ошибке откатится только этот блок, а не вся транзакция.

Пример: в use case есть best-effort операция — попытаться обновить кэш, если не получилось — не страшно.

Savepoint — точка внутри транзакции, к которой можно откатиться, не откатывая всё.

async with session.begin():
    order = Order.place(cmd)
    await orders.save(order)

    async with session.begin_nested():             
        try:
            await cache.update(order)
        except CacheError:
            # savepoint откатится, но основная транзакция продолжается
            logger.warning('cache update failed, continuing')

Использование savepoint нужно редко. Если понадобилось — обычно это признак, что операция должна была быть отдельной транзакцией.

▸ Distributed transactions — не для этого курса

Два-phase commit (2PC), XA-транзакции — механизмы для атомарных операций через несколько ресурсов (например, БД + очередь сообщений).

В современных микросервисах их избегают: сложные, дорогие, плохо масштабируются. Замена — Outbox pattern и Sagas.

Не используйте 2PC без веских причин.

Optimistic Locking

Одна транзакция изменяет Order. Пока она работает, другой пользователь читает того же Order, изменяет и сохраняет. Кто выиграет? В типичной transaction — второй. Первый молча теряется.

Optimistic Locking: каждый aggregate имеет версию. При save проверяем, не изменилась ли версия. Если да — конфликт, бросаем ошибку.

from sqlalchemy.orm import mapped_column

class OrderRow(Base):
    __tablename__ = 'orders'
    __mapper_args__ = {'version_id_col': 'version'}   

    id: Mapped[int] = mapped_column(primary_key=True)
    version: Mapped[int]                              
    total: Mapped[Decimal]
    ...

SQLAlchemy автоматически:

  • инкрементирует version при save.
  • добавляет WHERE version = ? в UPDATE.
  • бросает StaleDataError, если WHERE не нашёл строку (значит версия была изменена другим процессом).

Use case ловит и повторяет:

async def execute(self, cmd):
    for attempt in range(3):
        try:
            async with self._uow:
                order = await self._orders.find_by_id(cmd.order_id)
                order.apply(cmd)
                await self._orders.save(order)
                return
        except StaleDataError:
            if attempt == 2:
                raise
            continue

Optimistic ≠ pessimistic (row-level lock). Pessimistic — блокирует строку при чтении. Работает, но плохо масштабируется — очередь ожиданий.

Как не надо

1. Транзакция внутри Repository

class PostgresOrderRepository:
    def __init__(self, session_factory: async_sessionmaker) -> None:
        self._session_factory = session_factory

    async def save(self, order: Order) -> None:
        async with self._session_factory() as session:    
            async with session.begin():                   
                # ... save

Repository открывает свою транзакцию. Нельзя объединить orders.save + outbox.append в одну транзакцию — они в разных session’ах.

Правильно: session инжектится в repository, транзакцию открывает use case через UoW или session.begin().

2. Транзакция включает внешний вызов

async def execute(self, cmd):
    async with self._session.begin():
        order = await self._orders.find_by_id(cmd.id)
        confirmation = await self._payment_api.charge(order)   # !!    
        order.mark_paid(confirmation.id)
        await self._orders.save(order)

HTTP-вызов к платёжной системе — секунды. Всё это время БД держит lock на orders. При росте нагрузки — контенция и timeouts.

Правильно:

  1. До транзакции — вызов API, получить confirmation.
  2. В транзакции — обновление aggregate.

Или (лучше для платежей): двухфазный подход — резервируем в БД, вызываем API, подтверждаем в БД (два отдельных транзакции + saga).

3. Транзакция на несколько use case’ов

async def bulk_endpoint(commands: list[Command], session: AsyncSession):
    async with session.begin():                             # !!
        for cmd in commands:
            await handler.execute(cmd, session)             

Внешний код открыл транзакцию, каждый use case работает в её контексте. Проблемы:

  • Один use case падает — все откатываются.
  • Внутренние use case’ы теряют независимость.
  • Границы становятся расплывчатыми.

Правильно: каждый use case — своя транзакция. При bulk-обработке — по одному, с явной обработкой ошибок.

4. Собственный UoW-класс над ORM без надобности

class UnitOfWork:
    def __init__(self, session):
        self.session = session
        self._changes = []                          

    def register_new(self, obj):
        self._changes.append(('new', obj))

    def register_dirty(self, obj):
        self._changes.append(('dirty', obj))

    async def commit(self):
        for action, obj in self._changes:
            ...

Дублирование identity map SQLAlchemy. Overhead без пользы.

Правильно: тонкая обёртка (только __aenter__/__aexit__/commit/rollback) или прямое использование session.begin().

Trade-offs

СитуацияUoW-обёрткаAsyncSession напрямую
Один стек, тесты с SQLite in-memory
Абстракция для in-memory тестов
Несколько источников (SQLAlchemy + Redis)
Простой CRUD
DDD + Domain Events + Outbox(можно и так)

В твоём же коде

Анонимизированный микросервис хранит транзакцию внутри репозитория:

class PostgresItemRepository:
    def __init__(self, session_factory):
        self._session_factory = session_factory

    async def update_normalized_data(self, ...):
        async with self._session_factory() as session:
            async with session.begin():
                ...

Проблемы:

  1. Нельзя объединить update_normalized_data с outbox.append(event) в одну транзакцию.
  2. Каждый вызов метода = новая session. Оверхэд соединения. При высокой нагрузке — плохо.
  3. Use case не контролирует границу транзакции.

Рефакторинг: AsyncSession инжектится (через Dishka Scope.REQUEST в handler’е), use case открывает async with session.begin(), repository использует уже открытую session.

Дальнейшее чтение

  • Martin Fowler. Patterns of Enterprise Application Architecture. Chapter 11 — Unit of Work.
  • Vaughn Vernon. Implementing Domain-Driven Design. Chapter 10 — Aggregates + transaction boundaries.
  • Harry Percival, Bob Gregory. Architecture Patterns with Python. Chapter 6 — Unit of Work Pattern. Бесплатно на cosmicpython.com.
  • SQLAlchemy Docs. Session Basics. Официальная документация.
  • Chris Richardson. Microservices Patterns. Обсуждение distributed transactions, saga vs 2PC.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Кто должен управлять границами транзакции?

  2. Q2. Правило «одна бизнес-операция = одна транзакция» означает...

  3. Q3. AsyncSession в SQLAlchemy — это...

  4. Q4. Optimistic locking по сравнению с pessimistic:

  5. Q5. Почему транзакция не должна включать внешний HTTP-вызов?