DEV Community

Cover image for Tracing Async Python: How to Instrument FastAPI and Celery in the Same Trace
Temitope
Temitope

Posted on

Tracing Async Python: How to Instrument FastAPI and Celery in the Same Trace

Most observability guides stop at the HTTP layer.

You add OpenTelemetry to your FastAPI app, traces start showing up in your backend, and everything looks great — until you realize that the moment a request hands off work to a Celery worker, the trace disappears. The worker runs in a completely separate process. The span context doesn't travel with the task. What should be one unified trace becomes two unrelated fragments.

This is one of the most common observability gaps in Python backend systems, and it's surprisingly underserved in the documentation. In this article, we'll close it.

We'll instrument a FastAPI application and a Celery worker so that every task a request spawns appears as a child span in the same trace — giving you end-to-end visibility from HTTP request to background job completion.


What We're Building

A simple order processing system:

POST /orders
    │
    ├── FastAPI handler (validates, saves order)
    │
    └── Celery task (sends confirmation email, updates inventory)
Enter fullscreen mode Exit fullscreen mode

Without proper trace propagation, you'd see:

Trace A: POST /orders (FastAPI) — 45ms
Trace B: process_order (Celery) — 1.2s   ← orphaned, no parent
Enter fullscreen mode Exit fullscreen mode

With it, you'd see:

Trace A: POST /orders — 1.25s total
  ├── validate_order — 8ms
  ├── save_order — 35ms
  └── process_order (Celery worker) — 1.2s
        ├── send_confirmation_email — 800ms
        └── update_inventory — 400ms
Enter fullscreen mode Exit fullscreen mode

That second view is what we're building towards.


Prerequisites

  • Python 3.10+
  • A running Redis instance (for Celery's broker)
  • Basic familiarity with FastAPI and Celery

Installing Dependencies

pip install fastapi uvicorn celery redis
pip install opentelemetry-sdk
pip install opentelemetry-api
pip install opentelemetry-instrumentation-fastapi
pip install opentelemetry-instrumentation-celery
pip install opentelemetry-exporter-otlp-proto-grpc
Enter fullscreen mode Exit fullscreen mode

Project Structure

order-tracing/
├── tracing.py        # Shared tracer setup
├── main.py           # FastAPI application
├── worker.py         # Celery worker
└── tasks.py          # Celery tasks
Enter fullscreen mode Exit fullscreen mode

Step 1: Setting Up the Shared Tracer

The most important design decision in this setup is that both FastAPI and Celery must share the same tracer configuration. They're separate processes, but they need to use the same trace exporter and the same propagator so span context can be serialized and deserialized correctly.

tracing.py

import os
from opentelemetry import trace, propagate
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.propagators.b3 import B3MultiFormat
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


def init_tracer(service_name: str) -> trace.Tracer:
    resource = Resource.create({
        SERVICE_NAME: service_name,
    })

    exporter = OTLPSpanExporter(
        endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"),
        insecure=True,
    )

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(exporter))

    trace.set_tracer_provider(provider)

    # Set a composite propagator that supports both W3C TraceContext and B3
    # W3C is the modern standard; B3 is common in older infrastructure
    propagate.set_global_textmap(
        CompositePropagator([
            TraceContextTextMapPropagator(),
            B3MultiFormat(),
        ])
    )

    return trace.get_tracer(service_name)
Enter fullscreen mode Exit fullscreen mode

The propagator setup here is worth pausing on. The propagator is responsible for serializing the trace context into a format that can be passed between processes — in our case, into Celery task headers — and deserializing it on the other side. Without a matching propagator on both ends, the context arrives as bytes that the worker doesn't know how to read.


Step 2: The FastAPI Application

main.py

import json
from fastapi import FastAPI, HTTPException
from opentelemetry import trace, propagate
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from pydantic import BaseModel

from tracing import init_tracer
from tasks import process_order_task

# Initialize tracer for the API service
tracer = init_tracer("order-api")

app = FastAPI()

# Auto-instrument FastAPI — this creates spans for every request automatically
FastAPIInstrumentor.instrument_app(app)


class OrderRequest(BaseModel):
    user_id: str
    product_id: str
    quantity: int


