Modular Monolith Backend Systems
Section 1: Concept Definition & Architectural Paradigms
What Is a Modular Monolith?
A modular monolith is a single deployable unit — one process, one codebase, one container — whose internal structure is divided into explicitly bounded, independently reasoned modules with enforced boundaries between them.
The key distinction from a traditional monolith is not deployment — it's discipline. The code is still deployed as one artifact, but modules are designed to be ignorant of each other's internals. They communicate through contracts (interfaces + DTOs), not by reaching directly into each other's layers, database tables, or service implementations.
Think of it as applying the architectural rigor of microservices inside a single process.
The Three Paradigms Side by Side
Traditional Monolith
├── models.py ← every model, everything mixed
├── views.py ← all routes
├── services.py ← all business logic
└── database.py ← shared session, shared schema
There are no enforced module boundaries. Any file can import anything. User is referenced in Order, Payment, Notification directly. The database is a single schema. Over time, the codebase becomes a ball of mud — not because engineers are careless, but because nothing structurally prevents coupling from accumulating.
Failure mode: You cannot change the User model without auditing the entire codebase. You cannot test Order logic without standing up Auth and Notification. Deployment is safe only if everything works simultaneously.
Microservices
user-service/ ← separate repo, separate DB, separate process
order-service/ ← separate repo, separate DB, separate process
payment-service/ ← separate repo, separate DB, separate process
Each service owns its data completely. Communication happens over the network (REST, gRPC, message queues). Each service deploys independently.
What this actually costs:
- Every cross-service operation is a distributed transaction problem
- You now have network latency, partial failure, eventual consistency, and service discovery to reason about from day one
- Operational overhead: Kubernetes, service mesh, distributed tracing, independent CI pipelines, separate on-call runbooks — all before you've validated that your product idea is correct
- A team of 3 people managing 8 microservices is a support burden, not an architecture win
Failure mode: Teams adopt microservices for organizational reasons (autonomy, team ownership) and pay the full operational cost without the team scale that justifies it.
Modular Monolith
app/
├── modules/
│ ├── auth/ ← bounded module, owns its models, services, DB schema
│ ├── orders/ ← bounded module, communicates via contracts only
│ └── payments/ ← bounded module
├── shared/ ← truly shared utilities only (logging, config, base classes)
└── main.py
One process. Strict internal boundaries. Modules interact through defined interfaces — never by importing each other's models.py or calling private functions.
Comparative Analysis
| Dimension | Traditional Monolith | Modular Monolith | Microservices |
|---|---|---|---|
| Deployment unit | Single | Single | Multiple, independent |
| Codebase structure | Flat / layered globally | Bounded modules per domain | Separate repos per service |
| Inter-component calls | Direct imports, unrestricted | Contracts (interfaces + DTOs) | Network calls (REST/gRPC/events) |
| Database | Single schema, fully shared | Single DB, schema-per-module | Database-per-service |
| Transaction support | Single transaction across all code | Single transaction within a module; controlled cross-module | Distributed transactions (Saga, outbox) |
| Operational complexity | Low | Low | High |
| Local development | Simple | Simple | Complex (Docker Compose, service discovery) |
| Testability | Difficult (tight coupling) | Good (modules are isolated units) | Good per-service, but integration tests are hard |
| Team scaling | Poor — everyone touches everything | Good — modules can be owned by sub-teams | Excellent — full team autonomy, high coordination overhead |
| Deployment speed | Fast | Fast | Potentially fast per service, but coordination cost |
| Failure isolation | None — one bug can crash everything | None at process level, but domain logic is isolated | Strong — service crashes don't directly propagate |
| Observability | Trivial | Trivial | Requires investment (tracing, correlation IDs, centralized logging) |
When Each Architecture Is Appropriate
Traditional monolith: When you are in proof-of-concept stage and speed of iteration matters more than anything else. This is defensible for 0-to-1 product work with a solo developer or very small team. It becomes a liability the moment the codebase survives long enough to have multiple contributors.
Modular monolith: The correct default for most production systems with teams of 2–15 engineers. It gives you the operational simplicity of a monolith with architectural discipline that doesn't punish growth. It's also the correct starting point even if microservices are the long-term target — because a well-bounded modular monolith has seams that map directly to future service extractions.
Microservices: Appropriate when you have genuinely independent scaling requirements (the payment service needs 50x the compute of the notification service), multiple teams who need full deployment autonomy, and the organisational maturity to operate distributed infrastructure. Shopify, Uber, and Netflix adopted microservices at team sizes and traffic levels that made the operational investment rational. Most systems never reach that inflection point.
Why Teams Migrate Between Paradigms
Monolith → Modular monolith: Triggered when a traditional monolith becomes unmaintainable. Engineers can't change one area without breaking another. Onboarding new developers requires understanding the entire codebase. The migration is an internal refactor — no deployment changes required.
Modular monolith → Microservices: Triggered by team scale (multiple teams needing independent deployment), genuinely divergent scaling requirements per module, or technology heterogeneity needs (one module needs a graph database, another needs a time-series store). A well-structured modular monolith makes this migration significantly cheaper — each bounded module already has clean interfaces and a private schema, and the extraction becomes a mechanical process rather than an archaeological dig.
Microservices → Modular monolith (the underreported migration): This happens more than the industry acknowledges. Teams that adopted microservices too early — chasing hype rather than solving real problems — often consolidate back into fewer, larger services or modular monoliths because the operational overhead exceeds their engineering capacity. The complexity of distributed transactions, service discovery, and network failure modes proves to be a net negative when the team is small and the traffic doesn't justify the infrastructure.
The Structural Principle That Separates It from a Traditional Monolith
The defining property of a modular monolith is not file organization — it's enforced ignorance. The orders module must be genuinely incapable (not just discouraged) from reaching into the auth module's internal service implementations or database tables. This enforcement can happen through:
- Python import linting (enforcing what can import what)
- PostgreSQL schema-level RBAC (the
ordersDB role literally cannotSELECTfrom theauthschema) - Architecture tests that fail CI if boundary violations are detected
Without enforcement, you have a labelled monolith, not a modular one. The boundary degrades under deadline pressure unless the architecture makes violation structurally harder than compliance.
Section 2: Core Design Principles (Enforced, Not Theoretical)
2.1 Domain-Driven Design (DDD)
DDD is not a folder structure. It's a discipline for aligning code boundaries with business reality. The core insight is that software complexity is not primarily technical — it's conceptual. The hardest bugs in production systems are not algorithmic; they're ambiguity about what a concept means in different contexts.
Bounded Contexts
A bounded context is a region of the system where a specific model applies, terms have precise meanings, and rules are enforced consistently — and outside that boundary, the same word may mean something entirely different.
The canonical example: User.
In the auth module, User means: credentials, password hash, login history, MFA status.
In the billing module, User means: payment method, subscription tier, invoice address.
In the notifications module, User means: email address, push token, preference flags.
These are not the same entity wearing different hats. They are different models that happen to reference the same human being. If you create one User model that serves all three contexts, you get a god object that no single module owns and every module is afraid to change.
The rule: Each module defines its own representation of external concepts it cares about. The billing module doesn't import auth.models.User. It has billing.models.BillingAccount with a user_id: UUID — a soft reference to something it doesn't own.
This is the foundational constraint from which everything else follows.
Entity Ownership Across Modules
Every entity in the system has exactly one owning module. That module is responsible for:
- The canonical schema for that entity
- The lifecycle (create, update, delete)
- The authoritative source of truth
All other modules hold references, not copies of the model. They may maintain a read model — a local, denormalized projection of data they need — but they do not reach into the owning module's tables.
auth module → owns: users, sessions, mfa_tokens
billing module → owns: billing_accounts, subscriptions, invoices
orders module → owns: orders, order_items, fulfillment_status
inventory module → owns: products, stock_levels, reservations
The orders module needs to know a product's name and price at the time of order. It does not query inventory.products directly. It either:
- Calls the inventory module's public contract at order creation time and stores the snapshot
- Maintains a local read model (
orders.product_snapshots) populated by events the inventory module emits
Both are valid. Direct cross-module table queries are not.
Consequences of Shared Entities
When a single entity is shared across modules without a clear owner, you accumulate these failure modes:
Schema coupling: A migration in the auth module that renames a column breaks billing, orders, and notifications simultaneously. You cannot deploy independently. You cannot reason locally.
Transaction entanglement: A billing operation now needs to lock a row that the auth module's login flow also needs. You have created a deadlock surface across unrelated domains.
Test pollution: You cannot write an isolated unit test for billing logic without constructing a full User object with auth-specific fields that billing has no business knowing about.
Semantic drift: user.status means email_verified in auth context and subscription_active in billing context. Both teams add fields to the same column over time. Neither team can safely remove anything.
The shared entity problem is the most common source of hidden coupling in systems that look modular but aren't. The symptom is that a "small" change to a central model requires touching 12 files across 6 modules.
2.2 Separation of Concerns
There are two axes of separation in a modular monolith. Both are required.
Vertical Separation — Modules
This is the domain axis. Business capability is divided into modules:
modules/
├── auth/
├── billing/
├── orders/
└── inventory/
Each module is a vertical slice of the system. It contains everything needed to serve its domain: routes, business logic, data access, schemas, migrations. Nothing leaks out except through its public contract.
Horizontal Separation — Layers Inside a Module
Within each module, responsibilities are separated into horizontal layers:
modules/orders/
├── router.py ← HTTP: request parsing, response formatting, no business logic
├── service.py ← Business logic: rules, orchestration, decisions
├── repository.py ← Data access: all SQL lives here, nothing else
├── schemas.py ← Pydantic: request/response contracts, NOT ORM models
├── models.py ← DB models: SQLAlchemy or raw table definitions
└── events.py ← Domain events this module emits
The discipline here is strict unidirectional flow:
router → service → repository → database
What this means in practice:
- The router never writes SQL. It doesn't know what a database is.
- The service never constructs HTTP responses. It returns domain objects or raises domain exceptions.
- The repository never applies business rules. It executes queries and returns data.
- The router never calls another module's repository directly.
Why this matters: When the router layer only handles HTTP concerns, you can swap your transport layer (HTTP → gRPC → CLI) without touching business logic. When the repository layer only handles data access, you can swap PostgreSQL for a different store without rewriting services. Each layer has exactly one reason to change.
What breaks if you ignore it: You get services that construct JSONResponse objects, routers that contain if user.subscription_tier == 'pro': checks, and repositories that send emails. These are not hypothetical — they are the exact failure modes that appear under deadline pressure when there are no enforced boundaries.
2.3 Module Contracts — The Critical Section
A contract is the only thing one module is allowed to know about another. It defines what a module exposes to the outside world without revealing how it works internally.
A contract consists of two components:
- An interface — what operations the module supports
- DTOs (Data Transfer Objects) — what data crosses the boundary, expressed as Pydantic models
Why Interfaces, Not Concrete Services
If billing imports auth.services.AuthService directly:
# billing/service.py — WRONG
from app.modules.auth.services import AuthService
class BillingService:
def __init__(self):
self.auth = AuthService() # direct dependency
You have now hard-coupled billing to auth's implementation. You cannot test billing without instantiating auth. You cannot replace the auth implementation. You cannot swap a real auth service for a mock in tests. The auth module's internal refactors now break billing.
The correct pattern uses an interface:
# modules/auth/contracts.py — the PUBLIC surface of the auth module
from abc import ABC, abstractmethod
from uuid import UUID
from pydantic import BaseModel
# DTO — what crosses the boundary
class UserIdentity(BaseModel):
user_id: UUID
email: str
is_active: bool
# Interface — what operations are available
class IUserService(ABC):
@abstractmethod
async def get_user_identity(self, user_id: UUID) -> UserIdentity | None:
...
@abstractmethod
async def assert_user_exists(self, user_id: UUID) -> None:
"""Raises UserNotFoundError if the user does not exist."""
...
# modules/auth/services.py — the PRIVATE implementation
from .contracts import IUserService, UserIdentity
class UserService(IUserService):
def __init__(self, repository: IUserRepository):
self._repo = repository
async def get_user_identity(self, user_id: UUID) -> UserIdentity | None:
user = await self._repo.find_by_id(user_id)
if not user:
return None
return UserIdentity(
user_id=user.id,
email=user.email,
is_active=user.is_active
)
# modules/billing/service.py — depends on the INTERFACE, not the implementation
from app.modules.auth.contracts import IUserService
class BillingService:
def __init__(self, user_service: IUserService, repository: IBillingRepository):
self._users = user_service
self._repo = repository
async def create_billing_account(self, user_id: UUID) -> BillingAccount:
await self._users.assert_user_exists(user_id) # uses the contract
# ... billing logic
Now billing is completely ignorant of how auth works internally. In tests, you inject a mock that implements IUserService. In production, you inject the real UserService. The contract is the stable surface; the implementation can change freely.
Exposing Contracts via __init__.py
The __init__.py of each module defines its public API. Everything not exported here is private:
# modules/auth/__init__.py — the module's public surface
from .contracts import IUserService, UserIdentity
from .exceptions import UserNotFoundError, AuthenticationError
from .router import router # FastAPI router, mounted by main.py
# NOT exported:
# - UserService (implementation detail)
# - UserRepository (implementation detail)
# - User ORM model (internal to this module)
# - Any internal utility functions
__all__ = [
"IUserService",
"UserIdentity",
"UserNotFoundError",
"AuthenticationError",
"router",
]
This is not convention — it's enforcement. If billing tries to do:
from app.modules.auth.services import UserService # private
An import linter (import-linter or a custom pylint rule) will fail CI. The __init__.py is the gate. If it's not in __all__, it doesn't exist as far as other modules are concerned.
DTOs at the Boundary
The UserIdentity DTO in the example above is critical. Notice what it contains:
class UserIdentity(BaseModel):
user_id: UUID
email: str
is_active: bool
It does not contain: password_hash, mfa_secret, last_login_ip, failed_login_count. Those are auth module internals. The DTO exposes only what other modules legitimately need.
This is the anti-corruption layer at the data level. Even if auth's internal User ORM model has 30 fields, no other module ever sees more than what the contract DTO exposes. If auth adds internal fields, no other module is affected. If auth renames internal fields, the DTO is the adapter — it translates internally before crossing the boundary.
2.4 Dependency Direction
The Rule
Dependencies flow inward. Outer layers depend on inner layers. Inner layers never depend on outer layers.
Infrastructure (DB, HTTP, external APIs)
↓
Adapters (repositories, controllers, event handlers)
↓
Application (services, use cases)
↓
Domain (entities, value objects, interfaces)
The domain layer — your core business rules — depends on nothing external. It doesn't know what FastAPI is. It doesn't know what PostgreSQL is. It doesn't import sqlalchemy. It defines interfaces that the outer layers implement.
This is dependency inversion: high-level policy does not depend on low-level detail. Low-level detail depends on high-level policy. The IUserRepository interface is defined in the domain layer. The PostgresUserRepository that actually executes SQL is in the infrastructure layer. The domain depends on the interface. The infrastructure implements it.
In Python, this looks like:
# domain layer — no external imports
class IUserRepository(ABC):
@abstractmethod
async def find_by_id(self, user_id: UUID) -> User | None: ...
@abstractmethod
async def save(self, user: User) -> None: ...
# infrastructure layer — implements the interface
class PostgresUserRepository(IUserRepository):
def __init__(self, connection: asyncpg.Connection):
self._conn = connection
async def find_by_id(self, user_id: UUID) -> User | None:
row = await self._conn.fetchrow(
"SELECT id, email, password_hash, is_active FROM auth.users WHERE id = $1",
user_id
)
if not row:
return None
return User(id=row["id"], email=row["email"], ...)
The service receives IUserRepository — it never knows whether it's talking to Postgres, Redis, or an in-memory dict used in tests.
Avoiding Circular Dependencies
Circular dependencies are the architectural equivalent of a deadlock. They manifest when module A imports from module B, and module B imports from module A — directly or transitively.
# orders/service.py
from app.modules.inventory.contracts import IInventoryService # orders → inventory
# inventory/service.py
from app.modules.orders.contracts import IOrderService # inventory → orders — CIRCULAR
This fails at import time in Python. But more subtly, it reveals a domain modeling problem: if orders and inventory depend on each other, their boundaries are wrong. One of them is doing the other's job.
The resolution strategies:
Strategy 1 — Extract a shared concept. If both modules need ProductSnapshot, move that concept to a shared/ package that neither module owns but both can import from. Shared must contain only pure data structures and utilities — never business logic.
Strategy 2 — Invert the dependency with an event. Instead of inventory calling orders, orders emits an event (OrderPlaced). inventory subscribes to it. Neither module knows the other exists.
Strategy 3 — Merge the modules. If two modules are so tightly coupled that they keep creating circular dependencies, they may be one domain that was incorrectly split. Merge them and redraw the boundary.
Why Discipline Alone Fails
Every principle in this section requires structural enforcement — not team agreement, not code review culture, not documentation.
The reason is simple: under deadline pressure, in the middle of an incident, at 2am before a launch — engineers take the path of least resistance. If importing auth.models.User directly into billing saves 20 minutes right now, it will happen. Repeatedly. Over time, those shortcuts are the accumulated technical debt that makes a codebase unmaintainable.
Enforcement mechanisms that actually work:
-
import-linterconfigured in CI — defines allowed import chains, fails the pipeline on violations - PostgreSQL schema-level RBAC — the billing database role literally cannot
SELECTfrom the auth schema, regardless of what the application code tries to do - Architecture tests with
pytest— tests that assertbilling.servicesdoes not import anything fromauth.services
A well-designed architecture makes violations harder than compliance. The developer who tries to reach into another module's internals should hit friction before they merge, not during a post-mortem six months later.
Section 3: Module Structuring in FastAPI
The Three Approaches Compared
Before defining the ideal structure, you need to understand what you're choosing between and why the alternatives fail at scale.
Approach 1 — Router-Based (Flat)
app/
├── main.py
├── database.py
├── models.py ← all ORM models, every domain
├── schemas.py ← all Pydantic models, every domain
└── routers/
├── auth.py
├── orders.py
└── billing.py
This is what most FastAPI tutorials produce. It looks organized until it isn't.
What breaks:
models.py becomes a 600-line file where User, Order, Invoice, Product, and Notification all coexist. A change to any model requires understanding the entire file. schemas.py follows the same trajectory. The "separation" is by file type, not by domain — which is the wrong axis entirely.
You end up with horizontal layers that span the entire system instead of vertical boundaries per domain. routers/auth.py imports from models.py which is shared with routers/orders.py. The coupling is total. You just can't see it because it's hidden behind filenames.
Appropriate for: Prototypes. Single-developer projects under ~5 endpoints. Nothing that will be maintained by more than one person for more than a month.
Approach 2 — Service Layer (Horizontal)
app/
├── main.py
├── models/
│ ├── user.py
│ ├── order.py
│ └── invoice.py
├── services/
│ ├── auth_service.py
│ ├── order_service.py
│ └── billing_service.py
├── repositories/
│ ├── user_repository.py
│ └── order_repository.py
└── routers/
├── auth.py
└── orders.py
This introduces the service layer correctly and separates concerns horizontally. It is a genuine improvement over the flat approach.
What breaks:
The structure organizes by layer, not by domain. To understand the full order flow, you must navigate routers/orders.py → services/order_service.py → repositories/order_repository.py → models/order.py. These files live in four separate directories. A new engineer reading the codebase cannot read one folder and understand one domain — they must hold the entire cross-directory structure in their head simultaneously.
More critically: there is no structural barrier preventing order_service.py from importing user_repository.py directly, bypassing auth's service layer entirely. The coupling is still possible; it just looks more organized while it accumulates.
Appropriate for: Small teams who want some structure but aren't yet building for multiple domain owners. Works acceptably up to ~8 endpoints per domain before cognitive overhead degrades.
Approach 3 — Package-Per-Domain (Modular Standard)
app/
├── main.py
├── core/ ← system-wide infrastructure
│ ├── config.py
│ ├── database.py
│ ├── exceptions.py
│ └── logging.py
├── shared/ ← truly shared data structures only
│ ├── base_repository.py
│ ├── base_schema.py
│ └── types.py
└── modules/
├── auth/
├── orders/
├── billing/
└── inventory/
Each module is a self-contained package. To understand the auth domain, you read the auth/ folder. Everything that domain owns is there. Nothing it doesn't own is there.
Why this works at scale:
A new engineer assigned to fix a billing bug opens modules/billing/. They don't need to understand auth, orders, or inventory. The module is a unit of reasoning, a unit of testing, a unit of potential future extraction.
Scalability limits: The modular structure remains valid until individual modules become large enough to warrant sub-modules. An orders module that handles ordering, fulfillment, returns, and disputes may eventually need modules/orders/fulfillment/, modules/orders/returns/ as nested sub-domains. This is the correct evolution — not a sign the approach is failing.
Production Folder Structure
This is the full structure you build to. Every decision is justified below.
app/
├── main.py ← app factory, router registration, lifespan
├── core/
│ ├── config.py ← pydantic-settings, env vars, never hardcoded values
│ ├── database.py ← asyncpg pool, connection lifecycle
│ ├── exceptions.py ← base exception classes for the entire system
│ ├── logging.py ← structured JSON logging config
│ └── middleware.py ← correlation ID injection, request timing
├── shared/
│ ├── base_repository.py ← abstract base with common query helpers
│ ├── base_schema.py ← common Pydantic config (camelCase, etc.)
│ └── types.py ← UUID aliases, KES money type, etc.
└── modules/
└── orders/ ← one module, fully self-contained
├── __init__.py ← public contract surface (exports only)
├── contracts.py ← IOrderService interface + DTOs for other modules
├── exceptions.py ← OrderNotFoundError, InvalidOrderStateError, etc.
├── router.py ← FastAPI router, HTTP only
├── service.py ← business logic, orchestration
├── repository.py ← all SQL for this module
├── schemas.py ← Pydantic request/response models
├── models.py ← DB table definitions (SQLAlchemy or raw)
├── events.py ← domain events this module emits
├── dependencies.py ← FastAPI Depends() factories for this module
└── migrations/ ← Alembic versions scoped to this module
└── versions/
The Request Lifecycle — Traced in Full
HTTP Request
↓
Router ← parse, validate input shape, extract path/query params
↓
Service ← business rules, decisions, orchestration
↓
Unit of Work ← transaction boundary opens here
↓
Repository ← executes SQL, returns domain objects
↓
PostgreSQL ← query execution
↓
Repository ← maps rows to domain objects
↓
Unit of Work ← commit or rollback
↓
Service ← maps domain result to response DTO
↓
Router ← returns HTTP response
Each layer has one job. Let's trace a real request through the entire stack.
The Router — HTTP Boundary Only
# modules/orders/router.py
from uuid import UUID
from fastapi import APIRouter, Depends, status
from .schemas import CreateOrderRequest, OrderResponse
from .service import OrderService
from .dependencies import get_order_service
from .exceptions import OrderNotFoundError
router = APIRouter(prefix="/orders", tags=["orders"])
@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
async def create_order(
payload: CreateOrderRequest, # FastAPI validates shape here
service: OrderService = Depends(get_order_service),
):
order = await service.create_order(
user_id=payload.user_id,
items=payload.items,
)
return OrderResponse.model_validate(order)
@router.get("/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: UUID,
service: OrderService = Depends(get_order_service),
):
order = await service.get_order(order_id)
if order is None:
raise OrderNotFoundError(order_id)
return OrderResponse.model_validate(order)
Notice what the router does not do:
- No SQL
- No business logic (
if order.status == "pending" and user.tier == "pro") - No calls to other module repositories
- No exception catching beyond what produces a specific HTTP response
The router knows about HTTP. It knows about Pydantic schemas. That is all.
Schemas — Request/Response Contracts
# modules/orders/schemas.py
from uuid import UUID
from decimal import Decimal
from pydantic import BaseModel, field_validator
from shared.base_schema import AppBaseModel
class OrderItemRequest(AppBaseModel):
product_id: UUID
quantity: int
@field_validator("quantity")
@classmethod
def quantity_must_be_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("Quantity must be at least 1")
return v
class CreateOrderRequest(AppBaseModel):
user_id: UUID
items: list[OrderItemRequest]
@field_validator("items")
@classmethod
def must_have_items(cls, v: list) -> list:
if not v:
raise ValueError("Order must contain at least one item")
return v
class OrderItemResponse(AppBaseModel):
product_id: UUID
product_name: str
quantity: int
unit_price: Decimal
line_total: Decimal
class OrderResponse(AppBaseModel):
order_id: UUID
user_id: UUID
status: str
items: list[OrderItemResponse]
total_amount: Decimal
created_at: datetime
Critical distinction: schemas.py contains Pydantic models for HTTP input/output. models.py contains database table definitions. These are never the same class. Conflating them couples your API contract to your database schema — a change to your table structure breaks your API response, and vice versa.
The Service — Business Logic Only
# modules/orders/service.py
from uuid import UUID
from .repository import OrderRepository
from .schemas import OrderItemRequest
from .models import Order, OrderItem
from .events import OrderPlacedEvent
from app.modules.inventory.contracts import IInventoryService
from app.core.events import EventBus
class OrderService:
def __init__(
self,
repository: OrderRepository,
inventory: IInventoryService, # interface, not concrete class
event_bus: EventBus,
):
self._repo = repository
self._inventory = inventory
self._events = event_bus
async def create_order(
self,
user_id: UUID,
items: list[OrderItemRequest],
) -> Order:
# 1. Validate stock availability (calls inventory contract)
product_ids = [item.product_id for item in items]
snapshots = await self._inventory.get_product_snapshots(product_ids)
# 2. Apply business rules
order_items = []
for item in items:
snapshot = snapshots.get(item.product_id)
if snapshot is None:
raise ProductNotFoundError(item.product_id)
if snapshot.stock_quantity < item.quantity:
raise InsufficientStockError(item.product_id, item.quantity)
order_items.append(OrderItem(
product_id=item.product_id,
product_name=snapshot.name, # snapshot stored here
quantity=item.quantity,
unit_price=snapshot.price,
))
# 3. Persist — repository handles the transaction
order = await self._repo.create_order(user_id=user_id, items=order_items)
# 4. Emit domain event
await self._events.publish(OrderPlacedEvent(
order_id=order.id,
user_id=user_id,
total_amount=order.total_amount,
))
return order
async def get_order(self, order_id: UUID) -> Order | None:
return await self._repo.find_by_id(order_id)
The service makes decisions. It enforces business rules. It orchestrates calls across repositories and external contracts. It does not know what SQL looks like. It does not know what an HTTP status code is.
The Repository — Data Access Only
# modules/orders/repository.py
import asyncpg
from uuid import UUID, uuid4
from decimal import Decimal
from .models import Order, OrderItem
class OrderRepository:
def __init__(self, connection: asyncpg.Connection):
self._conn = connection
async def create_order(
self,
user_id: UUID,
items: list[OrderItem],
) -> Order:
order_id = uuid4()
total = sum(item.unit_price * item.quantity for item in items)
# Transaction managed by Unit of Work — connection is already in transaction
await self._conn.execute(
"""
INSERT INTO orders.orders (id, user_id, status, total_amount, created_at)
VALUES ($1, $2, 'pending', $3, NOW())
""",
order_id, user_id, total
)
for item in items:
await self._conn.execute(
"""
INSERT INTO orders.order_items
(id, order_id, product_id, product_name, quantity, unit_price)
VALUES ($1, $2, $3, $4, $5, $6)
""",
uuid4(), order_id, item.product_id,
item.product_name, item.quantity, item.unit_price
)
return await self.find_by_id(order_id)
async def find_by_id(self, order_id: UUID) -> Order | None:
row = await self._conn.fetchrow(
"""
SELECT
o.id, o.user_id, o.status, o.total_amount, o.created_at,
json_agg(json_build_object(
'product_id', oi.product_id,
'product_name', oi.product_name,
'quantity', oi.quantity,
'unit_price', oi.unit_price
)) AS items
FROM orders.orders o
JOIN orders.order_items oi ON oi.order_id = o.id
WHERE o.id = $1
GROUP BY o.id
""",
order_id
)
if not row:
return None
return Order.from_row(row)
All SQL lives here. The service never sees a raw query. The router never sees a database row. If you need to optimize a query, you open repository.py. The change is contained.
The Unit of Work — Transaction Boundary
# core/database.py
import asyncpg
from contextlib import asynccontextmanager
class UnitOfWork:
def __init__(self, pool: asyncpg.Pool):
self._pool = pool
self.connection: asyncpg.Connection | None = None
async def __aenter__(self) -> "UnitOfWork":
self.connection = await self._pool.acquire()
self._transaction = self.connection.transaction()
await self._transaction.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
await self._transaction.rollback()
else:
await self._transaction.commit()
await self._pool.release(self.connection)
The UnitOfWork acquires a connection from the pool, opens a transaction, and hands the connection to any repository that needs it. On success, it commits. On any exception, it rolls back. The service never calls .commit() or .rollback() directly — those are infrastructure concerns.
Dependency Injection — Wiring It Together
# modules/orders/dependencies.py
from fastapi import Depends
import asyncpg
from app.core.database import get_connection
from app.modules.inventory.dependencies import get_inventory_service
from app.core.events import get_event_bus
from .repository import OrderRepository
from .service import OrderService
async def get_order_service(
conn: asyncpg.Connection = Depends(get_connection),
inventory: IInventoryService = Depends(get_inventory_service),
event_bus: EventBus = Depends(get_event_bus),
) -> OrderService:
repo = OrderRepository(conn)
return OrderService(
repository=repo,
inventory=inventory,
event_bus=event_bus,
)
# core/database.py
async def get_connection(request: Request) -> asyncpg.Connection:
async with request.app.state.pool.acquire() as conn:
yield conn
FastAPI's Depends() system is your dependency injection container. Each request gets its own connection, its own repository instances, its own service instance. There is no shared mutable state between requests. This is what makes horizontal scaling safe — each request is completely self-contained.
Application Assembly — main.py
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg
from app.core.config import settings
from app.core.middleware import CorrelationIDMiddleware
from app.modules.auth import router as auth_router
from app.modules.orders import router as orders_router
from app.modules.billing import router as billing_router
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
app.state.pool = await asyncpg.create_pool(
dsn=settings.database_url,
min_size=5,
max_size=20,
)
yield
# shutdown
await app.state.pool.close()
app = FastAPI(
title="KukuFiti API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(CorrelationIDMiddleware)
app.include_router(auth_router, prefix="/api/v1")
app.include_router(orders_router, prefix="/api/v1")
app.include_router(billing_router, prefix="/api/v1")
main.py is the composition root. It knows about all modules (because it mounts their routers), but modules do not know about main.py. The connection pool lives on app.state — created once on startup, shared across all requests via the pool's internal connection management, destroyed on shutdown.
Where Validation Happens
| Concern | Layer | Mechanism |
|---|---|---|
| Request shape (field types, required fields) | Router/Pydantic | Automatic — FastAPI raises 422 before handler runs |
| Field-level constraints (positive integer, valid enum) | Schema validators |
@field_validator in Pydantic model |
| Business rule validation (stock available, user eligible) | Service | Domain exceptions raised by service, caught by exception handler |
| Database constraints (unique, foreign key) | Repository/DB |
asyncpg.UniqueViolationError caught in repository, re-raised as domain exception |
Pydantic handles structure. The service handles semantics. The database handles integrity. Each layer catches what it understands and translates it upward.
Where Transactions Start and End
Transactions start and end at the Unit of Work boundary, which is opened by the dependency injection layer — not by the service, not by the repository.
Depends(get_connection) ← connection acquired, transaction starts
↓
service.create_order() ← business logic runs inside the transaction
↓
repo.create_order() ← SQL executes inside the transaction
repo.find_by_id() ← SQL executes inside the same transaction
↓
return from handler ← transaction commits, connection released
exception raised ← transaction rolls back, connection released
This guarantees that if create_order inserts into orders.orders and then fails inserting into orders.order_items, both inserts are rolled back atomically. You never get a partial order in the database.
Section 4: Database Design (PostgreSQL — Primary Focus)
4.1 Architectural Options
Three strategies exist for mapping modules to database structure. The choice is irreversible without significant migration effort — get it right at the start.
Option A — Database-Per-Module
Each module gets its own PostgreSQL database instance.
auth_db ← separate Postgres instance
orders_db ← separate Postgres instance
billing_db ← separate Postgres instance
What this buys you: Maximum isolation. The auth database going down cannot affect the orders database at the connection level. Each database can be tuned independently.
What this costs you:
Cross-module joins are impossible. Any query that needs data from two domains requires an application-level join — fetch from one database, fetch from another, merge in Python. This is slower, more complex, and inconsistent under concurrent writes.
Transactions do not span database boundaries without a distributed transaction coordinator (2PC), which introduces a whole class of failure modes that are genuinely difficult to handle correctly.
Connection pool overhead multiplies by the number of databases.
Verdict: This is the microservices database pattern. It does not belong in a modular monolith. You pay the full operational cost of microservices data isolation without being in microservices.
Option B — Shared Database, Single Schema
All modules share one schema. Tables are distinguished by naming convention.
CREATE TABLE users (...); -- auth
CREATE TABLE auth_sessions (...); -- auth
CREATE TABLE orders (...); -- orders
CREATE TABLE order_items (...); -- orders
CREATE TABLE billing_accounts (...); -- billing
What this buys you: Simplicity. Cross-module joins work. Transactions span all tables. One connection pool.
What this costs you:
There is no database-level enforcement of module boundaries. The orders application role can query users directly. There is nothing stopping it. Your architecture exists only in documentation and team discipline — and Section 1 already established why discipline alone fails.
All modules share the same migration history. A schema change in billing blocks a deploy for orders. You cannot reason about the database per domain.
Table name collisions become a real problem at scale. status, created_at, metadata — these column names appear in every domain. When everything is in one schema, naming becomes increasingly tortured to avoid ambiguity.
Verdict: Appropriate for small prototypes. Becomes a liability once you have more than one developer touching the database regularly.
Option C — Schema-Per-Module (Recommended)
One PostgreSQL database. Multiple schemas — one per module. Each module's tables live exclusively in its schema. Database roles enforce that modules cannot cross schema boundaries.
CREATE SCHEMA auth;
CREATE SCHEMA orders;
CREATE SCHEMA billing;
CREATE SCHEMA inventory;
auth.users
auth.sessions
auth.mfa_tokens
orders.orders
orders.order_items
orders.product_snapshots
billing.billing_accounts
billing.subscriptions
billing.invoices
inventory.products
inventory.stock_levels
What this buys you:
- Database-level enforcement of module boundaries via RBAC (covered in 4.2)
- Cross-module joins remain possible when legitimately needed — but controlled
- Single connection pool, single transaction scope
- Migrations can be scoped per schema
- Table names are unambiguous —
orders.statusvsbilling.statusare different columns with no naming collision
What this costs you:
Slightly more setup. Schema-aware Alembic configuration (covered in 4.7). That is the full cost.
Verdict: This is the correct pattern for a production modular monolith. Everything that follows assumes schema-per-module.
4.2 Schema Isolation & RBAC
PostgreSQL's role system is your architectural enforcement mechanism at the database level. The application code respects module boundaries because it's structurally unable to violate them — not because of team discipline.
Schema and Role Setup
-- Create schemas
CREATE SCHEMA auth;
CREATE SCHEMA orders;
CREATE SCHEMA billing;
CREATE SCHEMA inventory;
-- Create application roles — one per module
CREATE ROLE auth_app WITH LOGIN PASSWORD 'auth_secret';
CREATE ROLE orders_app WITH LOGIN PASSWORD 'orders_secret';
CREATE ROLE billing_app WITH LOGIN PASSWORD 'billing_secret';
CREATE ROLE inventory_app WITH LOGIN PASSWORD 'inventory_secret';
-- Create a read-only reporting role (cross-schema, SELECT only)
CREATE ROLE reporting_reader WITH LOGIN PASSWORD 'reporting_secret';
-- Grant schema usage — each role can only USE its own schema
GRANT USAGE ON SCHEMA auth TO auth_app;
GRANT USAGE ON SCHEMA orders TO orders_app;
GRANT USAGE ON SCHEMA billing TO billing_app;
GRANT USAGE ON SCHEMA inventory TO inventory_app;
-- Grant table-level privileges within each schema
GRANT SELECT, INSERT, UPDATE, DELETE
ON ALL TABLES IN SCHEMA auth TO auth_app;
GRANT SELECT, INSERT, UPDATE, DELETE
ON ALL TABLES IN SCHEMA orders TO orders_app;
GRANT SELECT, INSERT, UPDATE, DELETE
ON ALL TABLES IN SCHEMA billing TO billing_app;
GRANT SELECT, INSERT, UPDATE, DELETE
ON ALL TABLES IN SCHEMA inventory TO inventory_app;
-- Ensure future tables in each schema are also covered
ALTER DEFAULT PRIVILEGES IN SCHEMA auth
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO auth_app;
ALTER DEFAULT PRIVILEGES IN SCHEMA orders
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO orders_app;
-- Reporting role gets SELECT across all schemas
GRANT USAGE ON SCHEMA auth, orders, billing, inventory TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA auth TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA orders TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA billing TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA inventory TO reporting_reader;
What This Enforces
The orders_app role has no USAGE on the auth schema. If the orders service's database connection attempts:
SELECT * FROM auth.users WHERE id = $1;
PostgreSQL returns:
ERROR: permission denied for schema auth
This fires regardless of what the application code says. The architecture is enforced at the infrastructure layer where it cannot be bypassed by a shortcut import or a rushed fix.
Configuration in the Application
Each module gets its own connection string, using its own role:
# core/config.py
class Settings(BaseSettings):
auth_db_url: str # postgres://auth_app:auth_secret@localhost/appdb
orders_db_url: str # postgres://orders_app:orders_secret@localhost/appdb
billing_db_url: str
inventory_db_url: str
model_config = SettingsConfigDict(env_file=".env")
Each module's dependency injection uses its module-specific connection, not a global shared connection. The connection pool is still to the same database — the isolation is at the role level, not the network level.
4.3 Foreign Keys vs Soft References
This is where most systems make a consequential mistake early, and pay for it when the system grows.
Why Cross-Schema Foreign Keys Are Forbidden
PostgreSQL supports foreign keys across schemas within the same database:
-- orders schema referencing auth schema
ALTER TABLE orders.orders
ADD CONSTRAINT fk_user
FOREIGN KEY (user_id) REFERENCES auth.users(id);
This is technically valid. It is architecturally wrong. Here is why:
It couples the schemas at the database level. The orders schema now cannot be migrated, restored, or extracted without the auth schema being present and consistent. If you ever extract orders into a separate service, this constraint must be dropped and handled at the application layer — a migration under production load, which is high-risk.
It defeats RBAC. The database must implicitly allow the orders_app role to read auth.users to validate the foreign key on every insert. Your role isolation breaks down the moment referential integrity enforcement requires cross-schema reads.
It creates deployment coupling. You cannot drop a column from auth.users without first modifying orders.orders. Two schemas that should be independently deployable are now locked together at the schema level.
The Soft Reference Strategy
Instead of a foreign key, the orders module stores the user_id as a bare UUID with no database-level constraint pointing to auth.users:
-- orders schema
CREATE TABLE orders.orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL, -- soft reference: no FK to auth.users
status TEXT NOT NULL DEFAULT 'pending',
total_amount NUMERIC(12,2) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
The integrity guarantee that would have been handled by the foreign key is now enforced at the application layer, in the service:
# orders/service.py
async def create_order(self, user_id: UUID, items: list[OrderItemRequest]) -> Order:
# Application-layer integrity check — replaces the FK constraint
identity = await self._user_service.get_user_identity(user_id)
if identity is None:
raise UserNotFoundError(user_id)
if not identity.is_active:
raise UserInactiveError(user_id)
# ... proceed with order creation
The trade-off is explicit: you give up automatic database-level referential integrity and take on the responsibility of enforcing it in code. This is the correct trade-off for a modular architecture because it keeps the enforcement at the layer that understands the domain rules — not at the infrastructure layer that has no business knowledge.
Lifecycle Problems and Solutions
The soft reference strategy introduces problems that foreign keys solve automatically. You must address them explicitly.
Problem 1 — Orphaned Records
A user is deleted from auth.users. The orders schema has 47 orders with that user_id. No foreign key cascade exists. Those orders now reference a non-existent user — orphaned records that cause application errors when fetched.
Solution A — Soft Deletes (Preferred)
Never hard-delete records that are referenced by other modules. Mark them as deleted:
ALTER TABLE auth.users
ADD COLUMN deleted_at TIMESTAMPTZ DEFAULT NULL;
-- "Delete" a user
UPDATE auth.users
SET deleted_at = NOW()
WHERE id = $1;
-- Query active users only
SELECT * FROM auth.users WHERE deleted_at IS NULL;
The orders module can still fetch the user's display name from its own product_snapshots table (where it stored it at order creation time). The historical record is intact. No orphans created.
Solution B — Event-Driven Cleanup
When auth emits a UserDeletedEvent, the orders module's event handler processes it:
# orders/event_handlers.py
async def handle_user_deleted(event: UserDeletedEvent, repo: OrderRepository):
# Cancel all pending orders for the deleted user
await repo.cancel_orders_for_user(event.user_id)
# Anonymize completed orders (GDPR compliance)
await repo.anonymize_user_data(event.user_id)
This keeps the orders module in control of its own data cleanup, triggered by the auth module's event — without coupling the modules directly.
Solution C — Validation at Read Time
For non-critical reads, validate existence lazily:
async def get_order_with_user(self, order_id: UUID) -> OrderDetail:
order = await self._repo.find_by_id(order_id)
user = await self._user_service.get_user_identity(order.user_id)
return OrderDetail(
order=order,
user=user, # None if user deleted — handled gracefully in response
)
Problem 2 — Deletion Handling at the Domain Level
Before deleting any entity that other modules may reference, the owning module should check whether the deletion is safe. This is a domain rule, not a database rule:
# auth/service.py
async def delete_user(self, user_id: UUID) -> None:
# Check with other modules via their contracts
has_active_orders = await self._order_service.has_active_orders(user_id)
has_outstanding_balance = await self._billing_service.has_outstanding_balance(user_id)
if has_active_orders:
raise CannotDeleteUserError("User has active orders")
if has_outstanding_balance:
raise CannotDeleteUserError("User has outstanding balance")
# Safe to soft-delete
await self._repo.soft_delete(user_id)
await self._events.publish(UserDeletedEvent(user_id=user_id))
The auth module orchestrates the deletion check by querying other modules' public contracts. No cross-schema SQL. No direct table access.
4.4 Indexing Strategy
A schema with correct structure but poor indexing will perform well in development (small data sets) and fail in production (millions of rows). Index decisions must be deliberate from the start — retrofitting indexes onto a live production table with millions of rows requires CREATE INDEX CONCURRENTLY, which is a production incident waiting to happen if done carelessly.
Indexing Soft Reference Columns
Every soft reference column will be queried in WHERE and JOIN clauses. Index them all:
-- orders.orders — soft reference to user
CREATE INDEX idx_orders_user_id ON orders.orders(user_id);
-- orders.orders — status filtering (very common query pattern)
CREATE INDEX idx_orders_status ON orders.orders(status);
-- orders.order_items — joining back to parent order
CREATE INDEX idx_order_items_order_id ON orders.order_items(order_id);
-- Composite — common query: "all pending orders for a user"
CREATE INDEX idx_orders_user_status ON orders.orders(user_id, status);
The composite index on (user_id, status) serves two query patterns:
-- Uses the composite index (leftmost prefix rule)
SELECT * FROM orders.orders WHERE user_id = $1;
-- Also uses the composite index
SELECT * FROM orders.orders WHERE user_id = $1 AND status = 'pending';
-- Does NOT use the composite index efficiently
SELECT * FROM orders.orders WHERE status = 'pending';
-- This needs its own index: idx_orders_status
Understand the leftmost prefix rule: a composite index on (a, b, c) serves queries on a, (a, b), and (a, b, c). It does not serve queries on b alone or c alone.
Partial Indexes
A partial index covers only a subset of rows that match a condition. For soft deletes, most queries filter WHERE deleted_at IS NULL. A partial index on that subset is significantly smaller and faster than a full index:
-- Partial index — only indexes rows where deleted_at IS NULL
CREATE INDEX idx_users_active ON auth.users(email)
WHERE deleted_at IS NULL;
-- Partial index for pending orders only
-- If 95% of orders are completed, this index is small and fast
CREATE INDEX idx_orders_pending ON orders.orders(user_id, created_at)
WHERE status = 'pending';
The pending orders index only covers rows where status = 'pending'. If your system processes orders quickly, this index stays small even with millions of completed orders in the table.
Covering Indexes
A covering index includes all columns a query needs, so PostgreSQL can answer the query entirely from the index without touching the heap (the actual table rows). This is called an index-only scan:
-- Query: fetch order summary for a user (id, status, total, created_at)
-- Covering index includes all projected columns
CREATE INDEX idx_orders_user_summary
ON orders.orders(user_id, created_at DESC)
INCLUDE (id, status, total_amount);
For this query:
SELECT id, status, total_amount, created_at
FROM orders.orders
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT 20;
PostgreSQL reads the index only — never touches the table. On high-read endpoints (order history lists), this is a meaningful performance difference.
Validating Index Usage
Always verify your indexes are being used in production queries:
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT id, status, total_amount
FROM orders.orders
WHERE user_id = '550e8400-e29b-41d4-a716-446655440000'
AND status = 'pending';
Look for Index Scan or Index Only Scan in the output. Seq Scan on a large table means your index is missing, wrong, or PostgreSQL's planner determined it's cheaper to scan the full table (which can happen if the table is small or statistics are stale — run ANALYZE if you suspect this).
4.5 ORM vs Raw SQL
Neither approach is universally correct. The decision depends on the operation type.
| Operation | Approach | Reason |
|---|---|---|
| Single record insert | ORM or raw SQL | Comparable complexity |
| Single record fetch by PK | ORM | Readable, adequate performance |
| Complex filtering with pagination | Raw SQL | ORM-generated SQL is often suboptimal |
| Aggregations (SUM, COUNT, AVG, GROUP BY) | Raw SQL | ORMs produce verbose, slow queries for aggregations |
| Reporting queries | Materialized views + raw SQL | ORM cannot express window functions cleanly |
| Bulk inserts | Raw SQL with COPY or executemany |
ORM inserts one row per query by default |
| Joins across tables in the same module | Raw SQL | ORM joins are opaque and hard to optimise |
The kukufiti-api uses raw SQL via asyncpg. This is the correct choice for a system targeting production quality from the start. Here is why:
Query plan visibility. When you write raw SQL, you know exactly what query hits the database. When an ORM generates SQL, you must intercept and log the generated query to understand what's happening. In production debugging, this extra step costs time.
Aggregation correctness. SQLAlchemy's ORM layer struggles with complex aggregations. Developers work around it with hybrid approaches that are harder to read than plain SQL. For a fintech/agritech system where financial aggregations are core business logic, you want complete control.
asyncpg performance. asyncpg is significantly faster than psycopg2 and the async SQLAlchemy layer because it speaks PostgreSQL's binary protocol directly, with no intermediate ORM mapping overhead.
Raw SQL Patterns with asyncpg
# Single record fetch
async def find_batch_by_id(self, batch_id: UUID) -> Batch | None:
row = await self._conn.fetchrow(
"SELECT id, farm_id, breed, chick_count, placement_date, status "
"FROM batches.batches WHERE id = $1 AND deleted_at IS NULL",
batch_id
)
return Batch.from_row(row) if row else None
# List with filtering and pagination
async def list_batches(
self,
farm_id: UUID,
status: str | None,
limit: int,
offset: int,
) -> list[Batch]:
query = """
SELECT id, farm_id, breed, chick_count, placement_date, status
FROM batches.batches
WHERE farm_id = $1
AND deleted_at IS NULL
AND ($2::text IS NULL OR status = $2)
ORDER BY placement_date DESC
LIMIT $3 OFFSET $4
"""
rows = await self._conn.fetch(query, farm_id, status, limit, offset)
return [Batch.from_row(row) for row in rows]
# Financial aggregation — never use ORM for this
async def get_batch_financial_summary(self, farm_id: UUID) -> FinancialSummary:
row = await self._conn.fetchrow(
"""
SELECT
COUNT(*) FILTER (WHERE status = 'active') AS active_batches,
COUNT(*) FILTER (WHERE status = 'completed') AS completed_batches,
SUM(total_revenue) FILTER (WHERE status = 'completed') AS total_revenue,
SUM(total_cost) AS total_cost,
AVG(fcr) FILTER (WHERE fcr IS NOT NULL) AS avg_fcr
FROM batches.batches
WHERE farm_id = $1
AND deleted_at IS NULL
""",
farm_id
)
return FinancialSummary.from_row(row)
# Bulk insert — executemany for efficiency
async def record_mortality_events(
self,
events: list[MortalityEvent],
) -> None:
await self._conn.executemany(
"""
INSERT INTO health.mortality_events
(id, batch_id, event_date, count, cause, notes)
VALUES ($1, $2, $3, $4, $5, $6)
""",
[
(e.id, e.batch_id, e.event_date, e.count, e.cause, e.notes)
for e in events
]
)
Materialized Views for Reporting
A materialized view pre-computes and stores a complex query result. Reads are instant — no aggregation at query time. The view is refreshed on a schedule or on demand:
-- Create materialized view: farm performance dashboard
CREATE MATERIALIZED VIEW batches.farm_performance_summary AS
SELECT
b.farm_id,
DATE_TRUNC('month', b.placement_date) AS month,
COUNT(*) AS total_batches,
SUM(b.chick_count) AS total_chicks,
SUM(f.total_revenue) AS revenue,
SUM(f.total_cost) AS cost,
SUM(f.total_revenue) - SUM(f.total_cost) AS profit,
AVG(b.fcr) AS avg_fcr,
AVG(b.mortality_rate) AS avg_mortality_rate
FROM batches.batches b
JOIN finance.batch_financials f ON f.batch_id = b.id
WHERE b.deleted_at IS NULL
GROUP BY b.farm_id, DATE_TRUNC('month', b.placement_date)
WITH DATA;
-- Index the materialized view
CREATE INDEX idx_farm_perf_farm_month
ON batches.farm_performance_summary(farm_id, month DESC);
-- Refresh — run via cron or background worker
REFRESH MATERIALIZED VIEW CONCURRENTLY batches.farm_performance_summary;
CONCURRENTLY allows reads during the refresh — critical for production. Without it, REFRESH takes an exclusive lock and blocks all queries against the view while it runs.
4.6 The N+1 Query Problem
This is the most common performance failure in systems that move from development to production load. It is critical to understand because it is non-obvious, easy to create, and catastrophic at scale.
How It Occurs
You need to display a list of orders with user names. The naive implementation:
# service.py — N+1 PROBLEM
async def list_orders_with_users(self, farm_id: UUID) -> list[OrderDetail]:
orders = await self._order_repo.list_orders(farm_id) # 1 query → returns 50 orders
result = []
for order in orders:
# 1 query PER ORDER — 50 orders = 50 additional queries
user = await self._user_service.get_user_identity(order.user_id)
result.append(OrderDetail(order=order, user=user))
return result
For a page of 50 orders, this executes 51 queries: 1 to fetch the list, then 1 per order to fetch the user. At 100 requests per minute, that's 5,100 queries per minute against your database — for a single endpoint.
At scale: 500 concurrent users viewing order lists = 25,500 queries per minute. Your database connection pool is exhausted. Latency spikes. The system degrades.
Why It's Dangerous Across Modules
In a monolith, N+1 usually manifests within a single database. In a modular monolith, it can manifest as N+1 cross-module API calls — where each call is not just a SQL query but a full service invocation, potentially with its own database query. This is significantly worse.
Fix 1 — Batch API
Design the user service contract to accept a list of IDs and return all identities in a single query:
# auth/contracts.py
class IUserService(ABC):
@abstractmethod
async def get_user_identities(
self,
user_ids: list[UUID]
) -> dict[UUID, UserIdentity]:
"""Returns a dict mapping user_id → UserIdentity for all found users."""
...
# auth/service.py
async def get_user_identities(
self,
user_ids: list[UUID]
) -> dict[UUID, UserIdentity]:
rows = await self._conn.fetch(
"SELECT id, email, is_active FROM auth.users WHERE id = ANY($1::uuid[])",
user_ids
)
return {
row["id"]: UserIdentity(
user_id=row["id"],
email=row["email"],
is_active=row["is_active"]
)
for row in rows
}
# orders/service.py — FIXED: 2 queries total regardless of list size
async def list_orders_with_users(self, farm_id: UUID) -> list[OrderDetail]:
orders = await self._order_repo.list_orders(farm_id) # 1 query
user_ids = list({order.user_id for order in orders})
user_map = await self._user_service.get_user_identities(user_ids) # 1 query
return [
OrderDetail(order=order, user=user_map.get(order.user_id))
for order in orders
]
2 queries regardless of page size. This is the correct pattern for any cross-module data enrichment.
Fix 2 — Read Model Duplication
Store the data you need from another module locally, populated at write time. This is the most performant approach — zero cross-module queries at read time:
-- orders module stores user display name at order creation time
-- No need to query auth module when rendering order history
CREATE TABLE orders.orders (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
user_email TEXT NOT NULL, -- denormalized from auth at creation time
status TEXT NOT NULL,
total_amount NUMERIC(12,2) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
The tradeoff: if the user's email changes, orders.user_email is stale. For most business contexts (order history display), showing the email at the time of order is actually more correct than showing the current email. For display data that must always be current, use the batch API approach instead.
Fix 3 — Caching with Invalidation
For data that changes infrequently (user identity, product names, farm details), cache at the service level:
# orders/service.py
async def list_orders_with_users(self, farm_id: UUID) -> list[OrderDetail]:
orders = await self._order_repo.list_orders(farm_id)
result = []
for order in orders:
# Check cache first — Redis with 5-minute TTL
cache_key = f"user_identity:{order.user_id}"
cached = await self._cache.get(cache_key)
if cached:
user = UserIdentity.model_validate_json(cached)
else:
user = await self._user_service.get_user_identity(order.user_id)
await self._cache.set(cache_key, user.model_dump_json(), ttl=300)
result.append(OrderDetail(order=order, user=user))
return result
This trades cache consistency for performance. For user identity data, a 5-minute staleness window is usually acceptable. For financial balances, it is not.
4.7 Migrations (Alembic + Schemas)
The default Alembic configuration assumes a single schema and a single migration history. In a schema-per-module system, you need schema-aware migrations with isolated version tables per module.
The Problem with Default Alembic
Default Alembic stores migration version history in a single alembic_version table in the public schema:
-- default — one table for entire system
public.alembic_version
With schema-per-module, you need:
auth.alembic_version -- migration history for auth schema only
orders.alembic_version -- migration history for orders schema only
billing.alembic_version -- migration history for billing schema only
This allows each module to migrate independently. An orders migration does not block a billing migration.
Schema-Aware Alembic Configuration
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool, text
from alembic import context
import os
config = context.config
fileConfig(config.config_file_name)
# Determine which module we're migrating
MODULE = os.environ.get("MIGRATION_MODULE", "orders")
SCHEMA = MODULE
# Module-specific connection string
DB_URLS = {
"auth": os.environ["AUTH_DB_URL"],
"orders": os.environ["ORDERS_DB_URL"],
"billing": os.environ["BILLING_DB_URL"],
"inventory": os.environ["INVENTORY_DB_URL"],
}
def run_migrations_online():
connectable = engine_from_config(
{"sqlalchemy.url": DB_URLS[MODULE]},
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
# Ensure schema exists
connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}"))
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table="alembic_version",
version_table_schema=SCHEMA, # version table lives in module's schema
include_schemas=True,
)
with context.begin_transaction():
context.run_migrations()
Running Module-Specific Migrations
# Migrate only the orders module
MIGRATION_MODULE=orders alembic upgrade head
# Create a new migration for the batches module
MIGRATION_MODULE=batches alembic revision --autogenerate -m "add_mortality_threshold_column"
# Rollback one step for billing only
MIGRATION_MODULE=billing alembic downgrade -1
A Production Migration for kukufiti-api
# modules/batches/migrations/versions/001_create_batches_schema.py
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
def upgrade():
# Ensure schema exists
op.execute("CREATE SCHEMA IF NOT EXISTS batches")
op.create_table(
"batches",
sa.Column("id", UUID(as_uuid=True), primary_key=True,
server_default=sa.text("gen_random_uuid()")),
sa.Column("farm_id", UUID(as_uuid=True), nullable=False),
sa.Column("breed", sa.Text(), nullable=False),
sa.Column("chick_count", sa.Integer(), nullable=False),
sa.Column("placement_date", sa.Date(), nullable=False),
sa.Column("status", sa.Text(), nullable=False,
server_default=sa.text("'active'")),
sa.Column("mortality_rate", sa.Numeric(5, 2), nullable=True),
sa.Column("fcr", sa.Numeric(5, 3), nullable=True),
sa.Column("deleted_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False,
server_default=sa.text("NOW()")),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False,
server_default=sa.text("NOW()")),
schema="batches",
)
op.create_index(
"idx_batches_farm_id",
"batches",
["farm_id"],
schema="batches",
)
op.create_index(
"idx_batches_farm_status",
"batches",
["farm_id", "status"],
schema="batches",
postgresql_where=sa.text("deleted_at IS NULL"),
)
def downgrade():
op.drop_table("batches", schema="batches")
op.execute("DROP SCHEMA IF EXISTS batches CASCADE")
Critical Migration Pitfalls
Pitfall 1 — Alembic autogenerate with multiple schemas. By default, --autogenerate only inspects the public schema. You must configure include_schemas=True and target your specific schema, or autogenerate produces empty migrations and misses your actual tables.
Pitfall 2 — Running migrations as a superuser. If your migration runs as a PostgreSQL superuser but your application runs as orders_app, the tables created during migration may be owned by the superuser role. The application role then cannot insert into them. Always run migrations using the same role the application uses, or explicitly set ownership:
ALTER TABLE orders.orders OWNER TO orders_app;
Pitfall 3 — Not testing downgrade migrations. Every migration that goes up must be able to come down. A failed production deployment requires a rollback. If downgrade() is not implemented and tested, you cannot roll back. Test both directions in CI.
Pitfall 4 — Dropping columns without a deprecation period. Dropping a column in production requires: deploy application code that no longer reads that column → verify no queries reference it → then drop. Dropping first causes an immediate 500 in production if any running instance still queries that column.
Section 5: Inter-Module Communication
The Core Problem
Modules need to communicate. An order is placed — inventory must reserve stock, billing must record the transaction, notifications must email the user. The question is not whether modules communicate, but how — and the answer determines whether your system is resilient or brittle.
There are three strategies. Each makes a different trade-off between simplicity, coupling, and durability. Understanding when each fails is as important as understanding how each works.
5.1 Direct Function Calls
The simplest approach. Module A calls module B's public contract method directly, in the same thread, in the same transaction.
# orders/service.py
class OrderService:
def __init__(
self,
repository: OrderRepository,
inventory: IInventoryService, # contract — not concrete class
billing: IBillingService, # contract — not concrete class
notification: INotificationService,
):
self._repo = repository
self._inventory = inventory
self._billing = billing
self._notification = notification
async def create_order(self, user_id: UUID, items: list[OrderItemRequest]) -> Order:
# Step 1 — validate and reserve stock (inventory module)
reservation = await self._inventory.reserve_stock(
items=[StockReservationItem(product_id=i.product_id, quantity=i.quantity)
for i in items]
)
# Step 2 — persist the order
order = await self._repo.create_order(user_id=user_id, items=items,
reservation_id=reservation.id)
# Step 3 — record billing transaction
await self._billing.record_transaction(
user_id=user_id,
order_id=order.id,
amount=order.total_amount,
)
# Step 4 — send confirmation notification
await self._notification.send_order_confirmation(
user_id=user_id,
order_id=order.id,
)
return order
When Direct Calls Are Correct
For reads: Fetching data from another module before making a decision is almost always a direct call. It's synchronous, the result is needed immediately, and no side effects need durability guarantees.
# Correct use of direct call — reading before deciding
identity = await self._user_service.get_user_identity(user_id)
if not identity or not identity.is_active:
raise UserInactiveError(user_id)
For critical, transactional side effects: When the downstream action must succeed for the originating operation to be valid, a direct call inside the same transaction is appropriate. Stock reservation is a good example — you should not create an order if stock reservation fails. The failure should roll everything back.
The Coupling Problem
Direct calls create temporal coupling: module A can only operate if module B is available at the exact moment A runs. In a monolith, this is usually acceptable because B is always in-process. But consider:
async def create_order(self, ...):
order = await self._repo.create_order(...) # DB write — succeeds
await self._billing.record_transaction(...) # DB write — succeeds
await self._notification.send_order_confirmation( # calls external email API
user_id=user_id,
order_id=order.id,
) # external API is down — raises
# What happens? The exception propagates upward.
# If the Unit of Work catches it, the entire order is rolled back.
# The user gets a 500. Their order does not exist.
# The email was the problem. The order should have succeeded.
This is the critical failure mode of direct calls for side effects: a non-critical downstream failure rolls back a critical upstream operation.
The notification failure should not undo the order. The notification should be retried asynchronously. The order is real regardless of whether the email was sent.
What Direct Calls Cannot Handle
| Scenario | Problem |
|---|---|
| Non-critical side effects (email, push notification) | Failure rolls back unrelated critical operation |
| Fan-out (one event, many subscribers) | Caller must know about all consumers — coupling grows with every new subscriber |
| Retry on transient failure | Caller must implement retry logic inline — blocking the request |
| Ordering guarantees | No persistence — if the process crashes mid-call sequence, completed steps are not retried |
| New module subscribing to existing events | Must modify the originating service every time a new consumer appears |
5.2 In-Memory Event Bus
Instead of calling downstream modules directly, the originating module publishes an event. Subscribers handle it in the same process, asynchronously.
# core/events.py
from dataclasses import dataclass, field
from uuid import UUID
from typing import Any, Callable, Awaitable
from collections import defaultdict
@dataclass
class DomainEvent:
event_id: UUID = field(default_factory=uuid4)
occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
class OrderPlacedEvent(DomainEvent):
order_id: UUID
user_id: UUID
total_amount: Decimal
items: list[dict]
class InMemoryEventBus:
def __init__(self):
self._handlers: dict[type, list[Callable]] = defaultdict(list)
def subscribe(self, event_type: type, handler: Callable[..., Awaitable[None]]):
self._handlers[event_type].append(handler)
async def publish(self, event: DomainEvent) -> None:
handlers = self._handlers.get(type(event), [])
for handler in handlers:
await handler(event)
# main.py — wire up subscriptions at startup
from app.modules.billing.event_handlers import handle_order_placed_billing
from app.modules.notifications.event_handlers import handle_order_placed_notification
from app.modules.inventory.event_handlers import handle_order_placed_inventory
event_bus = InMemoryEventBus()
event_bus.subscribe(OrderPlacedEvent, handle_order_placed_billing)
event_bus.subscribe(OrderPlacedEvent, handle_order_placed_notification)
event_bus.subscribe(OrderPlacedEvent, handle_order_placed_inventory)
# orders/service.py — no direct knowledge of billing or notifications
async def create_order(self, user_id: UUID, items: list[OrderItemRequest]) -> Order:
order = await self._repo.create_order(user_id=user_id, items=items)
await self._event_bus.publish(OrderPlacedEvent(
order_id=order.id,
user_id=user_id,
total_amount=order.total_amount,
items=[{"product_id": str(i.product_id), "quantity": i.quantity}
for i in items],
))
return order
Now adding a new module that cares about orders — say, a fraud detection module — requires zero changes to OrderService. Subscribe the new handler at startup. The orders module never knows fraud detection exists.
The Volatility Problem
The in-memory event bus solves coupling. It does not solve durability.
1. Order created in database ✓
2. OrderPlacedEvent published ✓
3. billing handler starts ✓
4. Process crashes (OOM, deploy) ✗
5. billing handler never completes ✗
6. Order exists, billing not recorded ✗
Any of these failure scenarios produce inconsistent state:
- Event published, handler throws an exception — was it a transient error? Is billing in an indeterminate state?
- Event published, handler starts, process killed mid-execution — partial side effect, no way to replay
- Event published synchronously after DB commit — what if the commit succeeds but the process dies before
publish()runs?
The in-memory bus has no persistence. If the event is not handled successfully in the same process invocation in which it was published, it is gone. There is no retry. There is no replay. There is no audit trail.
The consequence: In-memory events are only safe for truly non-critical, best-effort side effects where failure is acceptable. Sending a push notification is fine — if it fails, the user doesn't get a ping. Recording a financial transaction is not fine — if it fails silently, your books are wrong.
5.3 The Outbox Pattern
The outbox pattern solves the fundamental problem: guaranteeing that an event is eventually processed exactly once, even if the process crashes at any point.
The core insight is that publishing an event and persisting data should be part of the same database transaction. The event is written to an outbox table atomically with the domain operation. A separate worker reads the outbox and delivers events to handlers. If delivery fails, the worker retries. The event is never lost because it was committed to the database.
The Dual-Write Problem (What the Outbox Solves)
Without the outbox, you have two separate writes:
# Dangerous — two separate writes, no atomicity
async def create_order(self, ...):
await self._repo.create_order(...) # Write 1: DB commit
await self._event_bus.publish(OrderPlaced) # Write 2: in-memory / message broker
# Crash here: order exists, event never delivered
This is called the dual-write problem. Either write can fail independently. You cannot guarantee both happen or neither happens without a two-phase commit — which introduces its own complexity.
The outbox makes both writes part of a single transaction:
# Safe — single atomic transaction
async def create_order(self, ...):
async with self._uow:
order = await self._repo.create_order(...) # Write 1
await self._outbox.enqueue(OrderPlacedEvent(...)) # Write 2 — same transaction
# Both committed atomically, or both rolled back
# Worker will deliver the event from the outbox
Outbox Table Schema
CREATE TABLE core.outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
-- pending → processing → delivered | dead
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ,
last_attempt_at TIMESTAMPTZ,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
last_error TEXT,
idempotency_key TEXT UNIQUE -- prevents duplicate event delivery
);
-- Worker queries this index constantly — must be fast
CREATE INDEX idx_outbox_pending
ON core.outbox(scheduled_at ASC)
WHERE status = 'pending';
-- Monitor dead events
CREATE INDEX idx_outbox_dead
ON core.outbox(created_at DESC)
WHERE status = 'dead';
Writing to the Outbox (Application Side)
# core/outbox.py
import json
from uuid import UUID, uuid4
from datetime import datetime, UTC
import asyncpg
from app.core.events import DomainEvent
class OutboxWriter:
def __init__(self, connection: asyncpg.Connection):
self._conn = connection
async def enqueue(
self,
event: DomainEvent,
idempotency_key: str | None = None,
delay_seconds: int = 0,
) -> None:
scheduled_at = datetime.now(UTC)
if delay_seconds:
from datetime import timedelta
scheduled_at += timedelta(seconds=delay_seconds)
await self._conn.execute(
"""
INSERT INTO core.outbox
(id, event_type, payload, status, scheduled_at, idempotency_key)
VALUES ($1, $2, $3, 'pending', $4, $5)
ON CONFLICT (idempotency_key) DO NOTHING
""",
event.event_id,
type(event).__name__,
json.dumps(event.__dict__, default=str),
scheduled_at,
idempotency_key or str(event.event_id),
)
The ON CONFLICT (idempotency_key) DO NOTHING clause means duplicate enqueue attempts are silently ignored. This is important for retry scenarios where a request is retried after a partial failure — the event is not enqueued twice.
The Outbox Worker (Separate Process or Background Task)
# workers/outbox_worker.py
import asyncio
import asyncpg
import json
import logging
from datetime import datetime, UTC, timedelta
from app.core.config import settings
from app.core.event_registry import EVENT_REGISTRY # maps event_type → handler
logger = logging.getLogger(__name__)
BATCH_SIZE = 50
POLL_INTERVAL_SECONDS = 2
LOCK_TIMEOUT_SECONDS = 30
async def process_outbox(pool: asyncpg.Pool) -> None:
"""Main worker loop — polls indefinitely."""
while True:
try:
processed = await process_batch(pool)
if processed == 0:
await asyncio.sleep(POLL_INTERVAL_SECONDS)
except Exception as e:
logger.error("Outbox worker error", exc_info=e)
await asyncio.sleep(POLL_INTERVAL_SECONDS)
async def process_batch(pool: asyncpg.Pool) -> int:
async with pool.acquire() as conn:
# SELECT FOR UPDATE SKIP LOCKED — critical for multiple worker instances
# Each worker claims its own batch, no two workers process the same row
rows = await conn.fetch(
"""
SELECT id, event_type, payload, attempt_count
FROM core.outbox
WHERE status = 'pending'
AND scheduled_at <= NOW()
ORDER BY scheduled_at ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
""",
BATCH_SIZE,
)
if not rows:
return 0
for row in rows:
await process_single_event(conn, row)
return len(rows)
async def process_single_event(conn: asyncpg.Connection, row: asyncpg.Record) -> None:
event_id = row["id"]
event_type = row["event_type"]
payload = json.loads(row["payload"])
attempt_count = row["attempt_count"]
# Mark as processing immediately — prevents other workers from claiming it
await conn.execute(
"UPDATE core.outbox SET status = 'processing', last_attempt_at = NOW(), "
"attempt_count = attempt_count + 1 WHERE id = $1",
event_id,
)
try:
handler = EVENT_REGISTRY.get(event_type)
if handler is None:
logger.warning(f"No handler registered for event type: {event_type}")
await mark_delivered(conn, event_id)
return
await handler(payload)
await mark_delivered(conn, event_id)
logger.info(f"Delivered event {event_type} [{event_id}]")
except Exception as e:
logger.error(f"Failed to deliver {event_type} [{event_id}]: {e}")
next_attempt = attempt_count + 1
max_attempts = row["max_attempts"] if "max_attempts" in row.keys() else 5
if next_attempt >= max_attempts:
# Move to dead letter — requires human intervention
await conn.execute(
"UPDATE core.outbox SET status = 'dead', last_error = $2 WHERE id = $1",
event_id, str(e),
)
logger.critical(f"Event {event_type} [{event_id}] moved to dead letter "
f"after {next_attempt} attempts")
else:
# Exponential backoff — 2^attempt_count * 10 seconds
backoff = min(2 ** next_attempt * 10, 3600)
await conn.execute(
"""
UPDATE core.outbox
SET status = 'pending',
scheduled_at = NOW() + ($2 || ' seconds')::interval,
last_error = $3
WHERE id = $1
""",
event_id, str(backoff), str(e),
)
async def mark_delivered(conn: asyncpg.Connection, event_id: UUID) -> None:
await conn.execute(
"UPDATE core.outbox SET status = 'delivered', processed_at = NOW() WHERE id = $1",
event_id,
)
Event Registry — Wiring Events to Handlers
# core/event_registry.py
from typing import Callable, Awaitable
from app.modules.billing.event_handlers import handle_order_placed_billing
from app.modules.notifications.event_handlers import handle_order_placed_notification
from app.modules.inventory.event_handlers import handle_order_placed_inventory
EVENT_REGISTRY: dict[str, Callable[[dict], Awaitable[None]]] = {
"OrderPlacedEvent": handle_order_placed_billing,
# Fan-out: multiple handlers per event type use a wrapper
"OrderPlacedEvent_notifications": handle_order_placed_notification,
"OrderPlacedEvent_inventory": handle_order_placed_inventory,
}
For true fan-out (one event, many independent handlers), each handler gets its own outbox row, enqueued at publish time:
# core/outbox.py
async def enqueue_fan_out(
self,
event: DomainEvent,
handler_names: list[str],
) -> None:
"""Enqueue one outbox row per handler — each processes independently."""
for handler_name in handler_names:
await self._conn.execute(
"""
INSERT INTO core.outbox
(id, event_type, payload, status, idempotency_key)
VALUES ($1, $2, $3, 'pending', $4)
ON CONFLICT (idempotency_key) DO NOTHING
""",
uuid4(),
f"{type(event).__name__}_{handler_name}",
json.dumps(event.__dict__, default=str),
f"{event.event_id}_{handler_name}",
)
This ensures that if the billing handler fails and retries, it does not re-trigger the notification handler. Each handler's retry lifecycle is independent.
The Complete Event Lifecycle
1. create_order() begins
├── Transaction opens
├── INSERT INTO orders.orders ← domain write
├── INSERT INTO core.outbox ← event write (same transaction)
└── Transaction commits
2. Worker polls core.outbox
├── SELECT ... FOR UPDATE SKIP LOCKED ← claims the row
├── UPDATE status = 'processing'
└── Calls handler
3a. Handler succeeds
└── UPDATE status = 'delivered'
3b. Handler fails (transient — network down)
└── UPDATE status = 'pending', scheduled_at = NOW() + backoff
└── Worker retries after backoff
3c. Handler fails repeatedly (max_attempts reached)
└── UPDATE status = 'dead'
└── Alert fires → human investigation
At no point is the event lost. Either it is delivered, or it is sitting in the outbox waiting to be delivered, or it is dead and visible in the database for investigation.
5.4 Anti-Corruption Layer (ACL)
Even with contracts and DTOs, modules that communicate still risk a subtle form of coupling: semantic leakage. The billing module starts making decisions based on fields that are internal to the auth module's concept of a user. Over time, billing's logic is shaped by auth's domain model rather than its own.
The Anti-Corruption Layer is a translation boundary. It converts the external module's representation into the local module's own domain language — keeping the local domain pure.
The Problem Without ACL
# billing/service.py — WITHOUT ACL
from app.modules.auth.contracts import UserIdentity
async def calculate_discount(self, user: UserIdentity) -> Decimal:
# Billing is now making decisions based on auth's data structures
# auth's "is_active" and "email" are auth concepts, not billing concepts
if user.is_active and "@enterprise.com" in user.email:
return Decimal("0.20") # 20% enterprise discount
return Decimal("0.00")
Billing's discount logic now depends on what UserIdentity.is_active means in the auth domain and on the structure of the email address — an auth concern. If auth changes its user model or the meaning of is_active, billing breaks in ways that are invisible until runtime.
ACL Implementation
The ACL translates the external DTO into a local value object that carries only what billing needs to know — expressed in billing's own language:
# billing/acl.py — the Anti-Corruption Layer
from dataclasses import dataclass
from app.modules.auth.contracts import UserIdentity
@dataclass(frozen=True)
class BillingCustomer:
"""
Billing's local representation of a customer.
Expressed entirely in billing domain language.
This is NOT auth.UserIdentity — it is a billing concept.
"""
customer_id: UUID
account_tier: str # "standard" | "enterprise" | "trial"
is_billing_eligible: bool
class UserIdentityACL:
"""
Translates auth.UserIdentity into billing.BillingCustomer.
All auth→billing translation lives here and only here.
"""
@staticmethod
def translate(identity: UserIdentity) -> BillingCustomer:
# Translation logic — auth concepts converted to billing concepts
account_tier = UserIdentityACL._determine_tier(identity)
is_eligible = identity.is_active and account_tier != "suspended"
return BillingCustomer(
customer_id=identity.user_id,
account_tier=account_tier,
is_billing_eligible=is_eligible,
)
@staticmethod
def _determine_tier(identity: UserIdentity) -> str:
# Tier determination lives in billing domain, not auth domain
if identity.email.endswith("@enterprise.com"):
return "enterprise"
if identity.created_at and (datetime.now(UTC) - identity.created_at).days < 30:
return "trial"
return "standard"
# billing/service.py — WITH ACL
from .acl import UserIdentityACL, BillingCustomer
class BillingService:
async def calculate_discount(self, user_id: UUID) -> Decimal:
# Fetch from auth via contract
identity = await self._user_service.get_user_identity(user_id)
if identity is None:
raise CustomerNotFoundError(user_id)
# Translate through ACL — now working with billing domain concepts
customer = UserIdentityACL.translate(identity)
if not customer.is_billing_eligible:
raise CustomerNotEligibleError(user_id)
return self._discount_policy.apply(customer.account_tier)
Now if auth renames is_active to account_status, only UserIdentityACL.translate() needs to change. The rest of billing is insulated. Billing's domain language is stable even as auth evolves.
ACL for Inbound Events
The same principle applies to domain events received from other modules. The event arrives as a dict (from the outbox). The ACL translates it into a local domain object before the handler processes it:
# billing/event_handlers.py
from .acl import OrderPlacedACL
async def handle_order_placed_billing(payload: dict) -> None:
# Translate external event payload into billing's own representation
billing_event = OrderPlacedACL.from_payload(payload)
# Handler works entirely in billing domain language
await billing_service.record_transaction(
customer_id=billing_event.customer_id,
reference=billing_event.billing_reference,
amount=billing_event.billable_amount,
currency=billing_event.currency,
)
# billing/acl.py
@dataclass(frozen=True)
class BillingOrderEvent:
"""Billing's local interpretation of an order event."""
customer_id: UUID
billing_reference: str # billing's name for this concept
billable_amount: Decimal
currency: str
class OrderPlacedACL:
@staticmethod
def from_payload(payload: dict) -> BillingOrderEvent:
return BillingOrderEvent(
customer_id=UUID(payload["user_id"]),
billing_reference=f"ORD-{payload['order_id'][:8].upper()}",
billable_amount=Decimal(str(payload["total_amount"])),
currency=payload.get("currency", "KES"),
)
Choosing the Right Strategy
| Scenario | Strategy | Why |
|---|---|---|
| Read data from another module before deciding | Direct call | Synchronous result needed; failure should abort the operation |
| Critical transactional side effect (stock reservation) | Direct call inside UoW | Must succeed or roll back together |
| Non-critical side effect (notification, analytics) | Outbox | Failure should not affect the originating operation |
| Financial transaction recording | Outbox | Cannot lose this event; must be durable |
| Fan-out to multiple independent consumers | Outbox with per-handler rows | Each consumer retries independently |
| Translating external module data into local domain | ACL | Prevents semantic leakage regardless of communication method |
| Proof of concept or non-critical tooling | In-memory event bus | Acceptable where data loss on crash is tolerable |
The general rule: if losing the event would leave the system in an incorrect or inconsistent state, use the outbox. If the operation is read-only or the side effect genuinely does not matter on failure, a direct call or in-memory event is acceptable.
Section 6: Transaction Management
Why This Section Is Non-Negotiable
Transaction management is where most backend systems make their most expensive mistakes. The errors are not loud — they don't cause immediate 500s or visible crashes. They produce silent data corruption: an order that exists without a payment record, a stock reservation that was never released, a billing event that was processed twice. These bugs surface days or weeks later in financial reconciliation or customer complaints, and they are extremely difficult to trace back to their origin.
The patterns in this section exist specifically to prevent silent corruption. Every decision here is a trade-off between consistency guarantees and operational complexity. Understanding both sides is required to make the right choice per scenario.
6.1 Unit of Work Pattern
The Problem It Solves
Without a Unit of Work, transaction management leaks into the service layer:
# BAD — transaction management scattered across service methods
class OrderService:
async def create_order(self, user_id: UUID, items: list) -> Order:
conn = await self._pool.acquire()
try:
await conn.execute("BEGIN")
order = await self._repo.create_order(conn, user_id, items)
await self._outbox.enqueue(conn, OrderPlacedEvent(...))
await conn.execute("COMMIT")
return order
except Exception:
await conn.execute("ROLLBACK")
raise
finally:
await self._pool.release(conn)
This is wrong for several reasons. The service now knows what a database transaction is. The connection lifecycle is managed manually in every method. If you forget the try/except in one service method, you have a connection that is never released and a transaction that is never committed or rolled back — a connection leak and a hanging transaction simultaneously.
The Unit of Work encapsulates all of this:
# core/unit_of_work.py
from __future__ import annotations
import asyncpg
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class UnitOfWork:
"""
Manages a single database connection and transaction.
One UoW per request. All repositories share the same connection.
Commit/rollback handled automatically by context manager.
"""
def __init__(self, pool: asyncpg.Pool):
self._pool = pool
self._connection: asyncpg.Connection | None = None
self._transaction: asyncpg.transaction.Transaction | None = None
@property
def connection(self) -> asyncpg.Connection:
if self._connection is None:
raise RuntimeError(
"UnitOfWork not entered. Use 'async with UnitOfWork(pool) as uow'"
)
return self._connection
async def __aenter__(self) -> UnitOfWork:
self._connection = await self._pool.acquire()
self._transaction = self._connection.transaction()
await self._transaction.start()
return self
async def __aexit__(
self,
exc_type: type | None,
exc_val: Exception | None,
exc_tb,
) -> None:
try:
if exc_type is not None:
# Any exception — rollback unconditionally
await self._transaction.rollback()
else:
await self._transaction.commit()
finally:
# Connection always released — even if commit/rollback fails
await self._pool.release(self._connection)
self._connection = None
self._transaction = None
Repository Integration
Every repository in the module receives the connection from the Unit of Work — not from the pool directly:
# modules/orders/repository.py
import asyncpg
from uuid import UUID
from .models import Order, OrderItem
class OrderRepository:
"""
All SQL for the orders domain.
Receives a connection — never acquires one itself.
The transaction is managed by the UnitOfWork, not here.
"""
def __init__(self, connection: asyncpg.Connection):
self._conn = connection
async def create_order(
self,
user_id: UUID,
items: list[OrderItem],
reservation_id: UUID,
) -> Order:
order_id = uuid4()
total = sum(i.unit_price * i.quantity for i in items)
await self._conn.execute(
"""
INSERT INTO orders.orders
(id, user_id, status, total_amount, reservation_id, created_at)
VALUES ($1, $2, 'pending', $3, $4, NOW())
""",
order_id, user_id, total, reservation_id,
)
await self._conn.executemany(
"""
INSERT INTO orders.order_items
(id, order_id, product_id, product_name, quantity, unit_price)
VALUES ($1, $2, $3, $4, $5, $6)
""",
[
(uuid4(), order_id, i.product_id, i.product_name,
i.quantity, i.unit_price)
for i in items
],
)
return await self.find_by_id(order_id)
Outbox Writer Shares the Same Connection
This is the critical point. The outbox write and the domain write must be part of the same transaction. They share the same connection:
# core/outbox.py
class OutboxWriter:
def __init__(self, connection: asyncpg.Connection):
self._conn = connection # same connection as the repository
async def enqueue(self, event: DomainEvent) -> None:
await self._conn.execute(
"""
INSERT INTO core.outbox (id, event_type, payload, idempotency_key)
VALUES ($1, $2, $3, $4)
ON CONFLICT (idempotency_key) DO NOTHING
""",
event.event_id,
type(event).__name__,
json.dumps(event.__dict__, default=str),
str(event.event_id),
)
When the transaction commits, both the order row and the outbox row are committed atomically. When the transaction rolls back, both are rolled back. There is no scenario where one succeeds and the other doesn't.
Dependency Injection — Wiring the UoW
# modules/orders/dependencies.py
from fastapi import Depends, Request
from app.core.unit_of_work import UnitOfWork
from app.core.outbox import OutboxWriter
from .repository import OrderRepository
from .service import OrderService
from app.modules.inventory.dependencies import get_inventory_service
async def get_unit_of_work(request: Request) -> UnitOfWork:
return UnitOfWork(pool=request.app.state.pool)
async def get_order_service(
uow: UnitOfWork = Depends(get_unit_of_work),
inventory: IInventoryService = Depends(get_inventory_service),
) -> OrderService:
# All components share the same UoW — same connection, same transaction
repo = OrderRepository(uow.connection)
outbox = OutboxWriter(uow.connection)
return OrderService(
uow=uow,
repository=repo,
outbox=outbox,
inventory=inventory,
)
# modules/orders/service.py
class OrderService:
def __init__(
self,
uow: UnitOfWork,
repository: OrderRepository,
outbox: OutboxWriter,
inventory: IInventoryService,
):
self._uow = uow
self._repo = repository
self._outbox = outbox
self._inventory = inventory
async def create_order(
self,
user_id: UUID,
items: list[OrderItemRequest],
) -> Order:
# Validate outside the transaction — no DB lock held during network call
reservation = await self._inventory.reserve_stock(items)
async with self._uow:
# Transaction open — acquire connection
order = await self._repo.create_order(
user_id=user_id,
items=items,
reservation_id=reservation.id,
)
await self._outbox.enqueue(OrderPlacedEvent(
order_id=order.id,
user_id=user_id,
total_amount=order.total_amount,
))
# Transaction commits here — both writes atomic
return order
Savepoints — Nested Operations Within a Transaction
Sometimes you need to attempt a sub-operation within a transaction and roll it back on failure without rolling back the entire transaction. PostgreSQL supports this via savepoints:
# Attempting a non-critical sub-operation within a larger transaction
async def create_order_with_optional_loyalty(
self,
user_id: UUID,
items: list[OrderItemRequest],
) -> Order:
async with self._uow:
order = await self._repo.create_order(user_id=user_id, items=items)
# Attempt loyalty points award — non-critical, should not abort the order
try:
async with self._uow.connection.transaction(
isolation="read_committed"
):
# This creates a savepoint — if it fails, only this block rolls back
await self._loyalty_repo.award_points(
user_id=user_id,
order_amount=order.total_amount,
)
except LoyaltyServiceError as e:
# Savepoint rolled back automatically — outer transaction intact
logger.warning(f"Loyalty points award failed for order {order.id}: {e}")
await self._outbox.enqueue(OrderPlacedEvent(order_id=order.id, ...))
# Outer transaction commits — order and outbox committed, loyalty may not be
return order
asyncpg implements nested connection.transaction() calls as savepoints automatically when an outer transaction is already active. This is the correct pattern for operations that are best-effort within a larger critical transaction.
6.2 Strong vs Eventual Consistency
This is the most consequential architectural decision you make per use case. There is no universal answer — the correct choice depends entirely on the business requirement.
Strong Consistency — Single Transaction
Strong consistency means that when an operation completes, all effects are immediately visible and in a consistent state. No partial states. No windows where the data is incomplete.
# Strong consistency — everything in one transaction
async def transfer_funds(
self,
from_account_id: UUID,
to_account_id: UUID,
amount: Decimal,
) -> Transfer:
async with self._uow:
# Both debit and credit in the same transaction
# Either both happen or neither happens — no partial transfer
from_account = await self._repo.get_account_for_update(from_account_id)
if from_account.balance < amount:
raise InsufficientFundsError(from_account_id, amount)
await self._repo.debit_account(from_account_id, amount)
await self._repo.credit_account(to_account_id, amount)
transfer = await self._repo.record_transfer(
from_account_id, to_account_id, amount
)
await self._outbox.enqueue(FundsTransferredEvent(
transfer_id=transfer.id,
amount=amount,
))
return transfer
GET FOR UPDATE acquires a row-level lock on from_account. No other transaction can modify that account until this transaction commits or rolls back. The debit and credit are atomic.
When strong consistency is required:
- Financial operations (debit/credit, fund transfers)
- Stock reservation (reserve and create order together)
- Any operation where a partial state would be invalid by business definition
- Operations involving exactly one database within one schema boundary
The limits of strong consistency:
A single database transaction cannot span multiple services, multiple database instances, or external API calls. Attempting to include an HTTP call inside a transaction is a serious mistake covered in 6.3.
Eventual Consistency — Event-Driven
Eventual consistency means the system will reach a correct state, but not immediately. There is a window after the primary operation commits where downstream effects have not yet propagated. During that window, the system is in a transitionally inconsistent state — but it is guaranteed to resolve.
# Eventual consistency — primary write commits, downstream effects via outbox
async def place_order(self, user_id: UUID, items: list) -> Order:
async with self._uow:
order = await self._repo.create_order(user_id=user_id, items=items)
# These events are written to the outbox — same transaction as the order
# Their handlers run asynchronously, after this transaction commits
await self._outbox.enqueue(OrderPlacedEvent(
order_id=order.id, user_id=user_id, total_amount=order.total_amount
))
# After commit: order exists in DB, events pending in outbox
# Billing handler hasn't run yet — billing record doesn't exist yet
# Notification handler hasn't run yet — email not sent yet
# Inventory update hasn't run yet — stock not adjusted yet
# These will all resolve within seconds — eventually consistent
return order
The outbox worker processes the events after the transaction commits. Within seconds to minutes (depending on worker polling interval and load), all downstream effects complete. If a handler fails, it retries with backoff. The system converges to a consistent state.
When eventual consistency is appropriate:
- Sending notifications (email, SMS, push) — a slight delay is invisible to users
- Updating analytics or reporting tables — staleness of seconds is acceptable
- Propagating read model updates across modules
- Any effect where the business can tolerate a brief window of inconsistency
- Operations that span module boundaries where a single transaction is not possible
Consistency Failure Scenarios
Understanding what happens when each approach fails is as important as knowing when to use each.
Strong consistency failure:
1. Transaction begins
2. Debit account A ✓
3. Credit account B ✗ — account B does not exist
4. Exception raised
5. Entire transaction rolls back ✓ — account A balance unchanged
6. Client receives 400 Bad Request
This is the correct behavior. The failure is immediate, visible, and leaves no partial state.
Eventual consistency failure scenarios:
Scenario A — Outbox write succeeds, handler fails repeatedly:
1. Order created ✓
2. OrderPlacedEvent in outbox ✓
3. Billing handler called ✗ — billing service bug
4. Event retried 5 times ✗ — still failing
5. Event marked dead ⚠️
6. Billing record never created ⚠️
Resolution: Human investigates dead letter, fixes bug, replays event manually
Scenario B — Handler is not idempotent, event processed twice:
1. OrderPlacedEvent delivered to billing handler
2. Billing record created ✓
3. Handler crashes before marking outbox 'delivered'
4. Worker retries — handler called again
5. Billing record created again ✗ — duplicate charge
Resolution: Idempotency key on billing records — ON CONFLICT DO NOTHING
Scenario B is the most insidious. Every event handler must be idempotent — processing the same event twice must produce the same result as processing it once:
# billing/event_handlers.py — IDEMPOTENT handler
async def handle_order_placed_billing(payload: dict) -> None:
order_id = UUID(payload["order_id"])
await billing_repo.create_transaction_if_not_exists(
reference_id=order_id, # idempotency key
amount=Decimal(payload["total_amount"]),
user_id=UUID(payload["user_id"]),
)
-- billing/repository — idempotent insert
INSERT INTO billing.transactions (id, reference_id, amount, user_id, created_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (reference_id) DO NOTHING;
-- If the transaction already exists for this order_id, do nothing — safe to retry
The Consistency Decision Matrix
| Operation | Consistency Model | Reason |
|---|---|---|
| Fund transfer (debit + credit) | Strong | Partial transfer is never acceptable |
| Order creation + stock reservation | Strong | Order without reservation is invalid |
| Order creation + email notification | Eventual | Email delay is acceptable |
| Order creation + billing record | Eventual (with dead-letter alert) | Billing must eventually happen; delay is acceptable |
| User registration + welcome email | Eventual | Email is a side effect, not part of registration |
| Updating search index after product change | Eventual | Index staleness of seconds is acceptable |
| Inventory adjustment after order placed | Eventual | Brief over-selling risk vs. transactional complexity tradeoff |
6.3 Performance Constraints
Transaction management has direct performance implications that are frequently misunderstood until they cause production incidents.
Never Include External I/O Inside a Transaction
This is the single most important rule in this section.
# DANGEROUS — external HTTP call inside a transaction
async def create_order(self, user_id: UUID, items: list) -> Order:
async with self._uow: # transaction opens here
order = await self._repo.create_order(user_id=user_id, items=items)
# WRONG: HTTP call to M-Pesa Daraja API inside the transaction
# Transaction holds DB locks while waiting for external response
payment = await self._mpesa_client.initiate_stk_push(
phone=user.phone,
amount=order.total_amount,
reference=str(order.id),
)
await self._repo.record_payment_initiation(order.id, payment.checkout_id)
# transaction commits — DB locks held for the entire M-Pesa round trip
M-Pesa Daraja API response time: 200ms to 3 seconds. During that entire window, the database transaction is open and holding row-level locks. Every other query that needs those rows must wait.
At 50 concurrent requests, you have 50 open transactions, each waiting for an external HTTP response. Your database connection pool — typically 20–50 connections — is exhausted. New requests queue waiting for a connection. Latency spikes. The system degrades under load that should be entirely manageable.
The correct pattern — validate outside, write inside:
# CORRECT — external call before transaction, write result inside
async def initiate_payment(self, order_id: UUID, user_phone: str) -> PaymentInitiation:
order = await self._repo.find_by_id(order_id)
if order is None:
raise OrderNotFoundError(order_id)
# External call OUTSIDE the transaction — no locks held during network wait
try:
payment = await self._mpesa_client.initiate_stk_push(
phone=user_phone,
amount=order.total_amount,
reference=str(order_id),
)
except MpesaAPIError as e:
raise PaymentInitiationError(order_id, reason=str(e)) from e
# Transaction opens AFTER external call completes — locks held for milliseconds
async with self._uow:
initiation = await self._repo.record_payment_initiation(
order_id=order_id,
checkout_request_id=payment.checkout_request_id,
status="pending",
)
await self._outbox.enqueue(PaymentInitiatedEvent(
order_id=order_id,
checkout_id=payment.checkout_request_id,
))
return initiation
The external call completes in whatever time it takes. The transaction opens after, executes a handful of fast SQL statements, and commits within milliseconds. Locks are held for the minimum possible duration.
Locking Strategies
PostgreSQL provides multiple locking granularities. Using the wrong lock level kills concurrency:
-- Row-level lock — locks only this specific row
-- Other transactions can read and write OTHER rows in the same table
SELECT id, balance
FROM billing.accounts
WHERE id = $1
FOR UPDATE;
-- SKIP LOCKED — used by queue workers
-- Returns only unlocked rows — workers don't block each other
SELECT id, event_type, payload
FROM core.outbox
WHERE status = 'pending'
ORDER BY scheduled_at
LIMIT 50
FOR UPDATE SKIP LOCKED;
-- NOWAIT — fails immediately if locked, rather than waiting
-- Use when you cannot afford to wait
SELECT id, balance
FROM billing.accounts
WHERE id = $1
FOR UPDATE NOWAIT;
-- Raises LockNotAvailable immediately if another transaction holds the lock
FOR UPDATE NOWAIT is appropriate for financial operations where you'd rather fail fast and ask the client to retry than queue behind another transaction of unknown duration.
Deadlocks — How They Occur and How to Prevent Them
A deadlock occurs when two transactions each hold a lock that the other needs:
Transaction A:
1. Lock account 111
2. Waiting for lock on account 222 (held by B)
Transaction B:
1. Lock account 222
2. Waiting for lock on account 111 (held by A)
Neither can proceed. PostgreSQL detects this and kills one transaction.
Prevention strategy — consistent lock ordering:
Always acquire locks in the same order, everywhere in your codebase. If you always lock the lower UUID first, two concurrent transfers between the same pair of accounts cannot deadlock:
async def transfer_funds(
self,
from_account_id: UUID,
to_account_id: UUID,
amount: Decimal,
) -> Transfer:
# Always lock in consistent order — prevents deadlock between concurrent transfers
first_id, second_id = sorted([from_account_id, to_account_id])
async with self._uow:
first = await self._repo.get_account_for_update(first_id)
second = await self._repo.get_account_for_update(second_id)
# Now determine which is source and which is destination
from_account = first if first.id == from_account_id else second
to_account = second if second.id == to_account_id else first
if from_account.balance < amount:
raise InsufficientFundsError(from_account_id, amount)
await self._repo.debit_account(from_account.id, amount)
await self._repo.credit_account(to_account.id, amount)
Transaction Isolation Levels
PostgreSQL defaults to READ COMMITTED — each statement within a transaction sees data committed before that statement began. This is appropriate for most operations.
-- READ COMMITTED (default) — appropriate for most writes
BEGIN;
-- Each statement sees freshly committed data
SELECT balance FROM billing.accounts WHERE id = $1; -- sees committed data
UPDATE billing.accounts SET balance = balance - 100 WHERE id = $1;
COMMIT;
For financial operations where you need to reason about a consistent snapshot across multiple reads:
-- REPEATABLE READ — snapshot taken at transaction start
-- All reads within the transaction see the same snapshot
-- Protects against non-repeatable reads
BEGIN ISOLATION LEVEL REPEATABLE READ;
SELECT SUM(balance) FROM billing.accounts WHERE user_id = $1;
-- ... complex logic ...
SELECT SUM(balance) FROM billing.accounts WHERE user_id = $1;
-- Both SELECTs return the same result, even if another transaction committed between them
COMMIT;
-- SERIALIZABLE — strongest isolation
-- Transactions execute as if they ran serially, one after another
-- Prevents phantom reads and write skew
-- Performance cost: serialization failures require retry logic
BEGIN ISOLATION LEVEL SERIALIZABLE;
-- ...
COMMIT;
-- May raise: ERROR: could not serialize access due to concurrent update
-- Application must detect this and retry
In asyncpg:
async with self._uow.connection.transaction(isolation="repeatable_read"):
balance_before = await self._repo.get_balance(account_id)
# ... validation logic ...
await self._repo.update_balance(account_id, new_balance)
For a production financial system, REPEATABLE READ for multi-step financial calculations and READ COMMITTED for standard CRUD operations is the correct default combination.
Connection Pool Sizing
The connection pool is a finite resource. Every open transaction holds one connection. If your average transaction duration is 50ms and your pool has 20 connections, your theoretical maximum throughput is 400 transactions per second before requests begin queuing for a connection.
# main.py — pool configuration
app.state.pool = await asyncpg.create_pool(
dsn=settings.database_url,
min_size=5, # connections maintained at idle
max_size=20, # maximum concurrent connections
max_inactive_connection_lifetime=300, # recycle idle connections after 5 min
command_timeout=30, # kill queries running longer than 30 seconds
statement_cache_size=100, # cache prepared statements
)
PostgreSQL's default max_connections is 100. With PgBouncer in transaction pooling mode (covered in Section 7), you can support many more application connections sharing a smaller pool of actual PostgreSQL connections. Without PgBouncer, sizing the pool conservatively — leaving headroom for migrations, monitoring queries, and admin connections — is essential.
The rule: Keep transactions short. Do not hold locks while waiting for anything external. Your connection pool is a shared resource, and one slow transaction that holds a connection for 5 seconds under load can cascade into full pool exhaustion across the entire system.
Section 7: Infrastructure & Deployment (Docker)
7.1 Why a Modular Monolith Fits Single Container Deployment
The modular monolith's deployment story is its most underappreciated advantage. One codebase produces one container image. That image is the unit of deployment, scaling, and rollback.
Compare this to microservices: 8 services means 8 container images, 8 CI pipelines, 8 deployment sequences that must be coordinated when a cross-service feature ships, and 8 sets of infrastructure to monitor. A bug fix to a shared library requires rebuilding and redeploying all 8. A rollback requires coordinating the rollback of whichever subset of services was involved.
With a modular monolith:
git push → CI builds one image → deploy one container → done
rollback → redeploy previous image tag → done
This is not a limitation. For a team of 2–10 engineers building toward a 2027 employment target, operational simplicity is a competitive advantage. Every hour not spent debugging a service mesh is an hour spent building product.
The single container model holds until one of these conditions is met:
- A specific module has dramatically different resource requirements than the rest (CPU-bound video processing alongside lightweight CRUD endpoints)
- Independent deployment velocity is required by genuinely separate teams
- Traffic to one module requires scaling that would waste resources on other modules
None of these conditions apply in your current phase. Build for a single container. Design the module boundaries such that extraction is possible when the time comes.
7.2 Production Dockerfile
The Dockerfile is not a development artifact. It is a specification of your production runtime. Every decision in it has consequences.
# ============================================================
# Stage 1: Builder — installs dependencies, compiles nothing to the final image
# ============================================================
FROM python:3.12.2-slim-bookworm AS builder
# Prevents Python from writing .pyc files — saves space in image
ENV PYTHONDONTWRITEBYTECODE=1
# Prevents Python from buffering stdout/stderr — logs appear immediately
ENV PYTHONUNBUFFERED=1
WORKDIR /build
# Install build dependencies — only needed at build time, not runtime
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first — Docker layer cache
# If requirements.txt doesn't change, pip install is skipped on rebuild
COPY requirements.txt .
# Install into a prefix directory — easy to copy to final stage
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# ============================================================
# Stage 2: Runtime — minimal image, no build tools
# ============================================================
FROM python:3.12.2-slim-bookworm AS runtime
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Runtime system dependencies only (libpq for asyncpg)
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user — never run production containers as root
RUN groupadd --gid 1001 appgroup && \
useradd --uid 1001 --gid appgroup --shell /bin/bash --create-home appuser
WORKDIR /app
# Copy installed packages from builder stage
COPY --from=builder /install /usr/local
# Copy application code
COPY --chown=appuser:appgroup . .
# Switch to non-root user
USER appuser
# Health check — Docker and orchestrators use this to determine container health
HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Expose the application port
EXPOSE 8000
# Default entrypoint — overridden per service in docker-compose / K8s
CMD ["gunicorn", "app.main:app", \
"--worker-class", "uvicorn.workers.UvicornWorker", \
"--workers", "4", \
"--bind", "0.0.0.0:8000", \
"--timeout", "30", \
"--keep-alive", "5", \
"--access-logfile", "-", \
"--error-logfile", "-", \
"--log-level", "info"]
Why Multi-Stage Build
The builder stage installs gcc and libpq-dev — compilation tools needed to build Python packages that include C extensions. These tools are not needed at runtime.
Without multi-stage build, your production image contains:
- gcc compiler
- build headers
- intermediate build artifacts
- pip cache
This adds ~200MB to the image and — more critically — expands the attack surface. A container with a compiler is a significantly more useful target for an attacker who achieves code execution than a container without one.
The final runtime image contains only what is needed to run the application. Smaller, faster to pull, more secure.
7.3 Gunicorn + Uvicorn Worker Architecture
FastAPI is an ASGI application. To serve it in production, you need two components:
- Uvicorn: an ASGI server that handles async Python correctly, understands WebSockets, and runs the event loop
- Gunicorn: a mature process manager that manages multiple worker processes, handles graceful shutdown, restarts crashed workers, and integrates with operating system signals
Gunicorn cannot run ASGI applications natively. Uvicorn alone cannot manage multiple processes. Together, via UvicornWorker, they form the standard production configuration:
┌─────────────────────────────────┐
│ Gunicorn Master │
│ Process manager, signal handler│
└──────────────┬──────────────────┘
│ forks
┌────────────────────┼────────────────────┐
↓ ↓ ↓
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ UvicornWorker 1 │ │ UvicornWorker 2 │ │ UvicornWorker 3 │
│ (own event │ │ (own event │ │ (own event │
│ loop) │ │ loop) │ │ loop) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Each Uvicorn worker is an independent process with its own Python interpreter, its own event loop, and its own connection pool. There is no shared memory between workers. This is what makes horizontal scaling safe — adding more workers (or more container instances) does not require any shared state coordination.
Worker Count Formula
workers = (CPU cores × 2) + 1
On a 2-core server: 5 workers. On a 4-core server: 9 workers. The +1 provides a spare worker during graceful restarts so traffic is never dropped while a worker is being recycled.
This formula assumes your workers are I/O bound — which they are. FastAPI with asyncpg spends most of its time waiting for database responses, not burning CPU. Async I/O means a single worker can handle many concurrent requests while waiting for the database. The worker count provides true parallelism across CPU cores, while asyncio provides concurrency within each worker.
Gunicorn Configuration File
Rather than passing every flag on the command line, use a configuration file:
# gunicorn.conf.py
import multiprocessing
import os
# Worker configuration
worker_class = "uvicorn.workers.UvicornWorker"
workers = int(os.environ.get("GUNICORN_WORKERS", (multiprocessing.cpu_count() * 2) + 1))
worker_connections = 1000 # max concurrent connections per worker
# Binding
bind = f"0.0.0.0:{os.environ.get('PORT', '8000')}"
# Timeouts
timeout = 30 # kill worker if request takes longer than 30s
graceful_timeout = 30 # time to finish in-flight requests on SIGTERM
keepalive = 5 # keep-alive connections
# Logging
accesslog = "-" # stdout
errorlog = "-" # stdout
loglevel = os.environ.get("LOG_LEVEL", "info")
access_log_format = (
'{"time":"%(t)s","method":"%(m)s","path":"%(U)s","status":%(s)s,'
'"duration":%(D)s,"bytes":%(b)s,"remote":"%({X-Forwarded-For}i)s"}'
)
# Process naming
proc_name = "kukufiti-api"
# Restart workers after N requests — prevents memory leaks from accumulating
max_requests = 1000
max_requests_jitter = 100 # randomizes restart timing to avoid thundering herd
# Use config file in CMD
CMD ["gunicorn", "app.main:app", "--config", "gunicorn.conf.py"]
max_requests is important. Python processes accumulate memory over time — especially if any dependency has a memory leak. Recycling workers after a set number of requests keeps memory usage bounded without a full container restart. max_requests_jitter staggers the restarts so all workers don't restart simultaneously.
7.4 docker-compose for Local Development
Development needs the application, the database, and PgBouncer (covered in 7.6). A docker-compose.yml provides the full local environment with a single command:
# docker-compose.yml
version: "3.9"
services:
# ─── PostgreSQL ──────────────────────────────────────────────────────────────
postgres:
image: postgres:16-alpine
container_name: kukufiti_postgres
environment:
POSTGRES_DB: kukufiti_dev
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres_local
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/init_db.sql:/docker-entrypoint-initdb.d/01_init.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d kukufiti_dev"]
interval: 10s
timeout: 5s
retries: 5
networks:
- kukufiti_network
# ─── PgBouncer ───────────────────────────────────────────────────────────────
pgbouncer:
image: pgbouncer/pgbouncer:1.22.0
container_name: kukufiti_pgbouncer
environment:
DATABASES_HOST: postgres
DATABASES_PORT: 5432
DATABASES_DBNAME: kukufiti_dev
PGBOUNCER_POOL_MODE: transaction
PGBOUNCER_MAX_CLIENT_CONN: 200
PGBOUNCER_DEFAULT_POOL_SIZE: 20
PGBOUNCER_AUTH_TYPE: scram-sha-256
PGBOUNCER_AUTH_FILE: /etc/pgbouncer/userlist.txt
volumes:
- ./config/pgbouncer/userlist.txt:/etc/pgbouncer/userlist.txt:ro
- ./config/pgbouncer/pgbouncer.ini:/etc/pgbouncer/pgbouncer.ini:ro
ports:
- "5433:5432" # app connects to 5433 → PgBouncer → Postgres 5432
depends_on:
postgres:
condition: service_healthy
networks:
- kukufiti_network
# ─── API Server ──────────────────────────────────────────────────────────────
api:
build:
context: .
dockerfile: Dockerfile
target: runtime
container_name: kukufiti_api
env_file:
- .env.local
ports:
- "8000:8000"
depends_on:
pgbouncer:
condition: service_started
postgres:
condition: service_healthy
volumes:
# Hot reload in development — mount source code
- ./app:/app/app
command: >
uvicorn app.main:app
--host 0.0.0.0
--port 8000
--reload
--reload-dir /app/app
networks:
- kukufiti_network
# ─── Outbox Worker ───────────────────────────────────────────────────────────
outbox_worker:
build:
context: .
dockerfile: Dockerfile
target: runtime
container_name: kukufiti_outbox_worker
env_file:
- .env.local
depends_on:
postgres:
condition: service_healthy
# Same image, different entrypoint
command: ["python", "-m", "app.workers.outbox_worker"]
networks:
- kukufiti_network
# ─── Migration Runner ─────────────────────────────────────────────────────────
migrate:
build:
context: .
dockerfile: Dockerfile
target: runtime
container_name: kukufiti_migrate
env_file:
- .env.local
depends_on:
postgres:
condition: service_healthy
command: ["python", "-m", "app.scripts.run_migrations"]
# Run once and exit — not a long-lived service
restart: "no"
networks:
- kukufiti_network
volumes:
postgres_data:
networks:
kukufiti_network:
driver: bridge
Database Initialization Script
-- scripts/init_db.sql
-- Runs once on first postgres container start
-- Create module schemas
CREATE SCHEMA IF NOT EXISTS auth;
CREATE SCHEMA IF NOT EXISTS orders;
CREATE SCHEMA IF NOT EXISTS billing;
CREATE SCHEMA IF NOT EXISTS inventory;
CREATE SCHEMA IF NOT EXISTS core;
-- Create application roles
CREATE ROLE auth_app WITH LOGIN PASSWORD 'auth_dev_password';
CREATE ROLE orders_app WITH LOGIN PASSWORD 'orders_dev_password';
CREATE ROLE billing_app WITH LOGIN PASSWORD 'billing_dev_password';
CREATE ROLE inventory_app WITH LOGIN PASSWORD 'inventory_dev_password';
CREATE ROLE core_app WITH LOGIN PASSWORD 'core_dev_password';
-- Grant schema permissions
GRANT USAGE ON SCHEMA auth TO auth_app;
GRANT ALL ON SCHEMA auth TO auth_app;
GRANT USAGE ON SCHEMA orders TO orders_app;
GRANT ALL ON SCHEMA orders TO orders_app;
GRANT USAGE ON SCHEMA core TO core_app, auth_app, orders_app, billing_app;
-- Allow all module roles to write to the outbox (in core schema)
GRANT INSERT ON ALL TABLES IN SCHEMA core TO auth_app, orders_app, billing_app, inventory_app;
ALTER DEFAULT PRIVILEGES IN SCHEMA core
GRANT INSERT ON TABLES TO auth_app, orders_app, billing_app, inventory_app;
7.5 Background Workers — Same Image, Different Entrypoint
This is the deployment pattern that makes the modular monolith operationally elegant. The outbox worker, scheduled tasks, and report generators all use the same Docker image as the API server. No separate codebase to maintain, no separate deployment pipeline, no version skew between the API and the workers.
kukufiti:v1.4.2 (same image)
├── running as API server → CMD gunicorn app.main:app
├── running as outbox worker → CMD python -m app.workers.outbox_worker
└── running as scheduled tasks → CMD python -m app.workers.scheduler
# app/workers/outbox_worker.py
"""
Outbox worker entrypoint.
Run as: python -m app.workers.outbox_worker
Deployed as the same Docker image as the API server, different CMD.
"""
import asyncio
import logging
from app.core.config import settings
from app.core.logging import configure_logging
from app.core.database import create_pool
from app.workers.outbox_processor import process_outbox
logger = logging.getLogger(__name__)
async def main() -> None:
configure_logging()
logger.info("Outbox worker starting")
pool = await create_pool(settings.database_url)
try:
logger.info("Outbox worker ready — polling for events")
await process_outbox(pool)
except KeyboardInterrupt:
logger.info("Outbox worker shutting down")
finally:
await pool.close()
logger.info("Outbox worker stopped")
if __name__ == "__main__":
asyncio.run(main())
# app/workers/scheduler.py
"""
Scheduled task runner — cron-style jobs without cron.
Runs as same image, different CMD.
"""
import asyncio
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.core.database import create_pool
from app.modules.batches.tasks import (
refresh_farm_performance_summary,
send_daily_mortality_alerts,
)
async def main() -> None:
pool = await create_pool(settings.database_url)
scheduler = AsyncIOScheduler()
# Refresh materialized view every 15 minutes
scheduler.add_job(
refresh_farm_performance_summary,
trigger="interval",
minutes=15,
args=[pool],
id="refresh_farm_performance",
replace_existing=True,
)
# Daily mortality alerts at 8am Nairobi time
scheduler.add_job(
send_daily_mortality_alerts,
trigger="cron",
hour=8,
minute=0,
timezone="Africa/Nairobi",
args=[pool],
id="daily_mortality_alerts",
replace_existing=True,
)
scheduler.start()
logger.info("Scheduler started")
try:
await asyncio.Event().wait() # run forever
finally:
scheduler.shutdown()
await pool.close()
In production (Railway, Render, or a VPS), you run separate services from the same image:
# railway.toml
[build]
dockerfilePath = "Dockerfile"
[[services]]
name = "api"
startCommand = "gunicorn app.main:app --config gunicorn.conf.py"
[[services]]
name = "outbox-worker"
startCommand = "python -m app.workers.outbox_worker"
[[services]]
name = "scheduler"
startCommand = "python -m app.workers.scheduler"
All three use the same image, the same environment variables, and deploy atomically on the same release. There is no version mismatch possible between the API and the workers.
7.6 Connection Pooling — The Critical Section
This is the most operationally important topic in this section. Connection pooling failures are responsible for a disproportionate share of production incidents in Python backend systems.
The Async Connection Explosion Problem
PostgreSQL maintains a process per connection. Each connection has a memory overhead of ~5–10MB on the database server. The default max_connections in PostgreSQL is 100.
An asyncio-based application like FastAPI can handle thousands of concurrent requests in a single process. Without pooling constraints, each request that needs a database connection requests one directly from PostgreSQL:
1000 concurrent requests
× 4 Uvicorn workers
= potentially 4000 simultaneous connection attempts to PostgreSQL
PostgreSQL max_connections = 100
Result: 3900 connections refused → cascade of 500 errors
The asyncpg connection pool inside each worker limits this — but only per worker. With 4 workers each configured for max_size=20, you have 80 potential simultaneous PostgreSQL connections — within safe limits. But add horizontal scaling (multiple container instances), or background workers, and the total connection count grows rapidly.
3 API containers × 4 workers × 20 pool connections = 240 connections
+ 2 outbox workers × 5 pool connections = 10 connections
+ scheduler × 5 pool connections = 5 connections
Total: = 255 connections
PostgreSQL max_connections = 100 → EXHAUSTED
PgBouncer Architecture
PgBouncer is a lightweight PostgreSQL connection pooler. It sits between your application and PostgreSQL, maintaining a small pool of actual PostgreSQL connections and multiplexing many application connections onto them:
Application Layer
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ 20 conns │ │ 20 conns │ │ 20 conns │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────────────┼──────────────┘
│
┌─────▼──────┐
│ PgBouncer │ ← accepts up to 200 client connections
│ │ ← maintains 20 server connections
└─────┬───────┘
│
┌─────▼───────┐
│ PostgreSQL │ ← sees only 20 connections regardless
│ │ of how many application clients exist
└─────────────┘
200 application connections share 20 PostgreSQL connections. PostgreSQL sees a constant, predictable load regardless of how many workers or container instances you add.
Pooling Modes — This Decision Matters
PgBouncer has three pooling modes. The differences between them are critical to understand:
Session pooling: A PostgreSQL connection is assigned to a client for the entire duration of the client's session (connection lifecycle). When the client disconnects, the PostgreSQL connection is returned to the pool. This is the most compatible mode — all PostgreSQL features work. It is also the least efficient for async applications, because the application holds the PostgreSQL connection even when it is not actively executing a query.
Transaction pooling (recommended for asyncpg): A PostgreSQL connection is assigned to a client only while a transaction is active. When the transaction commits or rolls back, the connection is returned to the pool immediately. A single PostgreSQL connection can serve many clients in sequence. This is the correct mode for asyncpg applications — connections are held only during actual database work.
Statement pooling: A connection is returned to the pool after each individual statement. This breaks transactions (you cannot execute BEGIN; UPDATE ...; COMMIT; across multiple connections). Not usable for transactional applications.
The compatibility constraint with transaction pooling: Because the PostgreSQL connection is reassigned between transactions, some features that assume a persistent connection do not work in transaction pooling mode:
-
SETstatements (session-level configuration) - Advisory locks (held at session level)
-
LISTEN/NOTIFY(requires persistent session) - Prepared statements (in some configurations)
For asyncpg, prepared statements are managed per-connection. With PgBouncer in transaction mode, you must disable prepared statement caching in asyncpg:
# core/database.py
pool = await asyncpg.create_pool(
dsn=settings.database_url, # points to PgBouncer, not Postgres directly
min_size=5,
max_size=20,
statement_cache_size=0, # REQUIRED with PgBouncer transaction mode
command_timeout=30,
)
Without statement_cache_size=0, asyncpg caches prepared statement handles per connection. When PgBouncer reassigns the connection to a different backend PostgreSQL connection, the cached statement handle is invalid. Queries fail with cryptic errors.
PgBouncer Configuration
; config/pgbouncer/pgbouncer.ini
[databases]
; Routing rule: any connection to 'kukufiti_dev' is forwarded to postgres:5432
kukufiti_dev = host=postgres port=5432 dbname=kukufiti_dev
[pgbouncer]
; Network
listen_addr = 0.0.0.0
listen_port = 5432
unix_socket_dir =
; Pooling
pool_mode = transaction
max_client_conn = 200 ; total application connections PgBouncer accepts
default_pool_size = 20 ; PostgreSQL connections per database/user pair
reserve_pool_size = 5 ; extra connections for sudden spikes
reserve_pool_timeout = 3 ; seconds before using reserve pool
; Authentication
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt
; Timeouts
server_idle_timeout = 600 ; return idle server connection to pool after 10 min
client_idle_timeout = 0 ; no timeout for idle clients (application manages this)
query_timeout = 30 ; kill queries running longer than 30s
query_wait_timeout = 15 ; fail if waiting for connection longer than 15s
; Logging
log_connections = 0 ; don't log every connection — high noise at scale
log_disconnections = 0
log_pooler_errors = 1 ; always log pooling errors
stats_period = 60 ; emit stats every 60 seconds
; Admin
admin_users = pgbouncer_admin
; config/pgbouncer/userlist.txt
; Format: "username" "scram-sha-256 hash of password"
; Generate hash: psql -c "SELECT concat('\"', usename, '\" \"', passwd, '\"') FROM pg_shadow WHERE usename='auth_app';"
"auth_app" "SCRAM-SHA-256$4096:..."
"orders_app" "SCRAM-SHA-256$4096:..."
"billing_app" "SCRAM-SHA-256$4096:..."
Monitoring PgBouncer
PgBouncer exposes statistics via a virtual database called pgbouncer. Connect to it with psql -p 5433 -U pgbouncer_admin pgbouncer and run:
-- Current pool state
SHOW POOLS;
-- Returns: database, user, cl_active, cl_waiting, sv_active, sv_idle, sv_used
-- Overall statistics
SHOW STATS;
-- Returns: requests/sec, bytes in/out, avg query time
-- Client connections
SHOW CLIENTS;
-- Server connections (actual PostgreSQL connections)
SHOW SERVERS;
Key metrics to watch:
-
cl_waiting: clients waiting for a server connection — if this is consistently above 0, your pool is undersized -
sv_active: server connections currently executing a query -
sv_idle: server connections in the pool, not currently used — if always 0, you are at capacity -
avg_query_time: average query duration from PgBouncer's perspective — includes wait time for a connection
Environment Configuration
# .env.local — development
DATABASE_URL=postgresql://orders_app:orders_dev_password@pgbouncer:5432/kukufiti_dev
AUTH_DB_URL=postgresql://auth_app:auth_dev_password@pgbouncer:5432/kukufiti_dev
ORDERS_DB_URL=postgresql://orders_app:orders_dev_password@pgbouncer:5432/kukufiti_dev
BILLING_DB_URL=postgresql://billing_app:billing_dev_password@pgbouncer:5432/kukufiti_dev
# Note: all URLs point to PgBouncer (port 5433 on host, 5432 inside Docker network)
# PgBouncer routes to PostgreSQL internally
GUNICORN_WORKERS=4
LOG_LEVEL=info
SECRET_KEY=dev_secret_key_never_use_in_production
# .env.example — committed to git, no actual values
DATABASE_URL=postgresql://app_user:password@pgbouncer_host:5432/dbname
AUTH_DB_URL=postgresql://auth_app:password@pgbouncer_host:5432/dbname
GUNICORN_WORKERS=4
LOG_LEVEL=info
SECRET_KEY=
.env is in .gitignore. .env.example is committed. This is the production-standard approach — the structure of environment variables is documented without any secrets being exposed.
Health Check Endpoint
The HEALTHCHECK in the Dockerfile calls this endpoint:
# app/modules/health/router.py
from fastapi import APIRouter, Depends
from app.core.database import get_connection
import asyncpg
router = APIRouter(tags=["health"])
@router.get("/health")
async def health_check(conn: asyncpg.Connection = Depends(get_connection)):
try:
# Verify database connectivity — not just process liveness
await conn.fetchval("SELECT 1")
return {
"status": "healthy",
"database": "connected",
}
except Exception as e:
# Return 503 — orchestrator will restart the container
from fastapi import Response
return Response(
content=f'{{"status":"unhealthy","error":"{str(e)}"}}',
status_code=503,
media_type="application/json",
)
A health check that only verifies the process is running is insufficient. If the database connection pool is exhausted or PgBouncer is unreachable, the container is unhealthy even though the process is alive. The health check must verify the entire critical path is functional.
Complete Deployment Architecture Diagram
Internet
│
▼
[Nginx / Railway Load Balancer]
│
├──────────────────────────────────────┐
▼ ▼
[API Container 1] [API Container 2]
[Gunicorn + 4 UvicornWorkers] [Gunicorn + 4 UvicornWorkers]
│ │
└──────────────┬───────────────────────┘
│
▼
[PgBouncer]
[transaction mode]
[max_client_conn=200]
[pool_size=20]
│
▼
[PostgreSQL 16]
[schemas: auth, orders, billing, inventory, core]
[max_connections=100]
Separately running (same image):
[Outbox Worker Container] ──────► [PgBouncer] ──► [PostgreSQL]
[Scheduler Container] ──────► [PgBouncer] ──► [PostgreSQL]
Every component in this architecture is horizontally scalable by adding more instances. PgBouncer ensures PostgreSQL connection count remains bounded regardless of how many application instances you add. The same image across all containers ensures no version drift.
Section 8: Testing Strategy
Why Testing Is an Architectural Decision
Testing is not something you add after the system is built. The structure of your tests reflects and enforces the structure of your architecture. A test suite that requires standing up the entire application to test a single business rule is telling you that your business logic is not isolated. A test suite that uses SQLite to test PostgreSQL-specific behaviour is lying to you about whether your system works.
The modular monolith's bounded module design makes it naturally testable — if the boundaries are real. Each module has a public contract. Each service has injected dependencies. Each repository is the only layer that touches the database. These properties make it possible to test each concern in isolation, at the appropriate level of fidelity.
The testing strategy has three levels. Each level answers a different question. All three are required.
The Three Levels
| Level | What It Tests | Database | Speed | Confidence |
|---|---|---|---|---|
| Unit | Business logic in isolation | None — dependencies mocked | Milliseconds | High on logic, zero on integration |
| Integration | A module's full stack against a real database | Real PostgreSQL via Testcontainers | Seconds | High on data layer, module-scoped |
| Slice (E2E) | Full request lifecycle through the application | Real PostgreSQL via Testcontainers | Seconds | Highest — tests the real system |
The distribution should be weighted toward slice tests and integration tests, with unit tests reserved for complex, pure business logic. The classic testing pyramid — many unit tests, few integration tests — made sense when integration tests were slow and expensive. With Testcontainers and async pytest, integration tests run in seconds. The argument for heavy unit test coverage at the expense of integration coverage is significantly weaker in a database-heavy backend system.
A test suite with 200 unit tests and 5 slice tests gives you high confidence that your business logic works in isolation and no confidence that your SQL is correct, your migrations are valid, or your endpoints return the right status codes. Invert that ratio.
Why SQLite Is Invalid for Testing PostgreSQL Applications
This point requires its own section because the mistake is extremely common.
SQLite and PostgreSQL are not interchangeable. They differ in:
Type system: PostgreSQL has UUID, JSONB, ARRAY, NUMERIC, TIMESTAMPTZ, ENUM, and dozens of other types. SQLite has TEXT, INTEGER, REAL, BLOB, NULL. A column defined as UUID in PostgreSQL maps to TEXT in SQLite. A column defined as JSONB with jsonb_array_elements() query functions does not exist in SQLite. Tests that pass on SQLite and fail in production on PostgreSQL-specific features give you false confidence.
Constraint enforcement: SQLite enforces FOREIGN KEY constraints only when explicitly enabled (PRAGMA foreign_keys = ON). PostgreSQL enforces them always. Tests that pass on SQLite with disabled foreign keys would fail on PostgreSQL with constraint violations.
Transaction behaviour: SQLite's locking is database-level. PostgreSQL's locking is row-level. Concurrent transaction tests behave differently.
SQL dialect: gen_random_uuid(), NOW(), RETURNING, ON CONFLICT DO NOTHING, FOR UPDATE SKIP LOCKED, ARRAY_AGG, json_build_object — all PostgreSQL-specific. SQLite accepts none of these.
Schema-per-module: SQLite has no schema concept. Your entire schema-per-module architecture cannot be tested with SQLite.
The conclusion is simple: test against PostgreSQL. Testcontainers makes this trivially easy.
Test Infrastructure Setup
Testcontainers
Testcontainers is a library that programmatically starts a real PostgreSQL Docker container before your tests run and tears it down after. No manual database setup, no shared test database state, no "works on my machine" problems.
pip install pytest pytest-asyncio testcontainers[postgres] asyncpg
# tests/conftest.py
"""
Root conftest — shared fixtures for all tests.
Testcontainers starts a real PostgreSQL instance for the test session.
"""
import asyncio
import pytest
import asyncpg
from testcontainers.postgres import PostgresContainer
from app.scripts.run_migrations import run_all_migrations
# ─── Event Loop ─────────────────────────────────────────────────────────────
@pytest.fixture(scope="session")
def event_loop():
"""Single event loop for the entire test session."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
# ─── PostgreSQL Container ────────────────────────────────────────────────────
@pytest.fixture(scope="session")
def postgres_container():
"""
Start a real PostgreSQL 16 container for the test session.
Container starts once, is reused across all tests, torn down at the end.
"""
with PostgresContainer(
image="postgres:16-alpine",
username="test_user",
password="test_password",
dbname="test_db",
) as container:
yield container
@pytest.fixture(scope="session")
async def db_pool(postgres_container, event_loop):
"""
Create the database schema and connection pool.
Runs migrations once per test session against the Testcontainer.
"""
dsn = postgres_container.get_connection_url().replace(
"postgresql+psycopg2", "postgresql"
)
# Create schemas and roles
admin_conn = await asyncpg.connect(dsn)
try:
await admin_conn.execute("""
CREATE SCHEMA IF NOT EXISTS auth;
CREATE SCHEMA IF NOT EXISTS orders;
CREATE SCHEMA IF NOT EXISTS billing;
CREATE SCHEMA IF NOT EXISTS inventory;
CREATE SCHEMA IF NOT EXISTS core;
""")
finally:
await admin_conn.close()
# Run all module migrations
await run_all_migrations(dsn)
# Create the connection pool
pool = await asyncpg.create_pool(
dsn=dsn,
min_size=2,
max_size=10,
statement_cache_size=0,
)
yield pool
await pool.close()
# ─── Transaction Isolation Between Tests ─────────────────────────────────────
@pytest.fixture
async def db_conn(db_pool):
"""
Provides a database connection wrapped in a transaction that is
rolled back after each test. This is the critical fixture — it
guarantees test isolation without resetting the database between tests.
"""
async with db_pool.acquire() as conn:
transaction = conn.transaction()
await transaction.start()
yield conn
# Always roll back — test state never persists to the next test
await transaction.rollback()
The transaction rollback pattern is the key insight. Rather than truncating tables or recreating the schema between each test (which is slow), each test runs inside a transaction that is rolled back on completion. The database is in a clean state for every test with zero overhead.
Unit Tests
Unit tests verify pure business logic. They mock all dependencies — repositories, external service contracts, event buses. They run without a database.
The correct targets for unit tests are service methods that contain non-trivial decision logic: discount calculations, eligibility checks, state machine transitions, validation rules.
# tests/unit/test_order_service.py
import pytest
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
from app.modules.orders.service import OrderService
from app.modules.orders.exceptions import InsufficientStockError, UserInactiveError
from app.modules.auth.contracts import UserIdentity
from app.modules.inventory.contracts import ProductSnapshot
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def mock_order_repo():
repo = AsyncMock()
repo.create_order.return_value = MagicMock(
id=uuid4(),
total_amount=Decimal("2500.00"),
status="pending",
)
return repo
@pytest.fixture
def mock_user_service():
service = AsyncMock()
service.get_user_identity.return_value = UserIdentity(
user_id=uuid4(),
email="farmer@example.co.ke",
is_active=True,
)
return service
@pytest.fixture
def mock_inventory_service():
service = AsyncMock()
service.get_product_snapshots.return_value = {
uuid4(): ProductSnapshot(
product_id=uuid4(),
name="Day-Old Chick",
price=Decimal("250.00"),
stock_quantity=500,
)
}
return service
@pytest.fixture
def mock_outbox():
return AsyncMock()
@pytest.fixture
def mock_uow():
uow = AsyncMock()
uow.__aenter__ = AsyncMock(return_value=uow)
uow.__aexit__ = AsyncMock(return_value=None)
return uow
@pytest.fixture
def order_service(mock_order_repo, mock_user_service, mock_inventory_service,
mock_outbox, mock_uow):
return OrderService(
uow=mock_uow,
repository=mock_order_repo,
inventory=mock_inventory_service,
user_service=mock_user_service,
outbox=mock_outbox,
)
# ─── Tests ───────────────────────────────────────────────────────────────────
class TestCreateOrder:
async def test_rejects_inactive_user(self, order_service, mock_user_service):
mock_user_service.get_user_identity.return_value = UserIdentity(
user_id=uuid4(),
email="inactive@example.co.ke",
is_active=False, # inactive user
)
with pytest.raises(UserInactiveError):
await order_service.create_order(
user_id=uuid4(),
items=[MagicMock(product_id=uuid4(), quantity=10)],
)
# Verify no order was attempted
mock_order_repo.create_order.assert_not_called()
async def test_rejects_when_insufficient_stock(
self, order_service, mock_inventory_service
):
product_id = uuid4()
mock_inventory_service.get_product_snapshots.return_value = {
product_id: ProductSnapshot(
product_id=product_id,
name="Day-Old Chick",
price=Decimal("250.00"),
stock_quantity=5, # only 5 available
)
}
with pytest.raises(InsufficientStockError) as exc_info:
await order_service.create_order(
user_id=uuid4(),
items=[MagicMock(product_id=product_id, quantity=100)], # wants 100
)
assert exc_info.value.product_id == product_id
assert exc_info.value.requested == 100
assert exc_info.value.available == 5
async def test_publishes_order_placed_event_on_success(
self, order_service, mock_outbox
):
await order_service.create_order(
user_id=uuid4(),
items=[MagicMock(product_id=uuid4(), quantity=10)],
)
mock_outbox.enqueue.assert_called_once()
event = mock_outbox.enqueue.call_args[0][0]
assert type(event).__name__ == "OrderPlacedEvent"
async def test_does_not_create_order_if_user_not_found(
self, order_service, mock_user_service, mock_order_repo
):
mock_user_service.get_user_identity.return_value = None
with pytest.raises(UserNotFoundError):
await order_service.create_order(user_id=uuid4(), items=[...])
mock_order_repo.create_order.assert_not_called()
class TestOrderTotalCalculation:
"""
Pure arithmetic logic — no dependencies needed.
Directly test the calculation method.
"""
def test_calculates_total_correctly(self):
items = [
MagicMock(unit_price=Decimal("250.00"), quantity=10),
MagicMock(unit_price=Decimal("180.00"), quantity=5),
]
# 250 × 10 + 180 × 5 = 2500 + 900 = 3400
total = OrderService.calculate_total(items)
assert total == Decimal("3400.00")
def test_handles_single_item(self):
items = [MagicMock(unit_price=Decimal("250.00"), quantity=1)]
assert OrderService.calculate_total(items) == Decimal("250.00")
Integration Tests
Integration tests verify that a module's repository layer — the SQL — works correctly against a real PostgreSQL database. They do not test business logic. They test data persistence, query correctness, constraint enforcement, and index effectiveness.
# tests/integration/test_order_repository.py
import pytest
from decimal import Decimal
from uuid import uuid4
from datetime import date
from app.modules.orders.repository import OrderRepository
from app.modules.orders.models import OrderItem
# db_conn fixture provides a real connection inside a rolled-back transaction
class TestOrderRepository:
async def test_creates_order_with_items(self, db_conn):
repo = OrderRepository(db_conn)
user_id = uuid4()
product_id = uuid4()
items = [
OrderItem(
product_id=product_id,
product_name="Day-Old Chick",
quantity=100,
unit_price=Decimal("250.00"),
)
]
order = await repo.create_order(
user_id=user_id,
items=items,
reservation_id=uuid4(),
)
assert order.id is not None
assert order.user_id == user_id
assert order.status == "pending"
assert order.total_amount == Decimal("25000.00") # 100 × 250
assert len(order.items) == 1
assert order.items[0].product_name == "Day-Old Chick"
async def test_find_by_id_returns_none_for_missing_order(self, db_conn):
repo = OrderRepository(db_conn)
result = await repo.find_by_id(uuid4())
assert result is None
async def test_list_by_user_returns_only_that_users_orders(self, db_conn):
repo = OrderRepository(db_conn)
user_a = uuid4()
user_b = uuid4()
# Create orders for both users
await repo.create_order(user_id=user_a, items=[...], reservation_id=uuid4())
await repo.create_order(user_id=user_a, items=[...], reservation_id=uuid4())
await repo.create_order(user_id=user_b, items=[...], reservation_id=uuid4())
user_a_orders = await repo.list_by_user(user_a, limit=10, offset=0)
assert len(user_a_orders) == 2
assert all(o.user_id == user_a for o in user_a_orders)
async def test_soft_delete_excludes_from_active_queries(self, db_conn):
repo = OrderRepository(db_conn)
user_id = uuid4()
order = await repo.create_order(user_id=user_id, items=[...], reservation_id=uuid4())
await repo.soft_delete(order.id)
# find_by_id for active orders should not find deleted order
result = await repo.find_by_id(order.id)
assert result is None
# But a raw query should confirm it exists with deleted_at set
row = await db_conn.fetchrow(
"SELECT deleted_at FROM orders.orders WHERE id = $1",
order.id
)
assert row["deleted_at"] is not None
async def test_respects_schema_isolation(self, db_conn):
"""
The orders connection role cannot read from auth schema.
This test verifies RBAC is enforced at the DB level.
"""
with pytest.raises(asyncpg.InsufficientPrivilegeError):
await db_conn.fetch("SELECT * FROM auth.users")
async def test_pagination_returns_correct_page(self, db_conn):
repo = OrderRepository(db_conn)
user_id = uuid4()
# Create 5 orders
for _ in range(5):
await repo.create_order(user_id=user_id, items=[...], reservation_id=uuid4())
page_1 = await repo.list_by_user(user_id, limit=2, offset=0)
page_2 = await repo.list_by_user(user_id, limit=2, offset=2)
page_3 = await repo.list_by_user(user_id, limit=2, offset=4)
assert len(page_1) == 2
assert len(page_2) == 2
assert len(page_3) == 1
# No overlap between pages
page_1_ids = {o.id for o in page_1}
page_2_ids = {o.id for o in page_2}
assert page_1_ids.isdisjoint(page_2_ids)
Slice Tests — The Most Valuable Tests
A slice test sends a real HTTP request through the full application stack: router → service → repository → database → response. No mocking. No stubbing. The entire vertical slice is tested as a unit.
This is where you catch the bugs that unit tests miss: incorrect HTTP status codes, malformed response shapes, missing database constraints, wrong error messages returned to the client, authentication failures on protected routes.
Slice tests are what you should have the most of. They are the closest approximation to what a real client experiences.
# tests/slices/test_order_slices.py
import pytest
from decimal import Decimal
from httpx import AsyncClient, ASGITransport
from uuid import uuid4
from app.main import app
# ─── Application Client Fixture ──────────────────────────────────────────────
@pytest.fixture
async def client(db_pool):
"""
Create an AsyncClient that talks to the real FastAPI application.
The application uses the test database pool — real PostgreSQL, real SQL.
"""
# Override the pool in app state with the test pool
app.state.pool = db_pool
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
@pytest.fixture
async def auth_headers(client, db_conn):
"""Register and log in a test user, return auth headers."""
user_id = uuid4()
email = f"test_{user_id}@kukufiti.co.ke"
# Register
resp = await client.post("/api/v1/auth/register", json={
"email": email,
"password": "SecurePass123!",
"full_name": "Test Farmer",
})
assert resp.status_code == 201
# Login
resp = await client.post("/api/v1/auth/login", json={
"email": email,
"password": "SecurePass123!",
})
assert resp.status_code == 200
token = resp.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
# ─── Slice Tests ──────────────────────────────────────────────────────────────
class TestCreateOrderSlice:
async def test_creates_order_successfully(self, client, auth_headers, db_conn):
# Seed a product in inventory
product_id = uuid4()
await db_conn.execute(
"""
INSERT INTO inventory.products (id, name, price, stock_quantity)
VALUES ($1, 'Day-Old Chick', 250.00, 1000)
""",
product_id,
)
response = await client.post(
"/api/v1/orders/",
json={
"items": [{"product_id": str(product_id), "quantity": 10}]
},
headers=auth_headers,
)
assert response.status_code == 201
body = response.json()
assert body["status"] == "pending"
assert Decimal(body["total_amount"]) == Decimal("2500.00")
assert len(body["items"]) == 1
assert body["items"][0]["product_name"] == "Day-Old Chick"
# Verify it actually exists in the database — not just in the response
row = await db_conn.fetchrow(
"SELECT id, status FROM orders.orders WHERE id = $1",
body["order_id"],
)
assert row is not None
assert row["status"] == "pending"
async def test_returns_422_for_empty_items_list(self, client, auth_headers):
response = await client.post(
"/api/v1/orders/",
json={"items": []}, # Pydantic validator should reject this
headers=auth_headers,
)
assert response.status_code == 422
errors = response.json()["detail"]
assert any("items" in str(e["loc"]) for e in errors)
async def test_returns_401_without_auth_header(self, client):
response = await client.post(
"/api/v1/orders/",
json={"items": [{"product_id": str(uuid4()), "quantity": 1}]},
# No Authorization header
)
assert response.status_code == 401
async def test_returns_400_when_stock_insufficient(
self, client, auth_headers, db_conn
):
product_id = uuid4()
await db_conn.execute(
"INSERT INTO inventory.products (id, name, price, stock_quantity) "
"VALUES ($1, 'Scarce Chick', 250.00, 5)",
product_id,
)
response = await client.post(
"/api/v1/orders/",
json={"items": [{"product_id": str(product_id), "quantity": 100}]},
headers=auth_headers,
)
assert response.status_code == 400
assert "insufficient" in response.json()["detail"].lower()
async def test_order_creation_writes_outbox_event(
self, client, auth_headers, db_conn
):
"""
Verify the outbox pattern — order creation must enqueue an event.
The event being in the outbox is the guarantee of eventual consistency.
"""
product_id = uuid4()
await db_conn.execute(
"INSERT INTO inventory.products (id, name, price, stock_quantity) "
"VALUES ($1, 'Test Chick', 250.00, 1000)",
product_id,
)
response = await client.post(
"/api/v1/orders/",
json={"items": [{"product_id": str(product_id), "quantity": 1}]},
headers=auth_headers,
)
assert response.status_code == 201
# The outbox row must exist — same transaction guaranteed this
outbox_row = await db_conn.fetchrow(
"SELECT event_type, status FROM core.outbox "
"WHERE payload->>'order_id' = $1",
response.json()["order_id"],
)
assert outbox_row is not None
assert outbox_row["event_type"] == "OrderPlacedEvent"
assert outbox_row["status"] == "pending"
class TestGetOrderSlice:
async def test_returns_order_for_owner(self, client, auth_headers, db_conn):
# Create an order via the API
product_id = uuid4()
await db_conn.execute(
"INSERT INTO inventory.products (id, name, price, stock_quantity) "
"VALUES ($1, 'Chick', 250.00, 1000)",
product_id,
)
create_resp = await client.post(
"/api/v1/orders/",
json={"items": [{"product_id": str(product_id), "quantity": 1}]},
headers=auth_headers,
)
order_id = create_resp.json()["order_id"]
# Fetch it
get_resp = await client.get(
f"/api/v1/orders/{order_id}",
headers=auth_headers,
)
assert get_resp.status_code == 200
assert get_resp.json()["order_id"] == order_id
async def test_returns_404_for_nonexistent_order(self, client, auth_headers):
response = await client.get(
f"/api/v1/orders/{uuid4()}",
headers=auth_headers,
)
assert response.status_code == 404
async def test_returns_403_when_accessing_another_users_order(
self, client, db_conn
):
"""Verify that User A cannot read User B's orders."""
# Create User A and their order
user_a_headers = await create_test_user_headers(client, "user_a@test.co.ke")
product_id = await seed_product(db_conn)
create_resp = await client.post(
"/api/v1/orders/",
json={"items": [{"product_id": str(product_id), "quantity": 1}]},
headers=user_a_headers,
)
order_id = create_resp.json()["order_id"]
# User B attempts to access User A's order
user_b_headers = await create_test_user_headers(client, "user_b@test.co.ke")
response = await client.get(
f"/api/v1/orders/{order_id}",
headers=user_b_headers,
)
assert response.status_code == 403
Testing the Outbox Worker
The outbox worker is a critical component that runs separately from the API. It must be tested independently:
# tests/integration/test_outbox_worker.py
import pytest
import json
from uuid import uuid4
from decimal import Decimal
from unittest.mock import AsyncMock
from app.workers.outbox_processor import process_batch
from app.core.event_registry import EVENT_REGISTRY
class TestOutboxProcessor:
async def test_processes_pending_event_and_marks_delivered(
self, db_pool, db_conn
):
event_id = uuid4()
# Seed a pending outbox event directly
await db_conn.execute(
"""
INSERT INTO core.outbox (id, event_type, payload, status, idempotency_key)
VALUES ($1, 'OrderPlacedEvent', $2, 'pending', $3)
""",
event_id,
json.dumps({"order_id": str(uuid4()), "user_id": str(uuid4()),
"total_amount": "2500.00"}),
str(event_id),
)
# Register a mock handler
mock_handler = AsyncMock()
EVENT_REGISTRY["OrderPlacedEvent"] = mock_handler
processed = await process_batch(db_pool)
assert processed == 1
mock_handler.assert_called_once()
# Verify status updated
row = await db_conn.fetchrow(
"SELECT status FROM core.outbox WHERE id = $1", event_id
)
assert row["status"] == "delivered"
async def test_retries_failed_event_with_backoff(self, db_pool, db_conn):
event_id = uuid4()
await db_conn.execute(
"""
INSERT INTO core.outbox (id, event_type, payload, status,
idempotency_key, max_attempts)
VALUES ($1, 'OrderPlacedEvent', $2, 'pending', $3, 5)
""",
event_id,
json.dumps({"order_id": str(uuid4())}),
str(event_id),
)
# Handler that always fails
EVENT_REGISTRY["OrderPlacedEvent"] = AsyncMock(
side_effect=Exception("Downstream service unavailable")
)
await process_batch(db_pool)
row = await db_conn.fetchrow(
"SELECT status, attempt_count, scheduled_at FROM core.outbox WHERE id = $1",
event_id,
)
assert row["status"] == "pending" # retryable — not dead yet
assert row["attempt_count"] == 1
assert row["scheduled_at"] > datetime.now(UTC) # scheduled in the future
async def test_moves_to_dead_after_max_attempts(self, db_pool, db_conn):
event_id = uuid4()
# Already at max_attempts - 1
await db_conn.execute(
"""
INSERT INTO core.outbox
(id, event_type, payload, status, idempotency_key,
attempt_count, max_attempts)
VALUES ($1, 'OrderPlacedEvent', $2, 'pending', $3, 4, 5)
""",
event_id,
json.dumps({"order_id": str(uuid4())}),
str(event_id),
)
EVENT_REGISTRY["OrderPlacedEvent"] = AsyncMock(
side_effect=Exception("Permanent failure")
)
await process_batch(db_pool)
row = await db_conn.fetchrow(
"SELECT status FROM core.outbox WHERE id = $1", event_id
)
assert row["status"] == "dead"
async def test_skip_locked_allows_parallel_workers(self, db_pool, db_conn):
"""
Two concurrent workers must not process the same event.
FOR UPDATE SKIP LOCKED guarantees this.
"""
import asyncio
# Seed 10 events
for _ in range(10):
eid = uuid4()
await db_conn.execute(
"INSERT INTO core.outbox (id, event_type, payload, status, "
"idempotency_key) VALUES ($1, 'TestEvent', '{}', 'pending', $2)",
eid, str(eid),
)
handler_call_count = 0
async def counting_handler(payload):
nonlocal handler_call_count
handler_call_count += 1
EVENT_REGISTRY["TestEvent"] = counting_handler
# Run two workers concurrently
results = await asyncio.gather(
process_batch(db_pool),
process_batch(db_pool),
)
# Total processed = sum of both workers, no duplicates
assert sum(results) == 10
assert handler_call_count == 10
pytest Configuration
# pytest.ini
[pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# Markers — used to run subsets of the test suite
markers =
unit: Pure unit tests — no database, fast
integration: Integration tests — real PostgreSQL, module-scoped
slice: Slice tests — full HTTP request through real application
# Show test durations — identifies slow tests
addopts = -v --durations=10 --tb=short
# Run only unit tests (fast — run on every file save)
pytest -m unit
# Run integration tests (slower — run before committing)
pytest -m integration
# Run slice tests (slowest but most valuable — run in CI)
pytest -m slice
# Run everything
pytest
# Run with coverage
pytest --cov=app --cov-report=term-missing --cov-fail-under=80
Coverage Targets
Coverage is a diagnostic tool, not a goal. 100% coverage with tests that only verify happy paths is worse than 70% coverage with tests that verify failure modes, edge cases, and boundary conditions.
The targets that matter:
| Component | Target | What To Focus On |
|---|---|---|
| Service layer | 90%+ | All decision branches — every if, every exception path |
| Repository layer | 85%+ | All queries, including empty result cases |
| Router layer | 80%+ | Status codes, auth enforcement, validation rejection |
| Outbox worker | 85%+ | Retry logic, dead letter transition, skip locked |
| Event handlers | 90%+ | Idempotency — verify double-processing produces correct result |
The most important tests to write, in order:
- The happy path slice test for every endpoint
- The authentication/authorisation rejection test for every protected endpoint
- The validation rejection tests for every request schema
- The failure path unit tests for every business rule that can raise an exception
- The idempotency tests for every event handler
Write these before writing any coverage-chasing tests. A test suite with these five categories across all modules is a production-grade test suite.
Section 9: Scaling Strategies
The Correct Mental Model for Scaling
Scaling is not a feature you add when traffic arrives. It is a set of architectural decisions made upfront that determine how much headroom you have before traffic becomes a crisis. The modular monolith's scaling story is frequently misunderstood — engineers assume it cannot scale because it is not microservices. This is wrong. The bottlenecks in a modular monolith are well-understood, predictable, and solvable with standard techniques. The bottlenecks in a premature microservices architecture are distributed, non-deterministic, and significantly harder to debug.
The goal of this section is not to promise infinite scalability. It is to define exactly how far the modular monolith scales, what the ceiling looks like, what hits the ceiling first, and what the migration path looks like when you genuinely outgrow it.
9.1 Vertical Scaling
Vertical scaling means increasing the resources available to the single instance: more CPU cores, more RAM, faster disk I/O. It is the first scaling lever and the cheapest to pull — no code changes, no architectural changes, no operational complexity increase.
CPU Scaling
FastAPI with asyncpg is an I/O-bound system. The application spends most of its time waiting — for database responses, for network I/O, for external API calls. It is not burning CPU during that wait. This means:
A single CPU core running an async event loop can handle a very large number of concurrent requests as long as those requests are I/O-bound. The event loop multiplexes them — while request A is waiting for a database response, the event loop serves requests B, C, and D.
Adding CPU cores does not directly improve throughput for I/O-bound work within a single async process. It does allow you to run more Gunicorn workers — each worker is an independent process with its own event loop, and each process can use one CPU core. This is why the worker count formula is (cores × 2) + 1 — you want enough workers to keep all cores occupied during I/O wait time.
2-core server → 5 workers → each worker handles ~200 concurrent connections
4-core server → 9 workers → each worker handles ~200 concurrent connections
8-core server → 17 workers → each worker handles ~200 concurrent connections
The total concurrent request capacity scales linearly with core count — but only up to the point where the database becomes the bottleneck, which it will before CPU does.
RAM Scaling
RAM consumption in a Python asyncio application is dominated by:
- The Python interpreter and loaded modules (~50–100MB per worker process)
- In-flight request state (request objects, response buffers, query results held in memory)
- Connection pool state (asyncpg maintains prepared statement caches, connection metadata)
- Background data structures (outbox worker state, scheduler job metadata)
A rough working figure: each Gunicorn worker consumes 100–200MB at idle, growing to 300–500MB under load depending on the size of query results being held in memory. On a 4-core server with 9 workers, budget 2–4GB RAM for the application alone, leaving headroom for the OS, PostgreSQL (if co-located), and PgBouncer.
The max_requests and max_requests_jitter settings in gunicorn.conf.py (covered in Section 7) limit memory growth over time by periodically recycling workers. Without this, Python processes accumulate memory due to fragmentation and unreachable cyclic references that the garbage collector does not reclaim promptly. Worker recycling is the practical solution.
Disk I/O
For the application server, disk I/O is rarely the bottleneck — the application is stateless, and reads/writes go to PostgreSQL. For PostgreSQL itself, disk I/O is frequently the bottleneck under write-heavy loads. The relevant settings:
-- postgresql.conf — adjust based on available RAM
shared_buffers = 4GB -- 25% of total RAM — PostgreSQL's own buffer pool
effective_cache_size = 12GB -- estimate of OS page cache available
work_mem = 64MB -- memory per sort/hash operation per query
maintenance_work_mem = 1GB -- memory for VACUUM, CREATE INDEX, etc.
wal_buffers = 64MB -- write-ahead log buffer
checkpoint_completion_target = 0.9
random_page_cost = 1.1 -- for SSD storage — lowers this from default 4.0
random_page_cost = 1.1 tells the query planner that random disk reads are nearly as fast as sequential reads — true for SSDs, false for spinning disks. Setting this correctly causes the planner to prefer index scans over sequential scans more aggressively, which is the correct behaviour for SSD-backed cloud databases.
9.2 Horizontal Scaling
Horizontal scaling means adding more instances of the application server. Multiple containers running the same image, all connected to the same PostgreSQL database via PgBouncer, behind a load balancer.
Stateless by Design
FastAPI with asyncpg is stateless by design — there is no shared mutable state between request handlers within a process, and no state that persists between processes. Each request acquires a database connection, executes its logic, returns a response, and releases the connection. Nothing is left over.
This means horizontal scaling requires zero application code changes. You add a container, it registers with the load balancer, and it begins serving traffic immediately. No session affinity required. No distributed cache coordination required at the application layer.
[Load Balancer]
Round Robin / Least Connections
/ | \
[API-1] [API-2] [API-3]
4 workers 4 workers 4 workers
\ | /
[PgBouncer]
max_client_conn=500
pool_size=25
|
[PostgreSQL]
Adding API-4 requires: pulling the image, starting the container, registering with the load balancer. The database sees the connection count increase by at most workers × pool_size_per_worker — which PgBouncer absorbs, presenting a constant pool of 25 connections to PostgreSQL regardless.
Load Balancer Configuration
# nginx.conf — upstream configuration for horizontal scaling
upstream kukufiti_api {
least_conn; # route to worker with fewest active connections
server api_1:8000 max_fails=3 fail_timeout=30s;
server api_2:8000 max_fails=3 fail_timeout=30s;
server api_3:8000 max_fails=3 fail_timeout=30s;
keepalive 32; # maintain persistent connections to backends
}
server {
listen 80;
server_name api.kukufiti.co.ke;
location / {
proxy_pass http://kukufiti_api;
proxy_http_version 1.1;
proxy_set_header Connection ""; # required for keepalive upstream
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Correlation-ID $request_id;
proxy_connect_timeout 5s;
proxy_read_timeout 30s;
proxy_send_timeout 30s;
}
location /health {
proxy_pass http://kukufiti_api/health;
access_log off; # don't log health check noise
}
}
least_conn distributes requests to the backend with the fewest active connections. This is superior to round-robin for a backend where request processing times vary — a slow database query on one worker doesn't cause subsequent requests to pile up behind it.
Graceful Shutdown and Rolling Deploys
Horizontal scaling enables zero-downtime deploys. The process:
1. Pull new image on API-1
2. Signal API-1 to stop accepting new connections (SIGTERM)
3. API-1 completes in-flight requests (graceful_timeout = 30s)
4. API-1 stops — load balancer routes all traffic to API-2 and API-3
5. Start new API-1 with new image
6. API-1 passes health check — load balancer re-adds it
7. Repeat for API-2, then API-3
Gunicorn handles SIGTERM correctly — it stops accepting new connections and waits for in-flight requests to complete before shutting down. The graceful_timeout = 30s in gunicorn.conf.py is the window for in-flight requests to finish. Any request still running after 30 seconds is forcibly terminated.
# main.py — lifespan handles graceful shutdown at the application level
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.pool = await asyncpg.create_pool(dsn=settings.database_url, ...)
logger.info("Application started — pool ready")
yield
# Shutdown — runs when SIGTERM received
logger.info("Shutdown signal received — closing connection pool")
await app.state.pool.close()
logger.info("Connection pool closed — shutdown complete")
The pool close on shutdown waits for all active connections to be returned before closing. In-flight database operations complete normally. Connections are not dropped mid-query.
9.3 Bottlenecks — What Hits the Ceiling First
Horizontal scaling of the application layer is nearly unlimited. The bottleneck is always the database. Understanding the database bottleneck in detail is what separates engineers who can scale systems from engineers who can add containers.
Bottleneck 1 — Write Throughput
PostgreSQL is a single-master database. All writes go to one server. Write throughput is bounded by:
- WAL (Write-Ahead Log) write speed — sequential writes to disk
- Lock contention on hot rows
- Checkpoint frequency and I/O impact
A well-configured PostgreSQL 16 instance on modern cloud storage (NVMe SSD) handles approximately 5,000–15,000 simple write transactions per second. This ceiling is significantly higher than most applications reach before other concerns dominate.
For a modular monolith at early-to-mid scale, write throughput is not the binding constraint. The binding constraint is usually query complexity or lock contention on specific tables.
Lock contention on hot rows: If every order creation updates a single inventory.stock_levels row for a popular product, that row becomes a hot spot. Every concurrent order creation blocks on the same lock:
-- Every concurrent order update blocks here
UPDATE inventory.stock_levels
SET reserved_quantity = reserved_quantity + $2
WHERE product_id = $1;
The solution is not to switch to microservices. The solution is to design around the contention:
-- Append-only reservation log — no hot row, no contention
INSERT INTO inventory.reservations
(id, product_id, quantity, order_id, created_at)
VALUES ($1, $2, $3, $4, NOW());
-- Current stock level computed from log — read at query time
SELECT
p.total_quantity
- COALESCE(SUM(r.quantity) FILTER (WHERE r.status = 'active'), 0) AS available
FROM inventory.products p
LEFT JOIN inventory.reservations r ON r.product_id = p.id
WHERE p.id = $1
GROUP BY p.total_quantity;
Append-only writes never contend with each other — each write targets a new row with its own row lock. The available stock calculation is a read that does not block writes.
Bottleneck 2 — Read Throughput and Query Performance
Reads scale more easily than writes. PostgreSQL supports read replicas — secondary servers that replicate the primary's WAL and serve read-only queries. Read-heavy workloads distribute across replicas while writes continue to the primary.
# core/database.py — separate pools for read and write
class DatabaseManager:
def __init__(self, write_pool: asyncpg.Pool, read_pool: asyncpg.Pool):
self.write = write_pool # primary — all writes, transactional reads
self.read = read_pool # replica — non-transactional reads
# core/dependencies.py
async def get_write_conn(request: Request) -> asyncpg.Connection:
async with request.app.state.write_pool.acquire() as conn:
yield conn
async def get_read_conn(request: Request) -> asyncpg.Connection:
async with request.app.state.read_pool.acquire() as conn:
yield conn
# modules/orders/repository.py
class OrderRepository:
def __init__(
self,
write_conn: asyncpg.Connection,
read_conn: asyncpg.Connection,
):
self._write = write_conn
self._read = read_conn
async def create_order(self, ...) -> Order:
# Writes always go to primary
await self._write.execute("INSERT INTO orders.orders ...")
async def list_orders_for_dashboard(self, ...) -> list[Order]:
# Dashboard reads can tolerate replica lag — goes to replica
rows = await self._read.fetch("SELECT ... FROM orders.orders ...")
return [Order.from_row(row) for row in rows]
async def find_by_id_for_update(self, order_id: UUID) -> Order:
# Reads inside a transaction must go to primary — replica lag
# would cause stale reads that invalidate business logic
return await self._write.fetchrow(
"SELECT ... FROM orders.orders WHERE id = $1 FOR UPDATE", order_id
)
Replica lag awareness: Replica replication is asynchronous. There is a window — typically milliseconds to seconds — where the replica has not yet applied the latest writes from the primary. A read immediately after a write may not see the write if it goes to the replica. The rule: reads that must see the most recent writes (post-write reads in the same request, reads inside transactions) go to the primary. Reads that tolerate slight staleness (reports, dashboard aggregations, list views) go to the replica.
Bottleneck 3 — Connection Count
Covered extensively in Section 7.6. The summary:
Without PgBouncer:
N API instances × W workers × P pool connections = total PostgreSQL connections
At scale, this exceeds PostgreSQL's max_connections
With PgBouncer (transaction mode):
PostgreSQL sees pool_size connections regardless of N, W, or P
Total PostgreSQL connections = constant and predictable
PgBouncer is not optional at scale. It is the mechanism that makes horizontal application scaling compatible with a single PostgreSQL instance.
Bottleneck 4 — Slow Queries
A single slow query affects the entire system more than its apparent share of traffic suggests. A query that takes 500ms holds its connection for 500ms. At high concurrency, slow queries exhaust the connection pool and cause queuing for all subsequent requests — including fast ones.
The diagnostic tools:
-- pg_stat_statements: requires the extension to be enabled
-- Shows query statistics across all executions since last reset
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
SELECT
LEFT(query, 80) AS query_preview,
calls,
ROUND(mean_exec_time::numeric, 2) AS avg_ms,
ROUND(total_exec_time::numeric, 2) AS total_ms,
ROUND(stddev_exec_time::numeric, 2) AS stddev_ms,
rows / NULLIF(calls, 0) AS avg_rows
FROM pg_stat_statements
WHERE mean_exec_time > 100 -- queries averaging over 100ms
ORDER BY mean_exec_time DESC
LIMIT 20;
-- pg_stat_activity: what is running right now
SELECT
pid,
now() - pg_stat_activity.query_start AS duration,
query,
state,
wait_event_type,
wait_event
FROM pg_stat_activity
WHERE state != 'idle'
AND query_start < now() - interval '5 seconds' -- running longer than 5s
ORDER BY duration DESC;
-- Unused indexes: storage cost with no query benefit
SELECT
schemaname,
tablename,
indexname,
idx_scan, -- 0 = never used
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY pg_relation_size(indexrelid) DESC;
9.4 Application-Layer Caching
Before reaching for additional database infrastructure, apply caching at the appropriate layers. Caching is a horizontal scaling multiplier — it reduces database load, which extends the ceiling without infrastructure cost.
# core/cache.py
import json
from typing import Any
import redis.asyncio as redis
from app.core.config import settings
class Cache:
def __init__(self, client: redis.Redis):
self._client = client
async def get(self, key: str) -> Any | None:
value = await self._client.get(key)
if value is None:
return None
return json.loads(value)
async def set(self, key: str, value: Any, ttl_seconds: int) -> None:
await self._client.set(
key,
json.dumps(value, default=str),
ex=ttl_seconds,
)
async def delete(self, key: str) -> None:
await self._client.delete(key)
async def delete_pattern(self, pattern: str) -> None:
"""Invalidate all keys matching a pattern."""
keys = await self._client.keys(pattern)
if keys:
await self._client.delete(*keys)
# modules/inventory/service.py — caching product snapshots
class InventoryService:
async def get_product_snapshots(
self,
product_ids: list[UUID],
) -> dict[UUID, ProductSnapshot]:
result = {}
uncached_ids = []
# Check cache first — batch get
pipe = self._cache._client.pipeline()
for pid in product_ids:
pipe.get(f"product_snapshot:{pid}")
cached_values = await pipe.execute()
for pid, cached in zip(product_ids, cached_values):
if cached:
result[pid] = ProductSnapshot.model_validate_json(cached)
else:
uncached_ids.append(pid)
# Fetch uncached from database
if uncached_ids:
snapshots = await self._repo.get_product_snapshots(uncached_ids)
# Write to cache
pipe = self._cache._client.pipeline()
for snapshot in snapshots.values():
pipe.set(
f"product_snapshot:{snapshot.product_id}",
snapshot.model_dump_json(),
ex=300, # 5-minute TTL
)
await pipe.execute()
result.update(snapshots)
return result
async def update_product(self, product_id: UUID, ...) -> Product:
product = await self._repo.update_product(product_id, ...)
# Invalidate cache on write
await self._cache.delete(f"product_snapshot:{product_id}")
return product
Cache what changes infrequently and is read frequently:
- Product names and prices (invalidate on product update)
- User identity data (invalidate on user update)
- Farm details (invalidate on farm update)
- Aggregated dashboard figures (invalidate on a schedule — TTL based)
Do not cache:
- Financial balances (must always be current — no staleness acceptable)
- Order status (changes frequently — cache hit rate too low to justify complexity)
- Any data where stale reads produce incorrect business decisions
9.5 When the Modular Monolith Breaks
This is the most important subsection. Knowing when your architecture has reached its ceiling is as important as knowing how to build it. The signals are specific and observable. Do not migrate prematurely — the operational cost is real. Do not migrate too late — the coupling makes extraction painful.
Signal 1 — Team Size and Deployment Contention
The modular monolith works excellently with a single team of up to approximately 8–12 engineers. As the team grows beyond this, deployment contention emerges: multiple sub-teams want to deploy their module independently, but a single artifact means a deployment of any module is a deployment of all modules.
The symptom: a team's deployment is blocked because another team's code introduced a bug that is discovered in staging. Both teams' releases are held until the bug is fixed, even though their changes are entirely independent.
When this pattern becomes the dominant scheduling problem — not occasionally but structurally — you have a team topology argument for service extraction. The right question is not "can we make the modular monolith work?" but "is the coordination overhead of shared deployment costing more than the operational overhead of separate deployment?"
At 8–12 engineers on a single codebase with disciplined module boundaries and automated testing, this point is typically not reached. At 20+ engineers across multiple product streams, it frequently is.
Signal 2 — Uneven Module Scaling Requirements
The modular monolith scales as a unit. All modules receive the same resources. If one module requires dramatically more compute than the others, you either over-provision all modules to accommodate the demanding one, or the demanding module starves its co-located siblings.
Concrete examples:
- A video processing module that encodes footage is CPU-bound. Running it alongside lightweight CRUD modules wastes CPU allocation on the CRUD modules and throttles encoding throughput.
- A real-time WebSocket module that maintains thousands of persistent connections needs more memory and different concurrency characteristics than batch processing modules.
- A reporting module that runs expensive aggregation queries over millions of rows competes for database connections with latency-sensitive transactional modules.
The test: if you find yourself setting Gunicorn worker counts based on the requirements of one module that are inappropriate for all other modules, that module is a candidate for extraction.
Signal 3 — Database Write Bottleneck on a Specific Module
If one module generates the majority of writes and its write throughput is approaching PostgreSQL's ceiling, that module's database needs to scale independently — which means extracting it with its own database.
The diagnostic query:
-- Track write volume per schema over time
SELECT
schemaname,
SUM(n_tup_ins + n_tup_upd + n_tup_del) AS total_writes,
SUM(n_tup_ins) AS inserts,
SUM(n_tup_upd) AS updates,
SUM(n_tup_del) AS deletes
FROM pg_stat_user_tables
WHERE schemaname IN ('auth', 'orders', 'billing', 'inventory')
GROUP BY schemaname
ORDER BY total_writes DESC;
If one schema accounts for 80%+ of all writes and those writes are approaching throughput limits, that schema is a candidate for extraction to a service with its own database.
Signal 4 — Technology Heterogeneity Requirements
All modules in a modular monolith use the same technology stack. If a specific module's requirements are fundamentally better served by a different technology — a graph database for a social feature, a time-series database for IoT sensor data, a vector database for ML similarity search — that module cannot be served by the shared PostgreSQL instance.
This is not a performance bottleneck. It is a capability requirement. When a module needs a capability that the shared infrastructure cannot provide, extraction is the only path.
Signal 5 — Security Isolation Requirements
For regulated systems (fintech, healthcare), certain modules may require stronger isolation guarantees than a shared process provides. A module that handles payment card data subject to PCI-DSS compliance may need to run in a network-isolated environment with strict audit logging, different secret management, and independent penetration testing scope. These requirements are difficult to satisfy within a shared process.
9.6 The Scaling Ceiling in Numbers
To be concrete about what "the modular monolith scales well" actually means in production numbers:
| Metric | Realistic Ceiling (Single Server) | Realistic Ceiling (Horizontal, 3 Instances) |
|---|---|---|
| Concurrent API connections | 800–1,200 | 2,400–3,600 |
| Requests per second (simple CRUD) | 500–1,500 | 1,500–4,500 |
| Requests per second (complex queries) | 100–300 | 300–900 |
| PostgreSQL writes per second | 5,000–15,000 | Same — DB is shared |
| Database connections | 20–80 (via PgBouncer) | Same pool size |
| Engineers supported | 4–10 | 4–10 (same codebase) |
These are conservative estimates. A well-optimised system with read replicas, caching, and query optimisation can significantly exceed these figures before hitting a hard ceiling.
For context: most Kenyan fintech and agritech startups that reach Series A are operating at well under 10% of these limits. The modular monolith is not the bottleneck. Product-market fit, user acquisition, and operational execution are the bottlenecks. Premature architectural complexity is an expensive distraction from these actual challenges.
Build the modular monolith. Build it with clean boundaries. When the signals above emerge — and you will know them when they appear because they are structural and observable, not vague — extract the module that is causing the constraint. The clean boundaries you built from day one make that extraction a scheduled engineering project rather than an emergency rewrite.
Section 10: Reliability Patterns
Why Reliability Is an Architecture Concern, Not an Operations Concern
Reliability is not something you bolt onto a system after it is built. It is a set of properties that either exist in the design or don't exist at all. A system without idempotency guarantees cannot be made idempotent by adding a load balancer. A system without structured logging cannot be made observable by adding a monitoring dashboard. These properties must be designed in from the first request handler.
The patterns in this section address three distinct failure classes:
- Duplicate execution — the same operation runs more than once due to client retries, network timeouts, or message redelivery
- Transient failures — operations that fail temporarily and would succeed if retried correctly
- Invisible failures — operations that fail silently, leaving the system in an inconsistent state that is only discovered later during reconciliation
Each pattern addresses one of these classes. Together they define the difference between a system that works in a demo and a system that works at 3am when nobody is watching.
10.1 Idempotency
An operation is idempotent if executing it multiple times produces the same result as executing it once. This property is not optional for any operation that involves money, inventory, or user data — because in distributed systems, any operation may be executed more than once.
Why Duplicate Execution Happens
The client sends a payment request. The server processes it, commits the database transaction, and is about to send the response. The network connection drops. The client never receives the response. From the client's perspective, the request failed. The client retries. The server now receives the same payment request a second time. If the payment handler is not idempotent, the customer is charged twice.
This is not a theoretical scenario. It happens on every mobile network under poor signal conditions, during every rolling deploy where a connection is dropped during the Gunicorn worker restart, and every time a client implements a naive timeout-and-retry strategy.
The same problem applies to outbox event handlers. The worker delivers an event, the handler processes it, but the worker crashes before marking the outbox row as delivered. The worker restarts and delivers the event again. The handler runs again. If the handler is not idempotent, the side effect is doubled.
Idempotency Keys — Client-Supplied
For endpoints where the client must guarantee at-most-once execution, require the client to supply an idempotency key. The key is a client-generated UUID that uniquely identifies the intent to perform the operation. If the same key is seen twice, the second request returns the result of the first without re-executing the operation.
# modules/payments/router.py
from fastapi import APIRouter, Header, Depends, status
from typing import Annotated
from .schemas import InitiatePaymentRequest, PaymentResponse
from .service import PaymentService
from .dependencies import get_payment_service
router = APIRouter(prefix="/payments", tags=["payments"])
@router.post(
"/initiate",
response_model=PaymentResponse,
status_code=status.HTTP_200_OK,
)
async def initiate_payment(
payload: InitiatePaymentRequest,
idempotency_key: Annotated[str, Header(alias="Idempotency-Key")],
service: PaymentService = Depends(get_payment_service),
):
"""
Idempotency-Key header is REQUIRED for this endpoint.
Client generates a UUID, sends it with the request.
Same key → same response, no duplicate processing.
"""
return await service.initiate_payment(
order_id=payload.order_id,
amount=payload.amount,
idempotency_key=idempotency_key,
)
# modules/payments/service.py
import json
from uuid import UUID
from decimal import Decimal
from .repository import PaymentRepository
from .schemas import PaymentResponse
class PaymentService:
async def initiate_payment(
self,
order_id: UUID,
amount: Decimal,
idempotency_key: str,
) -> PaymentResponse:
# Check if this key has been seen before
existing = await self._repo.find_by_idempotency_key(idempotency_key)
if existing is not None:
# Return the original result — no re-execution
return PaymentResponse.model_validate(existing)
# Not seen before — process and store
async with self._uow:
# Double-checked locking — race condition protection
# Another concurrent request with the same key may have just committed
existing = await self._repo.find_by_idempotency_key_for_update(
idempotency_key
)
if existing is not None:
return PaymentResponse.model_validate(existing)
# Initiate with M-Pesa (outside transaction — external I/O)
# This call happens before acquiring the write lock
# If it fails, nothing was written — safe to retry
mpesa_response = await self._mpesa_client.initiate_stk_push(
amount=amount,
reference=str(order_id),
)
payment = await self._repo.create_payment(
order_id=order_id,
amount=amount,
checkout_request_id=mpesa_response.checkout_request_id,
idempotency_key=idempotency_key,
status="pending",
)
await self._outbox.enqueue(
PaymentInitiatedEvent(
payment_id=payment.id,
order_id=order_id,
amount=amount,
)
)
return PaymentResponse.model_validate(payment)
-- payments schema — idempotency key stored with the record
CREATE TABLE billing.payments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL,
amount NUMERIC(12,2) NOT NULL,
checkout_request_id TEXT,
status TEXT NOT NULL DEFAULT 'pending',
idempotency_key TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Unique constraint enforces idempotency at the database level
-- Even if two concurrent requests slip past the application check,
-- only one INSERT succeeds — the other gets a UniqueViolationError
CONSTRAINT uq_payments_idempotency_key UNIQUE (idempotency_key)
);
CREATE INDEX idx_payments_idempotency_key
ON billing.payments(idempotency_key);
The database-level unique constraint is the final safety net. Application-level checks can be bypassed by race conditions under high concurrency. The unique constraint cannot be bypassed — the database enforces it atomically.
Server-Generated Idempotency — Internal Operations
For operations that do not involve a client-supplied key — outbox event handlers, background jobs, scheduled tasks — idempotency is enforced by using the event ID or job ID as the natural idempotency key:
# modules/billing/event_handlers.py
import asyncpg
from uuid import UUID
from decimal import Decimal
from app.modules.billing.repository import BillingRepository
async def handle_order_placed_billing(
payload: dict,
repo: BillingRepository,
) -> None:
order_id = UUID(payload["order_id"])
amount = Decimal(str(payload["total_amount"]))
user_id = UUID(payload["user_id"])
# order_id is the natural idempotency key for this billing record
# ON CONFLICT DO NOTHING: if a billing record for this order already exists,
# the handler silently succeeds — safe to call any number of times
await repo.create_billing_record_if_not_exists(
order_id=order_id,
user_id=user_id,
amount=amount,
)
# modules/billing/repository.py
async def create_billing_record_if_not_exists(
self,
order_id: UUID,
user_id: UUID,
amount: Decimal,
) -> None:
await self._conn.execute(
"""
INSERT INTO billing.transactions
(id, order_id, user_id, amount, status, created_at)
VALUES ($1, $2, $3, $4, 'pending', NOW())
ON CONFLICT (order_id) DO NOTHING
""",
uuid4(), order_id, user_id, amount,
)
-- Unique constraint on order_id makes the handler idempotent
ALTER TABLE billing.transactions
ADD CONSTRAINT uq_transactions_order_id UNIQUE (order_id);
Every event handler must have this pattern. The question to ask for every handler: "If this handler runs twice with the same payload, does the second execution change the state of the system?" If the answer is yes, the handler is not idempotent and must be fixed.
Idempotency for State Transitions
State machines require additional care. A transition that has already occurred must not be re-applied:
# modules/orders/service.py
async def confirm_order(self, order_id: UUID) -> Order:
async with self._uow:
order = await self._repo.find_by_id_for_update(order_id)
if order is None:
raise OrderNotFoundError(order_id)
# Idempotent state transition — already confirmed is success, not error
if order.status == "confirmed":
return order # Already in target state — return without side effects
# Invalid transitions are errors, not idempotent
if order.status not in ("pending",):
raise InvalidOrderTransitionError(
order_id=order_id,
current_status=order.status,
attempted_status="confirmed",
)
confirmed_order = await self._repo.update_status(order_id, "confirmed")
await self._outbox.enqueue(OrderConfirmedEvent(order_id=order_id))
return confirmed_order
The rule: reaching the target state is always success. Attempting a transition from an incompatible state is always an error. The distinction matters — pending → confirmed twice is idempotent. cancelled → confirmed is a domain logic error.
10.2 Retry Handling
Retrying is what transforms a transient failure into a non-event. Without retry logic, a network hiccup to an external API, a brief database connection interruption, or a momentary resource exhaustion causes a user-visible error that should never have reached the user.
Retry logic must be:
- Bounded — maximum attempts with a hard ceiling
- Backed off — increasing delay between attempts
- Jittered — randomized delay to prevent thundering herd
- Targeted — only retry errors that are actually transient
Retry Decorator
# core/retry.py
import asyncio
import logging
import random
from functools import wraps
from typing import Callable, Awaitable, TypeVar, Type
logger = logging.getLogger(__name__)
T = TypeVar("T")
def with_retry(
max_attempts: int = 3,
base_delay_seconds: float = 1.0,
max_delay_seconds: float = 30.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: tuple[Type[Exception], ...] = (Exception,),
non_retryable_exceptions: tuple[Type[Exception], ...] = (),
):
"""
Decorator for async functions that should be retried on transient failure.
Implements exponential backoff with jitter — the standard retry pattern
for distributed systems. Jitter prevents thundering herd: without it,
all callers retry at the same instant after a shared failure, creating
a new spike that causes another failure.
"""
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
return await func(*args, **kwargs)
except non_retryable_exceptions as e:
# Domain errors, validation errors — retrying won't help
raise
except retryable_exceptions as e:
last_exception = e
if attempt == max_attempts:
logger.error(
f"Function {func.__name__} failed after {max_attempts} "
f"attempts. Final error: {e}"
)
raise
# Exponential backoff with optional jitter
delay = min(
base_delay_seconds * (exponential_base ** (attempt - 1)),
max_delay_seconds,
)
if jitter:
# Add random jitter: ±25% of the delay
delay *= (0.75 + random.random() * 0.5)
logger.warning(
f"Function {func.__name__} failed on attempt {attempt}/{max_attempts}. "
f"Retrying in {delay:.2f}s. Error: {e}"
)
await asyncio.sleep(delay)
raise last_exception # unreachable but satisfies type checker
return wrapper
return decorator
# Usage — M-Pesa API calls are flaky under load
# Retry transient network errors, not business logic errors
from app.core.retry import with_retry
from app.modules.payments.exceptions import (
PaymentDeclinedError, # business error — do not retry
InvalidPhoneNumberError, # validation error — do not retry
)
class MpesaClient:
@with_retry(
max_attempts=3,
base_delay_seconds=1.0,
max_delay_seconds=10.0,
retryable_exceptions=(
aiohttp.ClientConnectionError,
aiohttp.ServerTimeoutError,
asyncio.TimeoutError,
),
non_retryable_exceptions=(
PaymentDeclinedError,
InvalidPhoneNumberError,
),
)
async def initiate_stk_push(
self,
phone: str,
amount: Decimal,
reference: str,
) -> MpesaResponse:
async with self._session.post(
url=self._stk_push_url,
json={
"BusinessShortCode": self._shortcode,
"Password": self._generate_password(),
"Timestamp": self._timestamp(),
"TransactionType": "CustomerPayBillOnline",
"Amount": int(amount),
"PartyA": phone,
"PartyB": self._shortcode,
"PhoneNumber": phone,
"CallBackURL": self._callback_url,
"AccountReference": reference,
"TransactionDesc": f"Payment for order {reference}",
},
timeout=aiohttp.ClientTimeout(total=15),
) as response:
if response.status == 200:
data = await response.json()
return MpesaResponse(
checkout_request_id=data["CheckoutRequestID"],
merchant_request_id=data["MerchantRequestID"],
)
elif response.status == 400:
data = await response.json()
raise PaymentDeclinedError(reason=data.get("errorMessage"))
else:
# 5xx — transient server error, retryable
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=(),
status=response.status,
)
Database-Level Retry — Serialization Failures
With SERIALIZABLE isolation level, PostgreSQL may reject a transaction with a serialization error if it detects a conflict. These must be retried at the application level — they are not bugs, they are the correct behaviour of the isolation level:
# core/database.py
import asyncpg
async def execute_with_serializable_retry(
conn: asyncpg.Connection,
operation: Callable,
max_attempts: int = 3,
) -> Any:
"""
Execute an operation under SERIALIZABLE isolation with automatic retry
on serialization failures. Use only for operations where SERIALIZABLE
is genuinely required — the overhead is real.
"""
for attempt in range(1, max_attempts + 1):
try:
async with conn.transaction(isolation="serializable"):
return await operation(conn)
except asyncpg.SerializationError:
if attempt == max_attempts:
raise
# Small random delay before retry — break synchronization between
# concurrent transactions that are conflicting with each other
await asyncio.sleep(random.uniform(0.01, 0.1))
Circuit Breaker — Stopping Retries When a Service Is Down
Retrying indefinitely against a service that is down amplifies the problem — each retry holds a connection and consumes resources. A circuit breaker stops retrying after a threshold of failures is reached, allows the downstream service time to recover, and then tests with a small number of probe requests before restoring normal traffic.
# core/circuit_breaker.py
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
class CircuitState(Enum):
CLOSED = "closed" # normal operation — requests pass through
OPEN = "open" # failing — requests rejected immediately
HALF_OPEN = "half_open" # testing — limited requests pass through
@dataclass
class CircuitBreaker:
name: str
failure_threshold: int = 5 # failures before opening
recovery_timeout_seconds: float = 30.0 # time in OPEN before trying HALF_OPEN
half_open_max_calls: int = 3 # probe requests in HALF_OPEN
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
_half_open_calls: int = field(default=0, init=False)
def _should_attempt(self) -> bool:
if self._state == CircuitState.CLOSED:
return True
if self._state == CircuitState.OPEN:
elapsed = time.monotonic() - self._last_failure_time
if elapsed >= self.recovery_timeout_seconds:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return True
return False
if self._state == CircuitState.HALF_OPEN:
return self._half_open_calls < self.half_open_max_calls
return False
def _record_success(self) -> None:
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
self._failure_count = 0
def _record_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
elif self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
async def call(self, operation: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
if not self._should_attempt():
raise CircuitOpenError(
f"Circuit breaker '{self.name}' is OPEN — "
f"downstream service unavailable"
)
if self._state == CircuitState.HALF_OPEN:
self._half_open_calls += 1
try:
result = await operation(*args, **kwargs)
self._record_success()
return result
except Exception as e:
self._record_failure()
raise
# modules/payments/service.py — circuit breaker wrapping M-Pesa calls
class PaymentService:
def __init__(self, ...):
self._mpesa_breaker = CircuitBreaker(
name="mpesa_daraja_api",
failure_threshold=5,
recovery_timeout_seconds=60.0,
)
async def initiate_payment(self, ...) -> PaymentResponse:
try:
mpesa_response = await self._mpesa_breaker.call(
self._mpesa_client.initiate_stk_push,
phone=phone,
amount=amount,
reference=str(order_id),
)
except CircuitOpenError:
# M-Pesa is known to be down — fail fast with a clear message
raise PaymentServiceUnavailableError(
"Payment service temporarily unavailable. Please try again in a few minutes."
)
10.3 Observability
Observability is the property that allows you to understand the internal state of a system from its external outputs — without modifying the code. The three pillars are logs, metrics, and traces. In a modular monolith, all three can be implemented without distributed infrastructure — they are structurally simpler than in microservices, but require the same discipline.
Structured Logging
Logs that are human-readable prose are useful for debugging a single request when you already know approximately what went wrong. They are useless for answering questions like: "How many failed payment initiations occurred in the last hour for users in Mombasa?"
Structured logs are JSON. Every log line is a machine-parseable event with explicit fields. A log aggregator (Loki, Elasticsearch, CloudWatch) can query them, aggregate them, and alert on them.
# core/logging.py
import logging
import json
import sys
from datetime import datetime, UTC
from typing import Any
class StructuredFormatter(logging.Formatter):
"""
Emits every log record as a JSON object on a single line.
Fields are consistent and queryable by log aggregators.
"""
RESERVED_ATTRS = frozenset({
"args", "asctime", "created", "exc_info", "exc_text", "filename",
"funcName", "levelname", "levelno", "lineno", "message", "module",
"msecs", "msg", "name", "pathname", "process", "processName",
"relativeCreated", "stack_info", "thread", "threadName",
})
def format(self, record: logging.LogRecord) -> str:
log_entry: dict[str, Any] = {
"timestamp": datetime.fromtimestamp(record.created, UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Include any extra fields passed to the logger
for key, value in record.__dict__.items():
if key not in self.RESERVED_ATTRS and not key.startswith("_"):
log_entry[key] = value
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, default=str)
def configure_logging(level: str = "INFO") -> None:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(level.upper())
root_logger.handlers = [handler]
# Suppress noisy library loggers
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("asyncpg").setLevel(logging.WARNING)
# Usage — always pass structured context as extra fields
import logging
logger = logging.getLogger(__name__)
async def create_order(self, user_id: UUID, items: list) -> Order:
logger.info(
"Order creation started",
extra={
"user_id": str(user_id),
"item_count": len(items),
"module": "orders",
}
)
try:
order = await self._repo.create_order(user_id=user_id, items=items)
logger.info(
"Order created successfully",
extra={
"order_id": str(order.id),
"user_id": str(user_id),
"total_amount": str(order.total_amount),
"module": "orders",
}
)
return order
except InsufficientStockError as e:
logger.warning(
"Order creation failed — insufficient stock",
extra={
"user_id": str(user_id),
"product_id": str(e.product_id),
"requested": e.requested,
"available": e.available,
"module": "orders",
}
)
raise
Every log line now contains queryable fields. Find all failed orders for a specific user: level=WARNING AND module=orders AND user_id=<uuid>. Find all insufficient stock failures by product: message="Order creation failed" AND product_id=<uuid>.
Correlation IDs — Tracing a Request Across Logs
A single user request may generate dozens of log lines across multiple functions, modules, and background workers. Without a correlation ID, these log lines are unconnected — you cannot reconstruct what happened for a specific request.
A correlation ID is a UUID generated at the start of each request and attached to every log line that request produces. All log lines with the same correlation ID belong to the same request.
# core/middleware.py
import uuid
import logging
from contextvars import ContextVar
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
# Context variable — per-async-task storage, safe for asyncio
correlation_id_ctx: ContextVar[str] = ContextVar(
"correlation_id",
default="no-correlation-id",
)
class CorrelationIDMiddleware(BaseHTTPMiddleware):
"""
Injects a correlation ID into every request.
Sources it from the incoming header if present (allows tracing
across services when the client or upstream proxy sets the ID).
Falls back to a generated UUID.
"""
HEADER_NAME = "X-Correlation-ID"
async def dispatch(self, request: Request, call_next) -> Response:
correlation_id = (
request.headers.get(self.HEADER_NAME)
or str(uuid.uuid4())
)
# Store in context var — accessible anywhere in this request's async chain
token = correlation_id_ctx.set(correlation_id)
# Propagate to response headers — client can correlate with server logs
response = await call_next(request)
response.headers[self.HEADER_NAME] = correlation_id
correlation_id_ctx.reset(token)
return response
# core/logging.py — inject correlation ID into every log record
class CorrelationIDFilter(logging.Filter):
"""
Adds the current request's correlation ID to every log record.
Because it reads from a ContextVar, it correctly resolves
the correlation ID for the current async task without any
explicit passing of the ID through function arguments.
"""
def filter(self, record: logging.LogRecord) -> bool:
record.correlation_id = correlation_id_ctx.get("no-correlation-id")
return True
def configure_logging(level: str = "INFO") -> None:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
handler.addFilter(CorrelationIDFilter()) # attach to handler
root_logger = logging.getLogger()
root_logger.setLevel(level.upper())
root_logger.handlers = [handler]
Now every log line automatically includes "correlation_id": "550e8400-e29b-41d4-a716-446655440000" without any explicit passing. A support ticket about a failed order includes the correlation ID from the response header. You query your log aggregator for that ID and see every log line from every function in that request's execution, in order.
Request Timing Middleware
# core/middleware.py
import time
import logging
logger = logging.getLogger("app.request")
class RequestTimingMiddleware(BaseHTTPMiddleware):
"""
Logs every request with method, path, status code, and duration.
Structured format enables latency queries by endpoint.
"""
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.monotonic()
response = await call_next(request)
duration_ms = (time.monotonic() - start_time) * 1000
logger.info(
"HTTP request completed",
extra={
"http_method": request.method,
"http_path": request.url.path,
"http_status": response.status_code,
"duration_ms": round(duration_ms, 2),
"correlation_id": correlation_id_ctx.get(),
"client_ip": request.client.host if request.client else None,
"user_agent": request.headers.get("User-Agent"),
}
)
# Warn on slow requests
if duration_ms > 1000:
logger.warning(
"Slow request detected",
extra={
"http_method": request.method,
"http_path": request.url.path,
"duration_ms": round(duration_ms, 2),
"correlation_id": correlation_id_ctx.get(),
}
)
return response
Health and Metrics Endpoint
# modules/health/router.py
import asyncpg
import time
from fastapi import APIRouter, Request, Response
from app.core.database import get_connection
from fastapi import Depends
router = APIRouter(tags=["observability"])
@router.get("/health")
async def health_check(request: Request):
"""
Liveness + readiness check in one endpoint.
Returns 200 if the system is healthy and ready to serve traffic.
Returns 503 if any critical dependency is unavailable.
"""
checks = {}
healthy = True
# Database check
try:
start = time.monotonic()
async with request.app.state.pool.acquire() as conn:
await conn.fetchval("SELECT 1")
checks["database"] = {
"status": "healthy",
"latency_ms": round((time.monotonic() - start) * 1000, 2),
}
except Exception as e:
healthy = False
checks["database"] = {"status": "unhealthy", "error": str(e)}
# PgBouncer check (optional — separate pool check)
try:
pool_stats = request.app.state.pool.get_size()
checks["connection_pool"] = {
"status": "healthy",
"pool_size": pool_stats,
}
except Exception as e:
checks["connection_pool"] = {"status": "unknown", "error": str(e)}
status_code = 200 if healthy else 503
return Response(
content=json.dumps({
"status": "healthy" if healthy else "unhealthy",
"checks": checks,
"version": settings.app_version,
}),
status_code=status_code,
media_type="application/json",
)
@router.get("/metrics")
async def metrics(request: Request):
"""
Lightweight metrics endpoint for monitoring systems.
For production, replace with Prometheus client library.
"""
pool = request.app.state.pool
return {
"pool_size": pool.get_size(),
"pool_min_size": pool.get_min_size(),
"pool_max_size": pool.get_max_size(),
"pool_idle_connections": pool.get_idle_size(),
}
Tracing Across Module Calls
In a modular monolith, all modules run in the same process. Tracing across module calls is simply a matter of propagating the correlation ID through the ContextVar — which happens automatically since all code in a single request shares the same async task context.
For outbox event handlers that run in a separate worker process, the correlation ID from the originating request must be stored in the outbox payload and restored in the worker:
# core/outbox.py
async def enqueue(self, event: DomainEvent) -> None:
payload = event.__dict__.copy()
# Store the originating request's correlation ID in the event payload
# The worker restores it when processing, so all handler log lines
# are traceable back to the original request
payload["_correlation_id"] = correlation_id_ctx.get("no-correlation-id")
await self._conn.execute(
"""
INSERT INTO core.outbox (id, event_type, payload, idempotency_key)
VALUES ($1, $2, $3, $4)
ON CONFLICT (idempotency_key) DO NOTHING
""",
event.event_id,
type(event).__name__,
json.dumps(payload, default=str),
str(event.event_id),
)
# workers/outbox_processor.py
async def process_single_event(conn, row) -> None:
payload = json.loads(row["payload"])
# Restore correlation ID from the event — links worker logs to API logs
correlation_id = payload.pop("_correlation_id", str(uuid4()))
token = correlation_id_ctx.set(correlation_id)
try:
handler = EVENT_REGISTRY.get(row["event_type"])
if handler:
await handler(payload)
await mark_delivered(conn, row["id"])
finally:
correlation_id_ctx.reset(token)
Now log lines from the outbox worker carry the same correlation ID as the API request that created the event. A single query — correlation_id=<uuid> — shows the full lifecycle: the API request that created the order, and the worker that processed its billing event, all in chronological order.
Global Exception Handler
Unhandled exceptions must be caught at the application boundary, logged with full context, and translated into consistent error responses. A raw 500 with a Python traceback in the response body is a security leak and a usability failure:
# core/exception_handlers.py
import logging
import traceback
from fastapi import Request, Response
from fastapi.responses import JSONResponse
import json
logger = logging.getLogger("app.exceptions")
async def unhandled_exception_handler(
request: Request,
exc: Exception,
) -> JSONResponse:
"""
Catches any exception that was not handled by a more specific handler.
Logs the full traceback internally. Returns a safe, consistent error response.
"""
correlation_id = correlation_id_ctx.get("no-correlation-id")
logger.error(
"Unhandled exception",
extra={
"correlation_id": correlation_id,
"http_method": request.method,
"http_path": request.url.path,
"exception_type": type(exc).__name__,
"exception_message": str(exc),
"traceback": traceback.format_exc(),
}
)
return JSONResponse(
status_code=500,
content={
"error": "internal_server_error",
"message": "An unexpected error occurred. Please try again.",
"correlation_id": correlation_id, # client uses this in support tickets
}
)
async def domain_exception_handler(
request: Request,
exc: DomainException,
) -> JSONResponse:
"""Handles known domain exceptions with appropriate HTTP status codes."""
correlation_id = correlation_id_ctx.get("no-correlation-id")
logger.warning(
f"Domain exception: {type(exc).__name__}",
extra={
"correlation_id": correlation_id,
"exception_type": type(exc).__name__,
"http_path": request.url.path,
**exc.context, # domain-specific context fields
}
)
return JSONResponse(
status_code=exc.http_status_code,
content={
"error": exc.error_code,
"message": exc.message,
"correlation_id": correlation_id,
}
)
# main.py — register exception handlers
from app.core.exception_handlers import (
unhandled_exception_handler,
domain_exception_handler,
)
from app.core.exceptions import DomainException
app.add_exception_handler(Exception, unhandled_exception_handler)
app.add_exception_handler(DomainException, domain_exception_handler)
Putting It Together — What a Production Log Stream Looks Like
{"timestamp":"2026-05-11T08:14:22.341Z","level":"INFO","logger":"app.request","message":"HTTP request completed","http_method":"POST","http_path":"/api/v1/orders/","http_status":201,"duration_ms":47.3,"correlation_id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890"}
{"timestamp":"2026-05-11T08:14:22.289Z","level":"INFO","logger":"app.modules.orders.service","message":"Order creation started","user_id":"550e8400-e29b-41d4-a716-446655440000","item_count":3,"module":"orders","correlation_id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890"}
{"timestamp":"2026-05-11T08:14:22.331Z","level":"INFO","logger":"app.modules.orders.service","message":"Order created successfully","order_id":"f47ac10b-58cc-4372-a567-0e02b2c3d479","user_id":"550e8400-e29b-41d4-a716-446655440000","total_amount":"2500.00","module":"orders","correlation_id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890"}
{"timestamp":"2026-05-11T08:14:24.103Z","level":"INFO","logger":"app.workers.outbox","message":"Delivered event OrderPlacedEvent","event_id":"f47ac10b-58cc-4372-a567-0e02b2c3d479","correlation_id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890"}
Every line has correlation_id. The last line is from the outbox worker — a separate process — but carries the same correlation ID as the API request. The entire lifecycle of the request, including asynchronous side effects, is reconstructible from a single log query.
The Reliability Stack — All Layers Together
Request arrives
│
▼
[CorrelationIDMiddleware] ← assign/propagate correlation ID
│
▼
[RequestTimingMiddleware] ← start timer, log on completion
│
▼
[Router] ← Pydantic validation, 422 on schema error
│
▼
[Service] ← idempotency check, business rules
│
├── [Retry decorator] ← wraps external API calls
│
├── [Circuit breaker] ← stops retrying downed services
│
└── [Unit of Work] ← atomic write + outbox enqueue
│
▼
[PostgreSQL] ← unique constraints enforce idempotency at DB level
│
▼
[Outbox Worker] ← eventual consistency, retry with backoff
│
▼
[Event Handlers] ← idempotent, ON CONFLICT DO NOTHING
Throughout:
[StructuredLogger] ← every layer logs with correlation_id + context
[ExceptionHandler] ← catches unhandled, logs traceback, safe response
Every failure class is addressed at the layer best positioned to handle it. Idempotency is enforced at the service layer and the database layer simultaneously. Transient failures are retried by the retry decorator for synchronous calls and by the outbox worker for async events. Silent failures are made visible by structured logs with correlation IDs that link the full request lifecycle.
Section 11: Migration to Microservices
The Fundamental Premise
Migration to microservices is not an upgrade. It is a trade. You trade operational simplicity for deployment independence and the ability to scale specific components in isolation. That trade is only rational when the cost of the operational complexity you are acquiring is lower than the cost of the problem you are solving.
The mistake most engineering teams make is treating microservices as an architectural destination — a more evolved state that all serious systems eventually reach. This is wrong. Microservices are a solution to specific, concrete problems: team deployment contention, uneven module scaling requirements, technology heterogeneity needs. If those problems do not exist at the scale you are operating, microservices add cost without adding value.
The correct relationship to microservices is: design as if you might need them, build as if you don't, extract precisely when you must.
This section covers what "design as if you might need them" looks like in practice, how to recognise the signals that extraction has become necessary, and the engineering mechanics of performing the extraction without disrupting a running production system.
11.1 Design for Extraction From Day One
A modular monolith designed for potential extraction looks identical to one not designed for it — the difference is entirely in the constraints you enforce on module boundaries. These constraints cost nothing at build time and are worth everything at extraction time.
The Properties That Make a Module Extractable
Property 1 — The module communicates only through its public contract.
If module A has never directly accessed module B's repository, database schema, or internal service implementations, then extracting B means: replace B's in-process implementation with an HTTP client that calls B's new service endpoint. The contract interface — IInventoryService — stays identical. The InventoryService class that used to call the database now calls an HTTP endpoint. Module A's code does not change.
If module A has been calling B's repository directly, extraction requires auditing every place that coupling exists, extracting the data those calls retrieve into B's new service API, rewriting all callers, and then testing everything. This is the expensive migration. The cheap migration is only possible if the boundaries were respected from the start.
Property 2 — The module's data is owned exclusively within its schema.
If the inventory module is the only module that reads and writes inventory.* tables, extraction means: point the inventory service at a new database seeded with a copy of the inventory.* schema. All other modules already use soft references — no cross-schema foreign keys to drop, no data dependencies to untangle.
If the billing module has been joining directly to inventory.products in reporting queries, extraction means those queries break. You must either add a reporting API to the inventory service, denormalize the data billing needs into billing's own tables, or run a migration to populate billing's read model from inventory's data before the cutover. Expensive, risky, requires a production migration window.
Property 3 — Cross-module side effects are event-driven.
If the orders module notifies inventory by publishing an OrderPlacedEvent to the outbox, extraction means: replace the in-process outbox worker with a message broker consumer that calls the inventory service's endpoint. The ordering module's event publication code does not change.
If the orders module calls inventory_service.adjust_stock() directly in-process, extraction means adding an HTTP call where there was a function call. This is manageable but requires wrapping the call in proper resilience patterns — timeout, retry, circuit breaker — that were unnecessary in-process.
Property 4 — The module has no shared mutable state with other modules.
No global variables, no class-level state, no shared caches that span module boundaries. In-process caches must be module-scoped. If two modules share a Redis key namespace by convention, extraction requires either splitting the Redis instance or establishing ownership of each key prefix.
What to Build Into Every Module That Ensures Extractability
# modules/inventory/__init__.py
# The public contract is the only surface another module touches.
# This is also the surface the extracted service will expose via HTTP.
from .contracts import (
IInventoryService, # becomes the HTTP client interface after extraction
ProductSnapshot, # becomes the response DTO of the inventory API
StockReservationRequest, # becomes the request DTO
StockReservationResult, # becomes the response DTO
)
from .exceptions import (
InsufficientStockError, # becomes a 4xx response with error code
ProductNotFoundError,
)
from .router import router
__all__ = [
"IInventoryService",
"ProductSnapshot",
"StockReservationRequest",
"StockReservationResult",
"InsufficientStockError",
"ProductNotFoundError",
"router",
]
The contract defined here is the API specification of the future microservice. Every method on IInventoryService becomes an endpoint. Every DTO becomes a request or response schema. The work of defining the service API is already done — it was done when the module contract was written.
The Adapter Pattern — Pre-Wiring for Extraction
Write the HTTP adapter for the contract before you need it. Keep it dormant. When extraction happens, swap the implementation via dependency injection with zero changes to calling modules:
# modules/inventory/adapters/http_adapter.py
"""
HTTP adapter for IInventoryService.
Used when inventory is extracted to its own service.
Dormant until extraction — the in-process implementation is used until then.
Keeps the same interface as the in-process service.
"""
import httpx
from uuid import UUID
from .contracts import IInventoryService, ProductSnapshot, StockReservationResult
class InventoryHTTPAdapter(IInventoryService):
"""
Calls the inventory microservice over HTTP.
Drop-in replacement for the in-process InventoryService.
"""
def __init__(self, base_url: str, timeout: float = 5.0):
self._client = httpx.AsyncClient(
base_url=base_url,
timeout=timeout,
headers={"Content-Type": "application/json"},
)
async def get_product_snapshots(
self,
product_ids: list[UUID],
) -> dict[UUID, ProductSnapshot]:
response = await self._client.post(
"/internal/products/snapshots",
json={"product_ids": [str(pid) for pid in product_ids]},
)
response.raise_for_status()
data = response.json()
return {
UUID(k): ProductSnapshot.model_validate(v)
for k, v in data["snapshots"].items()
}
async def reserve_stock(
self,
reservation: StockReservationRequest,
) -> StockReservationResult:
response = await self._client.post(
"/internal/reservations",
json=reservation.model_dump(mode="json"),
)
if response.status_code == 409:
raise InsufficientStockError(
product_id=UUID(response.json()["product_id"]),
requested=response.json()["requested"],
available=response.json()["available"],
)
response.raise_for_status()
return StockReservationResult.model_validate(response.json())
async def aclose(self):
await self._client.aclose()
# core/config.py
class Settings(BaseSettings):
use_inventory_service: bool = False # False = in-process, True = HTTP
inventory_service_url: str = ""
# modules/inventory/dependencies.py
from app.core.config import settings
def get_inventory_service() -> IInventoryService:
if settings.use_inventory_service:
# Use the HTTP adapter — inventory has been extracted
return InventoryHTTPAdapter(base_url=settings.inventory_service_url)
else:
# Use the in-process implementation — still in the monolith
return InventoryService(...)
Extraction becomes a configuration change: USE_INVENTORY_SERVICE=true, INVENTORY_SERVICE_URL=https://inventory.internal.kukufiti.co.ke. No code changes in any calling module.
11.2 Signals That Extraction Is Necessary
Section 9.5 covered the signals from a scaling perspective. Here they are examined from the extraction decision perspective — what you observe, how you verify it is the cause, and what extraction specifically solves.
Signal 1 — Deployment Contention Is the Primary Engineering Scheduling Problem
Observation: Sprint planning regularly involves conversations about whose code deploys first. QA cycles for one team's feature are blocked by another team's failing tests. A hotfix for a billing bug requires a full regression pass of the order management features.
Verification: Track deployment frequency and reasons for deployment blocks over 4 weeks. If more than 20% of deployment delays are caused by code from a different module than the one being deployed, you have structural contention.
What extraction solves: The billing team deploys the billing service independently. The orders team deploys the orders service independently. Neither can block the other.
What extraction does not solve: Technical debt within a module, poor test coverage, slow CI pipelines. These problems follow the code into the microservice.
Signal 2 — A Specific Module Has Resource Requirements an Order of Magnitude Different From Others
Observation: The video processing module (farm surveillance footage analysis) runs at 90% CPU for extended periods. The orders API runs at 8% CPU. Adding CPU capacity for video processing means allocating that CPU to all modules — the orders API gets 10× the resources it needs.
Verification: Profile resource consumption per module over a production load period. If one module accounts for more than 60% of total CPU or memory consumption while handling less than 20% of request volume, its resource profile is incompatible with co-location.
What extraction solves: The video processing service runs on CPU-optimised instances. The rest of the application runs on general-purpose instances. Resource allocation matches actual requirements.
Signal 3 — A Module Needs a Different Technology Stack
Observation: The recommendation engine module needs to run Python with PyTorch for ML inference. The rest of the application runs FastAPI + asyncpg. Introducing a large ML framework and GPU dependencies into the application image increases build time, image size, and deployment complexity for all modules.
Verification: The dependency being considered cannot be isolated within the module without affecting the entire application container.
What extraction solves: The recommendation service runs a separate Python environment with ML dependencies. The main application image stays lean.
Signal 4 — A Module Has Compliance Requirements That Require Isolation
Observation: The payments module handles cardholder data subject to PCI-DSS. Compliance requires the cardholder data environment (CDE) to be isolated from systems that do not need to access it. Running payments in the same process as the orders API means the orders API is in scope for PCI-DSS — requiring the same compliance controls regardless of whether it touches payment data.
Verification: Compliance scope analysis from a QSA (Qualified Security Assessor). If co-location expands the compliance scope beyond what is necessary, isolation is required.
What extraction solves: The payments service runs in a network-isolated CDE. Other services call it via a defined API but are out of PCI scope.
11.3 The Strangler Fig Pattern
The Strangler Fig is the only safe way to migrate a running production system to microservices. It is named after a tree species that grows around a host tree, gradually replacing it over time while the host continues to function normally until the replacement is complete.
The alternative — the "big bang rewrite" — involves stopping development of the monolith, writing the new services in parallel, and switching all traffic over at once. This approach has a failure rate high enough that it is treated as an anti-pattern in the industry. You either never finish, or you finish with a system that has not been tested under real production load and discover all the failures simultaneously at cutover.
The Strangler Fig makes each extraction step independently deployable, independently rollbackable, and independently verifiable.
Phase 0 — Audit and Baseline (Before Any Code Change)
Before beginning extraction of any module, document exactly what you are extracting:
# Identify all cross-module calls TO the module being extracted
# This defines the API surface the new service must expose
grep -r "IInventoryService\|from app.modules.inventory" app/modules/ \
--include="*.py" \
| grep -v "app/modules/inventory/"
-- Identify all direct cross-schema reads (violations that must be resolved first)
SELECT
usename,
schemaname,
tablename,
seq_scan + idx_scan AS total_accesses
FROM pg_stat_user_tables
WHERE schemaname = 'inventory'
ORDER BY total_accesses DESC;
# Write a test that verifies the module contract is complete —
# all behaviour consumed by other modules is covered by the contract
# This becomes the acceptance test for the extracted service
class TestInventoryContractCompliance:
"""
Every method on IInventoryService must be exercised here.
This test suite runs against both the in-process implementation
and the HTTP adapter — same tests, different implementation.
"""
@pytest.fixture(params=["in_process", "http"])
def inventory_service(self, request, ...):
if request.param == "in_process":
return InventoryService(...)
else:
return InventoryHTTPAdapter(base_url=test_service_url)
async def test_get_product_snapshots_returns_all_requested(
self, inventory_service
):
...
async def test_reserve_stock_reduces_available_quantity(
self, inventory_service
):
...
These contract compliance tests run against both implementations from day one. When the extracted service is ready, the tests prove it is a valid replacement.
Phase 1 — Introduce the API Gateway / Routing Layer
Before extracting any module, introduce an API gateway or a routing proxy in front of the application. This is the mechanism that will redirect traffic from the monolith to extracted services without the client changing any URLs.
# nginx.conf — pre-extraction (all traffic to monolith)
upstream backend {
server kukufiti_api:8000;
}
server {
location /api/v1/ {
proxy_pass http://backend;
}
}
# nginx.conf — during inventory extraction (split traffic)
upstream monolith {
server kukufiti_api:8000;
}
upstream inventory_service {
server kukufiti_inventory:8001;
}
server {
# Inventory endpoints → new service
location /api/v1/products {
proxy_pass http://inventory_service;
}
location /api/v1/reservations {
proxy_pass http://inventory_service;
}
# Everything else → monolith
location /api/v1/ {
proxy_pass http://monolith;
}
}
The client — mobile app, web frontend, partner API — calls the same URLs. The gateway decides where to route each request. The monolith and the new service run simultaneously. Traffic is migrated progressively.
Phase 2 — Extract the Database First
The database extraction must happen before the service extraction. This is the most operationally risky step — it requires a live migration of data that is actively being written. Do it carefully and in stages.
Step 1 — Run dual writes. Before cutting over any reads, configure the monolith to write to both the existing inventory schema and the new inventory_service database simultaneously. The new database accumulates data in parallel with the old one:
# modules/inventory/repository.py — dual write adapter
class DualWriteInventoryRepository:
"""
Writes to both the existing schema and the new standalone database.
Reads still come from the existing schema — new DB is write-only shadow.
Used for data migration validation before full cutover.
"""
def __init__(
self,
primary: InventoryRepository, # existing schema
shadow: ExternalInventoryRepository, # new standalone database
enable_shadow: bool = False,
):
self._primary = primary
self._shadow = shadow
self._enable_shadow = enable_shadow
async def update_stock_level(self, product_id: UUID, quantity: int) -> None:
# Always write to primary
await self._primary.update_stock_level(product_id, quantity)
# Write to shadow if enabled — failures logged but not raised
# Shadow is for validation, not correctness
if self._enable_shadow:
try:
await self._shadow.update_stock_level(product_id, quantity)
except Exception as e:
logger.error(
"Shadow write failed — primary write succeeded",
extra={"product_id": str(product_id), "error": str(e)},
)
Step 2 — Validate shadow consistency. Run a reconciliation job that compares the primary and shadow databases for a period — typically 24–48 hours under real production load. Any divergence indicates a write path you missed:
# scripts/reconcile_inventory.py
async def reconcile_inventory(primary_conn, shadow_conn):
"""
Compares stock levels between primary (inventory schema in monolith DB)
and shadow (standalone inventory service DB).
Logs all discrepancies. Expected to be zero before cutover is approved.
"""
primary_levels = await primary_conn.fetch(
"SELECT product_id, quantity FROM inventory.stock_levels ORDER BY product_id"
)
shadow_levels = await shadow_conn.fetch(
"SELECT product_id, quantity FROM stock_levels ORDER BY product_id"
)
primary_map = {row["product_id"]: row["quantity"] for row in primary_levels}
shadow_map = {row["product_id"]: row["quantity"] for row in shadow_levels}
discrepancies = []
for product_id, primary_qty in primary_map.items():
shadow_qty = shadow_map.get(product_id)
if shadow_qty != primary_qty:
discrepancies.append({
"product_id": str(product_id),
"primary_quantity": primary_qty,
"shadow_quantity": shadow_qty,
})
if discrepancies:
logger.warning(
f"Inventory reconciliation: {len(discrepancies)} discrepancies found",
extra={"discrepancies": discrepancies},
)
else:
logger.info("Inventory reconciliation: zero discrepancies — ready for cutover")
return discrepancies
Step 3 — Migrate reads to the shadow. Once reconciliation runs clean for 48 hours, configure reads to come from the shadow database. Primary writes continue during this phase — the shadow is now the source of truth for reads, primary is still receiving writes for safety:
Step 4 — Stop writing to primary, complete cutover. When reads are proven stable from the shadow, disable writes to the primary schema. The inventory module now exclusively uses the standalone database. The inventory schema in the monolith database is now a historical artifact that can be archived and dropped on the next maintenance window.
Phase 3 — Deploy the Extracted Service
With the database already migrated, the service extraction itself is anti-climactic:
1. Deploy inventory service container (same codebase, different entry point)
2. Run contract compliance tests against the live service
3. Set USE_INVENTORY_SERVICE=true on one API server instance
4. Monitor for 1 hour — compare error rates and latencies
5. Roll out to remaining API server instances if metrics are clean
6. Remove inventory module code from monolith in the next sprint
# main.py of the extracted inventory service
# This is nearly identical to the inventory module's router registration
# in the monolith — just without all the other modules
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncpg
from app.core.config import settings
from app.modules.inventory import router as inventory_router
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = await asyncpg.create_pool(dsn=settings.inventory_db_url)
yield
await app.state.pool.close()
app = FastAPI(title="Kukufiti Inventory Service", lifespan=lifespan)
app.include_router(inventory_router, prefix="/api/v1")
# Internal endpoint — called by the monolith's HTTP adapter
# Not exposed to external clients
app.include_router(internal_inventory_router, prefix="/internal")
The /internal prefix convention is important. The extracted service exposes two sets of endpoints:
-
/api/v1/...— external-facing endpoints, same URLs as in the monolith, served through the API gateway -
/internal/...— service-to-service endpoints, only reachable within the internal network, not proxied externally
Other services call /internal/products/snapshots. External clients call /api/v1/products through the gateway. The distinction prevents internal APIs from being accidentally exposed.
Phase 4 — Service-to-Service Authentication
Once a module is extracted, inter-service calls are no longer in-process. They cross a network boundary and must be authenticated. A compromised service should not be able to impersonate another service:
# core/service_auth.py
"""
Service-to-service authentication via shared secrets or mTLS.
JWT-based service tokens — short-lived, scoped to specific service pairs.
"""
import jwt
from datetime import datetime, UTC, timedelta
class ServiceTokenIssuer:
def __init__(self, private_key: str, service_name: str):
self._private_key = private_key
self._service_name = service_name
def issue_token(self, target_service: str) -> str:
payload = {
"iss": self._service_name, # issuing service
"aud": target_service, # intended recipient
"iat": datetime.now(UTC),
"exp": datetime.now(UTC) + timedelta(minutes=5),
}
return jwt.encode(payload, self._private_key, algorithm="RS256")
class ServiceTokenValidator:
def __init__(self, public_keys: dict[str, str], service_name: str):
self._public_keys = public_keys # service_name → public key
self._service_name = service_name
def validate(self, token: str) -> str:
"""Returns the issuing service name if valid."""
# Decode header to get issuer without verification
header = jwt.get_unverified_header(token)
unverified = jwt.decode(token, options={"verify_signature": False})
issuer = unverified.get("iss")
public_key = self._public_keys.get(issuer)
if not public_key:
raise UnknownServiceError(issuer)
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=self._service_name,
)
return payload["iss"]
# modules/inventory/internal_router.py
from fastapi import APIRouter, Depends, Header
from typing import Annotated
internal_router = APIRouter(prefix="/internal")
async def verify_service_token(
x_service_token: Annotated[str, Header()],
) -> str:
return service_token_validator.validate(x_service_token)
@internal_router.post("/products/snapshots")
async def get_product_snapshots(
payload: ProductSnapshotRequest,
calling_service: str = Depends(verify_service_token),
):
# Audit log — which service is calling this
logger.info(
"Internal endpoint called",
extra={"endpoint": "product_snapshots", "caller": calling_service}
)
snapshots = await inventory_service.get_product_snapshots(payload.product_ids)
return {"snapshots": {str(k): v.model_dump() for k, v in snapshots.items()}}
Phase 5 — Handling Distributed Transactions After Extraction
Before extraction, the orders module and inventory module could participate in the same database transaction. After extraction, they cannot. Operations that previously committed atomically now require a distributed coordination strategy.
The two approaches:
Saga Pattern — Choreography:
Each service performs its local transaction and emits an event. Downstream services react to the event and perform their own transactions. If a downstream service fails, it emits a compensating event that triggers rollback of upstream operations.
OrderService:
1. Create order (status=pending)
2. Emit OrderCreatedEvent
InventoryService receives OrderCreatedEvent:
1. Reserve stock
2. Emit StockReservedEvent OR StockReservationFailedEvent
OrderService receives StockReservedEvent:
1. Update order status to confirmed
OrderService receives StockReservationFailedEvent:
1. Update order status to cancelled (compensation)
2. Emit OrderCancelledEvent
Saga Pattern — Orchestration:
A dedicated saga orchestrator service manages the sequence and compensation. It calls each service in turn, tracks state, and issues compensating calls on failure:
# sagas/order_placement_saga.py
class OrderPlacementSaga:
"""
Orchestrates the distributed order placement transaction across
orders service and inventory service.
Manages compensation on failure.
"""
async def execute(
self,
order_id: UUID,
user_id: UUID,
items: list,
) -> SagaResult:
completed_steps: list[str] = []
try:
# Step 1 — create order
order = await self._orders_client.create_order(
order_id=order_id, user_id=user_id, items=items
)
completed_steps.append("order_created")
# Step 2 — reserve stock
reservation = await self._inventory_client.reserve_stock(
order_id=order_id, items=items
)
completed_steps.append("stock_reserved")
# Step 3 — confirm order
confirmed = await self._orders_client.confirm_order(order_id=order_id)
completed_steps.append("order_confirmed")
return SagaResult(success=True, order_id=order_id)
except Exception as e:
logger.error(
f"Saga failed at step after: {completed_steps}",
extra={"order_id": str(order_id), "error": str(e)}
)
# Compensate completed steps in reverse order
await self._compensate(order_id, completed_steps)
raise OrderPlacementFailedError(order_id=order_id) from e
async def _compensate(self, order_id: UUID, completed_steps: list[str]) -> None:
for step in reversed(completed_steps):
try:
if step == "stock_reserved":
await self._inventory_client.release_reservation(order_id=order_id)
elif step == "order_created":
await self._orders_client.cancel_order(order_id=order_id)
except Exception as e:
# Compensation failure — requires manual intervention
logger.critical(
f"Saga compensation failed for step: {step}",
extra={"order_id": str(order_id), "error": str(e)}
)
# Alert operations team — this is a stuck saga
This is why the advice to build microservices from the start is often misguided. The saga pattern is significantly more complex than a single database transaction. It requires idempotency in every step, compensation logic for every step, a mechanism to track saga state, dead-letter handling for stuck sagas, and operational tooling to diagnose and manually resolve saga failures. All of this complexity is absent in a modular monolith where the entire operation fits in a single database transaction.
11.4 The Complete Extraction Checklist
For each module being extracted, verify all of the following before production cutover:
Boundary verification:
- [ ] No other module imports from this module's internal packages
- [ ] No other module's SQL queries reference this module's database schema directly
- [ ] All cross-module dependencies use the module's public contract interface
Contract completeness:
- [ ] All operations consumed by other modules are present on the contract interface
- [ ] All DTOs are fully defined with validation
- [ ] All domain exceptions are documented with HTTP status code mappings
- [ ] Contract compliance test suite passes against the extracted service
Database migration:
- [ ] Dual writes running and reconciliation clean for 48+ hours
- [ ] Read migration tested and validated
- [ ] Rollback procedure tested in staging
- [ ] Old schema retained (not dropped) for 30 days post-cutover
Service deployment:
- [ ] Service runs in isolation without the monolith
- [ ] Health check endpoint functional
- [ ] Internal endpoints protected by service token authentication
- [ ] External endpoints routed correctly through API gateway
- [ ] Structured logging with correlation ID propagation functional
Resilience:
- [ ] HTTP adapter has timeout, retry, and circuit breaker configured
- [ ] Distributed transaction strategy defined (saga or eventual consistency)
- [ ] Idempotency keys required on all mutating internal endpoints
- [ ] Load testing under expected peak traffic completed
Observability:
- [ ] Correlation IDs propagated across service boundary via headers
- [ ] Centralized log aggregation receiving logs from extracted service
- [ ] Alerting configured for service health, error rate, and latency
- [ ] Runbook written for the one failure mode most likely to require manual intervention
The Extraction Timeline — What Is Realistic
A single module extraction, done correctly, takes 4–8 weeks for a team of 2–3 engineers:
| Week | Work |
|---|---|
| 1 | Audit boundary violations, fix contract gaps, write compliance test suite |
| 2 | Implement and deploy the extracted service in staging, validate against compliance tests |
| 3 | Set up dual writes in production, monitor reconciliation |
| 4 | Migrate reads to new database in production, monitor for 1 week |
| 5 | Enable HTTP adapter for 5% of traffic via feature flag, monitor |
| 6 | Roll out HTTP adapter to 100% of traffic, monitor for 1 week |
| 7–8 | Remove module from monolith, clean up dead code, archive old schema |
Teams that rush this process — attempting cutover in days rather than weeks — typically discover their contract was incomplete, their compensation logic was wrong, or their replica lag was causing stale reads in the new service. The 8-week timeline is not conservatism — it is the time required to verify, under real production load, that the extracted service is a valid replacement for the in-process module.
Section 12: Enforcement Strategies
The Central Argument
Every architectural principle described in this research series is violated daily in production codebases maintained by competent engineers who understood the principles when they wrote the code. This is not a character failure. It is a systems failure.
Architectural boundaries degrade under three conditions that are universal in software teams: deadline pressure, incomplete context, and accumulated exceptions. Deadline pressure causes a developer to take the direct path — importing a concrete implementation instead of defining a new contract. Incomplete context causes a new team member to add a cross-schema query because they did not know the boundary existed. Accumulated exceptions cause a principle to erode gradually — each individual violation seems minor, but the aggregate destroys the architecture.
The solution is not better documentation, stronger code review culture, or more thorough onboarding. These are inputs that help but cannot provide guarantees. The solution is structural: make violations harder to commit than the compliant path. Ideally, make violations impossible to commit without explicitly defeating the enforcement mechanism.
This section covers three enforcement layers. Each layer catches a different class of violation. All three are required in a production system.
12.1 Import Linting — Python Boundary Enforcement
Python's import system is unrestricted by default. Any file can import from any other file in the project. The module structure you define with directories and __init__.py exports is a convention — the interpreter will happily ignore it and resolve any import that resolves to a valid path.
import-linter is a tool that enforces import rules as part of your CI pipeline. It reads a configuration that defines which imports are allowed between which packages, and fails with a descriptive error if any import violates the rules.
Installation and Configuration
pip install import-linter
# .importlinter — project root
[importlinter]
root_package = app
include_external_packages = True
# ─── Contract 1: Module isolation ─────────────────────────────────────────────
# Each module may only import from its own package, shared, and core.
# Cross-module imports are forbidden except through the contracts layer.
[importlinter:contract:module_isolation]
name = Modules must not import from each other's internal packages
type = forbidden
source_modules =
app.modules.auth
app.modules.orders
app.modules.billing
app.modules.inventory
app.modules.notifications
forbidden_modules =
app.modules.auth.service
app.modules.auth.repository
app.modules.auth.models
app.modules.orders.service
app.modules.orders.repository
app.modules.orders.models
app.modules.billing.service
app.modules.billing.repository
app.modules.billing.models
app.modules.inventory.service
app.modules.inventory.repository
app.modules.inventory.models
ignore_imports =
# Module __init__.py re-exports contracts — that import is allowed
app.modules.*.contracts -> app.modules.*.service
# ─── Contract 2: Dependency direction ──────────────────────────────────────────
# Services must not import from routers.
# Repositories must not import from services.
# Domain layer must not import from infrastructure.
[importlinter:contract:dependency_direction]
name = Dependencies flow inward — outer layers depend on inner layers only
type = layers
layers =
app.modules.*.router
app.modules.*.service
app.modules.*.repository
exhaustive = False
# ─── Contract 3: Shared package purity ─────────────────────────────────────────
# The shared package must contain only data structures and utilities.
# It must not import from any module.
[importlinter:contract:shared_purity]
name = Shared package must not import from any module
type = forbidden
source_modules =
app.shared
forbidden_modules =
app.modules
# ─── Contract 4: Core package restrictions ──────────────────────────────────────
# Core infrastructure must not import from modules.
# Modules import from core, not the other way around.
[importlinter:contract:core_independence]
name = Core package must not import from modules
type = forbidden
source_modules =
app.core
forbidden_modules =
app.modules
Running the Linter
# Check all contracts
lint-imports
# Check and show full dependency graph on failure
lint-imports --show-timings
# Example output on violation:
# ✗ Modules must not import from each other's internal packages
#
# app.modules.billing.service imports app.modules.auth.repository
# ↓
# app/modules/billing/service.py:14: from app.modules.auth.repository import UserRepository
#
# Hint: billing should use app.modules.auth.contracts.IUserService instead
CI Integration
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
lint-imports:
name: Enforce module boundaries
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install import-linter
- run: lint-imports
# Fails the pipeline if any import contract is violated
# Developer sees the exact import that caused the failure
# and which contract was violated
test:
name: Tests
needs: lint-imports # Tests only run if import contracts pass
runs-on: ubuntu-latest
steps:
- ...
The ordering matters. Import linting runs before tests. A boundary violation is caught and reported before any test execution — the developer gets a precise, actionable error immediately rather than a cryptic test failure caused by an unexpected coupling.
Custom Import Rules for Specific Patterns
import-linter supports multiple contract types. For more complex rules, write a custom contract:
# importlinter_contracts/no_cross_module_contracts_import.py
"""
Custom contract: modules may import another module's contracts package,
but only the contracts package — not any other subpackage.
Allows: from app.modules.auth.contracts import IUserService
Forbids: from app.modules.auth import UserService (concrete implementation)
Forbids: from app.modules.auth.service import UserService (internal)
"""
from importlinter.contracts import Contract, ContractCheck
from importlinter.domain import helpers
class ContractsOnlyImportContract(Contract):
"""
Enforces that cross-module imports only touch the .contracts subpackage.
"""
type_name = "contracts_only"
def check(self, graph, verbose) -> ContractCheck:
violations = []
for module in graph.modules:
if not module.name.startswith("app.modules."):
continue
module_root = ".".join(module.name.split(".")[:3]) # app.modules.X
for imported in graph.get_modules_directly_imported_by(module):
if not imported.name.startswith("app.modules."):
continue
imported_root = ".".join(imported.name.split(".")[:3]) # app.modules.Y
if imported_root == module_root:
continue # same module — allowed
# Cross-module import — must be .contracts subpackage
if not imported.name.endswith(".contracts") and \
".contracts." not in imported.name:
violations.append(
f"{module.name} imports {imported.name} "
f"(only .contracts subpackage allowed for cross-module imports)"
)
return ContractCheck(
kept=len(violations) == 0,
warnings=[],
violations=violations,
)
12.2 Architecture Tests — pytest as Enforcement
Architecture tests are pytest tests that assert structural properties of the codebase. They do not test application behaviour — they test the shape of the code. They run in CI alongside unit and integration tests and fail with descriptive messages when structure rules are violated.
Architecture tests catch violations that import linting cannot express — class hierarchy rules, naming conventions, interface implementation requirements, and dependency injection consistency.
# tests/architecture/test_module_structure.py
"""
Architecture tests — verify structural properties of the codebase.
These tests fail CI if module boundaries, naming conventions,
or structural rules are violated.
"""
import ast
import importlib
import inspect
import pkgutil
from pathlib import Path
import pytest
from app.modules.auth.contracts import IUserService
from app.modules.inventory.contracts import IInventoryService
# ─── Module Structure Tests ───────────────────────────────────────────────────
class TestModuleStructure:
"""Every module must have the required files and expose the required names."""
REQUIRED_FILES = {
"__init__.py",
"contracts.py",
"exceptions.py",
"router.py",
"service.py",
"repository.py",
"schemas.py",
"models.py",
}
MODULES_PATH = Path("app/modules")
def get_module_directories(self) -> list[Path]:
return [
d for d in self.MODULES_PATH.iterdir()
if d.is_dir() and not d.name.startswith("_")
]
@pytest.mark.parametrize(
"module_dir",
[d for d in Path("app/modules").iterdir()
if d.is_dir() and not d.name.startswith("_")],
ids=lambda d: d.name,
)
def test_module_has_required_files(self, module_dir: Path):
existing_files = {f.name for f in module_dir.iterdir() if f.is_file()}
missing = self.REQUIRED_FILES - existing_files
assert not missing, (
f"Module '{module_dir.name}' is missing required files: {missing}\n"
f"Every module must have: {self.REQUIRED_FILES}"
)
def test_every_module_exposes_router_in_init(self):
"""main.py can mount module routers — they must be exported."""
for module_dir in self.get_module_directories():
module_name = f"app.modules.{module_dir.name}"
module = importlib.import_module(module_name)
assert hasattr(module, "router"), (
f"Module '{module_dir.name}' does not export 'router' from __init__.py\n"
f"Every module must expose its FastAPI router via __all__ in __init__.py"
)
def test_module_init_does_not_export_concrete_services(self):
"""
Concrete service implementations must not appear in __init__.py exports.
Only contracts (interfaces + DTOs) should be exported.
"""
for module_dir in self.get_module_directories():
module_name = f"app.modules.{module_dir.name}"
module = importlib.import_module(module_name)
all_exports = getattr(module, "__all__", [])
for export_name in all_exports:
exported = getattr(module, export_name, None)
if exported is None:
continue
if inspect.isclass(exported):
# Concrete classes (not abstract, not Pydantic) should not be exported
is_abstract = inspect.isabstract(exported)
is_pydantic = hasattr(exported, "model_fields") # Pydantic BaseModel
is_exception = issubclass(exported, Exception)
is_router = hasattr(exported, "routes") # FastAPI router instance
if not (is_abstract or is_pydantic or is_exception):
# Could be a router instance — check
if not hasattr(exported, "routes"):
pytest.fail(
f"Module '{module_dir.name}' exports concrete class "
f"'{export_name}' from __init__.py\n"
f"Only abstract contracts, Pydantic DTOs, and exceptions "
f"should be exported. Concrete implementations are private."
)
# ─── Contract Implementation Tests ────────────────────────────────────────────
class TestContractImplementations:
"""
Every concrete service class that implements a contract interface
must implement ALL methods defined on the interface.
Catches partial implementations that would cause AttributeError at runtime.
"""
def test_user_service_implements_full_contract(self):
from app.modules.auth.service import UserService
from app.modules.auth.contracts import IUserService
contract_methods = {
name for name, _ in inspect.getmembers(IUserService, predicate=inspect.isfunction)
if not name.startswith("_")
}
implementation_methods = {
name for name, _ in inspect.getmembers(UserService, predicate=inspect.isfunction)
if not name.startswith("_")
}
missing = contract_methods - implementation_methods
assert not missing, (
f"UserService does not implement all IUserService methods.\n"
f"Missing: {missing}\n"
f"Every method on the interface must have a concrete implementation."
)
def test_inventory_service_implements_full_contract(self):
from app.modules.inventory.service import InventoryService
contract_methods = {
name for name, _ in inspect.getmembers(IInventoryService, predicate=inspect.isfunction)
if not name.startswith("_")
}
implementation_methods = {
name for name, _ in inspect.getmembers(InventoryService, predicate=inspect.isfunction)
if not name.startswith("_")
}
missing = contract_methods - implementation_methods
assert not missing, (
f"InventoryService missing methods: {missing}"
)
# ─── Naming Convention Tests ──────────────────────────────────────────────────
class TestNamingConventions:
def test_all_repository_classes_end_with_repository(self):
"""Repositories are identifiable by name — tooling and tracing depend on this."""
for module_dir in Path("app/modules").iterdir():
if not module_dir.is_dir():
continue
repo_file = module_dir / "repository.py"
if not repo_file.exists():
continue
tree = ast.parse(repo_file.read_text())
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
assert node.name.endswith("Repository"), (
f"Class '{node.name}' in {repo_file} does not end with 'Repository'.\n"
f"All repository classes must follow the naming convention: "
f"<Domain>Repository"
)
def test_all_contract_interfaces_start_with_i(self):
"""Interface naming convention: I<Name>. Distinguishes from concrete classes."""
for module_dir in Path("app/modules").iterdir():
if not module_dir.is_dir():
continue
contracts_file = module_dir / "contracts.py"
if not contracts_file.exists():
continue
tree = ast.parse(contracts_file.read_text())
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
# Check if it's an ABC subclass
base_names = [
b.id if isinstance(b, ast.Name) else
b.attr if isinstance(b, ast.Attribute) else ""
for b in node.bases
]
if "ABC" in base_names:
assert node.name.startswith("I"), (
f"Interface class '{node.name}' in {contracts_file} "
f"does not follow the 'I<Name>' naming convention.\n"
f"Abstract contract classes must be named IServiceName."
)
def test_router_files_only_contain_route_handlers(self):
"""
Routers must not contain business logic.
Detects direct database calls or complex conditionals in router functions.
Heuristic: router functions should be short — flag any over 20 lines.
"""
for module_dir in Path("app/modules").iterdir():
if not module_dir.is_dir():
continue
router_file = module_dir / "router.py"
if not router_file.exists():
continue
tree = ast.parse(router_file.read_text())
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
func_lines = node.end_lineno - node.lineno
assert func_lines <= 25, (
f"Route handler '{node.name}' in {router_file} is {func_lines} lines long.\n"
f"Route handlers should be ≤25 lines. Business logic belongs in services.\n"
f"Long route handlers typically contain misplaced business logic."
)
# ─── Outbox Compliance Tests ──────────────────────────────────────────────────
class TestOutboxCompliance:
"""
Every service method that writes to the database and produces
side effects must use the outbox, not direct calls.
Heuristic: check that service files import OutboxWriter,
not external HTTP clients directly in write paths.
"""
def test_services_do_not_import_external_http_clients_directly(self):
"""
Services should communicate via contracts and outbox events,
not by importing HTTP clients directly.
HTTP clients belong in adapters, not services.
"""
forbidden_imports = {
"httpx",
"aiohttp",
"requests",
}
for module_dir in Path("app/modules").iterdir():
if not module_dir.is_dir():
continue
service_file = module_dir / "service.py"
if not service_file.exists():
continue
tree = ast.parse(service_file.read_text())
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
assert alias.name not in forbidden_imports, (
f"Service '{service_file}' directly imports '{alias.name}'.\n"
f"Services must not import HTTP clients directly.\n"
f"Use a contract interface and inject an adapter."
)
elif isinstance(node, ast.ImportFrom):
if node.module and node.module.split(".")[0] in forbidden_imports:
pytest.fail(
f"Service '{service_file}' imports from '{node.module}'.\n"
f"Services must not import HTTP clients directly."
)
Architecture Test for Circular Dependency Detection
# tests/architecture/test_no_circular_imports.py
import importlib
import sys
from pathlib import Path
import pytest
def get_all_module_paths() -> list[str]:
"""Collect all Python module paths in the app package."""
modules = []
for path in Path("app").rglob("*.py"):
if "__pycache__" in str(path):
continue
module_path = str(path).replace("/", ".").replace(".py", "")
modules.append(module_path)
return modules
class TestNoCircularImports:
def test_all_modules_importable_without_circular_dependency(self):
"""
Attempt to import every module in the application.
Circular imports raise ImportError with a descriptive message.
Catches circular dependencies before they cause confusing runtime errors.
"""
# Reset sys.modules to ensure clean import state
original_modules = dict(sys.modules)
failed = []
for module_path in get_all_module_paths():
try:
importlib.import_module(module_path)
except ImportError as e:
if "circular" in str(e).lower() or "cannot import" in str(e).lower():
failed.append((module_path, str(e)))
except Exception:
pass # Other import errors are caught by other tests
finally:
# Restore sys.modules between attempts
sys.modules.clear()
sys.modules.update(original_modules)
assert not failed, (
f"Circular import detected in {len(failed)} module(s):\n" +
"\n".join(f" {path}: {err}" for path, err in failed)
)
12.3 Database RBAC — Infrastructure-Level Enforcement
Import linting and architecture tests enforce boundaries at the code level. They can be defeated by disabling the linting configuration or skipping the tests. Database RBAC enforces boundaries at the infrastructure level — the database engine itself rejects unauthorized access regardless of what the application code does.
This is the enforcement layer that survives deadline pressure, personnel changes, and CI pipeline failures. It is the layer you want to exist when someone says "I'll just query auth.users directly to fix this bug" — the database says no, not a reviewer.
Schema Access Matrix
Define the full access matrix before writing any SQL. Every module role, every schema, every permission level:
-- scripts/enforce_rbac.sql
-- Run this as the database superuser (postgres role)
-- Re-run whenever a new module or schema is added
-- ─── Revoke defaults ───────────────────────────────────────────────────────────
-- By default, PostgreSQL grants PUBLIC access to the public schema
-- and CREATE to all users in their own schema. Revoke these.
REVOKE ALL ON SCHEMA public FROM PUBLIC;
REVOKE CREATE ON SCHEMA public FROM PUBLIC;
-- ─── Schema ownership ──────────────────────────────────────────────────────────
-- Each schema is owned by the corresponding module role
-- This prevents any other role from modifying the schema structure
ALTER SCHEMA auth OWNER TO auth_app;
ALTER SCHEMA orders OWNER TO orders_app;
ALTER SCHEMA billing OWNER TO billing_app;
ALTER SCHEMA inventory OWNER TO inventory_app;
ALTER SCHEMA core OWNER TO core_app;
-- ─── Module role access ─────────────────────────────────────────────────────────
-- Each module role: full access to own schema, no access to others
-- auth module
GRANT USAGE ON SCHEMA auth TO auth_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA auth TO auth_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA auth TO auth_app;
ALTER DEFAULT PRIVILEGES FOR ROLE auth_app IN SCHEMA auth
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO auth_app;
ALTER DEFAULT PRIVILEGES FOR ROLE auth_app IN SCHEMA auth
GRANT USAGE, SELECT ON SEQUENCES TO auth_app;
-- auth module can write to core.outbox (to enqueue events)
GRANT USAGE ON SCHEMA core TO auth_app;
GRANT INSERT ON core.outbox TO auth_app;
-- orders module
GRANT USAGE ON SCHEMA orders TO orders_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA orders TO orders_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA orders TO orders_app;
ALTER DEFAULT PRIVILEGES FOR ROLE orders_app IN SCHEMA orders
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO orders_app;
GRANT USAGE ON SCHEMA core TO orders_app;
GRANT INSERT ON core.outbox TO orders_app;
-- billing module
GRANT USAGE ON SCHEMA billing TO billing_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA billing TO billing_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA billing TO billing_app;
ALTER DEFAULT PRIVILEGES FOR ROLE billing_app IN SCHEMA billing
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO billing_app;
GRANT USAGE ON SCHEMA core TO billing_app;
GRANT INSERT ON core.outbox TO billing_app;
-- inventory module
GRANT USAGE ON SCHEMA inventory TO inventory_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA inventory TO inventory_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA inventory TO inventory_app;
ALTER DEFAULT PRIVILEGES FOR ROLE inventory_app IN SCHEMA inventory
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO inventory_app;
GRANT USAGE ON SCHEMA core TO inventory_app;
GRANT INSERT ON core.outbox TO inventory_app;
-- core module (outbox worker, scheduler)
GRANT USAGE ON SCHEMA core TO core_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA core TO core_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA core TO core_app;
-- Outbox worker needs to READ from all schemas for event hydration if needed
-- (only if the worker queries module data — typically it does not)
-- ─── Reporting role ─────────────────────────────────────────────────────────────
-- Cross-schema SELECT only — used by analytics, dashboards, data exports
-- Never given INSERT, UPDATE, or DELETE
CREATE ROLE IF NOT EXISTS reporting_reader WITH LOGIN PASSWORD 'reporting_password';
GRANT USAGE ON SCHEMA auth, orders, billing, inventory, core TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA auth TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA orders TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA billing TO reporting_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA inventory TO reporting_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA auth GRANT SELECT ON TABLES TO reporting_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA orders GRANT SELECT ON TABLES TO reporting_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA billing GRANT SELECT ON TABLES TO reporting_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA inventory GRANT SELECT ON TABLES TO reporting_reader;
-- ─── Migration role ─────────────────────────────────────────────────────────────
-- Used exclusively by Alembic migration runner
-- Has DDL privileges (CREATE TABLE, ALTER TABLE, etc.) within its schema
-- Application roles must NOT have DDL privileges — separation of concerns
CREATE ROLE IF NOT EXISTS migrator WITH LOGIN PASSWORD 'migrator_password';
GRANT CREATE ON SCHEMA auth TO migrator;
GRANT CREATE ON SCHEMA orders TO migrator;
GRANT CREATE ON SCHEMA billing TO migrator;
GRANT CREATE ON SCHEMA inventory TO migrator;
GRANT CREATE ON SCHEMA core TO migrator;
-- Migrator can also write data (needed for data migrations)
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA auth TO migrator;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA orders TO migrator;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA billing TO migrator;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA inventory TO migrator;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA core TO migrator;
Verifying RBAC Is Correctly Configured
-- Verify what each role can access
-- Run as postgres superuser
-- Show all privileges for a specific role
SELECT
table_schema,
table_name,
privilege_type
FROM information_schema.role_table_grants
WHERE grantee = 'orders_app'
ORDER BY table_schema, table_name, privilege_type;
-- Verify a role CANNOT access a specific schema
-- Should return no rows if RBAC is correct
SELECT
table_schema,
table_name,
privilege_type
FROM information_schema.role_table_grants
WHERE grantee = 'orders_app'
AND table_schema = 'auth'; -- orders_app should have zero rows here
# tests/architecture/test_database_rbac.py
"""
Tests that verify PostgreSQL RBAC is correctly configured.
Run against the test database — same schema as production.
These tests verify that the database enforces module boundaries.
"""
import pytest
import asyncpg
class TestDatabaseRBAC:
@pytest.fixture
async def orders_conn(self, postgres_container):
"""Connection using the orders_app role."""
conn = await asyncpg.connect(
host=postgres_container.get_container_host_ip(),
port=postgres_container.get_exposed_port(5432),
database="test_db",
user="orders_app",
password="orders_dev_password",
)
yield conn
await conn.close()
async def test_orders_role_can_read_own_schema(self, orders_conn):
"""orders_app must be able to SELECT from orders schema."""
result = await orders_conn.fetchval(
"SELECT COUNT(*) FROM orders.orders"
)
assert result is not None # query succeeded
async def test_orders_role_cannot_read_auth_schema(self, orders_conn):
"""orders_app must be denied access to auth schema."""
with pytest.raises(asyncpg.InsufficientPrivilegeError) as exc_info:
await orders_conn.fetch("SELECT id FROM auth.users LIMIT 1")
assert "permission denied" in str(exc_info.value).lower()
async def test_orders_role_cannot_read_billing_schema(self, orders_conn):
with pytest.raises(asyncpg.InsufficientPrivilegeError):
await orders_conn.fetch(
"SELECT id FROM billing.billing_accounts LIMIT 1"
)
async def test_orders_role_can_write_to_outbox(self, orders_conn):
"""orders_app must be able to INSERT into core.outbox."""
import json
from uuid import uuid4
event_id = uuid4()
await orders_conn.execute(
"""
INSERT INTO core.outbox (id, event_type, payload, idempotency_key)
VALUES ($1, 'TestEvent', $2, $3)
""",
event_id,
json.dumps({"test": True}),
str(event_id),
)
# Verify it was written
row = await orders_conn.fetchrow(
"SELECT id FROM core.outbox WHERE id = $1", event_id
)
assert row is not None
async def test_orders_role_cannot_delete_from_outbox(self, orders_conn):
"""orders_app should only INSERT to outbox, not DELETE."""
with pytest.raises(asyncpg.InsufficientPrivilegeError):
await orders_conn.execute(
"DELETE FROM core.outbox WHERE id = gen_random_uuid()"
)
async def test_orders_role_has_no_ddl_privileges(self, orders_conn):
"""Application roles must not be able to alter schema structure."""
with pytest.raises(asyncpg.InsufficientPrivilegeError):
await orders_conn.execute(
"CREATE TABLE orders.unauthorized_table (id UUID PRIMARY KEY)"
)
async def test_reporting_reader_can_read_all_schemas(self, postgres_container):
conn = await asyncpg.connect(
host=postgres_container.get_container_host_ip(),
port=postgres_container.get_exposed_port(5432),
database="test_db",
user="reporting_reader",
password="reporting_password",
)
try:
await conn.fetchval("SELECT COUNT(*) FROM auth.users")
await conn.fetchval("SELECT COUNT(*) FROM orders.orders")
await conn.fetchval("SELECT COUNT(*) FROM billing.billing_accounts")
finally:
await conn.close()
async def test_reporting_reader_cannot_write(self, postgres_container):
conn = await asyncpg.connect(
host=postgres_container.get_container_host_ip(),
port=postgres_container.get_exposed_port(5432),
database="test_db",
user="reporting_reader",
password="reporting_password",
)
try:
with pytest.raises(asyncpg.InsufficientPrivilegeError):
await conn.execute(
"DELETE FROM orders.orders WHERE 1=0" # even a no-op delete
)
finally:
await conn.close()
12.4 Why Discipline Alone Fails — The Systems Argument
The preceding enforcement mechanisms are sometimes dismissed as over-engineering — the argument being that a good team with clear guidelines does not need automated enforcement. This argument fails for reasons that are structural, not motivational.
The Deadline Forcing Function
Every software project experiences deadline pressure. The intensity varies, but the pattern is universal: at some point, a developer must choose between doing something correctly and doing it in time to meet a commitment. In that moment, with the correct path requiring 4 hours and the shortcut requiring 20 minutes, the shortcut is taken. This is not a failure of discipline — it is a rational response to incentives.
If the correct path is "define a new contract, add it to the interface, implement it, inject it through the dependency chain" and the shortcut is "import the concrete class directly", the shortcut is 20 minutes. If import linting makes the shortcut fail CI, the shortcut is not available. The developer implements it correctly because there is no shortcut to take.
The Context Problem
Architecture rules exist as mental models in the heads of the engineers who defined them. A developer joining the team 8 months into the project does not have those mental models. They have documentation — which may be outdated — and code reviews — which are performed by people who are also busy. They can violate a boundary in good faith because they genuinely did not know it existed.
Architecture tests and import linting provide the context inline with the code change. The CI failure message tells the developer exactly what rule they violated and why it exists. This is better than any documentation because it fires at the precise moment of violation, with the precise information needed to correct it.
The Accumulation Problem
Boundaries degrade through accumulation of exceptions, each of which seems reasonable in isolation. "This is just a one-time reporting query — I'll clean it up later." "This module is being merged with that one anyway — the boundary doesn't matter here." "The deadline is tomorrow — we'll refactor next sprint."
Each individual exception is defensible. The aggregate of 40 such exceptions over 18 months is an unmaintainable codebase that no individual engineer intended to create. Automated enforcement prevents accumulation because it treats every violation identically — the 40th exception fails CI the same way the 1st did. There is no gradation, no context that makes it acceptable, no reviewer who can be persuaded that this one is different.
The Three-Layer Enforcement Stack
Layer 1 — Import Linting (import-linter)
└── What it catches: direct cross-module imports of internal packages
└── When it fires: CI pipeline, pre-commit hook
└── Can be bypassed: by disabling the linter configuration (requires explicit decision)
└── Failure mode if absent: boundary violations accumulate silently in PRs
Layer 2 — Architecture Tests (pytest)
└── What it catches: structural rule violations (missing files, wrong naming,
incomplete interface implementations, misplaced logic)
└── When it fires: CI pipeline, same run as unit tests
└── Can be bypassed: by deleting or skipping the test (requires explicit decision)
└── Failure mode if absent: structural degradation not detected until it causes bugs
Layer 3 — Database RBAC (PostgreSQL)
└── What it catches: cross-schema SQL access at the database engine level
└── When it fires: at query execution time — runtime, not CI
└── Can be bypassed: by using the postgres superuser role (requires credential access)
└── Failure mode if absent: cross-schema queries possible whenever application code allows it
The three layers are complementary. Import linting catches the problem before the code is committed. Architecture tests catch structural degradation before deployment. Database RBAC catches access violations that were never expressed as code-level imports — direct SQL strings, ORM configurations, reporting scripts.
A violation that slips past all three layers requires: defeating the import linter, defeating the architecture tests, and having the correct database role with cross-schema privileges. This is not impossible, but it requires a level of deliberate effort that makes accidental violations essentially impossible.
Section 13: Real-World Pitfalls
The Anatomy of a Failed Modular Monolith
Most modular monolith failures are not dramatic. There is no single catastrophic decision. The system degrades through a sequence of individually justifiable choices that accumulate into structural failure. By the time the problem is visible — in the form of a deployment that takes three days to coordinate, or a production incident that requires five engineers to understand — the root cause is months old and deeply embedded.
This section catalogues the failure modes in the order they typically appear. The early pitfalls create the conditions for the later ones. A codebase that has fake modularity inevitably develops god services. A codebase with a shared database inevitably has connection exhaustion problems. Understanding the failure chain is as important as understanding each pitfall in isolation.
Pitfall 1 — Fake Modularity
Fake modularity is the most common failure mode and the hardest to detect early because the codebase looks modular. The directory structure exists. The module names are meaningful. The team believes they have a modular system. The boundaries are not enforced.
What It Looks Like
app/modules/
├── auth/
│ ├── service.py
│ └── models.py
├── orders/
│ ├── service.py ← imports from auth.models directly
│ └── models.py
└── billing/
├── service.py ← imports from auth.models and orders.models directly
└── models.py
# modules/orders/service.py — FAKE MODULARITY
# This looks like it's inside the orders module.
# The imports tell the real story.
from app.modules.auth.models import User # ← direct model import
from app.modules.auth.repository import UserRepository # ← internal implementation
from app.modules.inventory.models import Product # ← direct model import
from app.modules.inventory.service import InventoryService # ← concrete class
class OrderService:
def __init__(self, db):
self.user_repo = UserRepository(db) # ← owns auth's repository
self.inventory = InventoryService(db) # ← owns inventory's concrete service
async def create_order(self, user_id, items):
# Directly queries auth's internal model structure
user = await self.user_repo.find_by_id(user_id)
if not user.email_verified_at: # ← auth internal field
raise ValueError("Email not verified")
# Directly manipulates inventory's internal model
for item in items:
product = await self.inventory.get_product(item.product_id)
product.reserved_count += item.quantity # ← mutating another module's model
await self.inventory.save(product)
This code is in a directory called orders. It is not modular. It owns auth's repository, instantiates inventory's concrete service, reads auth's internal field email_verified_at, and mutates inventory's model state directly.
The directory structure is a label. The code is a traditional monolith with extra folders.
How It Develops
Fake modularity develops through incremental steps that each seem reasonable:
Week 1: Developer needs user data in the orders service. The correct path — define a contract, add IUserService, inject it — takes an hour. The shortcut — import UserRepository directly — takes 5 minutes. No import linter. No architecture tests. The shortcut is taken.
Week 4: A bug requires checking email_verified_at before allowing orders. The User model is already imported. Adding the field check takes 2 minutes.
Week 12: A new developer joins. They model their code after what exists. The pattern of direct imports is now "how we do things". Three more modules have been added with the same coupling pattern.
Month 8: The team attempts to extract inventory into a separate service. They discover that billing, orders, notifications, and reporting all import from inventory.models directly. The extraction would require simultaneously changing all five modules. The project is shelved.
Detection
# tests/architecture/test_no_fake_modularity.py
import ast
from pathlib import Path
import pytest
def get_imports_from_file(filepath: Path) -> list[str]:
tree = ast.parse(filepath.read_text())
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom) and node.module:
imports.append(node.module)
elif isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
return imports
class TestNoFakeModularity:
def test_modules_do_not_import_sibling_module_internals(self):
"""
Detect fake modularity: a module importing another module's
internal packages (models, repository, service, dependencies)
rather than its public contracts.
"""
violations = []
modules_path = Path("app/modules")
for source_module_dir in modules_path.iterdir():
if not source_module_dir.is_dir():
continue
source_module = source_module_dir.name
for py_file in source_module_dir.rglob("*.py"):
imports = get_imports_from_file(py_file)
for imp in imports:
parts = imp.split(".")
# Check if it's a cross-module import into an internal package
if (len(parts) >= 4 and
parts[0] == "app" and
parts[1] == "modules" and
parts[2] != source_module and
parts[3] in {"models", "repository", "service",
"dependencies", "schemas"}):
violations.append(
f"{py_file.relative_to('.')} imports "
f"from internal package: {imp}\n"
f" → Use app.modules.{parts[2]}.contracts instead"
)
assert not violations, (
f"Fake modularity detected — {len(violations)} internal cross-module import(s):\n\n"
+ "\n\n".join(violations)
)
Pitfall 2 — Shared Database Abuse
Even with schema-per-module correctly configured, the shared database is routinely abused in ways that recreate the tight coupling it was designed to prevent.
Pattern A — The Convenience Join
The reporting endpoint needs to show orders with user details. The orders and auth schemas are in the same database. A developer writes:
-- modules/orders/repository.py — SHARED DATABASE ABUSE
SELECT
o.id,
o.total_amount,
o.status,
u.email, -- ← crossing schema boundary in SQL
u.full_name -- ← auth schema internal data
FROM orders.orders o
JOIN auth.users u ON u.id = o.user_id -- ← cross-schema JOIN
WHERE o.status = 'pending'
This query works perfectly when the orders_app role has access to auth.users. If RBAC is correctly configured, it fails at runtime. If RBAC is not configured, it silently creates a coupling that makes schema extraction impossible.
The correct approach:
# orders/service.py — correct cross-module data assembly
async def list_pending_orders_with_users(self) -> list[OrderWithUser]:
# Fetch from orders module — one query, in orders schema only
orders = await self._order_repo.list_pending()
# Batch fetch user identities via contract — one query via auth module
user_ids = list({o.user_id for o in orders})
user_map = await self._user_service.get_user_identities(user_ids)
# Assemble in application layer — no cross-schema SQL
return [
OrderWithUser(order=o, user=user_map.get(o.user_id))
for o in orders
]
Two queries instead of one join. The performance difference is negligible at any scale that matters for a modular monolith. The architectural difference is absolute.
Pattern B — Shared Tables as Module Communication
Two modules write to the same table as a form of communication. The notifications module reads from orders.orders to know when to send emails, instead of subscribing to OrderPlacedEvent:
# modules/notifications/service.py — WRONG
# Directly polling another module's table as a communication mechanism
async def send_pending_order_notifications(self):
# Reading orders.orders directly from the notifications module
pending_orders = await self._conn.fetch(
"""
SELECT id, user_id, created_at
FROM orders.orders -- ← wrong schema access
WHERE notification_sent = FALSE
AND created_at < NOW() - INTERVAL '5 minutes'
"""
)
for order in pending_orders:
await self._send_notification(order)
await self._conn.execute(
"UPDATE orders.orders SET notification_sent = TRUE WHERE id = $1",
order["id"],
)
This is wrong on multiple levels. The notifications module owns a column (notification_sent) in the orders schema. It writes to orders' tables directly. The orders module cannot change its schema without breaking notifications. The dependency is invisible until it breaks.
The correct approach is the outbox pattern. When an order is created, OrderPlacedEvent is enqueued. The notifications worker subscribes to it. The orders module never knows notifications exist.
Pattern C — Sequence and Identity Dependency
A module uses another module's sequences or identity values in a way that creates an undeclared dependency:
-- WRONG: billing module references auth's sequence directly
INSERT INTO billing.billing_accounts (user_id)
SELECT nextval('auth.user_id_seq'); -- ← depending on auth's internal sequence
This creates a hidden dependency between billing's data generation and auth's internal sequence state. Any change to auth's identity strategy breaks billing.
Pitfall 3 — God Services
A god service is a service class that has accumulated responsibilities across multiple domains because it was convenient to add each new responsibility to the existing class rather than create a new one.
How a God Service Develops
# modules/orders/service.py — AFTER 18 MONTHS OF GROWTH
class OrderService:
"""
Originally: create and manage orders.
Month 3: added payment processing ("just for now")
Month 6: added notification sending ("it's order-related")
Month 9: added inventory management ("tightly coupled anyway")
Month 12: added user validation ("needed for orders")
Month 15: added fraud detection ("checks happen at order time")
Month 18: added reporting ("finance needed it fast")
"""
def __init__(self, db, mpesa_client, email_client, sms_client,
redis_cache, fraud_model, report_generator):
# 7 dependencies — first sign of a god service
...
# Order management (original purpose)
async def create_order(self, ...): ...
async def cancel_order(self, ...): ...
async def get_order(self, ...): ...
# Payment processing (should be PaymentService)
async def initiate_mpesa_payment(self, ...): ...
async def handle_mpesa_callback(self, ...): ...
async def process_refund(self, ...): ...
# Notifications (should be NotificationService)
async def send_order_confirmation_email(self, ...): ...
async def send_shipping_notification(self, ...): ...
async def send_cancellation_sms(self, ...): ...
# Inventory management (should be InventoryService)
async def check_and_reserve_stock(self, ...): ...
async def release_stock_reservation(self, ...): ...
# User validation (should use IUserService contract)
async def validate_user_can_order(self, ...): ...
async def get_user_order_history(self, ...): ...
# Fraud detection (should be FraudService)
async def assess_order_fraud_risk(self, ...): ...
async def flag_suspicious_order(self, ...): ...
# Reporting (should be ReportingService or query layer)
async def generate_daily_order_report(self, ...): ...
async def get_revenue_by_date_range(self, ...): ...
This class has 18 methods across 6 distinct responsibilities. It has 7 injected dependencies. A unit test that needs to test create_order must construct mocks for mpesa_client, email_client, sms_client, redis_cache, fraud_model, and report_generator — regardless of whether create_order uses any of them.
Detection
The metrics that identify a god service:
# tests/architecture/test_no_god_services.py
import ast
from pathlib import Path
import pytest
class TestNoGodServices:
MAX_METHODS_PER_SERVICE = 12
MAX_INIT_PARAMETERS = 5
@pytest.mark.parametrize(
"service_file",
list(Path("app/modules").rglob("service.py")),
ids=lambda p: str(p.relative_to("app/modules")),
)
def test_service_does_not_exceed_method_limit(self, service_file: Path):
tree = ast.parse(service_file.read_text())
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
if not node.name.endswith("Service"):
continue
public_methods = [
n for n in ast.walk(node)
if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
and not n.name.startswith("_")
and n.name != "__init__"
]
assert len(public_methods) <= self.MAX_METHODS_PER_SERVICE, (
f"'{node.name}' in {service_file} has {len(public_methods)} public methods "
f"(maximum: {self.MAX_METHODS_PER_SERVICE}).\n"
f"A service with this many methods likely has multiple responsibilities.\n"
f"Split it into focused services, each responsible for one domain concept."
)
@pytest.mark.parametrize(
"service_file",
list(Path("app/modules").rglob("service.py")),
ids=lambda p: str(p.relative_to("app/modules")),
)
def test_service_init_does_not_have_too_many_dependencies(
self, service_file: Path
):
tree = ast.parse(service_file.read_text())
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
for method in ast.walk(node):
if not isinstance(method, ast.FunctionDef):
continue
if method.name != "__init__":
continue
# Count parameters excluding 'self'
param_count = len(method.args.args) - 1
assert param_count <= self.MAX_INIT_PARAMETERS, (
f"'{node.name}.__init__' in {service_file} "
f"has {param_count} parameters (maximum: {self.MAX_INIT_PARAMETERS}).\n"
f"Too many dependencies is a sign the class has too many responsibilities.\n"
f"Consider splitting '{node.name}' into smaller, focused services."
)
Remediation
When you find a god service, the split is guided by the Single Responsibility Principle restated as a question: "What is the one business concept this service exists to serve?"
OrderService exists to manage the lifecycle of orders: create, confirm, cancel, fulfil. Payment processing is a payment concern. Notifications are a notification concern. Inventory management is an inventory concern.
Each responsibility becomes a service in its own module with its own contract. OrderService calls them via their interfaces. The test for create_order now only needs to mock IInventoryService and IPaymentService — not the email client, SMS client, fraud model, or report generator.
Pitfall 4 — Poor Boundary Definition
Poor boundary definition is distinct from fake modularity. The module boundaries exist and are enforced, but they are drawn incorrectly — either too large (modules that contain multiple distinct domains) or too small (modules that are so granular they cannot stand alone).
Too Large — Modules That Contain Multiple Domains
app/modules/
└── business_logic/ ← one module for everything
├── order_service.py
├── payment_service.py
├── inventory_service.py
└── notification_service.py
This is a naming problem masquerading as a modularity problem. "Business logic" is not a domain — it is a description of what all services do. The module contains every domain in the system. It is a traditional monolith with one directory wrapper.
Too Small — Anemic Modules That Cannot Stand Alone
app/modules/
├── order_creation/ ← only handles creating orders
├── order_cancellation/ ← only handles cancelling orders
├── order_status/ ← only handles status updates
├── order_items/ ← only handles line items
└── order_history/ ← only handles history queries
These are not bounded contexts — they are function groups. order_creation must know about order_items to create an order with items. order_cancellation must know about order_status. Every operation requires coordination across multiple "modules". The boundary lines follow implementation actions, not domain concepts.
The correct boundary is orders — a single module that owns the complete lifecycle of an order from creation to completion. The internal implementation details (repositories for each table, schemas for each operation type) live within the module, not as separate modules.
The Correct Boundary Test
A module is correctly bounded if you can answer these three questions affirmatively:
Can you describe what this module does in one sentence without using the word "and"? "The orders module manages the lifecycle of customer orders" — valid. "The orders module manages order creation and order status and order payments and order notifications" — invalid.
Does the module own its data completely? Every table the module reads and writes must be in its own schema with no other module owning any of those tables.
Could this module theoretically be extracted to a separate service without requiring simultaneous changes to other modules? If extraction requires changing three other modules, the boundary is wrong.
Pitfall 5 — Event Inconsistency
Event-driven communication via the outbox pattern is the correct approach for side effects. It is also a class of failure that requires careful discipline to avoid accumulating inconsistencies silently.
Anti-Pattern A — Events That Contain Too Much Data
An event that contains the full state of an entity at the time of emission looks convenient — subscribers have everything they need. It creates a schema coupling problem:
# WRONG — fat event containing full entity state
@dataclass
class OrderPlacedEvent(DomainEvent):
order_id: UUID
user_id: UUID
user_email: str # ← auth module internal data embedded in orders event
user_phone: str # ← auth module internal data
user_subscription_tier: str # ← billing module internal data
product_names: list[str] # ← inventory module internal data
product_prices: list[Decimal]
total_amount: Decimal
discount_applied: Decimal
payment_method: str # ← billing module internal data
shipping_address: dict # ← logistics module internal data
# ... 20 more fields
This event has metastasized into a snapshot of the entire system state at order time. Every time any of these fields changes meaning in their owning module, the event schema must change, and every subscriber must be updated simultaneously.
# CORRECT — thin event with only what orders module owns
@dataclass
class OrderPlacedEvent(DomainEvent):
order_id: UUID # primary key — subscribers fetch what they need
user_id: UUID # soft reference — billing uses this to look up billing account
total_amount: Decimal # billing needs this directly — justified inclusion
occurred_at: datetime
Subscribers that need user data call the user service contract. Subscribers that need product data call the inventory service contract. The event contains only what the orders module owns.
Anti-Pattern B — Events That Are Not Versioned
An event schema that is not versioned cannot evolve without breaking subscribers. The orders module adds a field to OrderPlacedEvent. The billing handler, which uses json.loads(payload) and accesses payload["total_amount"], continues to work. The reporting handler, which accesses payload["discount_code"] — a field that was added and the handler was updated to use — starts failing on old events that are being replayed because they predate the field.
# core/events.py — versioned event base
@dataclass
class DomainEvent:
event_id: UUID = field(default_factory=uuid4)
occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC))
schema_version: int = 1 # increment when event structure changes
# Handler with version-aware processing
async def handle_order_placed_billing(payload: dict) -> None:
version = payload.get("schema_version", 1)
if version == 1:
# Original schema — total_amount at top level
amount = Decimal(str(payload["total_amount"]))
elif version == 2:
# V2 schema — total_amount nested under financials
amount = Decimal(str(payload["financials"]["total_amount"]))
else:
logger.error(f"Unknown event schema version: {version}")
raise UnknownEventVersionError(version)
await repo.create_billing_record_if_not_exists(
order_id=UUID(payload["order_id"]),
amount=amount,
)
Anti-Pattern C — Events Emitted Before Transaction Commits
The outbox pattern guarantees durability by writing the event in the same transaction as the domain data. Emitting an event before the transaction commits — or outside the transaction entirely — defeats this guarantee:
# WRONG — event emitted before transaction commits
async def create_order(self, user_id, items):
order_id = uuid4()
# Emit event BEFORE writing to database
await self._event_bus.publish(OrderPlacedEvent(order_id=order_id, ...))
# Transaction might fail after this point
async with self._uow:
await self._repo.create_order(order_id=order_id, ...)
# If the transaction fails, the event was already emitted.
# Subscribers have processed an order that does not exist in the database.
# ALSO WRONG — event emitted after transaction commits but not in the outbox
async def create_order(self, user_id, items):
async with self._uow:
order = await self._repo.create_order(...)
# Transaction commits here
# Process crashes here — event never emitted
# Order exists in DB, event never delivered
await self._event_bus.publish(OrderPlacedEvent(order_id=order.id, ...))
The only correct pattern is writing to the outbox inside the same transaction as the domain write. Any other sequence creates a window where the data and the event are inconsistent.
Anti-Pattern D — Dead Events With No Alerting
The outbox worker moves events to status='dead' after max attempts. If no alerting exists on the dead letter queue, these failures are invisible. The system appears to be running normally. The billing records are not being created. Financial reconciliation discovers the discrepancy three weeks later.
-- Monitoring query — run by alerting system every 5 minutes
SELECT
event_type,
COUNT(*) AS dead_count,
MIN(created_at) AS oldest_dead_event,
MAX(created_at) AS newest_dead_event,
AVG(attempt_count) AS avg_attempts
FROM core.outbox
WHERE status = 'dead'
AND created_at > NOW() - INTERVAL '24 hours'
GROUP BY event_type
HAVING COUNT(*) > 0;
-- Any rows returned → PagerDuty alert → immediate investigation
# workers/outbox_processor.py — alert on dead event creation
async def mark_dead(conn, event_id, event_type, error):
await conn.execute(
"UPDATE core.outbox SET status='dead', last_error=$2 WHERE id=$1",
event_id, error,
)
# Structured log that alerting system picks up
logger.critical(
"Event moved to dead letter queue — requires manual investigation",
extra={
"event_id": str(event_id),
"event_type": event_type,
"error": error,
"action_required": "investigate_dead_event",
}
)
Pitfall 6 — Connection Exhaustion
Connection exhaustion is the failure mode where the system runs out of database connections under load, causing all new requests to queue waiting for a connection and the system to degrade to zero throughput. It is entirely preventable and frequently discovered only in production under real load.
How Exhaustion Happens
The most common cause is transactions held open for too long, multiplied by concurrent requests:
# WRONG — transaction held open during external HTTP call
async def process_payment(self, order_id, amount):
async with self._uow: # connection acquired, transaction opens
order = await self._repo.find_by_id(order_id)
# External API call — 200ms to 3 seconds
# Connection held idle for this entire duration
mpesa_result = await self._mpesa_client.initiate_stk_push(
amount=amount,
phone=order.user_phone,
)
await self._repo.record_payment(order_id, mpesa_result.checkout_id)
# connection released only here
With 20 connection pool slots and 200ms average M-Pesa response time, the maximum sustained throughput before pool exhaustion is 100 payment requests per second. Under a load spike of 150 requests per second, the pool exhausts. Every subsequent request queues waiting for a connection. Queue depth grows. Response latency grows. The load balancer starts timing out requests. The system appears to be down.
The failure is not in the M-Pesa API response time. The failure is holding a database connection during a network wait.
The Second Cause — Leaked Connections
A connection that is acquired but never released is a connection leak. With async code, this typically happens when an exception is raised before the cleanup path runs:
# WRONG — connection leaked on exception
async def get_order(self, order_id):
conn = await self._pool.acquire() # acquired
try:
return await conn.fetchrow(...)
except SomeError:
raise # conn never released!
# cleanup never runs because raise exits the function
await self._pool.release(conn)
# CORRECT — async context manager guarantees release
async def get_order(self, order_id):
async with self._pool.acquire() as conn: # released on any exit path
return await conn.fetchrow(...)
Every connection acquisition must be through a context manager. Never acquire and manually release — the manual release path is always a bug waiting for the right exception to trigger it.
The Third Cause — Worker Count × Pool Size Arithmetic
Production setup:
3 API containers
× 4 Gunicorn workers each
× 20 asyncpg pool connections each
= 240 potential PostgreSQL connections
+ 2 outbox worker containers
× 5 pool connections each
= 10 additional connections
+ scheduler container
= 5 additional connections
Total: 255 connections
PostgreSQL max_connections: 100 (default)
Result: 155 connections refused under peak load
The fix is PgBouncer (covered in Section 7). But the arithmetic must be done explicitly — not assumed to be fine. Every environment change that adds containers or increases worker counts must include a recalculation of total potential connections against the database's configured limit.
Monitoring for Exhaustion Before It Becomes Incident
-- Current connection utilization
SELECT
COUNT(*) AS total_connections,
COUNT(*) FILTER (WHERE state = 'active') AS active,
COUNT(*) FILTER (WHERE state = 'idle') AS idle,
COUNT(*) FILTER (WHERE state = 'idle in transaction') AS idle_in_transaction,
current_setting('max_connections')::int AS max_connections,
ROUND(
COUNT(*) * 100.0 / current_setting('max_connections')::int,
1
) AS utilization_pct
FROM pg_stat_activity
WHERE datname = 'kukufiti_prod';
-- Long-running idle in transaction — these are connection leaks or slow transactions
SELECT
pid,
usename,
now() - xact_start AS transaction_age,
state,
LEFT(query, 80) AS last_query
FROM pg_stat_activity
WHERE state = 'idle in transaction'
AND xact_start < now() - interval '30 seconds'
ORDER BY transaction_age DESC;
idle in transaction connections are the most dangerous. They hold a connection and hold any locks acquired during the transaction. A connection that has been idle in transaction for 60 seconds is a sign of a transaction that was opened and never committed or rolled back — typically a bug in the connection management code.
Set an alarm at 70% connection utilization. Investigate at 80%. At 90%, you are minutes from exhaustion under any load spike.
The Pitfall Cascade
The pitfalls are not independent. They compound:
Fake modularity
│
▼
Cross-module model sharing
│
▼
Shared database queries (joins across schemas)
│
▼
God service emerges (all logic in one place is "convenient")
│
▼
Poor boundaries enforced — modules grow to encompass their god service dependencies
│
▼
Events contain full entity state (easier than defining proper contracts)
│
▼
Event schema changes break multiple consumers simultaneously
│
▼
Developers avoid changing event schemas — accrue technical debt instead
│
▼
God service accumulates more responsibilities to avoid event schema changes
│
▼
God service requires many connections simultaneously for complex operations
│
▼
Connection exhaustion under load
│
▼
"We need to rewrite in microservices"
▼
The microservices are designed with the same poorly defined boundaries
as the monolith — the failure cascade begins again
Every pitfall in this section is preventable by the enforcement strategies in Section 12 and the architectural principles in Sections 2–5. The cascade described above is the story of a team that understood the theory and lacked the enforcement. The enforcement is what breaks the cascade before it begins.
Diagnostic Checklist — Assess Your System's Health
Run this against any existing codebase to assess its architectural integrity:
Boundary integrity:
- [ ]
lint-importspasses with zero violations - [ ] Architecture tests pass with zero violations
- [ ] RBAC test suite passes — all unauthorized schema accesses are rejected
Data ownership:
- [ ] Every table has exactly one owning module
- [ ] No cross-schema foreign keys exist
- [ ] No module's SQL queries reference another module's schema
Event health:
- [ ] Dead letter queue has zero entries (or all entries are explained and being resolved)
- [ ] Every event handler has an idempotency mechanism
- [ ] Event schemas are versioned
- [ ] Outbox writes are always inside the same transaction as domain writes
Service complexity:
- [ ] No service class has more than 12 public methods
- [ ] No service
__init__has more than 5 parameters - [ ] Every service can be described in one sentence without "and"
Connection health:
- [ ] Total potential connections (containers × workers × pool size) < 70% of PostgreSQL max_connections
- [ ] No
idle in transactionconnections older than 30 seconds under normal load - [ ] PgBouncer is in place for any deployment with multiple application instances
- [ ] No external I/O occurs inside database transactions
Each "no" answer is a known risk with a specific remediation path described in this research.
Top comments (0)