Building distributed systems in Python? Here is how python-cqrs tackles consistency with orchestrated sagas, the mediator pattern, and a transactional outbox—without preaching theory for ten pages first.
TL;DR
- One thread through the code: place an order (command), read it back (query), then run a payment saga: close the invoice, credit the balance—without leaving half a payment applied.
- After money is in, a transactional outbox carries a notification to other services (for example, “provision this VPS”) with at-least-once delivery to the bus; in-process domain events stay a different, in-memory contract (see the small table in part 3).
- Transports and frameworks stay at the edge: nothing in a handler hard-codes HTTP, Kafka, or a CLI. More patterns (streaming, chain of responsibility, and the rest) live in the docs.
Links: GitHub · Docs · PyPI python-cqrs
A story in three crashes
- A user places an order. Payment succeeds.
- A downstream service is supposed to do the next step—but the process dies (OOM, redeploy, node loss).
- The other side never heard about the state change, yet money or inventory already moved.
That is the boring side of distributed systems: not “microservices are trendy,” but consistency when a commit in one place does not magically align the rest of the world.
Hi, I am Vadim, tech ex-lead of the financial infrastructure team at Timeweb Cloud. This post is a short intro to the library we use under our Python microservices—no epic “how we got here.” If you are tired of reinventing transactional outbox, sagas, and mediator wiring, this may save you time.
python-cqrs is a framework for CQRS and event-driven architecture. It grew out of a fork of diator and has since become its own stack for reliable service design.
The same bootstrap and mediator ideas apply to other flows; below is one concrete path only.
The system we are pretending to build
We model a narrow slice of a cloud-style control plane: a customer orders a product by SKU (think VPS or add-on), the order sits in a pending state until the gateway says money arrived, we close the invoice and move the customer balance in one coordinated saga, then we must notify another team’s service to actually create the instance. The code is toy-sized; BillingPort, LedgerPort, and the outbox are placeholders for your real services and database.
Part 1 — Command and query: create an order, read it back
We start with a command to open the order and a query to read the dashboard row. Same RequestMediator and mediator.send for both; only the request/response types and the handler differ.
Idea: one bootstrap call maps all Request types to handlers. Callers (FastAPI, worker, or tests) only call mediator.send(...).
import di
import cqrs
from cqrs.requests import bootstrap
class PlaceOrderCommand(cqrs.Request):
customer_id: str
product_sku: str
amount: float
class PlaceOrderResult(cqrs.Response):
order_id: str
status: str
class PlaceOrderHandler(cqrs.RequestHandler[PlaceOrderCommand, PlaceOrderResult]):
async def handle(self, request: PlaceOrderCommand) -> PlaceOrderResult:
return PlaceOrderResult(order_id="ord-42", status="PENDING_PAYMENT")
class GetOrderById(cqrs.Request):
order_id: str
class OrderView(cqrs.Response):
order_id: str
status: str
amount: float
product_sku: str
class GetOrderHandler(cqrs.RequestHandler[GetOrderById, OrderView]):
async def handle(self, request: GetOrderById) -> OrderView:
return OrderView(
order_id=request.order_id,
status="PENDING_PAYMENT",
amount=99.0,
product_sku="vps.s",
)
def commands_mapper(m: cqrs.RequestMap) -> None:
m.bind(PlaceOrderCommand, PlaceOrderHandler)
m.bind(GetOrderById, GetOrderHandler)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
)
More wiring: Bootstrap.
Part 2 — Saga: payment in → close invoice → update balance
When payment clears, you may need two effects in order: mark the invoice paid, then credit the balance (or the ledger). If the second step fails, the first has to be undone in business terms. That is an orchestrated saga: act in order, compensate in reverse. Details and storage: Saga.
import dataclasses
import uuid
import di
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.saga import Saga
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.memory import MemorySagaStorage
from cqrs.saga.models import SagaContext
from cqrs.response import Response
@dataclasses.dataclass
class AfterPaymentContext(SagaContext):
order_id: str
customer_id: str
invoice_id: str
amount: float
payment_id: str
capture_ref: str | None = None
balance_tx_id: str | None = None
class BillingPort:
async def close_invoice(self, invoice_id: str, payment_id: str) -> str:
return "inv-cap-1"
async def reopen_invoice(self, invoice_id: str) -> None:
pass
class LedgerPort:
async def credit_balance(self, customer_id: str, amount: float) -> str:
return "led-1"
async def reverse_credit(self, tx_id: str) -> None:
pass
# Step 1 — invoice paid in billing; compensate reopens if a later step fails
class CloseInvoiceStep(SagaStepHandler[AfterPaymentContext, Response]):
def __init__(self, billing: BillingPort) -> None:
self._billing = billing
async def act(self, context: AfterPaymentContext) -> SagaStepResult:
context.capture_ref = await self._billing.close_invoice(
context.invoice_id, context.payment_id
)
return self._generate_step_result(Response())
async def compensate(self, context: AfterPaymentContext) -> None:
if context.capture_ref:
await self._billing.reopen_invoice(context.invoice_id)
# Step 2 — credit customer balance; compensate reverses the ledger line
class CreditBalanceStep(SagaStepHandler[AfterPaymentContext, Response]):
def __init__(self, ledger: LedgerPort) -> None:
self._ledger = ledger
async def act(self, context: AfterPaymentContext) -> SagaStepResult:
context.balance_tx_id = await self._ledger.credit_balance(
context.customer_id, context.amount
)
return self._generate_step_result(Response())
async def compensate(self, context: AfterPaymentContext) -> None:
if context.balance_tx_id:
await self._ledger.reverse_credit(context.balance_tx_id)
class PaymentAfterSaga(Saga[AfterPaymentContext]):
steps = [CloseInvoiceStep, CreditBalanceStep]
def saga_mapper(m: cqrs.SagaMap) -> None:
m.bind(AfterPaymentContext, PaymentAfterSaga)
saga_mediator = bootstrap.bootstrap(
di_container=di.Container(),
sagas_mapper=saga_mapper,
saga_storage=MemorySagaStorage(),
)
async def run_payment_flow() -> None:
ctx = AfterPaymentContext(
order_id="ord-42",
customer_id="c1",
invoice_id="inv-7",
amount=99.0,
payment_id="pgw-100",
)
async for _ in saga_mediator.stream(context=ctx, saga_id=uuid.uuid4()):
pass
In the same module you can wrap the same sagas_mapper and storage in functools.lru_cache and return it from a saga_mediator_factory() used with fastapi.Depends in part 4, so the HTTP layer only calls saga_mediator_factory() and does not duplicate bootstrap. For a JSON body, add a small Pydantic model with the same fields as AfterPaymentContext and map it to the dataclass in the route (or map fields manually).
If the process dies between steps, persisted saga state and recovery (see the docs) let you finish the flow or the compensation, instead of a silent half-state.
Part 3 — Outbox: after the invoice is paid, connect the product
A separate provisioning stack should not rely on a “publish to Kafka, then hope” after the database already says “paid.” The transactional outbox stores the integration event in the same database transaction as your business update; a publisher process reads the outbox and sends to the bus (at-least-once; consumers should be idempotent). Overview: Transactional Outbox.
Register the wire name and payload type once, then in the same UoW as the payment commit, insert a row for NotificationEvent[ServiceProvisioningPayload].
import cqrs
from pydantic import BaseModel
class ServiceProvisioningPayload(BaseModel, frozen=True):
order_id: str
customer_id: str
product_sku: str
invoice_id: str
cqrs.OutboxedEventMap.register(
"invoice_paid.provision_service",
cqrs.NotificationEvent[ServiceProvisioningPayload],
)
# Persisted in the same DB transaction as “invoice paid” + outbox row in the command path.
Publisher loop (separate task or process):
import asyncio
import cqrs
from cqrs.message_brokers import kafka
from cqrs.adapters import kafka as kafka_adapters
broker = kafka.KafkaMessageBroker(
producer=kafka_adapters.kafka_producer_factory(dsn="localhost:9092"),
)
producer = cqrs.EventProducer(
message_broker=broker,
repository=outbox_repository,
)
# Separate loop: batch read → send → commit outbox state
async def publish_loop() -> None:
async for events in producer.event_batch_generator():
for event in events:
await producer.send_message(event)
await producer.repository.commit()
await asyncio.sleep(10)
Consumer with FastStream: Kafka delivers a NotificationEvent, you hand it to EventMediator.send and reuse the same handler style as inside the app.
import functools
import di
import cqrs
import faststream
import pydantic
from cqrs.events import bootstrap
from faststream import kafka
class ServiceProvisioningPayload(pydantic.BaseModel):
order_id: str
customer_id: str
product_sku: str
invoice_id: str
class OnInvoicePaidStartProvisioning(
cqrs.EventHandler[cqrs.NotificationEvent[ServiceProvisioningPayload]]
):
async def handle(
self, event: cqrs.NotificationEvent[ServiceProvisioningPayload],
) -> None:
return
@functools.lru_cache(maxsize=1)
def integration_mediator() -> cqrs.EventMediator:
def em(m: cqrs.EventMap) -> None:
m.bind(
cqrs.NotificationEvent[ServiceProvisioningPayload],
OnInvoicePaidStartProvisioning,
)
return bootstrap.bootstrap(di_container=di.Container(), events_mapper=em)
br = kafka.KafkaBroker(bootstrap_servers=["localhost:9092"])
app = faststream.FastStream(br)
@br.subscriber("invoice_paid.provision_service", group_id="provisioning", auto_commit=False)
async def on_invoice_paid(
body: cqrs.NotificationEvent[ServiceProvisioningPayload],
msg: kafka.KafkaMessage,
mediator: cqrs.EventMediator = faststream.Depends(integration_mediator),
):
await mediator.send(body)
await msg.ack()
If the HTTP process uses bootstrap with a message_broker, see Event producing; the outbox remains the safe place for “this must be visible outside the monolith’s DB.”
In-process domain events vs outbox (same product, two channels)
| Domain event (in-process) |
NotificationEvent + outbox |
|
|---|---|---|
| Use | Side effects in the same service in the same request (projection, email) | Other services must see the fact reliably |
| Delivery | In-process, at-most-once if the process dies | At-least-once to the bus after a successful outbox publish |
Part 4 — FastAPI: presentation layer stays thin
The FastAPI integration guide recommends the same pattern: build mediators with bootstrap, inject them with fastapi.Depends, and keep route functions to HTTP only—no business rules in the module. Types, mediator_factory, saga_mediator_factory, and uuid live in your app module together with parts 1–3.
SagaMediator is the return type of cqrs.saga.bootstrap.bootstrap (see the Saga Mediator docs). AfterPaymentSagaRequest is the JSON body shape for the saga route; execution still uses AfterPaymentContext from part 2.
app = fastapi.FastAPI()
@app.post("/orders", status_code=201)
async def place_order(
command: PlaceOrderCommand,
mediator: cqrs.RequestMediator = fastapi.Depends(mediator_factory),
) -> PlaceOrderResult:
return await mediator.send(command)
@app.get("/orders/{order_id}")
async def get_order(
order_id: str,
mediator: cqrs.RequestMediator = fastapi.Depends(mediator_factory),
) -> OrderView:
return await mediator.send(GetOrderById(order_id=order_id))
@app.post("/invoices/close-after-payment", status_code=200)
async def close_invoice_after_payment(
body: AfterPaymentSagaRequest,
saga: cqrs.SagaMediator = fastapi.Depends(saga_mediator_factory),
):
ctx = AfterPaymentContext(**body.model_dump())
async for _ in saga.stream(context=ctx, saga_id=uuid.uuid4()):
pass
return {"status": "ok"}
POST/GET for orders follow the FastAPI docs (command + query + mediator.send). The close-invoice route only builds an AfterPaymentContext and drives the same async stream as in part 2: no steps or compensation logic in the handler. The outbox publisher and consumer from part 3 still run in separate processes; they are not called from these routes.
v5 heads-up (optional)
The docs describe upcoming breaking changes: Pydantic optional, default models moving toward dataclasses. Discussion #57 has migration notes—confirm on the repo you track.
Links and calls to action
- Star the repo, open issues, or comment if this matches what you are building: python-cqrs on GitHub
- Migrations and v5: Discussion #57
- Deep dives: python-cqrs documentation
If you run Python microservices and live in the outbox / saga / CQRS space, what still hurts? I am curious—we might chip away at it together.

Top comments (1)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.