Most observability guides stop at the HTTP layer.
You add OpenTelemetry to your FastAPI app, traces start showing up in your backend, and everything looks great — until you realize that the moment a request hands off work to a Celery worker, the trace disappears. The worker runs in a completely separate process. The span context doesn't travel with the task. What should be one unified trace becomes two unrelated fragments.
This is one of the most common observability gaps in Python backend systems, and it's surprisingly underserved in the documentation. In this article, we'll close it.
We'll instrument a FastAPI application and a Celery worker so that every task a request spawns appears as a child span in the same trace — giving you end-to-end visibility from HTTP request to background job completion.
What We're Building
A simple order processing system:
POST /orders
│
├── FastAPI handler (validates, saves order)
│
└── Celery task (sends confirmation email, updates inventory)
Without proper trace propagation, you'd see:
Trace A: POST /orders (FastAPI) — 45ms
Trace B: process_order (Celery) — 1.2s ← orphaned, no parent
With it, you'd see:
Trace A: POST /orders — 1.25s total
├── validate_order — 8ms
├── save_order — 35ms
└── process_order (Celery worker) — 1.2s
├── send_confirmation_email — 800ms
└── update_inventory — 400ms
That second view is what we're building towards.
Prerequisites
- Python 3.10+
- A running Redis instance (for Celery's broker)
- Basic familiarity with FastAPI and Celery
Installing Dependencies
pip install fastapi uvicorn celery redis
pip install opentelemetry-sdk
pip install opentelemetry-api
pip install opentelemetry-instrumentation-fastapi
pip install opentelemetry-instrumentation-celery
pip install opentelemetry-exporter-otlp-proto-grpc
Project Structure
order-tracing/
├── tracing.py # Shared tracer setup
├── main.py # FastAPI application
├── worker.py # Celery worker
└── tasks.py # Celery tasks
Step 1: Setting Up the Shared Tracer
The most important design decision in this setup is that both FastAPI and Celery must share the same tracer configuration. They're separate processes, but they need to use the same trace exporter and the same propagator so span context can be serialized and deserialized correctly.
tracing.py
import os
from opentelemetry import trace, propagate
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.propagators.b3 import B3MultiFormat
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
def init_tracer(service_name: str) -> trace.Tracer:
resource = Resource.create({
SERVICE_NAME: service_name,
})
exporter = OTLPSpanExporter(
endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"),
insecure=True,
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
# Set a composite propagator that supports both W3C TraceContext and B3
# W3C is the modern standard; B3 is common in older infrastructure
propagate.set_global_textmap(
CompositePropagator([
TraceContextTextMapPropagator(),
B3MultiFormat(),
])
)
return trace.get_tracer(service_name)
The propagator setup here is worth pausing on. The propagator is responsible for serializing the trace context into a format that can be passed between processes — in our case, into Celery task headers — and deserializing it on the other side. Without a matching propagator on both ends, the context arrives as bytes that the worker doesn't know how to read.
Step 2: The FastAPI Application
main.py
import json
from fastapi import FastAPI, HTTPException
from opentelemetry import trace, propagate
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from pydantic import BaseModel
from tracing import init_tracer
from tasks import process_order_task
# Initialize tracer for the API service
tracer = init_tracer("order-api")
app = FastAPI()
# Auto-instrument FastAPI — this creates spans for every request automatically
FastAPIInstrumentor.instrument_app(app)
class OrderRequest(BaseModel):
user_id: str
product_id: str
quantity: int
@app.post("/orders")
async def create_order(order: OrderRequest):
# FastAPIInstrumentor has already started a span for this request
# We can get the current span to add business context
current_span = trace.get_current_span()
current_span.set_attribute("order.user_id", order.user_id)
current_span.set_attribute("order.product_id", order.product_id)
current_span.set_attribute("order.quantity", order.quantity)
with tracer.start_as_current_span("validate_order") as span:
if order.quantity <= 0:
span.set_attribute("validation.error", "invalid_quantity")
raise HTTPException(status_code=400, detail="Quantity must be positive")
span.set_attribute("validation.status", "passed")
with tracer.start_as_current_span("save_order") as span:
# Simulate saving to database
order_id = f"order_{order.user_id}_{order.product_id}"
span.set_attribute("order.id", order_id)
span.set_attribute("db.operation", "insert")
# This is the critical step: inject the current trace context into
# the Celery task headers so the worker can continue the trace
trace_context = {}
propagate.inject(trace_context)
# Pass the serialized trace context as part of the task
process_order_task.apply_async(
args=[order_id, order.dict()],
headers={"trace_context": json.dumps(trace_context)},
)
return {"order_id": order_id, "status": "processing"}
The propagate.inject(trace_context) call is doing the heavy lifting here. It takes the current span context — the trace ID, span ID, and sampling flags — and serializes it into the trace_context dictionary using whatever propagator format we configured. That dictionary then travels with the Celery task as a header.
Step 3: The Celery Worker
worker.py
from celery import Celery
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from tracing import init_tracer
# Initialize tracer for the worker service
# Note: same exporter endpoint, different service name
init_tracer("order-worker")
# Instrument Celery BEFORE creating the app instance
# Order matters here — instrumentation must happen before Celery initializes
CeleryInstrumentor().instrument()
celery_app = Celery(
"order_worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_track_started=True,
)
tasks.py
import json
import time
from opentelemetry import trace, propagate
from opentelemetry.trace import Status, StatusCode
from worker import celery_app
tracer = trace.get_tracer("order-worker")
@celery_app.task(bind=True, name="process_order")
def process_order_task(self, order_id: str, order_data: dict):
# Extract trace context from task headers
# This is how we reconnect to the parent trace from the API
raw_context = self.request.headers.get("trace_context", "{}")
carrier = json.loads(raw_context)
# Deserialize the trace context using the same propagator
parent_context = propagate.extract(carrier)
# Start a new span as a child of the API request span
with tracer.start_as_current_span(
"process_order",
context=parent_context,
kind=trace.SpanKind.CONSUMER,
) as span:
span.set_attribute("order.id", order_id)
span.set_attribute("messaging.system", "celery")
span.set_attribute("messaging.operation", "process")
try:
_send_confirmation_email(order_id, order_data)
_update_inventory(order_data)
span.set_attribute("order.status", "completed")
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.record_exception(e)
span.set_attribute("order.status", "failed")
span.set_status(Status(StatusCode.ERROR, str(e)))
# Re-raise so Celery can handle retry logic
raise
def _send_confirmation_email(order_id: str, order_data: dict):
with tracer.start_as_current_span("send_confirmation_email") as span:
span.set_attribute("email.recipient", order_data.get("user_id"))
span.set_attribute("email.template", "order_confirmation")
# Simulate email sending latency
time.sleep(0.8)
span.set_attribute("email.status", "sent")
def _update_inventory(order_data: dict):
with tracer.start_as_current_span("update_inventory") as span:
span.set_attribute("product.id", order_data.get("product_id"))
span.set_attribute("inventory.delta", -order_data.get("quantity", 0))
span.set_attribute("db.operation", "update")
# Simulate database write
time.sleep(0.4)
span.set_attribute("inventory.status", "updated")
The key line is context=parent_context in tracer.start_as_current_span. This tells OpenTelemetry to create the new span as a child of the span that was active in the FastAPI handler — even though that span is in a completely different process. Without this, OpenTelemetry would create a new root span, giving you the orphaned trace problem we started with.
Step 4: Running Everything
Start the OpenTelemetry Collector (or point directly at your backend):
docker run -p 4317:4317 otel/opentelemetry-collector-contrib
Start the Celery worker:
celery -A worker worker --loglevel=info
Start the FastAPI app:
uvicorn main:app --reload
Send a test request:
curl -X POST http://localhost:8000/orders \
-H "Content-Type: application/json" \
-d '{"user_id": "u123", "product_id": "p456", "quantity": 2}'
Open your observability backend and you should see a single trace with the full span hierarchy:
POST /orders — 1.25s
├── validate_order — 8ms
├── save_order — 35ms
└── process_order (worker) — 1.2s
├── send_confirmation_email — 800ms
└── update_inventory — 400ms
Common Issues and How to Fix Them
Spans appear but Celery tasks are orphaned
Cause: The propagator on the API side doesn't match the propagator on the worker side.
Fix: Make sure tracing.py is imported by both main.py and worker.py, and that propagate.set_global_textmap is called before any spans are created. The easiest way to guarantee this is to call init_tracer at module import time, not inside a function.
CeleryInstrumentor creates duplicate spans
Cause: CeleryInstrumentor().instrument() was called after the Celery app was initialized.
Fix: Instrument before creating the Celery instance. The instrumentation patches Celery's signal system, which only works if the patch is applied before Celery sets up its internal event hooks.
Trace context arrives empty in the worker
Cause: Celery's default serializer strips unknown header fields.
Fix: Make sure task_serializer is set to "json" and accept_content includes "json". Also verify you're reading from self.request.headers and not self.request.kwargs — the context travels in headers, not task arguments.
Worker spans show a different trace ID
Cause: The context parameter was passed to start_as_current_span but the extracted context was empty (e.g. the header key name was wrong).
Fix: Log the raw carrier dict before calling propagate.extract to verify the context is arriving correctly:
raw_context = self.request.headers.get("trace_context", "{}")
carrier = json.loads(raw_context)
print(f"Received trace context: {carrier}") # Should not be empty
parent_context = propagate.extract(carrier)
Adding Retry Visibility
Celery's retry mechanism is another common observability blind spot. When a task retries, it creates a new execution — but without instrumentation, you can't tell from the trace how many times it tried.
@celery_app.task(bind=True, name="process_order", max_retries=3)
def process_order_task(self, order_id: str, order_data: dict):
raw_context = self.request.headers.get("trace_context", "{}")
carrier = json.loads(raw_context)
parent_context = propagate.extract(carrier)
with tracer.start_as_current_span(
"process_order",
context=parent_context,
kind=trace.SpanKind.CONSUMER,
) as span:
# Track retry count on every execution
span.set_attribute("task.retry_count", self.request.retries)
span.set_attribute("task.max_retries", self.max_retries)
span.set_attribute("order.id", order_id)
try:
_send_confirmation_email(order_id, order_data)
_update_inventory(order_data)
except ExternalServiceError as e:
span.set_attribute("task.will_retry", self.request.retries < self.max_retries)
span.record_exception(e)
# Exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries)
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
With task.retry_count on every span, you can filter your observability backend for tasks with retry_count > 0 and immediately see which operations are flaky — and how many retries it typically takes before they succeed or fail permanently.
What to Monitor
With this instrumentation in place, here are the metrics and queries that become useful:
End-to-end latency: Filter traces by POST /orders and look at the total duration. Spikes here could be API-side or worker-side — the span breakdown tells you which.
Worker queue depth: Combine trace latency data with Celery's built-in queue metrics. If API spans complete quickly but total trace duration is high, tasks are sitting in the queue longer than expected.
Task retry rate: Query for spans where task.retry_count > 0. A rising retry rate is often the first signal of a degrading downstream dependency.
Failed tasks by error type: Filter spans by status=ERROR on process_order spans and group by the exception type. This tells you whether failures are clustering around a specific integration (email, inventory) or spread across everything.
Summary
Connecting FastAPI and Celery in a single trace requires three things:
- A shared tracer configuration imported by both processes
-
propagate.injecton the API side to serialize the trace context into task headers -
propagate.extracton the worker side to deserialize it and pass it as the parent context
The rest is standard OpenTelemetry — spans, attributes, status codes. But without those three pieces in place, you're flying blind the moment work leaves your API process.
Async Python systems hand off work constantly — to queues, to background workers, to scheduled jobs. Trace propagation is what lets you follow that work all the way through, and debug the full picture when something goes wrong.
Top comments (0)