December 2025 Update: Battle-tested patterns for handling LLM failures, implementing retries, and building resilient AI applications.
LLM Failure Modes
Understanding how LLMs fail is crucial for building resilient systems:Copy
Failure Type Cause Handling Strategy
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Rate Limiting Too many requests Exponential backoff
Timeout Slow response Timeout + retry
Invalid Response Malformed output Retry + validation
Content Filter Policy violation Modify prompt
Context Length Input too long Truncation
Service Outage Provider down Failover
Comprehensive Error Handling
Custom Exception Hierarchy
Copy
from enum import Enum
from typing import Optional, Any
class LLMErrorType(str, Enum):
RATE_LIMIT = "rate_limit"
TIMEOUT = "timeout"
INVALID_RESPONSE = "invalid_response"
CONTENT_FILTERED = "content_filtered"
CONTEXT_LENGTH = "context_length"
AUTHENTICATION = "authentication"
PROVIDER_ERROR = "provider_error"
UNKNOWN = "unknown"
class LLMError(Exception):
"""Base exception for LLM operations"""
def __init__(
self,
message: str,
error_type: LLMErrorType = LLMErrorType.UNKNOWN,
provider: str = None,
model: str = None,
retryable: bool = False,
retry_after: float = None,
original_error: Exception = None
):
super().__init__(message)
self.error_type = error_type
self.provider = provider
self.model = model
self.retryable = retryable
self.retry_after = retry_after
self.original_error = original_error
def to_dict(self) -> dict:
return {
"message": str(self),
"error_type": self.error_type.value,
"provider": self.provider,
"model": self.model,
"retryable": self.retryable,
"retry_after": self.retry_after
}
class RateLimitError(LLMError):
def __init__(self, message: str, retry_after: float = 60, **kwargs):
super().__init__(
message,
error_type=LLMErrorType.RATE_LIMIT,
retryable=True,
retry_after=retry_after,
**kwargs
)
class TimeoutError(LLMError):
def __init__(self, message: str, **kwargs):
super().__init__(
message,
error_type=LLMErrorType.TIMEOUT,
retryable=True,
**kwargs
)
class ContentFilterError(LLMError):
def __init__(self, message: str, **kwargs):
super().__init__(
message,
error_type=LLMErrorType.CONTENT_FILTERED,
retryable=False,
**kwargs
)
class ContextLengthError(LLMError):
def __init__(self, message: str, max_tokens: int = None, **kwargs):
super().__init__(
message,
error_type=LLMErrorType.CONTEXT_LENGTH,
retryable=False,
**kwargs
)
self.max_tokens = max_tokens
Retry Strategies
Exponential Backoff with Jitter
Copy
import asyncio
import random
from typing import Callable, TypeVar, Optional
from functools import wraps
import logging
T = TypeVar('T')
class RetryConfig:
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""Calculate delay for attempt number"""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
# Add random jitter (0.5 to 1.5 of calculated delay)
delay = delay * (0.5 + random.random())
return delay
async def retry_async(
func: Callable[..., T],
config: RetryConfig = None,
retryable_errors: tuple = (RateLimitError, TimeoutError),
on_retry: Callable[[Exception, int], None] = None
) -> Callable[..., T]:
"""Async retry decorator with exponential backoff"""
config = config or RetryConfig()
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(config.max_attempts):
try:
return await func(*args, **kwargs)
except retryable_errors as e:
last_exception = e
# Check if error specifies retry_after
delay = getattr(e, 'retry_after', None)
if delay is None:
delay = config.get_delay(attempt)
if attempt < config.max_attempts - 1:
if on_retry:
on_retry(e, attempt + 1)
logging.warning(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.2f}s..."
)
await asyncio.sleep(delay)
except Exception as e:
# Non-retryable error
raise
raise last_exception
return wrapper
# Usage with decorator pattern
def with_retry(
config: RetryConfig = None,
retryable_errors: tuple = (RateLimitError, TimeoutError)
):
"""Retry decorator factory"""
config = config or RetryConfig()
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_attempts):
try:
return await func(*args, **kwargs)
except retryable_errors as e:
last_exception = e
if attempt < config.max_attempts - 1:
delay = config.get_delay(attempt)
await asyncio.sleep(delay)
except Exception:
raise
raise last_exception
return wrapper
return decorator
# Example usage
@with_retry(RetryConfig(max_attempts=3, base_delay=2.0))
async def call_llm(prompt: str) -> str:
# Your LLM call here
pass
Circuit Breaker Pattern
Prevent cascading failures with circuit breakers:Copy
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional
class CircuitState(str, Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
success_threshold: int = 3
timeout: float = 60.0 # Time in OPEN state before testing
half_open_max_calls: int = 3
class CircuitBreaker:
"""Circuit breaker for LLM providers"""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: float = 0
self.half_open_calls = 0
def can_execute(self) -> bool:
"""Check if request can proceed"""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if timeout has passed
if time.time() - self.last_failure_time > self.config.timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
return self.half_open_calls < self.config.half_open_max_calls
return False
def record_success(self):
"""Record a successful call"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
else:
self.failure_count = 0
def record_failure(self):
"""Record a failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.success_count = 0
elif self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
def __call__(self, func: Callable) -> Callable:
"""Decorator for circuit breaker"""
@wraps(func)
async def wrapper(*args, **kwargs):
if not self.can_execute():
raise LLMError(
f"Circuit breaker '{self.name}' is OPEN",
error_type=LLMErrorType.PROVIDER_ERROR,
retryable=True,
retry_after=self.config.timeout
)
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
try:
result = await func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
return wrapper
# Usage
openai_breaker = CircuitBreaker("openai")
anthropic_breaker = CircuitBreaker("anthropic")
@openai_breaker
async def call_openai(prompt: str) -> str:
# OpenAI API call
pass
@anthropic_breaker
async def call_anthropic(prompt: str) -> str:
# Anthropic API call
pass
Provider Failover
Copy
from typing import List, Callable, Any
from dataclasses import dataclass
import logging
@dataclass
class Provider:
name: str
call_fn: Callable
priority: int = 0
circuit_breaker: CircuitBreaker = None
enabled: bool = True
class ProviderFailover:
"""Failover between multiple LLM providers"""
def __init__(self, providers: List[Provider]):
# Sort by priority (lower = higher priority)
self.providers = sorted(providers, key=lambda p: p.priority)
async def call(self, *args, **kwargs) -> Any:
"""Call providers with failover"""
last_error = None
for provider in self.providers:
if not provider.enabled:
continue
# Check circuit breaker
if provider.circuit_breaker:
if not provider.circuit_breaker.can_execute():
logging.warning(f"Provider {provider.name} circuit is open")
continue
try:
logging.info(f"Trying provider: {provider.name}")
result = await provider.call_fn(*args, **kwargs)
if provider.circuit_breaker:
provider.circuit_breaker.record_success()
return result
except LLMError as e:
last_error = e
logging.warning(f"Provider {provider.name} failed: {e}")
if provider.circuit_breaker:
provider.circuit_breaker.record_failure()
# If not retryable, skip to next provider immediately
if not e.retryable:
continue
except Exception as e:
last_error = e
logging.error(f"Unexpected error from {provider.name}: {e}")
if provider.circuit_breaker:
provider.circuit_breaker.record_failure()
raise LLMError(
f"All providers failed. Last error: {last_error}",
error_type=LLMErrorType.PROVIDER_ERROR,
retryable=False,
original_error=last_error
)
# Usage
failover = ProviderFailover([
Provider(
name="openai",
call_fn=call_openai,
priority=0,
circuit_breaker=openai_breaker
),
Provider(
name="anthropic",
call_fn=call_anthropic,
priority=1,
circuit_breaker=anthropic_breaker
),
Provider(
name="together",
call_fn=call_together,
priority=2
)
])
result = await failover.call(prompt="What is AI?")
Response Validation
Copy
from pydantic import BaseModel, ValidationError
from typing import Type, Optional
import json
class ResponseValidator:
"""Validate LLM responses"""
def __init__(
self,
max_retries: int = 3,
on_validation_error: Callable = None
):
self.max_retries = max_retries
self.on_validation_error = on_validation_error
async def validate_json(
self,
llm_call: Callable,
schema: Type[BaseModel],
prompt: str
) -> BaseModel:
"""Validate JSON response against Pydantic schema"""
validation_prompt = prompt
for attempt in range(self.max_retries):
response = await llm_call(validation_prompt)
try:
# Try to parse JSON
json_str = self._extract_json(response)
data = json.loads(json_str)
# Validate against schema
return schema.model_validate(data)
except json.JSONDecodeError as e:
if attempt < self.max_retries - 1:
validation_prompt = f"""
Your previous response was not valid JSON.
Error: {e}
Original request: {prompt}
Please respond with ONLY valid JSON, no markdown or explanation.
"""
else:
raise LLMError(
f"Failed to get valid JSON after {self.max_retries} attempts",
error_type=LLMErrorType.INVALID_RESPONSE
)
except ValidationError as e:
if attempt < self.max_retries - 1:
validation_prompt = f"""
Your previous JSON response didn't match the required schema.
Validation errors: {e.errors()}
Original request: {prompt}
Please fix the JSON to match the required schema.
"""
else:
raise LLMError(
f"Schema validation failed after {self.max_retries} attempts",
error_type=LLMErrorType.INVALID_RESPONSE
)
def _extract_json(self, text: str) -> str:
"""Extract JSON from text that might include markdown"""
text = text.strip()
# Remove markdown code blocks
if text.startswith("```"):
lines = text.split("\n")
lines = lines[1:-1] if lines[-1] == "```" else lines[1:]
text = "\n".join(lines)
# Find JSON object or array
start_chars = ['{', '[']
for char in start_chars:
if char in text:
start = text.index(char)
end_char = '}' if char == '{' else ']'
# Find matching end
depth = 0
for i, c in enumerate(text[start:], start):
if c == char:
depth += 1
elif c == end_char:
depth -= 1
if depth == 0:
return text[start:i+1]
return text
Timeout Management
Copy
import asyncio
from contextlib import asynccontextmanager
class TimeoutManager:
"""Manage timeouts for LLM calls"""
def __init__(
self,
default_timeout: float = 30.0,
streaming_timeout: float = 120.0
):
self.default_timeout = default_timeout
self.streaming_timeout = streaming_timeout
@asynccontextmanager
async def timeout(self, seconds: float = None):
"""Context manager for timeout"""
timeout_seconds = seconds or self.default_timeout
try:
yield asyncio.timeout(timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(
f"Operation timed out after {timeout_seconds}s"
)
async def with_timeout(
self,
coro,
timeout: float = None
):
"""Execute coroutine with timeout"""
timeout = timeout or self.default_timeout
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(
f"Operation timed out after {timeout}s"
)
# Usage
timeout_manager = TimeoutManager(default_timeout=30.0)
async def call_with_timeout(prompt: str) -> str:
return await timeout_manager.with_timeout(
call_llm(prompt),
timeout=45.0
)
Graceful Degradation
Copy
from dataclasses import dataclass
from typing import Optional, Callable, Any
@dataclass
class DegradationConfig:
fallback_response: str = None
fallback_model: str = None
cache_fallback: bool = True
simplified_prompt: str = None
class GracefulDegrader:
"""Handle degradation gracefully"""
def __init__(
self,
primary_fn: Callable,
cache_client = None,
config: DegradationConfig = None
):
self.primary_fn = primary_fn
self.cache = cache_client
self.config = config or DegradationConfig()
async def call(
self,
prompt: str,
cache_key: str = None,
**kwargs
) -> tuple[str, str]:
"""
Call with graceful degradation
Returns: (response, source) where source is 'primary', 'cache', or 'fallback'
"""
try:
result = await self.primary_fn(prompt, **kwargs)
# Cache successful response
if self.cache and cache_key:
await self.cache.set(cache_key, result)
return result, "primary"
except LLMError as e:
logging.warning(f"Primary call failed: {e}")
# Try cache fallback
if self.config.cache_fallback and self.cache and cache_key:
cached = await self.cache.get(cache_key)
if cached:
logging.info("Using cached response")
return cached, "cache"
# Try simplified prompt
if self.config.simplified_prompt:
try:
result = await self.primary_fn(
self.config.simplified_prompt.format(
original_prompt=prompt
),
**kwargs
)
return result, "simplified"
except Exception:
pass
# Use fallback response
if self.config.fallback_response:
return self.config.fallback_response, "fallback"
raise
# Usage
degrader = GracefulDegrader(
primary_fn=call_llm,
cache_client=redis_client,
config=DegradationConfig(
fallback_response="I'm having trouble processing your request. Please try again.",
cache_fallback=True
)
)
response, source = await degrader.call(
prompt="Explain quantum computing",
cache_key="quantum_computing_explanation"
)
print(f"Response from: {source}")
Unified Error Handler
Copy
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import logging
app = FastAPI()
@app.exception_handler(LLMError)
async def llm_error_handler(request: Request, exc: LLMError):
"""Handle LLM errors consistently"""
# Log error details
logging.error(
f"LLM Error: {exc.error_type.value}",
extra={
"error_type": exc.error_type.value,
"provider": exc.provider,
"model": exc.model,
"retryable": exc.retryable,
"path": request.url.path
}
)
# Map to HTTP status codes
status_codes = {
LLMErrorType.RATE_LIMIT: 429,
LLMErrorType.TIMEOUT: 504,
LLMErrorType.INVALID_RESPONSE: 502,
LLMErrorType.CONTENT_FILTERED: 422,
LLMErrorType.CONTEXT_LENGTH: 413,
LLMErrorType.AUTHENTICATION: 401,
LLMErrorType.PROVIDER_ERROR: 503,
LLMErrorType.UNKNOWN: 500
}
status_code = status_codes.get(exc.error_type, 500)
# Build response
response = {
"error": {
"type": exc.error_type.value,
"message": str(exc),
"retryable": exc.retryable
}
}
if exc.retry_after:
response["error"]["retry_after"] = exc.retry_after
return JSONResponse(
status_code=status_code,
content=response,
headers={
"Retry-After": str(int(exc.retry_after)) if exc.retry_after else None
}
)
Key Takeaways
Expect Failures
Design for failure from the start with proper exception handling
Implement Retries
Use exponential backoff with jitter for transient failures
Circuit Breakers
Prevent cascading failures with circuit breakers
Graceful Degradation
Always have fallback options for critical paths
What’s Next
Batch Processing
Learn to process large volumes of LLM requests efficiently