@app.post("/orders")
async def create_order(order: OrderRequest):
    # FastAPIInstrumentor has already started a span for this request
    # We can get the current span to add business context
    current_span = trace.get_current_span()
    current_span.set_attribute("order.user_id", order.user_id)
    current_span.set_attribute("order.product_id", order.product_id)
    current_span.set_attribute("order.quantity", order.quantity)

    with tracer.start_as_current_span("validate_order") as span:
        if order.quantity <= 0:
            span.set_attribute("validation.error", "invalid_quantity")
            raise HTTPException(status_code=400, detail="Quantity must be positive")
        span.set_attribute("validation.status", "passed")

    with tracer.start_as_current_span("save_order") as span:
        # Simulate saving to database
        order_id = f"order_{order.user_id}_{order.product_id}"
        span.set_attribute("order.id", order_id)
        span.set_attribute("db.operation", "insert")

    # This is the critical step: inject the current trace context into
    # the Celery task headers so the worker can continue the trace
    trace_context = {}
    propagate.inject(trace_context)

    # Pass the serialized trace context as part of the task
    process_order_task.apply_async(
        args=[order_id, order.dict()],
        headers={"trace_context": json.dumps(trace_context)},
    )

    return {"order_id": order_id, "status": "processing"}
Enter fullscreen mode Exit fullscreen mode

The propagate.inject(trace_context) call is doing the heavy lifting here. It takes the current span context — the trace ID, span ID, and sampling flags — and serializes it into the trace_context dictionary using whatever propagator format we configured. That dictionary then travels with the Celery task as a header.


Step 3: The Celery Worker

worker.py

from celery import Celery
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from tracing import init_tracer

# Initialize tracer for the worker service
# Note: same exporter endpoint, different service name
init_tracer("order-worker")

# Instrument Celery BEFORE creating the app instance
# Order matters here — instrumentation must happen before Celery initializes
CeleryInstrumentor().instrument()

celery_app = Celery(
    "order_worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_track_started=True,
)
Enter fullscreen mode Exit fullscreen mode

tasks.py

import json
import time
from opentelemetry import trace, propagate
from opentelemetry.trace import Status, StatusCode

from worker import celery_app

tracer = trace.get_tracer("order-worker")


@celery_app.task(bind=True, name="process_order")
def process_order_task(self, order_id: str, order_data: dict):
    # Extract trace context from task headers
    # This is how we reconnect to the parent trace from the API
    raw_context = self.request.headers.get("trace_context", "{}")
    carrier = json.loads(raw_context)

    # Deserialize the trace context using the same propagator
    parent_context = propagate.extract(carrier)

    # Start a new span as a child of the API request span
    with tracer.start_as_current_span(
        "process_order",
        context=parent_context,
        kind=trace.SpanKind.CONSUMER,
    ) as span:
        span.set_attribute("order.id", order_id)
        span.set_attribute("messaging.system", "celery")
        span.set_attribute("messaging.operation", "process")

        try:
            _send_confirmation_email(order_id, order_data)
            _update_inventory(order_data)

            span.set_attribute("order.status", "completed")
            span.set_status(Status(StatusCode.OK))

        except Exception as e:
            span.record_exception(e)
            span.set_attribute("order.status", "failed")
            span.set_status(Status(StatusCode.ERROR, str(e)))
            # Re-raise so Celery can handle retry logic
            raise


def _send_confirmation_email(order_id: str, order_data: dict):
    with tracer.start_as_current_span("send_confirmation_email") as span:
        span.set_attribute("email.recipient", order_data.get("user_id"))
        span.set_attribute("email.template", "order_confirmation")

        # Simulate email sending latency
        time.sleep(0.8)

        span.set_attribute("email.status", "sent")


def _update_inventory(order_data: dict):
    with tracer.start_as_current_span("update_inventory") as span:
        span.set_attribute("product.id", order_data.get("product_id"))
        span.set_attribute("inventory.delta", -order_data.get("quantity", 0))
        span.set_attribute("db.operation", "update")

        # Simulate database write
        time.sleep(0.4)

        span.set_attribute("inventory.status", "updated")
Enter fullscreen mode Exit fullscreen mode

The key line is context=parent_context in tracer.start_as_current_span. This tells OpenTelemetry to create the new span as a child of the span that was active in the FastAPI handler — even though that span is in a completely different process. Without this, OpenTelemetry would create a new root span, giving you the orphaned trace problem we started with.


Step 4: Running Everything

Start the OpenTelemetry Collector (or point directly at your backend):

docker run -p 4317:4317 otel/opentelemetry-collector-contrib
Enter fullscreen mode Exit fullscreen mode

Start the Celery worker:

celery -A worker worker --loglevel=info
Enter fullscreen mode Exit fullscreen mode

Start the FastAPI app:

uvicorn main:app --reload
Enter fullscreen mode Exit fullscreen mode

Send a test request:

