Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

December 2025 Update: Battle-tested patterns for handling LLM failures, implementing retries, and building resilient AI applications.

LLM Failure Modes

Think of an LLM API like a restaurant kitchen during a rush. Sometimes orders get backed up (rate limiting), sometimes the chef takes too long (timeouts), sometimes the dish comes out wrong (invalid responses), and occasionally the entire kitchen shuts down (service outages). Just as a great restaurant has contingency plans for each scenario, your application needs a strategy for every failure mode. Understanding how LLMs fail is crucial for building resilient systems:
Failure Type              Cause                    Handling Strategy
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Rate Limiting            Too many requests         Exponential backoff
Timeout                  Slow response             Timeout + retry
Invalid Response         Malformed output          Retry + validation
Content Filter           Policy violation          Modify prompt
Context Length           Input too long            Truncation
Service Outage           Provider down             Failover

Comprehensive Error Handling

Custom Exception Hierarchy

from enum import Enum
from typing import Optional, Any

class LLMErrorType(str, Enum):
    RATE_LIMIT = "rate_limit"
    TIMEOUT = "timeout"
    INVALID_RESPONSE = "invalid_response"
    CONTENT_FILTERED = "content_filtered"
    CONTEXT_LENGTH = "context_length"
    AUTHENTICATION = "authentication"
    PROVIDER_ERROR = "provider_error"
    UNKNOWN = "unknown"

class LLMError(Exception):
    """Base exception for LLM operations.
    
    Why a custom hierarchy instead of catching generic exceptions?
    Because each LLM failure type demands a different response strategy.
    A rate limit error means "slow down and retry," while a content
    filter error means "change your prompt, retrying won't help."
    Generic try/except loses that critical distinction.
    """
    
    def __init__(
        self,
        message: str,
        error_type: LLMErrorType = LLMErrorType.UNKNOWN,
        provider: str = None,
        model: str = None,
        # retryable flag lets calling code decide strategy without
        # inspecting the error type -- keeps retry logic generic
        retryable: bool = False,
        # retry_after comes from provider headers (e.g., OpenAI's
        # Retry-After) -- always prefer this over your own backoff
        retry_after: float = None,
        original_error: Exception = None
    ):
        super().__init__(message)
        self.error_type = error_type
        self.provider = provider
        self.model = model
        self.retryable = retryable
        self.retry_after = retry_after
        self.original_error = original_error
    
    def to_dict(self) -> dict:
        return {
            "message": str(self),
            "error_type": self.error_type.value,
            "provider": self.provider,
            "model": self.model,
            "retryable": self.retryable,
            "retry_after": self.retry_after
        }

class RateLimitError(LLMError):
    def __init__(self, message: str, retry_after: float = 60, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.RATE_LIMIT,
            retryable=True,
            retry_after=retry_after,
            **kwargs
        )

class TimeoutError(LLMError):
    def __init__(self, message: str, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.TIMEOUT,
            retryable=True,
            **kwargs
        )

class ContentFilterError(LLMError):
    def __init__(self, message: str, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.CONTENT_FILTERED,
            retryable=False,
            **kwargs
        )

class ContextLengthError(LLMError):
    def __init__(self, message: str, max_tokens: int = None, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.CONTEXT_LENGTH,
            retryable=False,
            **kwargs
        )
        self.max_tokens = max_tokens

Retry Strategies

Exponential Backoff with Jitter

import asyncio
import random
from typing import Callable, TypeVar, Optional
from functools import wraps
import logging

T = TypeVar('T')

