DEV Community

Cover image for Python CQRS: Building distributed systems without the pain (Sagas, Outbox, Event-Driven)
Vadim Kozyrevskii
Vadim Kozyrevskii

Posted on

Python CQRS: Building distributed systems without the pain (Sagas, Outbox, Event-Driven)

Building distributed systems in Python? Here is how python-cqrs tackles consistency with orchestrated sagas, the mediator pattern, and a transactional outbox—without preaching theory for ten pages first.

TL;DR

  • One thread through the code: place an order (command), read it back (query), then run a payment saga: close the invoice, credit the balance—without leaving half a payment applied.
  • After money is in, a transactional outbox carries a notification to other services (for example, “provision this VPS”) with at-least-once delivery to the bus; in-process domain events stay a different, in-memory contract (see the small table in part 3).
  • Transports and frameworks stay at the edge: nothing in a handler hard-codes HTTP, Kafka, or a CLI. More patterns (streaming, chain of responsibility, and the rest) live in the docs.

Links: GitHub · Docs · PyPI python-cqrs

Cover


A story in three crashes

  1. A user places an order. Payment succeeds.
  2. A downstream service is supposed to do the next step—but the process dies (OOM, redeploy, node loss).
  3. The other side never heard about the state change, yet money or inventory already moved.

That is the boring side of distributed systems: not “microservices are trendy,” but consistency when a commit in one place does not magically align the rest of the world.

Hi, I am Vadim, tech ex-lead of the financial infrastructure team at Timeweb Cloud. This post is a short intro to the library we use under our Python microservices—no epic “how we got here.” If you are tired of reinventing transactional outbox, sagas, and mediator wiring, this may save you time.

python-cqrs is a framework for CQRS and event-driven architecture. It grew out of a fork of diator and has since become its own stack for reliable service design.

The same bootstrap and mediator ideas apply to other flows; below is one concrete path only.

The system we are pretending to build

We model a narrow slice of a cloud-style control plane: a customer orders a product by SKU (think VPS or add-on), the order sits in a pending state until the gateway says money arrived, we close the invoice and move the customer balance in one coordinated saga, then we must notify another team’s service to actually create the instance. The code is toy-sized; BillingPort, LedgerPort, and the outbox are placeholders for your real services and database.


Part 1 — Command and query: create an order, read it back

We start with a command to open the order and a query to read the dashboard row. Same RequestMediator and mediator.send for both; only the request/response types and the handler differ.

Idea: one bootstrap call maps all Request types to handlers. Callers (FastAPI, worker, or tests) only call mediator.send(...).

import di
import cqrs
from cqrs.requests import bootstrap

class PlaceOrderCommand(cqrs.Request):
    customer_id: str
    product_sku: str
    amount: float

class PlaceOrderResult(cqrs.Response):
    order_id: str
    status: str

class PlaceOrderHandler(cqrs.RequestHandler[PlaceOrderCommand, PlaceOrderResult]):
    async def handle(self, request: PlaceOrderCommand) -> PlaceOrderResult:
        return PlaceOrderResult(order_id="ord-42", status="PENDING_PAYMENT")

class GetOrderById(cqrs.Request):
    order_id: str

class OrderView(cqrs.Response):
    order_id: str
    status: str
    amount: float
    product_sku: str

class GetOrderHandler(cqrs.RequestHandler[GetOrderById, OrderView]):
    async def handle(self, request: GetOrderById) -> OrderView:
        return OrderView(
            order_id=request.order_id,
            status="PENDING_PAYMENT",
            amount=99.0,
            product_sku="vps.s",
        )

def commands_mapper(m: cqrs.RequestMap) -> None:
    m.bind(PlaceOrderCommand, PlaceOrderHandler)
    m.bind(GetOrderById, GetOrderHandler)

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
)
Enter fullscreen mode Exit fullscreen mode

More wiring: Bootstrap.


Part 2 — Saga: payment in → close invoice → update balance

When payment clears, you may need two effects in order: mark the invoice paid, then credit the balance (or the ledger). If the second step fails, the first has to be undone in business terms. That is an orchestrated saga: act in order, compensate in reverse. Details and storage: Saga.

import dataclasses
import uuid
import di
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.saga import Saga
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.memory import MemorySagaStorage
from cqrs.saga.models import SagaContext
from cqrs.response import Response