curl -X POST http://localhost:8000/orders \
  -H "Content-Type: application/json" \
  -d '{"user_id": "u123", "product_id": "p456", "quantity": 2}'
Enter fullscreen mode Exit fullscreen mode

Open your observability backend and you should see a single trace with the full span hierarchy:

POST /orders — 1.25s
  ├── validate_order — 8ms
  ├── save_order — 35ms
  └── process_order (worker) — 1.2s
        ├── send_confirmation_email — 800ms
        └── update_inventory — 400ms
Enter fullscreen mode Exit fullscreen mode

Common Issues and How to Fix Them

Spans appear but Celery tasks are orphaned

Cause: The propagator on the API side doesn't match the propagator on the worker side.

Fix: Make sure tracing.py is imported by both main.py and worker.py, and that propagate.set_global_textmap is called before any spans are created. The easiest way to guarantee this is to call init_tracer at module import time, not inside a function.

CeleryInstrumentor creates duplicate spans

Cause: CeleryInstrumentor().instrument() was called after the Celery app was initialized.

Fix: Instrument before creating the Celery instance. The instrumentation patches Celery's signal system, which only works if the patch is applied before Celery sets up its internal event hooks.

Trace context arrives empty in the worker

Cause: Celery's default serializer strips unknown header fields.

Fix: Make sure task_serializer is set to "json" and accept_content includes "json". Also verify you're reading from self.request.headers and not self.request.kwargs — the context travels in headers, not task arguments.

Worker spans show a different trace ID

Cause: The context parameter was passed to start_as_current_span but the extracted context was empty (e.g. the header key name was wrong).

Fix: Log the raw carrier dict before calling propagate.extract to verify the context is arriving correctly:

raw_context = self.request.headers.get("trace_context", "{}")
carrier = json.loads(raw_context)
print(f"Received trace context: {carrier}")  # Should not be empty
parent_context = propagate.extract(carrier)
Enter fullscreen mode Exit fullscreen mode

Adding Retry Visibility

Celery's retry mechanism is another common observability blind spot. When a task retries, it creates a new execution — but without instrumentation, you can't tell from the trace how many times it tried.

@celery_app.task(bind=True, name="process_order", max_retries=3)
def process_order_task(self, order_id: str, order_data: dict):
    raw_context = self.request.headers.get("trace_context", "{}")
    carrier = json.loads(raw_context)
    parent_context = propagate.extract(carrier)

    with tracer.start_as_current_span(
        "process_order",
        context=parent_context,
        kind=trace.SpanKind.CONSUMER,
    ) as span:
        # Track retry count on every execution
        span.set_attribute("task.retry_count", self.request.retries)
        span.set_attribute("task.max_retries", self.max_retries)
        span.set_attribute("order.id", order_id)

        try:
            _send_confirmation_email(order_id, order_data)
            _update_inventory(order_data)

        except ExternalServiceError as e:
            span.set_attribute("task.will_retry", self.request.retries < self.max_retries)
            span.record_exception(e)

            # Exponential backoff
            raise self.retry(exc=e, countdown=2 ** self.request.retries)

        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            raise
Enter fullscreen mode Exit fullscreen mode

With task.retry_count on every span, you can filter your observability backend for tasks with retry_count > 0 and immediately see which operations are flaky — and how many retries it typically takes before they succeed or fail permanently.


What to Monitor

With this instrumentation in place, here are the metrics and queries that become useful:

End-to-end latency: Filter traces by POST /orders and look at the total duration. Spikes here could be API-side or worker-side — the span breakdown tells you which.

Worker queue depth: Combine trace latency data with Celery's built-in queue metrics. If API spans complete quickly but total trace duration is high, tasks are sitting in the queue longer than expected.

Task retry rate: Query for spans where task.retry_count > 0. A rising retry rate is often the first signal of a degrading downstream dependency.

Failed tasks by error type: Filter spans by status=ERROR on process_order spans and group by the exception type. This tells you whether failures are clustering around a specific integration (email, inventory) or spread across everything.


Summary

Connecting FastAPI and Celery in a single trace requires three things:

  • A shared tracer configuration imported by both processes
  • propagate.inject on the API side to serialize the trace context into task headers
  • propagate.extract on the worker side to deserialize it and pass it as the parent context

The rest is standard OpenTelemetry — spans, attributes, status codes. But without those three pieces in place, you're flying blind the moment work leaves your API process.

Async Python systems hand off work constantly — to queues, to background workers, to scheduled jobs. Trace propagation is what lets you follow that work all the way through, and debug the full picture when something goes wrong.


Find me on GitHub or LinkedIn.

Top comments (0)