Уровень 4 · Персистентность Глава 12 14 мин

Repository Pattern (deep dive)

Evans, Fowler, Khorikov, Vernon — четыре точки зрения на Repository. Collection- vs persistence-oriented. Generic Repository и IQueryable-leaking как антипаттерны. Outbox. Критика Seemann. Python + SQLAlchemy 2.x async.

TL;DR

  • Repository — самый переопределяемый паттерн в отрасли. Мы разберём 4 определения и покажем, какое когда.
  • Generic Repository и IQueryable-leaking — почти всегда плохо. Разберёмся почему.
  • Collection-oriented vs persistence-oriented — важное различие Vernon для async-стеков.
  • Seemann считает Repository антипаттерном поверх ORM. Разберём его аргументы.

Почему это самый спорный паттерн

Открываете статью «Repository Pattern in Python» — видите обёртку над SQLAlchemy с методами get, get_all, save, delete. Открываете Fowler — видите mediation между domain и mapping. Открываете Evans — видите коллекцию агрегатов. Открываете Khorikov — узнаёте, что 90% Repository в open source — антипаттерн. Открываете Seemann — узнаёте, что Repository вообще не нужен, если у вас есть ORM.

Так что же такое Repository?

Правильный ответ: у Repository нет одного определения, потому что его переопределяли четыре раза, и все четыре трактовки продолжают жить. В этой главе — карта версий, и когда какая уместна.

Evans (2003) — оригинал

For each type of object that needs global access, create an object that can provide the illusion of an in-memory collection of all objects of that type.

Eric Evans Domain-Driven Design, 2003

Ключевое слово — illusion. Repository прикидывается in-memory коллекцией.

  • Работает только с Aggregate Root, не с любой сущностью.
  • Возвращает полностью загруженный агрегат — со всеми child entities.
  • Скрывает persistence полностью. Клиент не знает, есть ли БД, кэш, файл, SOAP.

Пример по Evans:

class OrderRepository(Protocol):
    def order_of_id(self, order_id: OrderId) -> Order | None: ...
    def orders_placed_after(self, when: datetime) -> list[Order]: ...
    def add(self, order: Order) -> None: ...
    def remove(self, order: Order) -> None: ...

Обратите внимание: add и remove — не save и delete. Это коллекционная семантика. Order изменённый после find сохраняется автоматически (identity map / change tracking у ORM).

Что важно у Evans:

  • 1 Repository = 1 Aggregate Root. OrderRepository работает с Order, включая его OrderLine. Нет отдельного OrderLineRepository.
  • Queries — часть Repository. orders_placed_after — метод Repository, а не свободная функция.
  • Domain-язык в именах методов. Не find_by_created_at, а orders_placed_after.

Fowler (2002) — PoEAA

Fowler в Patterns of Enterprise Application Architecture формулирует Repository почти так же, но акцентирует roleли посредника между domain и data mapping.

Mediates between the domain and data mapping layers using a collection-like interface for accessing domain objects.

Martin Fowler Patterns of Enterprise Application Architecture, 2002

Разница с Evans:

  • Fowler допускает Query Object как аргумент — Repository принимает объект-запрос и возвращает удовлетворяющие агрегаты. Гибкость ценой сложности.
  • Более практичен, менее догматичен по коллекционной семантике.

Khorikov — критика и переформулировка

Vladimir Khorikov в блоге Enterprise Craftsmanship и книге Unit Testing (2020) — самый резкий критик того, во что Repository превратился в open source.

Три правила Khorikov:

  1. Один Repository на один Aggregate Root. Не «на entity», не «на table». На aggregate.
  2. Не возвращает IQueryable / Query / Select. Никаких query builders наружу.
  3. Не возвращает DTO. Возвращает Aggregate. Маппинг в DTO — задача читающего сайта (query side, если есть CQRS).

Правильный по Khorikov:

class UserRepository(Protocol):
    async def find_active_users_registered_after(self, dt: datetime) -> list[User]: ...
    async def find_by_email(self, email: Email) -> User | None: ...
    async def save(self, user: User) -> None: ...

