Think of an LLM API like a restaurant kitchen during a rush. Sometimes orders get backed up (rate limiting), sometimes the chef takes too long (timeouts), sometimes the dish comes out wrong (invalid responses), and occasionally the entire kitchen shuts down (service outages). Just as a great restaurant has contingency plans for each scenario, your application needs a strategy for every failure mode.Understanding how LLMs fail is crucial for building resilient systems:
Failure Type Cause Handling Strategy━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━Rate Limiting Too many requests Exponential backoffTimeout Slow response Timeout + retryInvalid Response Malformed output Retry + validationContent Filter Policy violation Modify promptContext Length Input too long TruncationService Outage Provider down Failover
Think of a circuit breaker like the fuse box in your house. When too much current flows through a circuit, the fuse trips to prevent a fire. Similarly, when an LLM provider starts failing repeatedly, you want to “trip” the breaker — stop sending requests that will inevitably fail, give the provider time to recover, and then cautiously test whether it is back. Without this, a single degraded provider can drag down your entire application as requests pile up waiting for timeouts.Prevent cascading failures with circuit breakers:
import timefrom enum import Enumfrom dataclasses import dataclass, fieldfrom typing import Callable, Optionalclass CircuitState(str, Enum): # CLOSED = normal traffic flows through (confusing name -- think # of a closed electrical circuit where current flows freely) CLOSED = "closed" # OPEN = broken circuit, requests are rejected immediately OPEN = "open" # HALF_OPEN = cautiously letting a few requests through to test recovery HALF_OPEN = "half_open"@dataclassclass CircuitBreakerConfig: failure_threshold: int = 5 # Failures before tripping open success_threshold: int = 3 # Successes in half-open before closing timeout: float = 60.0 # Seconds to wait in OPEN before testing half_open_max_calls: int = 3 # Max test requests in half-open stateclass CircuitBreaker: """Circuit breaker for LLM providers""" def __init__(self, name: str, config: CircuitBreakerConfig = None): self.name = name self.config = config or CircuitBreakerConfig() self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: float = 0 self.half_open_calls = 0 def can_execute(self) -> bool: """Check if request can proceed""" if self.state == CircuitState.CLOSED: return True if self.state == CircuitState.OPEN: # Check if timeout has passed if time.time() - self.last_failure_time > self.config.timeout: self.state = CircuitState.HALF_OPEN self.half_open_calls = 0 return True return False if self.state == CircuitState.HALF_OPEN: return self.half_open_calls < self.config.half_open_max_calls return False def record_success(self): """Record a successful call""" if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.config.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 else: self.failure_count = 0 def record_failure(self): """Record a failed call""" self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN self.success_count = 0 elif self.failure_count >= self.config.failure_threshold: self.state = CircuitState.OPEN def __call__(self, func: Callable) -> Callable: """Decorator for circuit breaker""" @wraps(func) async def wrapper(*args, **kwargs): if not self.can_execute(): raise LLMError( f"Circuit breaker '{self.name}' is OPEN", error_type=LLMErrorType.PROVIDER_ERROR, retryable=True, retry_after=self.config.timeout ) if self.state == CircuitState.HALF_OPEN: self.half_open_calls += 1 try: result = await func(*args, **kwargs) self.record_success() return result except Exception as e: self.record_failure() raise return wrapper# Usageopenai_breaker = CircuitBreaker("openai")anthropic_breaker = CircuitBreaker("anthropic")@openai_breakerasync def call_openai(prompt: str) -> str: # OpenAI API call pass@anthropic_breakerasync def call_anthropic(prompt: str) -> str: # Anthropic API call pass
Imagine you are booking a flight. Your first choice is a direct flight, but if that is sold out you check a connection through another hub, and if that fails you look at a completely different airline. Provider failover works the same way — you rank your LLM providers by preference (cost, quality, latency) and automatically cascade through them when one is unavailable. The key insight: a slightly worse response from a backup provider is almost always better than returning an error to the user.
from typing import List, Callable, Anyfrom dataclasses import dataclassimport logging@dataclassclass Provider: name: str call_fn: Callable priority: int = 0 # Lower number = tried first circuit_breaker: CircuitBreaker = None enabled: bool = True # Toggle without removing from the listclass ProviderFailover: """Failover between multiple LLM providers. Practical tip: keep your provider list in config, not code. That way ops can re-prioritize or disable a provider during an incident without a deploy. """ def __init__(self, providers: List[Provider]): # Sort by priority (lower = higher priority) self.providers = sorted(providers, key=lambda p: p.priority) async def call(self, *args, **kwargs) -> Any: """Call providers with failover""" last_error = None for provider in self.providers: if not provider.enabled: continue # Check circuit breaker if provider.circuit_breaker: if not provider.circuit_breaker.can_execute(): logging.warning(f"Provider {provider.name} circuit is open") continue try: logging.info(f"Trying provider: {provider.name}") result = await provider.call_fn(*args, **kwargs) if provider.circuit_breaker: provider.circuit_breaker.record_success() return result except LLMError as e: last_error = e logging.warning(f"Provider {provider.name} failed: {e}") if provider.circuit_breaker: provider.circuit_breaker.record_failure() # If not retryable, skip to next provider immediately if not e.retryable: continue except Exception as e: last_error = e logging.error(f"Unexpected error from {provider.name}: {e}") if provider.circuit_breaker: provider.circuit_breaker.record_failure() raise LLMError( f"All providers failed. Last error: {last_error}", error_type=LLMErrorType.PROVIDER_ERROR, retryable=False, original_error=last_error )# Usagefailover = ProviderFailover([ Provider( name="openai", call_fn=call_openai, priority=0, circuit_breaker=openai_breaker ), Provider( name="anthropic", call_fn=call_anthropic, priority=1, circuit_breaker=anthropic_breaker ), Provider( name="together", call_fn=call_together, priority=2 )])result = await failover.call(prompt="What is AI?")
LLMs are probabilistic — even when you ask for JSON, you might get markdown, a preamble, or a partially valid structure. Response validation is the seatbelt you never skip. The pattern here is “validate, then re-prompt”: if the response does not match your schema, feed the validation error back to the LLM and ask it to fix its output. Most models self-correct within one or two retries.
from pydantic import BaseModel, ValidationErrorfrom typing import Type, Optionalimport jsonclass ResponseValidator: """Validate LLM responses against a Pydantic schema, with auto-retry. Practical tip: always set a max_retries cap. Without it, a pathological prompt can burn through your budget in a retry loop. """ def __init__( self, max_retries: int = 3, on_validation_error: Callable = None ): self.max_retries = max_retries self.on_validation_error = on_validation_error async def validate_json( self, llm_call: Callable, schema: Type[BaseModel], prompt: str ) -> BaseModel: """Validate JSON response against Pydantic schema""" validation_prompt = prompt for attempt in range(self.max_retries): response = await llm_call(validation_prompt) try: # Try to parse JSON json_str = self._extract_json(response) data = json.loads(json_str) # Validate against schema return schema.model_validate(data) except json.JSONDecodeError as e: if attempt < self.max_retries - 1: validation_prompt = f"""Your previous response was not valid JSON.Error: {e}Original request: {prompt}Please respond with ONLY valid JSON, no markdown or explanation.""" else: raise LLMError( f"Failed to get valid JSON after {self.max_retries} attempts", error_type=LLMErrorType.INVALID_RESPONSE ) except ValidationError as e: if attempt < self.max_retries - 1: validation_prompt = f"""Your previous JSON response didn't match the required schema.Validation errors: {e.errors()}Original request: {prompt}Please fix the JSON to match the required schema.""" else: raise LLMError( f"Schema validation failed after {self.max_retries} attempts", error_type=LLMErrorType.INVALID_RESPONSE ) def _extract_json(self, text: str) -> str: """Extract JSON from text that might include markdown""" text = text.strip() # Remove markdown code blocks if text.startswith("```"): lines = text.split("\n") lines = lines[1:-1] if lines[-1] == "```" else lines[1:] text = "\n".join(lines) # Find JSON object or array start_chars = ['{', '['] for char in start_chars: if char in text: start = text.index(char) end_char = '}' if char == '{' else ']' # Find matching end depth = 0 for i, c in enumerate(text[start:], start): if c == char: depth += 1 elif c == end_char: depth -= 1 if depth == 0: return text[start:i+1] return text
LLM calls are unpredictable in latency. A simple summarization might return in 2 seconds one day and 45 seconds the next. Without explicit timeouts, a single slow request can hold an async worker hostage, and if you have a fixed worker pool, a few slow calls can starve every other user. Think of timeouts as a contract with your users: “I promise an answer within N seconds, or I’ll tell you I couldn’t get one.”
import asynciofrom contextlib import asynccontextmanagerclass TimeoutManager: """Manage timeouts for LLM calls. Practical tip: use different timeouts for streaming vs non-streaming. Streaming calls should get a longer total timeout because they produce tokens gradually, but you should also set a *first-token* timeout to catch connections that never start producing. """ def __init__( self, default_timeout: float = 30.0, streaming_timeout: float = 120.0 ): self.default_timeout = default_timeout self.streaming_timeout = streaming_timeout @asynccontextmanager async def timeout(self, seconds: float = None): """Context manager for timeout""" timeout_seconds = seconds or self.default_timeout try: yield asyncio.timeout(timeout_seconds) except asyncio.TimeoutError: raise TimeoutError( f"Operation timed out after {timeout_seconds}s" ) async def with_timeout( self, coro, timeout: float = None ): """Execute coroutine with timeout""" timeout = timeout or self.default_timeout try: return await asyncio.wait_for(coro, timeout=timeout) except asyncio.TimeoutError: raise TimeoutError( f"Operation timed out after {timeout}s" )# Usagetimeout_manager = TimeoutManager(default_timeout=30.0)async def call_with_timeout(prompt: str) -> str: return await timeout_manager.with_timeout( call_llm(prompt), timeout=45.0 )
Graceful degradation is the difference between Netflix showing you a slightly stale recommendation versus a blank screen. When your primary LLM path fails, you want to cascade through options: try the cache for a recent answer, try a simpler prompt that is more likely to succeed, and as a last resort, return a pre-written fallback. The user should always get something useful. The tuple (response, source) pattern below makes it easy to track how often you are degrading, which is a key operational health metric.
from dataclasses import dataclassfrom typing import Optional, Callable, Any@dataclassclass DegradationConfig: fallback_response: str = None # Pre-written static response fallback_model: str = None # Cheaper/faster backup model cache_fallback: bool = True # Try cache before giving up simplified_prompt: str = None # Stripped-down prompt that is easier to answerclass GracefulDegrader: """Handle degradation gracefully. Returns (response, source) so you can log degradation events and alert if the cache or fallback ratio spikes. """ def __init__( self, primary_fn: Callable, cache_client = None, config: DegradationConfig = None ): self.primary_fn = primary_fn self.cache = cache_client self.config = config or DegradationConfig() async def call( self, prompt: str, cache_key: str = None, **kwargs ) -> tuple[str, str]: """ Call with graceful degradation Returns: (response, source) where source is 'primary', 'cache', or 'fallback' """ try: result = await self.primary_fn(prompt, **kwargs) # Cache successful response if self.cache and cache_key: await self.cache.set(cache_key, result) return result, "primary" except LLMError as e: logging.warning(f"Primary call failed: {e}") # Try cache fallback if self.config.cache_fallback and self.cache and cache_key: cached = await self.cache.get(cache_key) if cached: logging.info("Using cached response") return cached, "cache" # Try simplified prompt if self.config.simplified_prompt: try: result = await self.primary_fn( self.config.simplified_prompt.format( original_prompt=prompt ), **kwargs ) return result, "simplified" except Exception: pass # Use fallback response if self.config.fallback_response: return self.config.fallback_response, "fallback" raise# Usagedegrader = GracefulDegrader( primary_fn=call_llm, cache_client=redis_client, config=DegradationConfig( fallback_response="I'm having trouble processing your request. Please try again.", cache_fallback=True ))response, source = await degrader.call( prompt="Explain quantum computing", cache_key="quantum_computing_explanation")print(f"Response from: {source}")
Your API consumers should never see raw OpenAI or Anthropic error messages. This handler translates internal LLM errors into clean, predictable HTTP responses with appropriate status codes. The mapping is deliberate: a rate limit from the provider becomes a 429 to the client so their retry logic kicks in, while a content filter violation becomes a 422 so they know to fix the input rather than retry.
from fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.responses import JSONResponseimport loggingapp = FastAPI()@app.exception_handler(LLMError)async def llm_error_handler(request: Request, exc: LLMError): """Translate internal LLM errors into standard HTTP responses. Practical tip: include the retryable flag and retry_after header in your error response. Well-behaved clients will use these to back off automatically, saving you from thundering-herd retries. """ # Log error details logging.error( f"LLM Error: {exc.error_type.value}", extra={ "error_type": exc.error_type.value, "provider": exc.provider, "model": exc.model, "retryable": exc.retryable, "path": request.url.path } ) # Map to HTTP status codes status_codes = { LLMErrorType.RATE_LIMIT: 429, LLMErrorType.TIMEOUT: 504, LLMErrorType.INVALID_RESPONSE: 502, LLMErrorType.CONTENT_FILTERED: 422, LLMErrorType.CONTEXT_LENGTH: 413, LLMErrorType.AUTHENTICATION: 401, LLMErrorType.PROVIDER_ERROR: 503, LLMErrorType.UNKNOWN: 500 } status_code = status_codes.get(exc.error_type, 500) # Build response response = { "error": { "type": exc.error_type.value, "message": str(exc), "retryable": exc.retryable } } if exc.retry_after: response["error"]["retry_after"] = exc.retry_after return JSONResponse( status_code=status_code, content=response, headers={ "Retry-After": str(int(exc.retry_after)) if exc.retry_after else None } )