DEV Community

Temitope
Temitope

Posted on

Error Handling Patterns for Python AI Pipelines: What to Catch, What to Retry, and What to Alert On

AI pipelines fail differently from regular software.

A web API fails predictably — the database is down, the network is unreachable, the input is invalid. You catch the exception, return an error response, and move on. The failure modes are finite and well-understood.

An AI pipeline fails in ways that standard exception handling wasn't designed for. The model returns a response that is structurally valid but semantically wrong. The output JSON is malformed because the model decided to add a comment inside it. The pipeline succeeds on 98% of inputs and silently produces garbage on the other 2%. The same input produces different outputs on different runs, making failures intermittent and hard to reproduce.

Worse, some failures aren't exceptions at all. They're successful API calls that returned something unusable.

This article builds a systematic approach to error handling in Python AI pipelines — categorizing failure modes, deciding what to catch versus retry versus alert on, and implementing the patterns that make pipelines debuggable in production.


The Four Categories of AI Pipeline Failures

Before writing any error handling code, it helps to categorize what can go wrong. AI pipeline failures fall into four distinct categories, each requiring a different response.

1. Infrastructure Failures

These are failures you'd see in any distributed system — network timeouts, rate limiting, service unavailability. They're transient by nature and almost always safe to retry.

# Infrastructure failures — retry these
openai.APITimeoutError
openai.APIConnectionError
openai.RateLimitError
anthropic.APIConnectionError
httpx.TimeoutException
Enter fullscreen mode Exit fullscreen mode

2. Input Failures

These failures happen because the input to the pipeline is invalid — too long, wrong format, contains content that triggers a safety filter. Retrying won't help because the same input will produce the same failure.

# Input failures — don't retry, fix the input
openai.BadRequestError      # Often: context too long
anthropic.BadRequestError
ContentFilterError          # Safety system triggered
TokenLimitExceededError     # Input exceeds model context window
Enter fullscreen mode Exit fullscreen mode

3. Output Failures

These are the most subtle. The API call succeeds, but the output isn't what you expected — malformed JSON, missing required fields, truncated response, wrong format. Standard exception handling misses these entirely because no exception was raised.

# Output failures — no exception raised, but response is unusable
# finish_reason == "length"    → response was cut off
# finish_reason == "content_filter" → output was filtered
# JSON parsing fails on response content
# Required fields missing from structured output
# Response is in wrong language
Enter fullscreen mode Exit fullscreen mode

4. Logic Failures

These are failures in how the pipeline processes the model's output — transformation errors, validation failures, downstream system errors triggered by bad model output. They live outside the LLM call itself.

# Logic failures — pipeline code failed processing model output
json.JSONDecodeError
pydantic.ValidationError
KeyError  # Expected field missing from model output
ValueError  # Model output failed business rule validation
Enter fullscreen mode Exit fullscreen mode

Setting Up the Error Handling Infrastructure

errors.py

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Any


class FailureCategory(Enum):
    INFRASTRUCTURE = "infrastructure"
    INPUT = "input"
    OUTPUT = "output"
    LOGIC = "logic"


class RetryStrategy(Enum):
    RETRY = "retry"           # Safe to retry immediately or with backoff
    NO_RETRY = "no_retry"     # Retrying won't help — fix the input
    ALERT = "alert"           # Needs human attention
    FALLBACK = "fallback"     # Use a fallback response


@dataclass
class PipelineError:
    """
    Structured representation of an AI pipeline failure.
    Carries enough context to make retry and alerting decisions.
    """
    category: FailureCategory
    retry_strategy: RetryStrategy
    message: str
    original_error: Optional[Exception] = None
    context: dict = field(default_factory=dict)
    recoverable: bool = True

    def should_retry(self) -> bool:
        return self.retry_strategy == RetryStrategy.RETRY

    def should_alert(self) -> bool:
        return self.retry_strategy == RetryStrategy.ALERT

    def should_fallback(self) -> bool:
        return self.retry_strategy == RetryStrategy.FALLBACK
Enter fullscreen mode Exit fullscreen mode

Step 1: Catching Infrastructure Failures

Infrastructure failures are the easiest to handle — they're transient, well-documented, and safe to retry with exponential backoff.

retry.py

import asyncio
import time
import structlog
from functools import wraps
from typing import Callable, TypeVar, Awaitable
from opentelemetry import trace

from errors import PipelineError, FailureCategory, RetryStrategy