Имя метода описывает бизнесовый вопрос. find_active_users_registered_after — намного лучше «get_by_status_and_created_at».

Vernon — collection vs persistence

Vaughn Vernon в Implementing Domain-Driven Design (2013) вводит важное различие.

Collection-oriented Repository (близко к Evans):

class OrderRepository:
    def add(self, order: Order) -> None: ...
    def remove(self, order: Order) -> None: ...
    def order_of_id(self, id: OrderId) -> Order: ...

Изменённый после order_of_id Order сохраняется автоматически — как in-memory коллекция. Работает поверх ORM с identity map (SQLAlchemy, Hibernate).

Persistence-oriented Repository:

class OrderRepository:
    async def save(self, order: Order) -> None: ...
    async def delete(self, order_id: OrderId) -> None: ...
    async def find_by_id(self, id: OrderId) -> Order | None: ...

save вызывается явно после изменений. Не полагается на change tracking. Проще, предсказуемее в async-контекстах, где lifetime сессии контролируется вручную.

Vernon рекомендует persistence-oriented для большинства современных проектов. Меньше магии, явнее контроль транзакций.

Как правильно: пример на SQLAlchemy 2.x async

Отсутствие бизнес-логики и явный маппинг domain ↔ ORM:

@dataclass
class Order:
    id: OrderId
    customer_id: CustomerId
    lines: list[OrderLine]
    status: OrderStatus
    total: Money

    def confirm(self) -> None:
        if self.status is not OrderStatus.PENDING:
            raise CannotConfirmError(self.status)
        self.status = OrderStatus.CONFIRMED
class OrderRepositoryPort(Protocol):
    async def find_by_id(self, order_id: OrderId) -> Order | None: ...
    async def save(self, order: Order) -> None: ...
    async def find_pending_older_than(self, dt: datetime) -> list[Order]: ...
class PostgresOrderRepository(OrderRepositoryPort):
    def __init__(self, session: AsyncSession):     
        self._session = session

    async def find_by_id(self, order_id: OrderId) -> Order | None:
        stmt = (
            select(OrderRow)
            .where(OrderRow.id == order_id.value)
            .options(selectinload(OrderRow.lines))
        )
        row = (await self._session.execute(stmt)).scalar_one_or_none()
        return _to_domain(row) if row else None

    async def save(self, order: Order) -> None:
        row = _to_orm(order)
        await self._session.merge(row)

Ключевые решения:

  • Session injected, не создаётся внутри репозитория. Транзакцию контролирует use case, а не Repository.
  • Возвращает Order, а не OrderRow. Маппер _to_domain — часть infrastructure.
  • Bounded методыfind_pending_older_than, а не find_by_status_and_created_at_lt.
  • selectinload — репозиторий обеспечивает полную загрузку агрегата (child entities). Клиент не знает про lazy loading.

Unit of Work — где живёт транзакция

Repository не должен открывать транзакцию сам. Это функция Unit of Work — по Fowler, «объект, отслеживающий изменения и координирующий запись».

В SQLAlchemy async AsyncSession = Unit of Work.

Плохо:

class PostgresOrderRepository:
    async def save(self, order):
        async with self._session_factory() as session:  
            async with session.begin():                 
                # ... запись

Транзакция инкапсулирована в репозитории. Нельзя объединить с сохранением другого агрегата в одну транзакцию.

Правильно:

class PlaceOrderUseCase:
    def __init__(self, uow: UnitOfWork, repo: OrderRepositoryPort, ...):
        self._uow = uow
        self._repo = repo

    async def execute(self, cmd: PlaceOrderCommand) -> OrderId:
        async with self._uow:                    
            order = Order.place(cmd)
            await self._repo.save(order)
            await self._events.publish(order.pull_events())
            return order.id

Use case открывает транзакцию, репозитории оперируют в её контексте, коммит происходит на выходе из async with. Один сценарий — одна транзакция, всё атомарно.

Outbox Pattern — dual write

Проблема: use case сохраняет Order в БД и публикует OrderPlaced в RabbitMQ. Что если БД коммит прошёл, а RabbitMQ упал? Событие потеряно. Или наоборот — событие ушло, БД откатилась? Consumer получил событие, но данных нет.

