Cross-cutting: logging, tracing, retries
Где живёт логирование. Structured logs. Distributed tracing. Timeout, retry, circuit breaker. Как не загрязнить domain.
TL;DR
- Логирование, метрики, трейсинг — cross-cutting concerns. Не должны загрязнять domain.
- Structured logs (JSON) с context propagation — must для production.
- Distributed tracing (OpenTelemetry) — единственный способ понять микросервисный поток.
- Retry без backoff и jitter создаёт thundering herd. Circuit breaker защищает downstream.
Что такое cross-cutting concerns
Функциональность, которая нужна во многих местах системы, но не является частью бизнес-логики: логирование, метрики, трейсинг, авторизация, ретраи, кеширование.
Проблема: если разбросать их по коду в лоб — каждый метод обрастает logger.info, try/except, metrics.inc. Бизнес-логика теряется среди infrastructure-details.
Правильный подход — изолировать cross-cutting от domain. Разными способами.
Logging
Structured logs
Обычный лог: "Order 42 confirmed at 12:34". Красиво читается человеку, ужасно парсится машиной.
Structured: {"order_id": 42, "action": "confirmed", "at": "12:34"}. Читается инструментами (Loki, ELK, Datadog).
Правило: в проде всегда JSON-логи. structlog или loguru для Python.
import structlog
logger = structlog.get_logger()
logger.info('order_confirmed', order_id=42, customer_id=7, total=5000)
Context propagation
Один запрос проходит через 5 сервисов. Хотим видеть все логи запроса вместе — по request_id.
Ручная передача — плохо: каждый log-call принимает request_id. contextvars — правильно:
from contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar('request_id')
# в middleware/handler:
request_id_var.set(new_uuid())
# в любом коде — прозрачно:
logger.info('order_confirmed', order_id=42)
# → {"order_id": 42, "request_id": "abc-123", ...}
structlog.contextvars.merge_contextvars — процессор, автоматически подмешивает contextvars во все логи.
Injected logger, не global
from app.logger import logger # global instance
class OrderService:
def confirm(self, order_id):
logger.info(...)
Проблемы:
- Нельзя подменить в тесте.
- Все зависят от глобального модуля.
- Порядок инициализации имеет значение.
class OrderService:
def __init__(self, logger: BoundLogger): # inject
self._logger = logger
Не логировать в domain
class Order:
def confirm(self):
if self.status != 'pending':
logger.warning('cannot confirm', order_id=self.id) # !!
raise DomainError('cannot confirm')
Domain знает про logger. Правило зависимостей нарушено. Тестирование Order требует настроить logger.
Правильно: domain бросает DomainError. Application service (или infrastructure) ловит и логирует. Domain — чист.
Metrics
Prometheus + собственная библиотека. Три типа:
- Counter — только растёт (
orders_placed_total). - Gauge — текущее значение (
queue_depth). - Histogram — распределение (
request_duration_seconds).
Куда ставить:
- HTTP middleware —
request_duration,requests_total. - Use case decorator —
use_case_duration. - Repository decorator —
repository_operations_total,query_duration.
Не в domain. Aggregate не знает про Prometheus.
Distributed Tracing
Отслеживание одного запроса через несколько сервисов. Каждая операция — span, все span одного запроса связаны trace_id.
Один запрос:
[POST /orders — 250ms trace_id=abc]
├── [validate — 5ms]
├── [orders.save — 30ms]
├── [inventory.reserve (RPC) — 100ms trace_id=abc]
│ ├── [inventory.check — 20ms]
│ └── [stock.decrement — 60ms]
├── [outbox.append — 15ms]
└── [response — 5ms]
Стандарт — OpenTelemetry. Auto-instrumentation для FastAPI, SQLAlchemy, httpx, RabbitMQ.
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def confirm_order(order_id):
with tracer.start_as_current_span('confirm_order') as span:
span.set_attribute('order.id', order_id)
# ... логика
Backend: Jaeger, Tempo, Datadog.
Правило: без distributed tracing в микросервисах невозможно отладить edge cases. Внедрять сразу.
Timeouts
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
resp = await client.get(url)
Правило: timeout меньше SLA родителя. Если у нас SLA 1s, вызов вниз — max 500ms. Иначе не успеем ответить + сделать retry.
Retry
Классическая ошибка: retry в цикле сразу.
for attempt in range(10):
try:
return await client.get(url)
except HTTPError:
continue # 10 запросов подряд, никакой паузы
При падении downstream все ретраи прилетают одновременно — thundering herd. Downstream ложится от нагрузки самих retries.
Правильно: exponential backoff + jitter.
import random
import asyncio
async def call_with_retry(url, max_attempts=3):
for attempt in range(max_attempts):
try:
return await client.get(url)
except HTTPError as e:
if attempt == max_attempts - 1:
raise
# exponential + jitter
delay = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
Библиотеки: stamina, tenacity. Не пишите руками, используйте библиотеку.
Что retry-ить:
- Transient errors: network glitch, 503, 429.
- НЕ retry: 4xx (кроме 429), validation errors, business errors.
Retry idempotent-операций (GET, PUT). Retry POST — только с idempotency key.
Circuit Breaker
Механизм, который «выключает» вызовы к сбойному сервису на время. При росте ошибок автоматически fail-fast, чтобы не забить пул соединений и дать downstream восстановиться.
Состояния:
- Closed — normal, запросы идут.
- Open — сервис признан упавшим, все запросы fail immediately.
- Half-open — пробный запрос через N секунд. Успех → closed, ошибка → open.
Библиотеки: purgatory, pybreaker.
Когда нужен: любой критичный внешний вызов. Особенно микросервисы.
Bulkhead
Изоляция ресурсов: один медленный downstream не должен положить пул соединений всей системы.
Пример: у сервиса 100 HTTP-connections. Одна интеграция с внешним медленным API занимает 90 из них. Другие RPC тоже страдают.
Решение: отдельные пулы для разных downstream. httpx.AsyncClient(limits=...) — по клиенту на downstream.
Как правильно: декораторы для cross-cutting
Наиболее чистый способ — декораторы вокруг use case:
class LoggingUseCase(UseCase):
"""Декоратор: логирует start/end/error use case."""
def __init__(self, inner: UseCase, logger: BoundLogger):
self._inner = inner
self._logger = logger
async def execute(self, cmd):
self._logger.info('use_case_started', use_case=type(self._inner).__name__)
try:
result = await self._inner.execute(cmd)
self._logger.info('use_case_completed')
return result
except DomainError as e:
self._logger.warning('use_case_failed', reason=str(e))
raise
except Exception as e:
self._logger.error('use_case_error', exc_info=True)
raise
class MetricsUseCase(UseCase):
"""Декоратор: метрики duration."""
...
В composition root:
handler = LoggingUseCase(
MetricsUseCase(
TracedUseCase(
RealPlaceOrderHandler(repo, ...)
)
),
logger=...,
)
Каждый концерн — отдельный слой. Domain не знает про них.
Как не надо
1. Логирование внутри domain
Уже разбирали. Domain — чист.
2. except Exception со свалом в info
try:
await complex_operation()
except Exception: # !!
logger.info('something happened') # !!
Exception съест CancelledError (в async), MemoryError, всё. info вместо error теряет проблему. Никакого stack trace.
Правильно: ловить конкретные ожидаемые исключения. Для непредвиденных — logger.exception('...') с полным stack trace, уровень error или critical.
3. Retry без idempotency
for attempt in range(3):
try:
return await payment_api.charge(amount) # !! списание при каждом успехе
except HTTPError:
pass
Если retry прошёл, но ответ потерян — клиент списан дважды.
Правильно: idempotency key. payment_api.charge(amount, idempotency_key=uuid).
4. Логирование PII
logger.info('user_registered', email='john@example.com', password='hunter2')
Персональные данные + пароль в логах. GDPR, безопасность.
Правильно: маскирование чувствительных полей. email='j***@e***.com'. Пароль — никогда.
5. Retry «на всё»
@retry(attempts=5)
async def create_user(cmd): ... # !! бизнес-ошибки тоже попадут в retry
Валидация упала → retry → снова валидация → снова упала. 5 бессмысленных попыток.
Правильно: retry только transient errors. @retry(on=(HTTPError, ConnectionError)).
Trade-offs
| Concern | Где | Как |
|---|---|---|
| Logging | Application + Infrastructure | Structured, contextvars, decorator |
| Metrics | HTTP middleware, use case decorator | Prometheus |
| Tracing | Middleware + auto-instrumentation | OpenTelemetry |
| Retry | Infrastructure adapter | Библиотека (stamina, tenacity), с backoff+jitter |
| Circuit breaker | Adapter к downstream | Библиотека (purgatory) |
| Timeout | Каждый external call | httpx.Timeout, asyncio.timeout |
Ни один concern не должен жить в domain.
В твоём же коде
Анонимизированный микросервис имеет базовую infrastructure для cross-cutting:
Хорошо:
structlogс context propagation — есть.stamina.retryнаCatalogAPIAdapter— правильно.- Логи внутри use case не разбросаны — есть central logging в handler.
Что стоит улучшить:
- Distributed tracing — нет. Добавить OpenTelemetry auto-instrumentation.
- Circuit breaker к marketplace-api. Сейчас retry, но при длительном сбое downstream — thundering herd.
- Timeout меньше SLA родителя. Проверить, что
HTTPX_CLIENT_TIMEOUT=60.0— не больше, чем время обработки одного сообщения RabbitMQ. - Bulkhead между marketplace-api client и другими connections.
Дальнейшее чтение
- Google. Site Reliability Engineering. Бесплатно. Chapter про timeouts, retries, deadlines.
- Google. The SRE Workbook. Практика через реальные примеры.
- OpenTelemetry documentation. Стандарт.
- structlog documentation. Structured logs для Python.
- stamina. Retry для Python сделан правильно.
- Michael T. Nygard. Release It! Design and Deploy Production-Ready Software. Классика про resiliency patterns.
Проверьте себя
Мини-quiz · закрепить
Проверьте себя
Q1. Где НЕ должен жить logger?
Q2. Retry без jitter — чем плох?
Q3. Circuit breaker в состоянии Open делает что?
Q4. Distributed tracing нужен для...
Q5. Правильный подход к cross-cutting concerns: