Skip to main content
December 2025 Update: Battle-tested patterns for handling LLM failures, implementing retries, and building resilient AI applications.

LLM Failure Modes

Understanding how LLMs fail is crucial for building resilient systems:
Failure Type              Cause                    Handling Strategy
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Rate Limiting            Too many requests         Exponential backoff
Timeout                  Slow response             Timeout + retry
Invalid Response         Malformed output          Retry + validation
Content Filter           Policy violation          Modify prompt
Context Length           Input too long            Truncation
Service Outage           Provider down             Failover

Comprehensive Error Handling

Custom Exception Hierarchy

from enum import Enum
from typing import Optional, Any

class LLMErrorType(str, Enum):
    RATE_LIMIT = "rate_limit"
    TIMEOUT = "timeout"
    INVALID_RESPONSE = "invalid_response"
    CONTENT_FILTERED = "content_filtered"
    CONTEXT_LENGTH = "context_length"
    AUTHENTICATION = "authentication"
    PROVIDER_ERROR = "provider_error"
    UNKNOWN = "unknown"

class LLMError(Exception):
    """Base exception for LLM operations"""
    
    def __init__(
        self,
        message: str,
        error_type: LLMErrorType = LLMErrorType.UNKNOWN,
        provider: str = None,
        model: str = None,
        retryable: bool = False,
        retry_after: float = None,
        original_error: Exception = None
    ):
        super().__init__(message)
        self.error_type = error_type
        self.provider = provider
        self.model = model
        self.retryable = retryable
        self.retry_after = retry_after
        self.original_error = original_error
    
    def to_dict(self) -> dict:
        return {
            "message": str(self),
            "error_type": self.error_type.value,
            "provider": self.provider,
            "model": self.model,
            "retryable": self.retryable,
            "retry_after": self.retry_after
        }

class RateLimitError(LLMError):
    def __init__(self, message: str, retry_after: float = 60, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.RATE_LIMIT,
            retryable=True,
            retry_after=retry_after,
            **kwargs
        )

class TimeoutError(LLMError):
    def __init__(self, message: str, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.TIMEOUT,
            retryable=True,
            **kwargs
        )

class ContentFilterError(LLMError):
    def __init__(self, message: str, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.CONTENT_FILTERED,
            retryable=False,
            **kwargs
        )

class ContextLengthError(LLMError):
    def __init__(self, message: str, max_tokens: int = None, **kwargs):
        super().__init__(
            message,
            error_type=LLMErrorType.CONTEXT_LENGTH,
            retryable=False,
            **kwargs
        )
        self.max_tokens = max_tokens

Retry Strategies

Exponential Backoff with Jitter

import asyncio
import random
from typing import Callable, TypeVar, Optional
from functools import wraps
import logging

T = TypeVar('T')