logger = structlog.get_logger()
tracer = trace.get_tracer("pipeline-retry")

T = TypeVar("T")


def with_retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    retryable_errors: tuple = (),
):
    """
    Decorator that retries async functions with exponential backoff.
    Only retries on explicitly listed error types.
    """
    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            current_span = trace.get_current_span()
            last_error = None

            for attempt in range(max_attempts):
                try:
                    if attempt > 0:
                        delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
                        logger.info(
                            "retrying_after_delay",
                            attempt=attempt,
                            delay_seconds=delay,
                            function=func.__name__,
                        )
                        current_span.set_attribute("retry.attempt", attempt)
                        await asyncio.sleep(delay)

                    return await func(*args, **kwargs)

                except retryable_errors as e:
                    last_error = e
                    logger.warning(
                        "retryable_error",
                        attempt=attempt + 1,
                        max_attempts=max_attempts,
                        error_type=type(e).__name__,
                        function=func.__name__,
                    )
                    current_span.set_attribute("retry.count", attempt + 1)
                    continue

                except Exception:
                    # Non-retryable errors propagate immediately
                    raise

            # All retries exhausted
            logger.error(
                "all_retries_exhausted",
                max_attempts=max_attempts,
                function=func.__name__,
                error_type=type(last_error).__name__,
            )
            raise last_error

        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

Step 2: Handling Output Failures

Output failures require a different approach because they don't raise exceptions — you have to check the response explicitly.

output_validator.py

import json
from typing import Optional, Type, TypeVar
from pydantic import BaseModel, ValidationError
import structlog
from opentelemetry import trace

from errors import PipelineError, FailureCategory, RetryStrategy

logger = structlog.get_logger()
T = TypeVar("T", bound=BaseModel)


def check_finish_reason(finish_reason: str, model: str) -> Optional[PipelineError]:
    """
    Evaluate the model's finish reason and return a PipelineError if
    the response should not be used.
    """
    if finish_reason == "stop":
        return None  # Normal completion — no error

    if finish_reason == "length":
        return PipelineError(
            category=FailureCategory.OUTPUT,
            retry_strategy=RetryStrategy.FALLBACK,
            message="Response truncated by token limit",
            context={"finish_reason": finish_reason, "model": model},
            recoverable=False,
        )

    if finish_reason == "content_filter":
        return PipelineError(
            category=FailureCategory.INPUT,
            retry_strategy=RetryStrategy.ALERT,
            message="Response blocked by content filter",
            context={"finish_reason": finish_reason, "model": model},
            recoverable=False,
        )

    # tool_calls and function_call are valid non-stop reasons
    if finish_reason in ("tool_calls", "function_call"):
        return None

    # Unknown finish reason — log but don't fail
    logger.warning("unknown_finish_reason", finish_reason=finish_reason, model=model)
    return None


def parse_structured_output(
    content: str,
    output_model: Type[T],
    operation: str,
) -> tuple[Optional[T], Optional[PipelineError]]:
    """
    Parse and validate structured JSON output from an LLM.
    Returns (result, None) on success or (None, PipelineError) on failure.
    """
    span = trace.get_current_span()

    # Step 1: Extract JSON if wrapped in markdown code blocks
    # Models frequently wrap JSON in ```
{% endraw %}
json ...
{% raw %}
 ``` even when told not to
    content = content.strip()
    if content.startswith("```