@dataclasses.dataclass
class AfterPaymentContext(SagaContext):
    order_id: str
    customer_id: str
    invoice_id: str
    amount: float
    payment_id: str
    capture_ref: str | None = None
    balance_tx_id: str | None = None

class BillingPort:
    async def close_invoice(self, invoice_id: str, payment_id: str) -> str:
        return "inv-cap-1"

    async def reopen_invoice(self, invoice_id: str) -> None:
        pass

class LedgerPort:
    async def credit_balance(self, customer_id: str, amount: float) -> str:
        return "led-1"

    async def reverse_credit(self, tx_id: str) -> None:
        pass

# Step 1 — invoice paid in billing; compensate reopens if a later step fails

class CloseInvoiceStep(SagaStepHandler[AfterPaymentContext, Response]):
    def __init__(self, billing: BillingPort) -> None:
        self._billing = billing

    async def act(self, context: AfterPaymentContext) -> SagaStepResult:
        context.capture_ref = await self._billing.close_invoice(
            context.invoice_id, context.payment_id
        )
        return self._generate_step_result(Response())

    async def compensate(self, context: AfterPaymentContext) -> None:
        if context.capture_ref:
            await self._billing.reopen_invoice(context.invoice_id)

# Step 2 — credit customer balance; compensate reverses the ledger line

class CreditBalanceStep(SagaStepHandler[AfterPaymentContext, Response]):
    def __init__(self, ledger: LedgerPort) -> None:
        self._ledger = ledger

    async def act(self, context: AfterPaymentContext) -> SagaStepResult:
        context.balance_tx_id = await self._ledger.credit_balance(
            context.customer_id, context.amount
        )
        return self._generate_step_result(Response())

    async def compensate(self, context: AfterPaymentContext) -> None:
        if context.balance_tx_id:
            await self._ledger.reverse_credit(context.balance_tx_id)

class PaymentAfterSaga(Saga[AfterPaymentContext]):
    steps = [CloseInvoiceStep, CreditBalanceStep]

def saga_mapper(m: cqrs.SagaMap) -> None:
    m.bind(AfterPaymentContext, PaymentAfterSaga)

saga_mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    sagas_mapper=saga_mapper,
    saga_storage=MemorySagaStorage(),
)

async def run_payment_flow() -> None:
    ctx = AfterPaymentContext(
        order_id="ord-42",
        customer_id="c1",
        invoice_id="inv-7",
        amount=99.0,
        payment_id="pgw-100",
    )
    async for _ in saga_mediator.stream(context=ctx, saga_id=uuid.uuid4()):
        pass
Enter fullscreen mode Exit fullscreen mode

In the same module you can wrap the same sagas_mapper and storage in functools.lru_cache and return it from a saga_mediator_factory() used with fastapi.Depends in part 4, so the HTTP layer only calls saga_mediator_factory() and does not duplicate bootstrap. For a JSON body, add a small Pydantic model with the same fields as AfterPaymentContext and map it to the dataclass in the route (or map fields manually).

If the process dies between steps, persisted saga state and recovery (see the docs) let you finish the flow or the compensation, instead of a silent half-state.


Part 3 — Outbox: after the invoice is paid, connect the product

A separate provisioning stack should not rely on a “publish to Kafka, then hope” after the database already says “paid.” The transactional outbox stores the integration event in the same database transaction as your business update; a publisher process reads the outbox and sends to the bus (at-least-once; consumers should be idempotent). Overview: Transactional Outbox.

Register the wire name and payload type once, then in the same UoW as the payment commit, insert a row for NotificationEvent[ServiceProvisioningPayload].

import cqrs
from pydantic import BaseModel

class ServiceProvisioningPayload(BaseModel, frozen=True):
    order_id: str
    customer_id: str
    product_sku: str
    invoice_id: str

cqrs.OutboxedEventMap.register(
    "invoice_paid.provision_service",
    cqrs.NotificationEvent[ServiceProvisioningPayload],
)
# Persisted in the same DB transaction as “invoice paid” + outbox row in the command path.
Enter fullscreen mode Exit fullscreen mode

Publisher loop (separate task or process):

import asyncio
import cqrs
from cqrs.message_brokers import kafka
from cqrs.adapters import kafka as kafka_adapters

broker = kafka.KafkaMessageBroker(
    producer=kafka_adapters.kafka_producer_factory(dsn="localhost:9092"),
)
producer = cqrs.EventProducer(
    message_broker=broker,
    repository=outbox_repository,
)

# Separate loop: batch read → send → commit outbox state