Определение Dual write (двойная запись)

Ситуация, когда бизнес-операция должна атомарно изменить состояние в двух разных системах (например, БД и брокер сообщений), но эти системы не участвуют в общей транзакции.

Решение — Outbox. Событие записывается в ту же БД, в отдельную таблицу outbox, в той же транзакции что и агрегат. Отдельный процесс-relay читает outbox и публикует. При падении relay — просто перечитает.

class PlaceOrderUseCase:
    async def execute(self, cmd):
        async with self._uow:
            order = Order.place(cmd)
            await self._orders.save(order)

            # то же соединение, та же транзакция
            for event in order.pull_events():
                await self._outbox.append(event)   

Отдельный worker:

class OutboxRelay:
    async def run(self):
        while True:
            batch = await self._outbox.fetch_unpublished(limit=100)
            for event in batch:
                await self._publisher.publish(event)
                await self._outbox.mark_published(event.id)
            await asyncio.sleep(1)

Ссылки: Kleppmann Designing Data-Intensive Applications, глава 11 (streams); Richardson Microservices Patterns, Chapter 3 (Transactional Outbox); microservices.io/patterns/data/transactional-outbox.

Как не надо

Generic Repository

class Repository(Generic[T]):                     
    async def get(self, id: int) -> T | None: ...
    async def get_all(self) -> list[T]: ...
    async def save(self, entity: T) -> None: ...
    async def delete(self, id: int) -> None: ...

# использование:
order_repo = Repository[Order](session)
user_repo = Repository[User](session)

Проблемы:

  1. Одинаковый интерфейс для разных агрегатов. Order и User имеют разные бизнесовые запросы. Generic Repository стирает эту разницу.
  2. Возвращает всё через get_all. В проде это SELECT * FROM orders без LIMIT. Однажды таблица вырастет — сервер упадёт.
  3. CRUD-именование. get, save, delete — не бизнесовые операции. Order.confirm() не имеет соответствия в Repository.

Khorikov разбирает это подробно: «Generic Repository — антипаттерн, потому что применяет один интерфейс к сущностям, которые ничего общего не имеют».

Repository возвращает Query Object / IQueryable

class OrderRepository:
    def query(self) -> Select[Order]:                     
        return select(OrderRow)

# в use case:
orders = await session.execute(
    order_repo.query()
    .where(OrderRow.status == "pending")
    .order_by(OrderRow.created_at.desc())
    .limit(10)
)

Что здесь плохо:

  • Use case знает про SQLAlchemy. Заменить БД нельзя.
  • Query-логика («pending», order by created_at, limit 10) размазана по use case’ам.
  • Тест use case требует настраивать SQLAlchemy.

Это .NET-специфичная критика (IQueryable), но применима к любому language. Правильно: find_pending_orders_sorted_by_recency(limit=10) — репозиторий отвечает за конкретный запрос.

Repository с бизнес-логикой

class OrderRepository:
    async def confirm_order(self, order_id):          
        order = await self.find_by_id(order_id)
        if order.status != "pending":
            raise ...
        order.status = "confirmed"
        await self.save(order)

confirm_orderбизнес-операция, её место в Order.confirm(). Repository должен только персистить, не решать. Иначе бизнес-правила размажутся между domain и infrastructure.

Repository возвращает DTO

class OrderRepository:
    async def find_by_id(self, id) -> OrderDTO: ...

Repository теперь принял на себя ответственность presentation layer. Как только появится второй потребитель с другим форматом — нужен второй метод, второй DTO. Ответственности сливаются.

Правильно: Repository возвращает Order (Aggregate). Маппинг в DTO — там, где формируется response (query side, controller, handler).

Критика Seemann — Repository антипаттерн?

В посте «IsRepository an Anti-Pattern?» (и в развитии темы) Mark Seemann утверждает: если у вас уже есть ORM, дополнительный Repository — избыточная абстракция.

Аргументы:

  1. DbContext (EF) / AsyncSession (SQLAlchemy) = Repository + Unit of Work. Обе роли уже сыграны ORM.
  2. Абстракция ради абстракции. Оборачивание ORM ещё одним слоем не даёт новой мощности.
  3. Тесты через in-memory ORM возможны. SQLite in-memory для SQLAlchemy покрывает большинство сценариев.

