Use this file to discover all available pages before exploring further.
Designing robust APIs for LLM applications requires careful consideration of latency, error handling, streaming, and scalability. Unlike traditional REST APIs where response times are measured in low milliseconds, LLM API calls routinely take 1-30 seconds depending on model size and output length. This single fact reshapes nearly every design decision — from timeout configuration to client-side UX patterns.Think of an LLM API like a restaurant kitchen: a regular API is the microwave (fast, predictable), while an LLM API is the slow-roasted brisket (worth the wait, but your customers need updates). Your API design needs to account for that fundamental difference.
The synchronous pattern is the simplest to reason about: the client sends a request and waits for the full response. Use this when response times are acceptable (under ~5 seconds) and the client can tolerate blocking. For anything longer, consider streaming or async jobs below.
from fastapi import FastAPI, HTTPException, Dependsfrom pydantic import BaseModel, Fieldfrom typing import List, Optionalfrom openai import AsyncOpenAIapp = FastAPI()# Initialize the client at module level so it's reused across requests.# Creating a new client per request wastes connection pool resources# and adds ~50-100ms of overhead from TLS handshake.client = AsyncOpenAI()class Message(BaseModel): # Constrain to valid OpenAI roles -- reject garbage early rather than # burning tokens on an API call that will fail downstream role: str = Field(..., pattern="^(system|user|assistant)$") # 100K chars is roughly ~25K tokens -- a safety net against # accidentally sending entire databases as context content: str = Field(..., min_length=1, max_length=100000)class ChatRequest(BaseModel): messages: List[Message] model: str = "gpt-4o" temperature: float = Field(0.7, ge=0, le=2) max_tokens: int = Field(1000, ge=1, le=4096) # user_id enables per-user rate limiting and abuse tracking. # OpenAI also recommends passing this for their own abuse detection. user_id: Optional[str] = Noneclass ChatResponse(BaseModel): content: str model: str usage: dict finish_reason: str@app.post("/v1/chat", response_model=ChatResponse)async def chat(request: ChatRequest): """Synchronous chat completion endpoint. Production tip: Set a reasonable client-side timeout (30-60s). If your gateway (nginx, ALB) has a shorter timeout than your LLM call, users get 504s while you still pay for the completion. """ try: response = await client.chat.completions.create( model=request.model, messages=[m.model_dump() for m in request.messages], temperature=request.temperature, max_tokens=request.max_tokens ) return ChatResponse( content=response.choices[0].message.content, model=response.model, usage={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens }, finish_reason=response.choices[0].finish_reason ) except Exception as e: # In production, never leak raw exception messages to clients. # Log the full error server-side but return a sanitized message. raise HTTPException(status_code=500, detail=str(e))
Streaming is the most important pattern in LLM API design. Without it, users stare at a blank screen for 5-20 seconds wondering if anything is happening. With streaming, the first token typically arrives in 200-500ms, and the user sees the response “typing” in real-time — the same pattern that makes ChatGPT feel responsive.Under the hood, this uses Server-Sent Events (SSE): a simple HTTP-based protocol where the server holds the connection open and pushes data chunks. Think of it like a news ticker — the server keeps broadcasting updates until it’s done.
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom typing import AsyncGeneratorimport jsonclass StreamRequest(BaseModel): messages: List[Message] model: str = "gpt-4o"async def stream_response( request: StreamRequest) -> AsyncGenerator[str, None]: """Generate SSE stream. Each chunk follows the SSE format: "data: {json}\n\n" The double newline is required by the SSE spec to delimit events. """ try: stream = await client.chat.completions.create( model=request.model, messages=[m.model_dump() for m in request.messages], stream=True ) async for chunk in stream: if chunk.choices[0].delta.content: data = { "content": chunk.choices[0].delta.content, "finish_reason": chunk.choices[0].finish_reason } # SSE format: each event is "data: ...\n\n" yield f"data: {json.dumps(data)}\n\n" # Signal stream completion -- clients watch for this sentinel yield "data: [DONE]\n\n" except Exception as e: # Stream errors inline so the client can handle them gracefully # rather than getting a broken connection with no context yield f"data: {json.dumps({'error': str(e)})}\n\n"@app.post("/v1/chat/stream")async def chat_stream(request: StreamRequest): """Server-Sent Events streaming endpoint. Production pitfall: If you use nginx or a CDN in front of this, you MUST disable response buffering. Otherwise nginx will collect the entire response before forwarding it, defeating the purpose of streaming entirely. """ return StreamingResponse( stream_response(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", # Don't cache streaming responses "Connection": "keep-alive", # Keep the connection open "X-Accel-Buffering": "no" # Disable nginx buffering # For AWS ALB, set idle timeout > expected stream duration # For CloudFront, enable chunked transfer encoding } )
Common streaming pitfall: Many reverse proxies and load balancers buffer responses by default. If your stream “works locally but not in production,” check your proxy configuration first. For nginx, add proxy_buffering off; in the location block. For AWS ALB, increase the idle timeout to at least 60 seconds.
For long-running LLM tasks (document summarization, multi-step agents, RAG pipelines over large corpora), synchronous endpoints fall apart. The client’s HTTP connection times out, the load balancer kills the request, and you’ve wasted compute with nothing to show for it.The async job pattern solves this by decoupling submission from completion. It works like a dry cleaner: you drop off clothes (submit a job), get a ticket number (job ID), and come back later to pick them up (poll or receive a webhook). This pattern is essential when processing takes more than ~30 seconds.
from fastapi import FastAPI, BackgroundTasksfrom pydantic import BaseModelfrom typing import Optionalimport uuidimport asynciofrom datetime import datetime# In-memory store (use Redis in production).# Why Redis? Because in-memory dicts vanish on restart and don't# work across multiple server instances behind a load balancer.jobs = {}class JobStatus(BaseModel): job_id: str status: str # "pending", "processing", "completed", "failed" created_at: str completed_at: Optional[str] = None result: Optional[dict] = None error: Optional[str] = None progress: float = 0.0class AsyncJobRequest(BaseModel): prompt: str model: str = "gpt-4o" webhook_url: Optional[str] = Noneasync def process_job(job_id: str, request: AsyncJobRequest): """Background job processor. In production, consider using Celery, Dramatiq, or a message queue (SQS, Redis Streams) instead of FastAPI BackgroundTasks. Background tasks die if the server restarts mid-processing, and they compete for the same event loop as your API handlers. """ jobs[job_id]["status"] = "processing" try: response = await client.chat.completions.create( model=request.model, messages=[{"role": "user", "content": request.prompt}] ) jobs[job_id]["status"] = "completed" jobs[job_id]["completed_at"] = datetime.utcnow().isoformat() jobs[job_id]["result"] = { "content": response.choices[0].message.content, "usage": { "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } } jobs[job_id]["progress"] = 1.0 # Send webhook if configured if request.webhook_url: await send_webhook(request.webhook_url, jobs[job_id]) except Exception as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = str(e) jobs[job_id]["completed_at"] = datetime.utcnow().isoformat()@app.post("/v1/jobs", response_model=JobStatus)async def create_job( request: AsyncJobRequest, background_tasks: BackgroundTasks): """Create async job""" job_id = str(uuid.uuid4()) jobs[job_id] = { "job_id": job_id, "status": "pending", "created_at": datetime.utcnow().isoformat(), "completed_at": None, "result": None, "error": None, "progress": 0.0 } background_tasks.add_task(process_job, job_id, request) return JobStatus(**jobs[job_id])@app.get("/v1/jobs/{job_id}", response_model=JobStatus)async def get_job(job_id: str): """Get job status""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") return JobStatus(**jobs[job_id])@app.delete("/v1/jobs/{job_id}")async def cancel_job(job_id: str): """Cancel job (if still pending)""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") if jobs[job_id]["status"] == "pending": jobs[job_id]["status"] = "cancelled" return {"message": "Job cancelled"} return {"message": "Job cannot be cancelled"}
Webhooks are the “don’t call us, we’ll call you” pattern. Instead of your client polling every few seconds asking “is my job done yet?” (which wastes bandwidth and adds latency), the server proactively notifies the client when something interesting happens.This is the preferred pattern for production LLM systems because it eliminates polling overhead and reduces time-to-notification from your poll interval down to near-zero. The tradeoff: the receiver must expose a public HTTP endpoint, which adds complexity for clients behind firewalls.
import httpxfrom typing import Optionalfrom pydantic import BaseModel, HttpUrlimport hmacimport hashlibclass WebhookConfig(BaseModel): url: HttpUrl secret: Optional[str] = None # HMAC secret for payload signing events: List[str] = ["job.completed", "job.failed"]class WebhookSender: """Send webhook notifications with retry logic. Production considerations: - Always sign payloads so receivers can verify authenticity - Use exponential backoff to avoid hammering a struggling endpoint - Log every delivery attempt for debugging failed webhooks - Consider a webhook delivery table/queue for persistence """ def __init__(self, timeout: float = 30.0, max_retries: int = 3): self.timeout = timeout self.max_retries = max_retries def _sign_payload(self, payload: str, secret: str) -> str: """Create HMAC-SHA256 signature for webhook payload. Receivers verify this to ensure the webhook came from us, not from an attacker spoofing our webhook URL. """ return hmac.new( secret.encode(), payload.encode(), hashlib.sha256 ).hexdigest() async def send( self, url: str, event: str, data: dict, secret: str = None ) -> bool: """Send webhook with retries""" payload = { "event": event, "timestamp": datetime.utcnow().isoformat(), "data": data } headers = { "Content-Type": "application/json", "X-Webhook-Event": event } if secret: payload_str = json.dumps(payload) signature = self._sign_payload(payload_str, secret) headers["X-Webhook-Signature"] = f"sha256={signature}" async with httpx.AsyncClient() as client: for attempt in range(self.max_retries): try: response = await client.post( url, json=payload, headers=headers, timeout=self.timeout ) if response.status_code < 300: return True except httpx.RequestError: pass # Exponential backoff await asyncio.sleep(2 ** attempt) return Falsewebhook_sender = WebhookSender()async def send_webhook(url: str, job_data: dict, secret: str = None): """Send job completion webhook""" event = "job.completed" if job_data["status"] == "completed" else "job.failed" await webhook_sender.send( url=url, event=event, data=job_data, secret=secret )
Rate limiting for LLM APIs is doubly important: not only do you need to protect your service from abuse, but every excess request costs you real money in inference charges. A single runaway script can burn through hundreds of dollars in minutes. Unlike traditional APIs where excess requests are just CPU cycles, LLM requests have direct cost implications.LLM APIs need two dimensions of rate limiting: request count (how many calls) and token count (how much compute). A user making 10 requests with 100K tokens each is very different from 10 requests with 100 tokens each, even though the request count is identical.
from fastapi import Request, HTTPExceptionfrom starlette.middleware.base import BaseHTTPMiddlewareimport timefrom collections import defaultdictfrom dataclasses import dataclass@dataclassclass RateLimitConfig: requests_per_minute: int = 60 # Protects against request floods requests_per_hour: int = 1000 # Limits sustained heavy usage tokens_per_minute: int = 100000 # Protects against cost spikesclass RateLimiter: """Token bucket rate limiter. Why token bucket? It allows short bursts while enforcing average rate. A user can fire 10 rapid requests, then naturally slows down -- much better UX than a hard sliding window that rejects request #61. """ def __init__(self): self.requests = defaultdict(list) # user_id -> list of timestamps self.tokens = defaultdict(int) # user_id -> tokens used def _clean_old_requests(self, user_id: str, window_seconds: int): """Remove requests outside the window""" cutoff = time.time() - window_seconds self.requests[user_id] = [ t for t in self.requests[user_id] if t > cutoff ] def check_rate_limit( self, user_id: str, config: RateLimitConfig ) -> tuple[bool, dict]: """Check if request is within rate limits""" now = time.time() # Check per-minute limit self._clean_old_requests(user_id, 60) minute_requests = len(self.requests[user_id]) if minute_requests >= config.requests_per_minute: return False, { "error": "rate_limit_exceeded", "limit": config.requests_per_minute, "window": "minute", "retry_after": 60 } # Check per-hour limit self._clean_old_requests(user_id, 3600) hour_requests = len(self.requests[user_id]) if hour_requests >= config.requests_per_hour: return False, { "error": "rate_limit_exceeded", "limit": config.requests_per_hour, "window": "hour", "retry_after": 3600 } return True, {} def record_request(self, user_id: str): """Record a request""" self.requests[user_id].append(time.time()) def record_tokens(self, user_id: str, tokens: int): """Record token usage""" self.tokens[user_id] += tokensrate_limiter = RateLimiter()class RateLimitMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): # Extract user ID from auth token or API key user_id = request.headers.get("X-API-Key", "anonymous") # Check rate limit allowed, error_info = rate_limiter.check_rate_limit( user_id, RateLimitConfig() ) if not allowed: return JSONResponse( status_code=429, content=error_info, headers={"Retry-After": str(error_info["retry_after"])} ) # Record request rate_limiter.record_request(user_id) response = await call_next(request) return responseapp.add_middleware(RateLimitMiddleware)
API versioning is inevitable for LLM applications. Models improve, response formats evolve, new capabilities (tool use, structured outputs, vision) get added. If you don’t version from day one, you’ll either break every existing client integration or freeze your API forever. Neither is acceptable.Use URL-path versioning (/v1/, /v2/) for LLM APIs. It’s the most visible, most cacheable, and easiest to route at the load balancer level. Header-based versioning is theoretically cleaner but makes debugging, logging, and documentation harder in practice.
from fastapi import FastAPI, APIRouterfrom enum import Enumclass APIVersion(str, Enum): V1 = "v1" V2 = "v2"# Version 1 router -- this is your stable, battle-tested APIv1_router = APIRouter(prefix="/v1", tags=["v1"])@v1_router.post("/chat")async def chat_v1(request: ChatRequest): # V1 implementation pass# Version 2 router with new featuresv2_router = APIRouter(prefix="/v2", tags=["v2"])class ChatRequestV2(BaseModel): messages: List[Message] model: str = "gpt-4o" temperature: float = 0.7 max_tokens: int = 1000 # New V2 fields -- additive changes that would break V1 clients response_format: Optional[dict] = None # JSON mode, structured output tools: Optional[List[dict]] = None # Function calling support stream: bool = False # Unified streaming toggle@v2_router.post("/chat")async def chat_v2(request: ChatRequestV2): # V2 implementation with new features pass# Mount routersapp.include_router(v1_router)app.include_router(v2_router)# Deprecation headers -- notify clients that V1 is going away.# The "Sunset" header tells clients the exact date V1 will be removed.# The "Link" header tells clients where to go instead.# This follows RFC 8594 (The Sunset HTTP Header Field).@v1_router.post("/chat", deprecated=True)async def chat_v1_deprecated(request: ChatRequest): response = await chat_v1(request) return JSONResponse( content=response, headers={ "Deprecation": "true", "Sunset": "2025-06-01", "Link": "</v2/chat>; rel='successor-version'" } )
Version migration tip: When deprecating a version, track which API keys are still hitting the old version. Send targeted emails 90, 60, 30, and 7 days before sunset. The teams that don’t migrate are usually the ones that will file the loudest support tickets when you cut them off.
Request validation in LLM APIs serves a different purpose than in traditional APIs. Yes, you’re catching malformed input — but more importantly, you’re preventing expensive mistakes. A validation miss that lets through a 500K character message means you just burned $5-10 on a single API call that was probably a mistake. Validate aggressively at the boundary; it’s the cheapest place to catch errors.
from pydantic import BaseModel, Field, validatorfrom typing import List, Optionalclass Message(BaseModel): role: str content: str @validator("role") def validate_role(cls, v): # Only allow valid OpenAI/Anthropic message roles. # "tool" was added for function calling; "function" is deprecated. allowed = ["system", "user", "assistant", "tool"] if v not in allowed: raise ValueError(f"role must be one of {allowed}") return v @validator("content") def validate_content(cls, v): # These limits protect against accidental data dumps. # 100K chars is roughly 25K tokens -- enough for almost # any legitimate use case. if len(v) > 100000: raise ValueError("content exceeds maximum length of 100000") if len(v.strip()) == 0: raise ValueError("content cannot be empty") return vclass ChatRequest(BaseModel): messages: List[Message] = Field(..., min_items=1, max_items=100) # Regex prevents injection of special characters in model names model: str = Field("gpt-4o", pattern="^[a-zA-Z0-9-]+$") temperature: float = Field(0.7, ge=0, le=2) # 128K supports models with large output windows (e.g., Claude 3.5) max_tokens: int = Field(1000, ge=1, le=128000) # OpenAI supports max 4 stop sequences stop: Optional[List[str]] = Field(None, max_items=4) @validator("messages") def validate_messages(cls, v): # A conversation without a user message is meaningless -- # catch this before burning API credits if not any(m.role == "user" for m in v): raise ValueError("must include at least one user message") # System message must be first. Putting it elsewhere produces # unpredictable behavior across different model providers. system_indices = [i for i, m in enumerate(v) if m.role == "system"] if system_indices and system_indices[0] != 0: raise ValueError("system message must be first") return v @validator("stop") def validate_stop(cls, v): if v: for s in v: if len(s) > 100: raise ValueError("stop sequence too long") return v# Error responsesclass APIError(BaseModel): error: str code: str message: str param: Optional[str] = None@app.exception_handler(ValueError)async def validation_error_handler(request: Request, exc: ValueError): return JSONResponse( status_code=400, content=APIError( error="invalid_request", code="validation_error", message=str(exc) ).model_dump() )
Health endpoints are not optional for production LLM APIs — they’re how your load balancer, Kubernetes, and monitoring systems know whether your service is alive and ready to serve traffic. The distinction between “alive” (liveness) and “ready” (readiness) matters: a service can be alive but not ready if it’s still loading model weights, warming caches, or waiting for a downstream provider to come back online.
from fastapi import FastAPIfrom pydantic import BaseModelfrom typing import Dictfrom datetime import datetimeimport psutilclass HealthStatus(BaseModel): status: str timestamp: str version: str uptime_seconds: floatclass DetailedStatus(BaseModel): status: str providers: Dict[str, str] metrics: Dict[str, float]start_time = datetime.utcnow()@app.get("/health", response_model=HealthStatus)async def health(): """Basic health check""" return HealthStatus( status="healthy", timestamp=datetime.utcnow().isoformat(), version="1.0.0", uptime_seconds=(datetime.utcnow() - start_time).total_seconds() )@app.get("/status", response_model=DetailedStatus)async def status(): """Detailed status with provider checks. This endpoint is heavier than /health -- it actually tests downstream dependencies. Don't hit this every second from your load balancer; use /health for that. Reserve /status for dashboards and on-call debugging. """ providers = {} # Check OpenAI -- a lightweight API call that validates credentials # and connectivity without burning completion tokens try: await client.models.list() providers["openai"] = "healthy" except Exception: providers["openai"] = "unhealthy" # System metrics metrics = { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "active_requests": len(jobs) # From job store } overall = "healthy" if all( s == "healthy" for s in providers.values() ) else "degraded" return DetailedStatus( status=overall, providers=providers, metrics=metrics )@app.get("/ready")async def ready(): """Kubernetes readiness probe""" # Check if service is ready to receive traffic return {"ready": True}@app.get("/live")async def live(): """Kubernetes liveness probe""" # Check if service is alive return {"live": True}
A consistent response envelope makes life dramatically easier for API consumers. Every response should follow the same shape: a success flag, data, an optional error, and metadata. This means clients don’t need to guess whether a 200 response contains result, data, output, or response — it’s always in the same place.
from pydantic import BaseModelfrom typing import TypeVar, Generic, OptionalT = TypeVar("T")class APIResponse(BaseModel, Generic[T]): """Standard API response wrapper. Every endpoint returns this shape. Clients can write one generic handler rather than per-endpoint parsing logic. The 'meta' field carries pagination, rate limit info, or any other context that isn't the primary data. """ success: bool data: Optional[T] = None error: Optional[APIError] = None meta: Optional[dict] = Noneclass PaginatedResponse(BaseModel, Generic[T]): """Paginated response""" items: List[T] total: int page: int page_size: int has_more: bool# Usage@app.get("/v1/models", response_model=APIResponse[List[str]])async def list_models(): models = ["gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet"] return APIResponse( success=True, data=models, meta={"count": len(models)} )
Why would you choose streaming SSE over WebSockets for an LLM API, and when might WebSockets be the better choice?
What interviewers are testing: Whether you understand the tradeoffs between unidirectional and bidirectional real-time protocols in the specific context of LLM inference.Strong answer: SSE is the right default for LLM APIs because the communication pattern is inherently unidirectional during generation — the server streams tokens to the client. SSE works over standard HTTP, plays well with existing load balancers and CDNs, automatically reconnects on disconnection, and requires no special client libraries (the browser’s EventSource API handles it natively). WebSockets add bidirectional capability you rarely need during token streaming, and they require sticky sessions or special load balancer configuration.Choose WebSockets when you need the client to send signals mid-stream — for example, a “stop generating” button, real-time editing where the user is typing while the model is responding, or multi-turn conversational interfaces where latency of establishing new connections matters. The tradeoff is operational complexity: WebSocket connections are stateful, harder to load-balance, and don’t work through all corporate proxies.The key insight is that SSE gives you 90% of the UX benefit with 30% of the operational cost. Most production LLM APIs (OpenAI, Anthropic, Google) chose SSE for exactly this reason.
How would you design rate limiting for a multi-tenant LLM API where different customers have different pricing tiers?
What interviewers are testing: Systems design thinking around fairness, cost management, and graceful degradation under load.Strong answer: You need a multi-dimensional rate limiting strategy. First, define limits per tier across three axes: requests per minute (RPM), tokens per minute (TPM), and concurrent requests. A free tier might get 20 RPM / 40K TPM / 3 concurrent; an enterprise tier might get 500 RPM / 2M TPM / 50 concurrent.Implementation-wise, use a distributed rate limiter backed by Redis (not in-memory, since you’re running multiple API server instances). The token bucket algorithm is ideal here because it allows natural bursting while enforcing average rates. You need two token buckets per customer: one for request count, one for token count.The subtle part is token estimation — you don’t know the output token count until after generation. Solution: estimate input tokens before the call (using tiktoken or a character heuristic), reserve a buffer for expected output, then reconcile after completion. If the actual usage exceeds the reservation, deduct from the next window rather than retroactively rejecting a completed request.Return X-RateLimit-Limit, X-RateLimit-Remaining, and X-RateLimit-Reset headers on every response so clients can implement client-side throttling proactively. Return 429 with a Retry-After header when limits are exceeded.
Your LLM API is experiencing 5-second cold starts when a new model version is deployed. How do you handle this without impacting users?
What interviewers are testing: Understanding of deployment strategies, graceful degradation, and the specific challenges of deploying ML model serving infrastructure.Strong answer: This is a blue-green or canary deployment problem with the added wrinkle that “warming up” an LLM serving instance takes meaningful time. The approach depends on your infrastructure:If you’re calling an external provider (OpenAI, Anthropic), you don’t control cold starts, but you control fallback. Implement a provider fallback chain: if the primary returns errors or latency exceeds a threshold, route to a secondary provider. This requires abstracting your provider interface so the caller doesn’t know which model is responding.If you’re self-hosting models, use rolling deployments with health checks that include a “warmup” phase. The readiness probe shouldn’t return healthy until the model has completed at least one inference request. Keep the old version’s pods running until the new version is fully warm. Use Kubernetes’ maxUnavailable: 0 and maxSurge to ensure you always have capacity.For the API layer itself, implement a circuit breaker pattern: if error rates on the new version exceed a threshold within the first few minutes, automatically roll back traffic to the previous version. Track latency percentiles (p50, p99), not just averages, because cold starts affect tail latency disproportionately.
How do you handle idempotency in an async LLM job API where network failures can cause duplicate submissions?
What interviewers are testing: Distributed systems fundamentals applied to an AI-specific context, especially around cost and correctness.Strong answer: Idempotency in LLM APIs is critical because duplicate processing means duplicate cost — and with expensive models, a retry storm can burn thousands of dollars in minutes.The standard pattern is client-supplied idempotency keys. The client generates a unique key (UUID) per logical operation and includes it in the request header (e.g., Idempotency-Key: abc-123). On the server side, before processing a job, check if that key already exists in your store. If it does, return the existing result without reprocessing.Implementation details that matter: store the idempotency key with its result in Redis or a database with a TTL (typically 24-48 hours). The key must be stored before processing begins, not after — otherwise, two concurrent identical requests will both start processing. Use an atomic “set if not exists” operation. If the job is still processing when a duplicate arrives, return the job status (pending/processing) rather than starting a new one.The edge case people miss: what if the initial request fails halfway through? You need to distinguish between “request received but processing failed” (should retry) and “request completed” (return cached result). Store the idempotency record with a status field, and only return cached results for completed successes.
Walk through how you would design the error handling strategy for a production LLM API that wraps multiple model providers.
What interviewers are testing: Production engineering maturity, specifically around failure modes unique to LLM inference.Strong answer: LLM APIs have failure modes that traditional APIs don’t: content filtering rejections, context length exceeded, model overloaded (429s from the provider), malformed model output, and timeouts due to variable generation time. Each requires different handling.Layer 1 — Input validation: Catch malformed requests, token count estimation (reject prompts that would exceed context window), and content policy pre-screening before burning any API credits. Return 400 with specific error codes.Layer 2 — Provider errors: Wrap each provider call in a retry-with-backoff for transient errors (429, 500, 503, network timeouts). Use exponential backoff with jitter to avoid thundering herd. After max retries, attempt fallback to a secondary provider. Return 502 or 503 with the provider-specific error sanitized (never leak provider details to end users).Layer 3 — Output validation: The model might return empty content, malformed JSON (when you expected structured output), or content that violates your policies. Validate model output before returning it. If output is invalid, retry with the same prompt up to 2 times (LLM outputs are non-deterministic, so a retry often works), then return a graceful error.Layer 4 — Consistency: Use a standard error envelope across all error types. Every error response should include an error code (machine-readable), a message (human-readable), and a request ID (for support debugging). Map all internal errors to a finite set of external error codes so clients can handle them programmatically.The key principle: fail fast on client errors (4xx), retry on server errors (5xx), and always prefer returning a degraded response over returning nothing.