class RetryConfig:
    def __init__(
        self,
        max_attempts: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        """Calculate delay for attempt number"""
        delay = self.base_delay * (self.exponential_base ** attempt)
        delay = min(delay, self.max_delay)
        
        if self.jitter:
            # Add random jitter (0.5 to 1.5 of calculated delay)
            delay = delay * (0.5 + random.random())
        
        return delay

async def retry_async(
    func: Callable[..., T],
    config: RetryConfig = None,
    retryable_errors: tuple = (RateLimitError, TimeoutError),
    on_retry: Callable[[Exception, int], None] = None
) -> Callable[..., T]:
    """Async retry decorator with exponential backoff"""
    
    config = config or RetryConfig()
    
    @wraps(func)
    async def wrapper(*args, **kwargs) -> T:
        last_exception = None
        
        for attempt in range(config.max_attempts):
            try:
                return await func(*args, **kwargs)
            
            except retryable_errors as e:
                last_exception = e
                
                # Check if error specifies retry_after
                delay = getattr(e, 'retry_after', None)
                if delay is None:
                    delay = config.get_delay(attempt)
                
                if attempt < config.max_attempts - 1:
                    if on_retry:
                        on_retry(e, attempt + 1)
                    
                    logging.warning(
                        f"Attempt {attempt + 1} failed: {e}. "
                        f"Retrying in {delay:.2f}s..."
                    )
                    await asyncio.sleep(delay)
            
            except Exception as e:
                # Non-retryable error
                raise
        
        raise last_exception
    
    return wrapper

# Usage with decorator pattern
def with_retry(
    config: RetryConfig = None,
    retryable_errors: tuple = (RateLimitError, TimeoutError)
):
    """Retry decorator factory"""
    config = config or RetryConfig()
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(config.max_attempts):
                try:
                    return await func(*args, **kwargs)
                except retryable_errors as e:
                    last_exception = e
                    
                    if attempt < config.max_attempts - 1:
                        delay = config.get_delay(attempt)
                        await asyncio.sleep(delay)
                except Exception:
                    raise
            
            raise last_exception
        
        return wrapper
    return decorator

# Example usage
@with_retry(RetryConfig(max_attempts=3, base_delay=2.0))
async def call_llm(prompt: str) -> str:
    # Your LLM call here
    pass

Circuit Breaker Pattern

Prevent cascading failures with circuit breakers:
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional

class CircuitState(str, Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    success_threshold: int = 3
    timeout: float = 60.0  # Time in OPEN state before testing
    half_open_max_calls: int = 3

class CircuitBreaker:
    """Circuit breaker for LLM providers"""
    
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: float = 0
        self.half_open_calls = 0
    
    def can_execute(self) -> bool:
        """Check if request can proceed"""
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # Check if timeout has passed
            if time.time() - self.last_failure_time > self.config.timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False
        
        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.config.half_open_max_calls
        
        return False
    
    def record_success(self):
        """Record a successful call"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.config.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
        else:
            self.failure_count = 0
    
    def record_failure(self):
        """Record a failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.success_count = 0
        elif self.failure_count >= self.config.failure_threshold:
            self.state = CircuitState.OPEN
    
    def __call__(self, func: Callable) -> Callable:
        """Decorator for circuit breaker"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if not self.can_execute():
                raise LLMError(
                    f"Circuit breaker '{self.name}' is OPEN",
                    error_type=LLMErrorType.PROVIDER_ERROR,
                    retryable=True,
                    retry_after=self.config.timeout
                )
            
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_calls += 1
            
            try:
                result = await func(*args, **kwargs)
                self.record_success()
                return result
            except Exception as e:
                self.record_failure()
                raise
        
        return wrapper

# Usage
openai_breaker = CircuitBreaker("openai")
anthropic_breaker = CircuitBreaker("anthropic")

@openai_breaker
async def call_openai(prompt: str) -> str:
    # OpenAI API call
    pass

@anthropic_breaker
async def call_anthropic(prompt: str) -> str:
    # Anthropic API call
    pass

Provider Failover

from typing import List, Callable, Any
from dataclasses import dataclass
import logging

@dataclass
class Provider:
    name: str
    call_fn: Callable
    priority: int = 0
    circuit_breaker: CircuitBreaker = None
    enabled: bool = True

class ProviderFailover:
    """Failover between multiple LLM providers"""
    
    def __init__(self, providers: List[Provider]):
        # Sort by priority (lower = higher priority)
        self.providers = sorted(providers, key=lambda p: p.priority)
    
    async def call(self, *args, **kwargs) -> Any:
        """Call providers with failover"""
        last_error = None
        
        for provider in self.providers:
            if not provider.enabled:
                continue
            
            # Check circuit breaker
            if provider.circuit_breaker:
                if not provider.circuit_breaker.can_execute():
                    logging.warning(f"Provider {provider.name} circuit is open")
                    continue
            
            try:
                logging.info(f"Trying provider: {provider.name}")
                result = await provider.call_fn(*args, **kwargs)
                
                if provider.circuit_breaker:
                    provider.circuit_breaker.record_success()
                
                return result
            
            except LLMError as e:
                last_error = e
                logging.warning(f"Provider {provider.name} failed: {e}")
                
                if provider.circuit_breaker:
                    provider.circuit_breaker.record_failure()
                
                # If not retryable, skip to next provider immediately
                if not e.retryable:
                    continue
            
            except Exception as e:
                last_error = e
                logging.error(f"Unexpected error from {provider.name}: {e}")
                
                if provider.circuit_breaker:
                    provider.circuit_breaker.record_failure()
        
        raise LLMError(
            f"All providers failed. Last error: {last_error}",
            error_type=LLMErrorType.PROVIDER_ERROR,
            retryable=False,
            original_error=last_error
        )

# Usage
failover = ProviderFailover([
    Provider(
        name="openai",
        call_fn=call_openai,
        priority=0,
        circuit_breaker=openai_breaker
    ),
    Provider(
        name="anthropic",
        call_fn=call_anthropic,
        priority=1,
        circuit_breaker=anthropic_breaker
    ),
    Provider(
        name="together",
        call_fn=call_together,
        priority=2
    )
])

result = await failover.call(prompt="What is AI?")

Response Validation

from pydantic import BaseModel, ValidationError
from typing import Type, Optional
import json

class ResponseValidator:
    """Validate LLM responses"""
    
    def __init__(
        self,
        max_retries: int = 3,
        on_validation_error: Callable = None
    ):
        self.max_retries = max_retries
        self.on_validation_error = on_validation_error
    
    async def validate_json(
        self,
        llm_call: Callable,
        schema: Type[BaseModel],
        prompt: str
    ) -> BaseModel:
        """Validate JSON response against Pydantic schema"""
        
        validation_prompt = prompt
        
        for attempt in range(self.max_retries):
            response = await llm_call(validation_prompt)
            
            try:
                # Try to parse JSON
                json_str = self._extract_json(response)
                data = json.loads(json_str)
                
                # Validate against schema
                return schema.model_validate(data)
            
            except json.JSONDecodeError as e:
                if attempt < self.max_retries - 1:
                    validation_prompt = f"""
Your previous response was not valid JSON.
Error: {e}

Original request: {prompt}

Please respond with ONLY valid JSON, no markdown or explanation.
"""
                else:
                    raise LLMError(
                        f"Failed to get valid JSON after {self.max_retries} attempts",
                        error_type=LLMErrorType.INVALID_RESPONSE
                    )
            
            except ValidationError as e:
                if attempt < self.max_retries - 1:
                    validation_prompt = f"""
Your previous JSON response didn't match the required schema.
Validation errors: {e.errors()}

Original request: {prompt}

Please fix the JSON to match the required schema.
"""
                else:
                    raise LLMError(
                        f"Schema validation failed after {self.max_retries} attempts",
                        error_type=LLMErrorType.INVALID_RESPONSE
                    )
    
    def _extract_json(self, text: str) -> str:
        """Extract JSON from text that might include markdown"""
        text = text.strip()
        
        # Remove markdown code blocks
        if text.startswith("```"):
            lines = text.split("\n")
            lines = lines[1:-1] if lines[-1] == "```" else lines[1:]
            text = "\n".join(lines)
        
        # Find JSON object or array
        start_chars = ['{', '[']
        for char in start_chars:
            if char in text:
                start = text.index(char)
                end_char = '}' if char == '{' else ']'
                
                # Find matching end
                depth = 0
                for i, c in enumerate(text[start:], start):
                    if c == char:
                        depth += 1
                    elif c == end_char:
                        depth -= 1
                        if depth == 0:
                            return text[start:i+1]
        
        return text

Timeout Management

import asyncio
from contextlib import asynccontextmanager

class TimeoutManager:
    """Manage timeouts for LLM calls"""
    
    def __init__(
        self,
        default_timeout: float = 30.0,
        streaming_timeout: float = 120.0
    ):
        self.default_timeout = default_timeout
        self.streaming_timeout = streaming_timeout
    
    @asynccontextmanager
    async def timeout(self, seconds: float = None):
        """Context manager for timeout"""
        timeout_seconds = seconds or self.default_timeout
        
        try:
            yield asyncio.timeout(timeout_seconds)
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Operation timed out after {timeout_seconds}s"
            )
    
    async def with_timeout(
        self,
        coro,
        timeout: float = None
    ):
        """Execute coroutine with timeout"""
        timeout = timeout or self.default_timeout
        
        try:
            return await asyncio.wait_for(coro, timeout=timeout)
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Operation timed out after {timeout}s"
            )

# Usage
timeout_manager = TimeoutManager(default_timeout=30.0)

async def call_with_timeout(prompt: str) -> str:
    return await timeout_manager.with_timeout(
        call_llm(prompt),
        timeout=45.0
    )

Graceful Degradation

from dataclasses import dataclass
from typing import Optional, Callable, Any

@dataclass
class DegradationConfig:
    fallback_response: str = None
    fallback_model: str = None
    cache_fallback: bool = True
    simplified_prompt: str = None

class GracefulDegrader:
    """Handle degradation gracefully"""
    
    def __init__(
        self,
        primary_fn: Callable,
        cache_client = None,
        config: DegradationConfig = None
    ):
        self.primary_fn = primary_fn
        self.cache = cache_client
        self.config = config or DegradationConfig()
    
    async def call(
        self,
        prompt: str,
        cache_key: str = None,
        **kwargs
    ) -> tuple[str, str]:
        """
        Call with graceful degradation
        Returns: (response, source) where source is 'primary', 'cache', or 'fallback'
        """
        
        try:
            result = await self.primary_fn(prompt, **kwargs)
            
            # Cache successful response
            if self.cache and cache_key:
                await self.cache.set(cache_key, result)
            
            return result, "primary"
        
        except LLMError as e:
            logging.warning(f"Primary call failed: {e}")
            
            # Try cache fallback
            if self.config.cache_fallback and self.cache and cache_key:
                cached = await self.cache.get(cache_key)
                if cached:
                    logging.info("Using cached response")
                    return cached, "cache"
            
            # Try simplified prompt
            if self.config.simplified_prompt:
                try:
                    result = await self.primary_fn(
                        self.config.simplified_prompt.format(
                            original_prompt=prompt
                        ),
                        **kwargs
                    )
                    return result, "simplified"
                except Exception:
                    pass
            
            # Use fallback response
            if self.config.fallback_response:
                return self.config.fallback_response, "fallback"
            
            raise

# Usage
degrader = GracefulDegrader(
    primary_fn=call_llm,
    cache_client=redis_client,
    config=DegradationConfig(
        fallback_response="I'm having trouble processing your request. Please try again.",
        cache_fallback=True
    )
)

response, source = await degrader.call(
    prompt="Explain quantum computing",
    cache_key="quantum_computing_explanation"
)
print(f"Response from: {source}")

Unified Error Handler

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import logging

app = FastAPI()

@app.exception_handler(LLMError)
async def llm_error_handler(request: Request, exc: LLMError):
    """Handle LLM errors consistently"""
    
    # Log error details
    logging.error(
        f"LLM Error: {exc.error_type.value}",
        extra={
            "error_type": exc.error_type.value,
            "provider": exc.provider,
            "model": exc.model,
            "retryable": exc.retryable,
            "path": request.url.path
        }
    )
    
    # Map to HTTP status codes
    status_codes = {
        LLMErrorType.RATE_LIMIT: 429,
        LLMErrorType.TIMEOUT: 504,
        LLMErrorType.INVALID_RESPONSE: 502,
        LLMErrorType.CONTENT_FILTERED: 422,
        LLMErrorType.CONTEXT_LENGTH: 413,
        LLMErrorType.AUTHENTICATION: 401,
        LLMErrorType.PROVIDER_ERROR: 503,
        LLMErrorType.UNKNOWN: 500
    }
    
    status_code = status_codes.get(exc.error_type, 500)
    
    # Build response
    response = {
        "error": {
            "type": exc.error_type.value,
            "message": str(exc),
            "retryable": exc.retryable
        }
    }
    
    if exc.retry_after:
        response["error"]["retry_after"] = exc.retry_after
    
    return JSONResponse(
        status_code=status_code,
        content=response,
        headers={
            "Retry-After": str(int(exc.retry_after)) if exc.retry_after else None
        }
    )

Key Takeaways

Expect Failures

Design for failure from the start with proper exception handling

Implement Retries

Use exponential backoff with jitter for transient failures

Circuit Breakers

Prevent cascading failures with circuit breakers

Graceful Degradation

Always have fallback options for critical paths

What’s Next

Batch Processing

Learn to process large volumes of LLM requests efficiently