"):
        lines = content.split("\n")
        # Remove first and last lines (the code fence markers)
        content = "\n".join(lines[1:-1] if lines[-1] == "

```" else lines[1:])

    # Step 2: Parse JSON
    try:
        raw_data = json.loads(content)
    except json.JSONDecodeError as e:
        span.set_attribute("output.parse_error", "json_decode")
        logger.warning(
            "json_parse_failed",
            operation=operation,
            error=str(e),
            content_preview=content[:200],
        )
        return None, PipelineError(
            category=FailureCategory.OUTPUT,
            retry_strategy=RetryStrategy.RETRY,
            message=f"Model returned invalid JSON: {str(e)}",
            original_error=e,
            context={"operation": operation},
            recoverable=True,
        )

    # Step 3: Validate against expected schema
    try:
        result = output_model.model_validate(raw_data)
        span.set_attribute("output.validation", "passed")
        return result, None

    except ValidationError as e:
        span.set_attribute("output.parse_error", "schema_validation")
        logger.warning(
            "schema_validation_failed",
            operation=operation,
            errors=e.errors(),
        )
        return None, PipelineError(
            category=FailureCategory.OUTPUT,
            retry_strategy=RetryStrategy.RETRY,
            message=f"Model output failed schema validation: {str(e)}",
            original_error=e,
            context={"operation": operation, "validation_errors": e.errors()},
            recoverable=True,
        )
Enter fullscreen mode Exit fullscreen mode

Step 3: A Complete Pipeline With Error Handling

Now let's put it together in a realistic pipeline that classifies support tickets and returns structured output.

pipeline.py

import os
from openai import AsyncOpenAI, RateLimitError, APITimeoutError, APIConnectionError, BadRequestError
from pydantic import BaseModel
from typing import Optional
import structlog
from opentelemetry import trace

from retry import with_retry
from output_validator import check_finish_reason, parse_structured_output
from errors import PipelineError, FailureCategory, RetryStrategy
from llm_tracer import llm_span, record_llm_response, record_llm_error

logger = structlog.get_logger()
tracer = trace.get_tracer("support-pipeline")
client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])


class TicketClassification(BaseModel):
    category: str
    urgency: str  # "low" | "medium" | "high" | "critical"
    sentiment: str  # "positive" | "neutral" | "negative" | "angry"
    requires_human: bool
    suggested_response: Optional[str] = None


class ClassificationResult(BaseModel):
    success: bool
    classification: Optional[TicketClassification] = None
    error: Optional[str] = None
    fallback_used: bool = False


RETRYABLE_OPENAI_ERRORS = (RateLimitError, APITimeoutError, APIConnectionError)


@with_retry(
    max_attempts=3,
    base_delay=1.0,
    retryable_errors=RETRYABLE_OPENAI_ERRORS,
)
async def _call_openai(messages: list, model: str) -> dict:
    """
    Raw OpenAI call with retry decoration.
    Separated from business logic so retries are clean.
    """
    response = await client.chat.completions.create(
        model=model,
        temperature=0.0,
        max_tokens=500,
        response_format={"type": "json_object"},
        messages=messages,
    )
    return response


async def classify_ticket(
    ticket_text: str,
    ticket_id: str,
) -> ClassificationResult:
    """
    Classify a support ticket with full error handling.
    Returns a ClassificationResult regardless of what goes wrong.
    """
    model = "gpt-4o-mini"
    log = logger.bind(ticket_id=ticket_id, model=model)

    with llm_span(
        model=model,
        operation="classify",
        feature="support_triage",
    ) as span:
        span.set_attribute("ticket.id", ticket_id)

        # Step 1: Check input length before calling the API
        # Avoids paying for a call that will fail with a context length error
        estimated_tokens = len(ticket_text.split()) * 1.3
        if estimated_tokens > 3000:
            log.warning("ticket_too_long", estimated_tokens=estimated_tokens)
            return ClassificationResult(
                success=False,
                error="Ticket too long for classification",
                fallback_used=True,
            )

        messages = [
            {
                "role": "system",
                "content": """Classify the support ticket. Return JSON with these exact fields:
{
  "category": "billing|technical|account|general",
  "urgency": "low|medium|high|critical",
  "sentiment": "positive|neutral|negative|angry",
  "requires_human": true|false,
  "suggested_response": "optional brief response suggestion or null"
}""",
            },
            {
                "role": "user",
                "content": ticket_text,
            },
        ]

        # Step 2: Call the API (with retry on infrastructure failures)
        try:
            response = await _call_openai(messages, model)

        except RETRYABLE_OPENAI_ERRORS as e:
            # All retries exhausted
            record_llm_error(span, e, error_type="infrastructure_exhausted")
            log.error("classification_failed_infrastructure", exc_info=True)
            return ClassificationResult(
                success=False,
                error=f"Service temporarily unavailable: {type(e).__name__}",
                fallback_used=True,
            )

        except BadRequestError as e:
            # Input error — don't retry
            record_llm_error(span, e, error_type="bad_request")
            log.warning("classification_bad_request", error=str(e))
            return ClassificationResult(
                success=False,
                error="Invalid request",
                fallback_used=False,
            )

        # Step 3: Check finish reason before trusting the output
        choice = response.choices[0]
        usage = response.usage

        record_llm_response(
            span=span,
            model=model,
            prompt_tokens=usage.prompt_tokens,
            completion_tokens=usage.completion_tokens,
            finish_reason=choice.finish_reason,
        )

        finish_error = check_finish_reason(choice.finish_reason, model)
        if finish_error:
            log.warning(
                "bad_finish_reason",
                finish_reason=choice.finish_reason,
                category=finish_error.category.value,
            )
            return ClassificationResult(
                success=False,
                error=finish_error.message,
                fallback_used=finish_error.should_fallback(),
            )

        # Step 4: Parse and validate the structured output
        classification, parse_error = parse_structured_output(
            content=choice.message.content,
            output_model=TicketClassification,
            operation="classify_ticket",
        )

        if parse_error:
            log.warning(
                "output_parse_failed",
                category=parse_error.category.value,
                message=parse_error.message,
            )
            # Output parsing failed — this is recoverable on retry
            # but we've already used our retries on the API call
            # so return a graceful failure
            return ClassificationResult(
                success=False,
                error=parse_error.message,
                fallback_used=True,
            )

        log.info(
            "ticket_classified",
            category=classification.category,
            urgency=classification.urgency,
            requires_human=classification.requires_human,
        )

        span.set_attributes({
            "ticket.category": classification.category,
            "ticket.urgency": classification.urgency,
            "ticket.requires_human": classification.requires_human,
        })

        return ClassificationResult(
            success=True,
            classification=classification,
        )
Enter fullscreen mode Exit fullscreen mode

Step 4: What to Alert On

Not every failure needs a human. Here's how to think about alerting thresholds:

Alert immediately:

  • content_filter finish reason — a prompt in your system may be triggering safety systems
  • infrastructure_exhausted errors exceeding 1% of requests — your retry budget is being consumed
  • Parse failures on structured output exceeding 5% — the model is producing malformed output consistently

Alert on trend, not on individual events:

  • Rising length finish reasons — prompts are growing and approaching token limits
  • Increasing retry counts — a dependency is degrading
  • Validation failure rate increasing — the model's output format may have drifted

Log but don't alert:

  • Single rate limit errors (retries handle these)
  • Individual JSON parse failures (occasional, expected)
  • Unknown finish reasons (rare edge cases)

monitoring.py

from opentelemetry import metrics

meter = metrics.get_meter("pipeline-metrics")

# Counters for tracking failure rates
pipeline_errors = meter.create_counter(
    "pipeline.errors",
    description="Count of pipeline errors by category and type",
)

pipeline_retries = meter.create_counter(
    "pipeline.retries",
    description="Count of retry attempts",
)

pipeline_fallbacks = meter.create_counter(
    "pipeline.fallbacks",
    description="Count of fallback responses served",
)


def record_pipeline_error(
    error: PipelineError,
    feature: str,
    model: str,
) -> None:
    pipeline_errors.add(1, {
        "error.category": error.category.value,
        "error.retry_strategy": error.retry_strategy.value,
        "feature": feature,
        "model": model,
    })

    if error.should_fallback():
        pipeline_fallbacks.add(1, {"feature": feature, "model": model})
Enter fullscreen mode Exit fullscreen mode

The Decision Tree

When an AI pipeline fails, this is how to decide what to do:

Exception raised?
├── Yes → Is it a known infrastructure error? (timeout, rate limit, connection)
│         ├── Yes → Retry with exponential backoff
│         └── No → Is it an input error? (too long, bad request)
│                  ├── Yes → Return error immediately, don't retry
│                  └── No → Log, alert, return graceful failure
│
└── No → Check finish_reason
          ├── "stop" → Parse and validate output
          │            ├── Valid → Return result
          │            └── Invalid → Retry if budget remains, else fallback
          ├── "length" → Response truncated → Fallback
          ├── "content_filter" → Alert, return graceful failure
          └── Other → Log, continue
Enter fullscreen mode Exit fullscreen mode

Summary

AI pipeline error handling requires explicit categorization before writing any catch blocks. The four categories — infrastructure, input, output, and logic — each have different retry strategies and alerting thresholds.

The patterns that matter most in production:

  • Retry infrastructure failures with exponential backoff, but only infrastructure failures
  • Check finish reason before trusting model output — a 200 response doesn't mean usable output
  • Validate structured output against a schema before using it downstream
  • Return graceful failures rather than propagating exceptions to users
  • Alert on rates, not individuals — single failures are noise, trends are signal

An AI pipeline that handles errors well degrades gracefully. Users get a fallback instead of a 500. Engineers get structured telemetry instead of a stack trace. And the system stays observable when something goes wrong at 2am.


Find me on GitHub or LinkedIn.

Top comments (0)