Unit of Work
Управление транзакциями явно. AsyncSession как UoW. Nested transactions. Где живёт транзакция — use case, не repository.
TL;DR
- Unit of Work — объект, отслеживающий изменения aggregate за одну бизнес-операцию и координирующий commit.
- В SQLAlchemy AsyncSession = UoW. Отдельная обёртка обычно не нужна.
- Одна транзакция = один use case. Границей владеет application service, не repository.
- Savepoints — редкий инструмент, но важно понимать, что это.
Зачем эта глава
В главе Repository мы упомянули: транзакция живёт в use case, не в repository. Здесь — почему это так, как оформить, где границы, что делать с nested-случаями и как это ложится на SQLAlchemy async.
Unit of Work — не «ещё один паттерн, чтобы усложнить». Это ответ на конкретную боль: одна бизнес-операция изменяет несколько aggregate. Как их сохранить атомарно?
Что такое Unit of Work
Объект, который отслеживает изменения в набор бизнес-объектов за одну логическую операцию и координирует запись изменений и разрешение конкурентных проблем.
Три задачи:
Change tracking. Знать, какие aggregate загружены, какие изменены, какие добавлены, какие удалены.
Coordinated commit. Вписать все изменения в одну транзакцию БД. Либо все, либо ничего.
Concurrency control. Optimistic locking, версии, обнаружение конфликтов.
В большинстве современных ORM UoW реализован в самом фреймворке.
SQLAlchemy AsyncSession как UoW
The Session begins in a mostly stateless form. Once queries are issued or other objects are persisted with it, it requests a connection resource from an Engine that is associated with the Session, and then establishes a transaction on that connection.
AsyncSession уже является Unit of Work:
- Change tracking через identity map.
- Coordinated commit через
session.commit(). - Concurrency control — через
version_id_colи optimistic locking (опционально).
Дополнительный слой UoW нужен, если:
- Хотите инкапсулировать транзакцию от use case (чтобы use case не знал про
AsyncSession). - Работаете сразу с несколькими источниками (SQLAlchemy + Redis + Kafka).
- Строите Domain Events + Outbox, где нужен явный контроль момента commit.
Для 90% случаев достаточно AsyncSession напрямую или тонкой обёртки UnitOfWork.
Как правильно: UoW обёртка
class UnitOfWork(Protocol):
"""Явная граница транзакции."""
async def __aenter__(self) -> 'UnitOfWork': ...
async def __aexit__(self, exc_type, exc, tb) -> None: ...
async def commit(self) -> None: ...
async def rollback(self) -> None: ...
class SqlAlchemyUnitOfWork:
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
self._session_factory = session_factory
self._session: AsyncSession | None = None
@property
def session(self) -> AsyncSession:
if self._session is None:
raise RuntimeError('UoW not active — use async with')
return self._session
async def __aenter__(self) -> 'SqlAlchemyUnitOfWork':
self._session = self._session_factory()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
try:
if exc_type is None:
await self._session.commit()
else:
await self._session.rollback()
finally:
await self._session.close()
self._session = None
Use case:
class PlaceOrderCommandHandler:
def __init__(
self,
uow: UnitOfWork,
orders: OrderRepositoryPort,
events: OutboxPort,
) -> None:
self._uow = uow
self._orders = orders
self._events = events
async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
async with self._uow:
order = Order.place(cmd)
await self._orders.save(order)
for event in order.pull_events():
await self._events.append(event)
return order.id
# commit по выходу — атомарно
Repository не создаёт транзакции — использует session из UoW.
Как правильно: без обёртки
Часто UoW-обёртка избыточна. AsyncSession инжектится напрямую, использование async with session.begin() даёт всё то же самое:
class PlaceOrderCommandHandler:
def __init__(
self,
session: AsyncSession,
orders: OrderRepositoryPort,
events: OutboxPort,
) -> None:
self._session = session
self._orders = orders
self._events = events
async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
async with self._session.begin():
order = Order.place(cmd)
await self._orders.save(order)
for event in order.pull_events():
await self._events.append(event)
return order.id
Плюсы: меньше кода, прямее. Минусы: application service знает про SQLAlchemy. Тестируемость через in-memory repository теряется (нужен in-memory session).
Выбор:
- Нужна абстракция от ORM для тестов → отдельный
UnitOfWorkPort. - Работаем на одном стеке, тесты через SQLite in-memory →
AsyncSessionнапрямую.
Границы транзакции: одна на use case
Правило: одна бизнес-операция = одна транзакция. Не короче, не длиннее.
Практический принцип:
Всё, что должно быть атомарно — в одну транзакцию.
Всё, что может быть eventual (события, side effects) — отдельно через Outbox или отдельные транзакции.
Никаких внешних вызовов внутри транзакции. HTTP, RPC, ожидание другого сервиса — до или после.
Nested transactions и savepoints
Иногда нужно выполнить блок операций так, что при ошибке откатится только этот блок, а не вся транзакция.
Пример: в use case есть best-effort операция — попытаться обновить кэш, если не получилось — не страшно.
Savepoint — точка внутри транзакции, к которой можно откатиться, не откатывая всё.
async with session.begin():
order = Order.place(cmd)
await orders.save(order)
async with session.begin_nested():
try:
await cache.update(order)
except CacheError:
# savepoint откатится, но основная транзакция продолжается
logger.warning('cache update failed, continuing')
Использование savepoint нужно редко. Если понадобилось — обычно это признак, что операция должна была быть отдельной транзакцией.
▸ Distributed transactions — не для этого курса
Два-phase commit (2PC), XA-транзакции — механизмы для атомарных операций через несколько ресурсов (например, БД + очередь сообщений).
В современных микросервисах их избегают: сложные, дорогие, плохо масштабируются. Замена — Outbox pattern и Sagas.
Не используйте 2PC без веских причин.
Optimistic Locking
Одна транзакция изменяет Order. Пока она работает, другой пользователь читает того же Order, изменяет и сохраняет. Кто выиграет? В типичной transaction — второй. Первый молча теряется.
Optimistic Locking: каждый aggregate имеет версию. При save проверяем, не изменилась ли версия. Если да — конфликт, бросаем ошибку.
from sqlalchemy.orm import mapped_column
class OrderRow(Base):
__tablename__ = 'orders'
__mapper_args__ = {'version_id_col': 'version'}
id: Mapped[int] = mapped_column(primary_key=True)
version: Mapped[int]
total: Mapped[Decimal]
...
SQLAlchemy автоматически:
- инкрементирует
versionпри save. - добавляет
WHERE version = ?в UPDATE. - бросает
StaleDataError, если WHERE не нашёл строку (значит версия была изменена другим процессом).
Use case ловит и повторяет:
async def execute(self, cmd):
for attempt in range(3):
try:
async with self._uow:
order = await self._orders.find_by_id(cmd.order_id)
order.apply(cmd)
await self._orders.save(order)
return
except StaleDataError:
if attempt == 2:
raise
continue
Optimistic ≠ pessimistic (row-level lock). Pessimistic — блокирует строку при чтении. Работает, но плохо масштабируется — очередь ожиданий.
Как не надо
1. Транзакция внутри Repository
class PostgresOrderRepository:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._session_factory = session_factory
async def save(self, order: Order) -> None:
async with self._session_factory() as session:
async with session.begin():
# ... save
Repository открывает свою транзакцию. Нельзя объединить orders.save + outbox.append в одну транзакцию — они в разных session’ах.
Правильно: session инжектится в repository, транзакцию открывает use case через UoW или session.begin().
2. Транзакция включает внешний вызов
async def execute(self, cmd):
async with self._session.begin():
order = await self._orders.find_by_id(cmd.id)
confirmation = await self._payment_api.charge(order) # !!
order.mark_paid(confirmation.id)
await self._orders.save(order)
HTTP-вызов к платёжной системе — секунды. Всё это время БД держит lock на orders. При росте нагрузки — контенция и timeouts.
Правильно:
- До транзакции — вызов API, получить confirmation.
- В транзакции — обновление aggregate.
Или (лучше для платежей): двухфазный подход — резервируем в БД, вызываем API, подтверждаем в БД (два отдельных транзакции + saga).
3. Транзакция на несколько use case’ов
async def bulk_endpoint(commands: list[Command], session: AsyncSession):
async with session.begin(): # !!
for cmd in commands:
await handler.execute(cmd, session)
Внешний код открыл транзакцию, каждый use case работает в её контексте. Проблемы:
- Один use case падает — все откатываются.
- Внутренние use case’ы теряют независимость.
- Границы становятся расплывчатыми.
Правильно: каждый use case — своя транзакция. При bulk-обработке — по одному, с явной обработкой ошибок.
4. Собственный UoW-класс над ORM без надобности
class UnitOfWork:
def __init__(self, session):
self.session = session
self._changes = []
def register_new(self, obj):
self._changes.append(('new', obj))
def register_dirty(self, obj):
self._changes.append(('dirty', obj))
async def commit(self):
for action, obj in self._changes:
...
Дублирование identity map SQLAlchemy. Overhead без пользы.
Правильно: тонкая обёртка (только __aenter__/__aexit__/commit/rollback) или прямое использование session.begin().
Trade-offs
| Ситуация | UoW-обёртка | AsyncSession напрямую |
|---|---|---|
| Один стек, тесты с SQLite in-memory | ✓ | |
| Абстракция для in-memory тестов | ✓ | |
| Несколько источников (SQLAlchemy + Redis) | ✓ | |
| Простой CRUD | ✓ | |
| DDD + Domain Events + Outbox | ✓ | (можно и так) |
В твоём же коде
Анонимизированный микросервис хранит транзакцию внутри репозитория:
class PostgresItemRepository:
def __init__(self, session_factory):
self._session_factory = session_factory
async def update_normalized_data(self, ...):
async with self._session_factory() as session:
async with session.begin():
...
Проблемы:
- Нельзя объединить
update_normalized_dataсoutbox.append(event)в одну транзакцию. - Каждый вызов метода = новая session. Оверхэд соединения. При высокой нагрузке — плохо.
- Use case не контролирует границу транзакции.
Рефакторинг: AsyncSession инжектится (через Dishka Scope.REQUEST в handler’е), use case открывает async with session.begin(), repository использует уже открытую session.
Дальнейшее чтение
- Martin Fowler. Patterns of Enterprise Application Architecture. Chapter 11 — Unit of Work.
- Vaughn Vernon. Implementing Domain-Driven Design. Chapter 10 — Aggregates + transaction boundaries.
- Harry Percival, Bob Gregory. Architecture Patterns with Python. Chapter 6 — Unit of Work Pattern. Бесплатно на cosmicpython.com.
- SQLAlchemy Docs. Session Basics. Официальная документация.
- Chris Richardson. Microservices Patterns. Обсуждение distributed transactions, saga vs 2PC.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Кто должен управлять границами транзакции?
Q2. Правило «одна бизнес-операция = одна транзакция» означает...
Q3. AsyncSession в SQLAlchemy — это...
Q4. Optimistic locking по сравнению с pessimistic:
Q5. Почему транзакция не должна включать внешний HTTP-вызов?