Контраргументы:

  1. Тестируемость. Полностью in-memory Fake Repository не требует SQLite, WAL, миграций. Быстрее.
  2. Явные бизнесовые запросы. find_pending_older_than(dt) — самодокументирующийся код. session.execute(select(...).where(...)) — нет.
  3. Защита от протечки ORM. selectinload, AsyncSession.expunge, Mapped[...] — детали SQLAlchemy. Repository удерживает их за границей.
  4. Возможность сменить хранилище. Redkin случай, но встречается — миграция части сущностей в document-store, кэширование в Redis.

Trade-offs

СитуацияRepository оправданБез Repository
Много бизнесовых запросов, каждый со своим именем
Нужны тесты use case без БД
Работа с одной таблицей, CRUD
Планируется смена хранилища
Read-heavy, много ad-hoc запросов✓ (CQRS read side)
Строгая изоляция domain от ORM

В твоём же коде

Разбор из анонимизированного микросервиса:

class PostgresItemRepository(ItemRepositoryPort):
    def __init__(self, session_factory: async_sessionmaker[AsyncSession]):
        self._session_factory = session_factory

    async def update_normalized_data(
        self, item_id: int, normalized_map: Dict[int, dict]
    ) -> None:
        async with self._session_factory() as session:  
            async with session.begin():                 
                for cid, data in normalized_map.items():
                    stmt = update(ItemAttributes).where(...).values(...)
                    await session.execute(stmt)

Что видим:

Плюс. Port + Adapter разделены. ItemRepositoryPort в application, PostgresItemRepository в infrastructure. Правило зависимостей соблюдено.

Минус 1: транзакция внутри репозитория. session.begin() внутри метода. Нельзя объединить с публикацией события в Outbox — нужна отдельная транзакция или расщепление. Правильно: AsyncSession инжектится (без session_factory), UoW-контроль — на уровне use case.

Минус 2: CRUD-имя метода. update_normalized_data(item_id, map) — технологическое имя. Не говорит, зачем это делается. Бизнесовая формулировка была бы mark_items_as_normalized(item_id, results) или подобная — привязка к сценарию.

Минус 3: dual write. В use case после update_normalized_data вызывается event_publisher.publish_standardized_position(...) — публикация в RabbitMQ. Обе операции не атомарны: падение RabbitMQ между шагами теряет событие. Решение — Outbox: писать событие в ту же БД, в ту же транзакцию, а публиковать отдельным relay.

Полный рефакторинг — на странице Разбор реального проекта, «dual-write-without-outbox».

Дальнейшее чтение

  • Eric Evans. Domain-Driven Design: Tackling Complexity in the Heart of Software. Addison-Wesley, 2003. Chapter 6 — Repository.
  • Martin Fowler. Patterns of Enterprise Application Architecture. Addison-Wesley, 2002. Repository, Unit of Work, Query Object, Data Mapper.
  • Vaughn Vernon. Implementing Domain-Driven Design. Addison-Wesley, 2013. Chapter 12 — Repositories: collection- vs persistence-oriented.
  • Vladimir Khorikov. Unit Testing: Principles, Practices, and Patterns. Manning, 2020. Разбор Repository для тестируемости.
  • Vladimir Khorikov. Repository Pattern Done Right — критика Generic Repository.
  • Mark Seemann. IsRepository an Anti-Pattern? — критика Repository поверх ORM.
  • Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly, 2017. Chapter 11 — про Outbox и надёжную доставку.
  • Chris Richardson. Microservices Patterns. Manning, 2018. Transactional Outbox, Saga.

Проверьте себя

Мини-quiz · закрепить

Проверьте себя

  1. Q1. Что должен возвращать Repository по Evans / Khorikov?

  2. Q2. Что не так с Generic Repository (Repository[T] с get/get_all/save)?

  3. Q3. Где живёт транзакция?

  4. Q4. Что решает Transactional Outbox?

  5. Q5. Аргумент Seemann против Repository