class RetryConfig:
    def __init__(
        self,
        max_attempts: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        """Calculate delay for attempt number"""
        delay = self.base_delay * (self.exponential_base ** attempt)
        delay = min(delay, self.max_delay)
        
        if self.jitter:
            # Add random jitter (0.5 to 1.5 of calculated delay)
            delay = delay * (0.5 + random.random())
        
        return delay

async def retry_async(
    func: Callable[..., T],
    config: RetryConfig = None,
    retryable_errors: tuple = (RateLimitError, TimeoutError),
    on_retry: Callable[[Exception, int], None] = None
) -> Callable[..., T]:
    """Async retry decorator with exponential backoff"""
    
    config = config or RetryConfig()
    
    @wraps(func)
    async def wrapper(*args, **kwargs) -> T:
        last_exception = None
        
        for attempt in range(config.max_attempts):
            try:
                return await func(*args, **kwargs)
            
            except retryable_errors as e:
                last_exception = e
                
                # Check if error specifies retry_after
                delay = getattr(e, 'retry_after', None)
                if delay is None:
                    delay = config.get_delay(attempt)
                
                if attempt < config.max_attempts - 1:
                    if on_retry:
                        on_retry(e, attempt + 1)
                    
                    logging.warning(
                        f"Attempt {attempt + 1} failed: {e}. "
                        f"Retrying in {delay:.2f}s..."
                    )
                    await asyncio.sleep(delay)
            
            except Exception as e:
                # Non-retryable error
                raise
        
        raise last_exception
    
    return wrapper

# Usage with decorator pattern
def with_retry(
    config: RetryConfig = None,
    retryable_errors: tuple = (RateLimitError, TimeoutError)
):
    """Retry decorator factory"""
    config = config or RetryConfig()
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(config.max_attempts):
                try:
                    return await func(*args, **kwargs)
                except retryable_errors as e:
                    last_exception = e
                    
                    if attempt < config.max_attempts - 1:
                        delay = config.get_delay(attempt)
                        await asyncio.sleep(delay)
                except Exception:
                    raise
            
            raise last_exception
        
        return wrapper
    return decorator

# Example usage
@with_retry(RetryConfig(max_attempts=3, base_delay=2.0))
async def call_llm(prompt: str) -> str:
    # Your LLM call here
    pass

Circuit Breaker Pattern

Think of a circuit breaker like the fuse box in your house. When too much current flows through a circuit, the fuse trips to prevent a fire. Similarly, when an LLM provider starts failing repeatedly, you want to “trip” the breaker — stop sending requests that will inevitably fail, give the provider time to recover, and then cautiously test whether it is back. Without this, a single degraded provider can drag down your entire application as requests pile up waiting for timeouts. Prevent cascading failures with circuit breakers:
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional

class CircuitState(str, Enum):
    # CLOSED = normal traffic flows through (confusing name -- think
    # of a closed electrical circuit where current flows freely)
    CLOSED = "closed"
    # OPEN = broken circuit, requests are rejected immediately
    OPEN = "open"
    # HALF_OPEN = cautiously letting a few requests through to test recovery
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5      # Failures before tripping open
    success_threshold: int = 3      # Successes in half-open before closing
    timeout: float = 60.0           # Seconds to wait in OPEN before testing
    half_open_max_calls: int = 3    # Max test requests in half-open state

class CircuitBreaker:
    """Circuit breaker for LLM providers"""
    
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: float = 0
        self.half_open_calls = 0
    
    def can_execute(self) -> bool:
        """Check if request can proceed"""
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # Check if timeout has passed
            if time.time() - self.last_failure_time > self.config.timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False
        
        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.config.half_open_max_calls
        
        return False
    
    def record_success(self):
        """Record a successful call"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.config.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
        else:
            self.failure_count = 0
    
    def record_failure(self):
        """Record a failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.success_count = 0
        elif self.failure_count >= self.config.failure_threshold:
            self.state = CircuitState.OPEN
    
    def __call__(self, func: Callable) -> Callable:
        """Decorator for circuit breaker"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if not self.can_execute():
                raise LLMError(
                    f"Circuit breaker '{self.name}' is OPEN",
                    error_type=LLMErrorType.PROVIDER_ERROR,
                    retryable=True,
                    retry_after=self.config.timeout
                )
            
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_calls += 1
            
            try:
                result = await func(*args, **kwargs)
                self.record_success()
                return result
            except Exception as e:
                self.record_failure()
                raise
        
        return wrapper

# Usage
openai_breaker = CircuitBreaker("openai")
anthropic_breaker = CircuitBreaker("anthropic")

@openai_breaker
async def call_openai(prompt: str) -> str:
    # OpenAI API call
    pass

@anthropic_breaker
async def call_anthropic(prompt: str) -> str:
    # Anthropic API call
    pass

Provider Failover

Imagine you are booking a flight. Your first choice is a direct flight, but if that is sold out you check a connection through another hub, and if that fails you look at a completely different airline. Provider failover works the same way — you rank your LLM providers by preference (cost, quality, latency) and automatically cascade through them when one is unavailable. The key insight: a slightly worse response from a backup provider is almost always better than returning an error to the user.
from typing import List, Callable, Any
from dataclasses import dataclass
import logging

@dataclass
class Provider:
    name: str
    call_fn: Callable
    priority: int = 0           # Lower number = tried first
    circuit_breaker: CircuitBreaker = None
    enabled: bool = True        # Toggle without removing from the list

class ProviderFailover:
    """Failover between multiple LLM providers.
    
    Practical tip: keep your provider list in config, not code. That way
    ops can re-prioritize or disable a provider during an incident without
    a deploy.
    """
    
    def __init__(self, providers: List[Provider]):
        # Sort by priority (lower = higher priority)
        self.providers = sorted(providers, key=lambda p: p.priority)
    
    async def call(self, *args, **kwargs) -> Any:
        """Call providers with failover"""
        last_error = None
        
        for provider in self.providers:
            if not provider.enabled:
                continue
            
            # Check circuit breaker
            if provider.circuit_breaker:
                if not provider.circuit_breaker.can_execute():
                    logging.warning(f"Provider {provider.name} circuit is open")
                    continue
            
            try:
                logging.info(f"Trying provider: {provider.name}")
                result = await provider.call_fn(*args, **kwargs)
                
                if provider.circuit_breaker:
                    provider.circuit_breaker.record_success()
                
                return result
            
            except LLMError as e:
                last_error = e
                logging.warning(f"Provider {provider.name} failed: {e}")
                
                if provider.circuit_breaker:
                    provider.circuit_breaker.record_failure()
                
                # If not retryable, skip to next provider immediately
                if not e.retryable:
                    continue
            
            except Exception as e:
                last_error = e
                logging.error(f"Unexpected error from {provider.name}: {e}")
                
                if provider.circuit_breaker:
                    provider.circuit_breaker.record_failure()
        
        raise LLMError(
            f"All providers failed. Last error: {last_error}",
            error_type=LLMErrorType.PROVIDER_ERROR,
            retryable=False,
            original_error=last_error
        )

# Usage
failover = ProviderFailover([
    Provider(
        name="openai",
        call_fn=call_openai,
        priority=0,
        circuit_breaker=openai_breaker
    ),
    Provider(
        name="anthropic",
        call_fn=call_anthropic,
        priority=1,
        circuit_breaker=anthropic_breaker
    ),
    Provider(
        name="together",
        call_fn=call_together,
        priority=2
    )
])

result = await failover.call(prompt="What is AI?")

Response Validation

LLMs are probabilistic — even when you ask for JSON, you might get markdown, a preamble, or a partially valid structure. Response validation is the seatbelt you never skip. The pattern here is “validate, then re-prompt”: if the response does not match your schema, feed the validation error back to the LLM and ask it to fix its output. Most models self-correct within one or two retries.
from pydantic import BaseModel, ValidationError
from typing import Type, Optional
import json

class ResponseValidator:
    """Validate LLM responses against a Pydantic schema, with auto-retry.
    
    Practical tip: always set a max_retries cap. Without it, a pathological
    prompt can burn through your budget in a retry loop.
    """
    
    def __init__(
        self,
        max_retries: int = 3,
        on_validation_error: Callable = None
    ):
        self.max_retries = max_retries
        self.on_validation_error = on_validation_error
    
    async def validate_json(
        self,
        llm_call: Callable,
        schema: Type[BaseModel],
        prompt: str
    ) -> BaseModel:
        """Validate JSON response against Pydantic schema"""
        
        validation_prompt = prompt
        
        for attempt in range(self.max_retries):
            response = await llm_call(validation_prompt)
            
            try:
                # Try to parse JSON
                json_str = self._extract_json(response)
                data = json.loads(json_str)
                
                # Validate against schema
                return schema.model_validate(data)
            
            except json.JSONDecodeError as e:
                if attempt < self.max_retries - 1:
                    validation_prompt = f"""
Your previous response was not valid JSON.
Error: {e}

Original request: {prompt}

Please respond with ONLY valid JSON, no markdown or explanation.
"""
                else:
                    raise LLMError(
                        f"Failed to get valid JSON after {self.max_retries} attempts",
                        error_type=LLMErrorType.INVALID_RESPONSE
                    )
            
            except ValidationError as e:
                if attempt < self.max_retries - 1:
                    validation_prompt = f"""
Your previous JSON response didn't match the required schema.
Validation errors: {e.errors()}

Original request: {prompt}

Please fix the JSON to match the required schema.
"""
                else:
                    raise LLMError(
                        f"Schema validation failed after {self.max_retries} attempts",
                        error_type=LLMErrorType.INVALID_RESPONSE
                    )
    
    def _extract_json(self, text: str) -> str:
        """Extract JSON from text that might include markdown"""
        text = text.strip()
        
        # Remove markdown code blocks
        if text.startswith("```"):
            lines = text.split("\n")
            lines = lines[1:-1] if lines[-1] == "```" else lines[1:]
            text = "\n".join(lines)
        
        # Find JSON object or array
        start_chars = ['{', '[']
        for char in start_chars:
            if char in text:
                start = text.index(char)
                end_char = '}' if char == '{' else ']'
                
                # Find matching end
                depth = 0
                for i, c in enumerate(text[start:], start):
                    if c == char:
                        depth += 1
                    elif c == end_char:
                        depth -= 1
                        if depth == 0:
                            return text[start:i+1]
        
        return text

Timeout Management

LLM calls are unpredictable in latency. A simple summarization might return in 2 seconds one day and 45 seconds the next. Without explicit timeouts, a single slow request can hold an async worker hostage, and if you have a fixed worker pool, a few slow calls can starve every other user. Think of timeouts as a contract with your users: “I promise an answer within N seconds, or I’ll tell you I couldn’t get one.”
import asyncio
from contextlib import asynccontextmanager

class TimeoutManager:
    """Manage timeouts for LLM calls.
    
    Practical tip: use different timeouts for streaming vs non-streaming.
    Streaming calls should get a longer total timeout because they
    produce tokens gradually, but you should also set a *first-token*
    timeout to catch connections that never start producing.
    """
    
    def __init__(
        self,
        default_timeout: float = 30.0,
        streaming_timeout: float = 120.0
    ):
        self.default_timeout = default_timeout
        self.streaming_timeout = streaming_timeout
    
    @asynccontextmanager
    async def timeout(self, seconds: float = None):
        """Context manager for timeout"""
        timeout_seconds = seconds or self.default_timeout
        
        try:
            yield asyncio.timeout(timeout_seconds)
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Operation timed out after {timeout_seconds}s"
            )
    
    async def with_timeout(
        self,
        coro,
        timeout: float = None
    ):
        """Execute coroutine with timeout"""
        timeout = timeout or self.default_timeout
        
        try:
            return await asyncio.wait_for(coro, timeout=timeout)
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Operation timed out after {timeout}s"
            )

# Usage
timeout_manager = TimeoutManager(default_timeout=30.0)

async def call_with_timeout(prompt: str) -> str:
    return await timeout_manager.with_timeout(
        call_llm(prompt),
        timeout=45.0
    )

Graceful Degradation

Graceful degradation is the difference between Netflix showing you a slightly stale recommendation versus a blank screen. When your primary LLM path fails, you want to cascade through options: try the cache for a recent answer, try a simpler prompt that is more likely to succeed, and as a last resort, return a pre-written fallback. The user should always get something useful. The tuple (response, source) pattern below makes it easy to track how often you are degrading, which is a key operational health metric.
from dataclasses import dataclass
from typing import Optional, Callable, Any

@dataclass
class DegradationConfig:
    fallback_response: str = None       # Pre-written static response
    fallback_model: str = None          # Cheaper/faster backup model
    cache_fallback: bool = True         # Try cache before giving up
    simplified_prompt: str = None       # Stripped-down prompt that is easier to answer

class GracefulDegrader:
    """Handle degradation gracefully.
    
    Returns (response, source) so you can log degradation events
    and alert if the cache or fallback ratio spikes.
    """
    
    def __init__(
        self,
        primary_fn: Callable,
        cache_client = None,
        config: DegradationConfig = None
    ):
        self.primary_fn = primary_fn
        self.cache = cache_client
        self.config = config or DegradationConfig()
    
    async def call(
        self,
        prompt: str,
        cache_key: str = None,
        **kwargs
    ) -> tuple[str, str]:
        """
        Call with graceful degradation
        Returns: (response, source) where source is 'primary', 'cache', or 'fallback'
        """
        
        try:
            result = await self.primary_fn(prompt, **kwargs)
            
            # Cache successful response
            if self.cache and cache_key:
                await self.cache.set(cache_key, result)
            
            return result, "primary"
        
        except LLMError as e:
            logging.warning(f"Primary call failed: {e}")
            
            # Try cache fallback
            if self.config.cache_fallback and self.cache and cache_key:
                cached = await self.cache.get(cache_key)
                if cached:
                    logging.info("Using cached response")
                    return cached, "cache"
            
            # Try simplified prompt
            if self.config.simplified_prompt:
                try:
                    result = await self.primary_fn(
                        self.config.simplified_prompt.format(
                            original_prompt=prompt
                        ),
                        **kwargs
                    )
                    return result, "simplified"
                except Exception:
                    pass
            
            # Use fallback response
            if self.config.fallback_response:
                return self.config.fallback_response, "fallback"
            
            raise

# Usage
degrader = GracefulDegrader(
    primary_fn=call_llm,
    cache_client=redis_client,
    config=DegradationConfig(
        fallback_response="I'm having trouble processing your request. Please try again.",
        cache_fallback=True
    )
)

response, source = await degrader.call(
    prompt="Explain quantum computing",
    cache_key="quantum_computing_explanation"
)
print(f"Response from: {source}")

Unified Error Handler

Your API consumers should never see raw OpenAI or Anthropic error messages. This handler translates internal LLM errors into clean, predictable HTTP responses with appropriate status codes. The mapping is deliberate: a rate limit from the provider becomes a 429 to the client so their retry logic kicks in, while a content filter violation becomes a 422 so they know to fix the input rather than retry.
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import logging

app = FastAPI()

@app.exception_handler(LLMError)
async def llm_error_handler(request: Request, exc: LLMError):
    """Translate internal LLM errors into standard HTTP responses.
    
    Practical tip: include the retryable flag and retry_after header
    in your error response. Well-behaved clients will use these to
    back off automatically, saving you from thundering-herd retries.
    """
    
    # Log error details
    logging.error(
        f"LLM Error: {exc.error_type.value}",
        extra={
            "error_type": exc.error_type.value,
            "provider": exc.provider,
            "model": exc.model,
            "retryable": exc.retryable,
            "path": request.url.path
        }
    )
    
    # Map to HTTP status codes
    status_codes = {
        LLMErrorType.RATE_LIMIT: 429,
        LLMErrorType.TIMEOUT: 504,
        LLMErrorType.INVALID_RESPONSE: 502,
        LLMErrorType.CONTENT_FILTERED: 422,
        LLMErrorType.CONTEXT_LENGTH: 413,
        LLMErrorType.AUTHENTICATION: 401,
        LLMErrorType.PROVIDER_ERROR: 503,
        LLMErrorType.UNKNOWN: 500
    }
    
    status_code = status_codes.get(exc.error_type, 500)
    
    # Build response
    response = {
        "error": {
            "type": exc.error_type.value,
            "message": str(exc),
            "retryable": exc.retryable
        }
    }
    
    if exc.retry_after:
        response["error"]["retry_after"] = exc.retry_after
    
    return JSONResponse(
        status_code=status_code,
        content=response,
        headers={
            "Retry-After": str(int(exc.retry_after)) if exc.retry_after else None
        }
    )

Key Takeaways

Expect Failures

Design for failure from the start with proper exception handling

Implement Retries

Use exponential backoff with jitter for transient failures

Circuit Breakers

Prevent cascading failures with circuit breakers

Graceful Degradation

Always have fallback options for critical paths

What’s Next

Batch Processing

Learn to process large volumes of LLM requests efficiently