async def publish_loop() -> None:
    async for events in producer.event_batch_generator():
        for event in events:
            await producer.send_message(event)
        await producer.repository.commit()
        await asyncio.sleep(10)
Enter fullscreen mode Exit fullscreen mode

Consumer with FastStream: Kafka delivers a NotificationEvent, you hand it to EventMediator.send and reuse the same handler style as inside the app.

import functools
import di
import cqrs
import faststream
import pydantic
from cqrs.events import bootstrap
from faststream import kafka

class ServiceProvisioningPayload(pydantic.BaseModel):
    order_id: str
    customer_id: str
    product_sku: str
    invoice_id: str

class OnInvoicePaidStartProvisioning(
    cqrs.EventHandler[cqrs.NotificationEvent[ServiceProvisioningPayload]]
):
    async def handle(
        self, event: cqrs.NotificationEvent[ServiceProvisioningPayload],
    ) -> None:
        return

@functools.lru_cache(maxsize=1)
def integration_mediator() -> cqrs.EventMediator:
    def em(m: cqrs.EventMap) -> None:
        m.bind(
            cqrs.NotificationEvent[ServiceProvisioningPayload],
            OnInvoicePaidStartProvisioning,
        )

    return bootstrap.bootstrap(di_container=di.Container(), events_mapper=em)

br = kafka.KafkaBroker(bootstrap_servers=["localhost:9092"])
app = faststream.FastStream(br)

@br.subscriber("invoice_paid.provision_service", group_id="provisioning", auto_commit=False)
async def on_invoice_paid(
    body: cqrs.NotificationEvent[ServiceProvisioningPayload],
    msg: kafka.KafkaMessage,
    mediator: cqrs.EventMediator = faststream.Depends(integration_mediator),
):
    await mediator.send(body)
    await msg.ack()
Enter fullscreen mode Exit fullscreen mode

If the HTTP process uses bootstrap with a message_broker, see Event producing; the outbox remains the safe place for “this must be visible outside the monolith’s DB.”

In-process domain events vs outbox (same product, two channels)

Domain event (in-process) NotificationEvent + outbox
Use Side effects in the same service in the same request (projection, email) Other services must see the fact reliably
Delivery In-process, at-most-once if the process dies At-least-once to the bus after a successful outbox publish

Part 4 — FastAPI: presentation layer stays thin

The FastAPI integration guide recommends the same pattern: build mediators with bootstrap, inject them with fastapi.Depends, and keep route functions to HTTP only—no business rules in the module. Types, mediator_factory, saga_mediator_factory, and uuid live in your app module together with parts 1–3.

SagaMediator is the return type of cqrs.saga.bootstrap.bootstrap (see the Saga Mediator docs). AfterPaymentSagaRequest is the JSON body shape for the saga route; execution still uses AfterPaymentContext from part 2.

app = fastapi.FastAPI()

@app.post("/orders", status_code=201)
async def place_order(
    command: PlaceOrderCommand,
    mediator: cqrs.RequestMediator = fastapi.Depends(mediator_factory),
) -> PlaceOrderResult:
    return await mediator.send(command)

@app.get("/orders/{order_id}")
async def get_order(
    order_id: str,
    mediator: cqrs.RequestMediator = fastapi.Depends(mediator_factory),
) -> OrderView:
    return await mediator.send(GetOrderById(order_id=order_id))

@app.post("/invoices/close-after-payment", status_code=200)
async def close_invoice_after_payment(
    body: AfterPaymentSagaRequest,
    saga: cqrs.SagaMediator = fastapi.Depends(saga_mediator_factory),
):
    ctx = AfterPaymentContext(**body.model_dump())
    async for _ in saga.stream(context=ctx, saga_id=uuid.uuid4()):
        pass
    return {"status": "ok"}
Enter fullscreen mode Exit fullscreen mode

POST/GET for orders follow the FastAPI docs (command + query + mediator.send). The close-invoice route only builds an AfterPaymentContext and drives the same async stream as in part 2: no steps or compensation logic in the handler. The outbox publisher and consumer from part 3 still run in separate processes; they are not called from these routes.


v5 heads-up (optional)

The docs describe upcoming breaking changes: Pydantic optional, default models moving toward dataclasses. Discussion #57 has migration notes—confirm on the repo you track.


Links and calls to action

If you run Python microservices and live in the outbox / saga / CQRS space, what still hurts? I am curious—we might chip away at it together.

Top comments (1)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.