December 2025 Update: Covers serverless AI, GPU inference, caching strategies, rate limiting, and production infrastructure patterns.
The Production Gap
Building an AI demo takes hours. Making it production-ready takes weeks. This module covers:- Reliability (error handling, retries, fallbacks)
- Performance (caching, batching, async)
- Cost (model routing, token optimization)
- Scaling (rate limits, queues, load balancing)
Reality Check: 90% of AI projects fail to reach production. The difference is infrastructure, not models.
Production Architecture
Copy
┌─────────────────────────────────────────────────────────────────────┐
│ LOAD BALANCER │
│ (CloudFlare, AWS ALB, nginx) │
└─────────────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ API Pod 1 │ │ API Pod 2 │ │ API Pod N │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────┼───────────────┘
▼
┌─────────────────────────────────────────────────┐
│ RATE LIMITER / QUEUE │
│ (Redis) │
└─────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Cache │ │ Model │ │ Fallback │
│ (Redis) │ │ Router │ │ Queue │
└───────────┘ └─────┬─────┘ └───────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ OpenAI │ │ Anthropic│ │ Local │
│ API │ │ API │ │ (Ollama)│
└──────────┘ └──────────┘ └──────────┘
Error Handling & Retries
Robust API Wrapper
Copy
import asyncio
from openai import OpenAI, APIError, RateLimitError, APIConnectionError
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import logging
logger = logging.getLogger(__name__)
class RobustLLMClient:
"""Production-grade LLM client with retries and fallbacks"""
def __init__(self):
self.openai = OpenAI()
self.fallback_models = [
"gpt-4o",
"gpt-4o-mini",
"gpt-3.5-turbo",
]
@retry(
retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
before_sleep=lambda retry_state: logger.warning(
f"Retry {retry_state.attempt_number} after {retry_state.outcome.exception()}"
)
)
async def _call_with_retry(self, model: str, messages: list, **kwargs):
"""Single model call with retries"""
return self.openai.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
async def complete(
self,
messages: list,
model: str = "gpt-4o",
timeout: float = 30.0,
**kwargs
) -> str:
"""Complete with automatic fallback"""
models_to_try = [model] + [m for m in self.fallback_models if m != model]
last_error = None
for try_model in models_to_try:
try:
response = await asyncio.wait_for(
self._call_with_retry(try_model, messages, **kwargs),
timeout=timeout
)
if try_model != model:
logger.info(f"Succeeded with fallback model: {try_model}")
return response.choices[0].message.content
except asyncio.TimeoutError:
logger.warning(f"Timeout with {try_model}")
last_error = TimeoutError(f"Timeout after {timeout}s")
except RateLimitError as e:
logger.warning(f"Rate limited on {try_model}: {e}")
last_error = e
except APIError as e:
logger.error(f"API error on {try_model}: {e}")
last_error = e
raise last_error or RuntimeError("All models failed")
Circuit Breaker Pattern
Copy
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: int = 60 # seconds
def __post_init__(self):
self.failures = 0
self.last_failure_time: datetime = None
self.state = CircuitState.CLOSED
def record_success(self):
self.failures = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN: allow one request to test
return True
# Usage with LLM
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
async def protected_llm_call(prompt: str):
if not circuit.can_execute():
raise RuntimeError("Circuit breaker is OPEN - service unavailable")
try:
result = await llm_client.complete(prompt)
circuit.record_success()
return result
except Exception as e:
circuit.record_failure()
raise
Caching Strategies
Semantic Cache
Copy
import hashlib
import json
from datetime import datetime, timedelta
import redis
import numpy as np
class SemanticCache:
"""Cache LLM responses with semantic similarity matching"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600
):
self.redis = redis.from_url(redis_url)
self.similarity_threshold = similarity_threshold
self.ttl = ttl_seconds
self.openai = OpenAI()
def _get_embedding(self, text: str) -> list[float]:
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def _hash_key(self, text: str) -> str:
return f"llm_cache:{hashlib.sha256(text.encode()).hexdigest()[:16]}"
async def get(self, prompt: str) -> str | None:
"""Try to get cached response"""
# First try exact match
exact_key = self._hash_key(prompt)
cached = self.redis.get(exact_key)
if cached:
return json.loads(cached)["response"]
# Try semantic match
prompt_embedding = self._get_embedding(prompt)
# Scan recent cache entries (in production, use vector DB)
for key in self.redis.scan_iter("llm_cache:*"):
cached = json.loads(self.redis.get(key))
if "embedding" in cached:
similarity = self._cosine_similarity(
prompt_embedding,
cached["embedding"]
)
if similarity >= self.similarity_threshold:
return cached["response"]
return None
async def set(self, prompt: str, response: str):
"""Cache a response"""
key = self._hash_key(prompt)
embedding = self._get_embedding(prompt)
self.redis.setex(
key,
self.ttl,
json.dumps({
"prompt": prompt,
"response": response,
"embedding": embedding,
"cached_at": datetime.now().isoformat()
})
)
# Usage
cache = SemanticCache()
async def cached_llm_call(prompt: str) -> str:
# Try cache first
cached = await cache.get(prompt)
if cached:
logger.info("Cache hit!")
return cached
# Call LLM
response = await llm_client.complete(prompt)
# Cache the response
await cache.set(prompt, response)
return response
Response Streaming with Cache
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import hashlib
app = FastAPI()
async def stream_with_cache(prompt: str, cache: SemanticCache):
"""Stream response while building cache"""
# Check cache first
cached = await cache.get(prompt)
if cached:
# Stream from cache
for chunk in cached.split():
yield f"data: {chunk} \n\n"
await asyncio.sleep(0.02) # Simulate streaming
return
# Stream from LLM and collect
full_response = []
async for chunk in llm_client.stream(prompt):
full_response.append(chunk)
yield f"data: {chunk}\n\n"
# Cache complete response
await cache.set(prompt, "".join(full_response))
@app.post("/chat/stream")
async def chat_stream(request: dict):
return StreamingResponse(
stream_with_cache(request["prompt"], cache),
media_type="text/event-stream"
)
Rate Limiting
Token Bucket Rate Limiter
Copy
import time
from dataclasses import dataclass
import redis
@dataclass
class RateLimitResult:
allowed: bool
remaining: int
reset_at: float
retry_after: float = 0
class TokenBucketLimiter:
"""Token bucket rate limiter with Redis backend"""
def __init__(
self,
redis_url: str,
tokens_per_minute: int = 60,
bucket_size: int = 100
):
self.redis = redis.from_url(redis_url)
self.rate = tokens_per_minute / 60 # tokens per second
self.bucket_size = bucket_size
def check(self, key: str, tokens: int = 1) -> RateLimitResult:
"""Check if request is allowed and consume tokens"""
now = time.time()
bucket_key = f"ratelimit:{key}"
# Get current bucket state
pipe = self.redis.pipeline()
pipe.hgetall(bucket_key)
result = pipe.execute()[0]
if result:
tokens_available = float(result.get(b"tokens", self.bucket_size))
last_update = float(result.get(b"last_update", now))
else:
tokens_available = self.bucket_size
last_update = now
# Refill tokens based on time passed
time_passed = now - last_update
tokens_available = min(
self.bucket_size,
tokens_available + (time_passed * self.rate)
)
# Check if we have enough tokens
if tokens_available >= tokens:
# Consume tokens
tokens_available -= tokens
self.redis.hset(bucket_key, mapping={
"tokens": tokens_available,
"last_update": now
})
self.redis.expire(bucket_key, 120) # Cleanup old keys
return RateLimitResult(
allowed=True,
remaining=int(tokens_available),
reset_at=now + (self.bucket_size - tokens_available) / self.rate
)
else:
# Calculate wait time
tokens_needed = tokens - tokens_available
wait_time = tokens_needed / self.rate
return RateLimitResult(
allowed=False,
remaining=0,
reset_at=now + wait_time,
retry_after=wait_time
)
# FastAPI middleware
from fastapi import Request, HTTPException
limiter = TokenBucketLimiter(
redis_url="redis://localhost:6379",
tokens_per_minute=100
)
async def rate_limit_middleware(request: Request, call_next):
# Get user identifier
user_id = request.headers.get("X-API-Key", request.client.host)
result = limiter.check(user_id)
if not result.allowed:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"Retry-After": str(int(result.retry_after)),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(result.reset_at))
}
)
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(result.remaining)
return response
Model Router
Cost-Optimized Routing
Copy
from dataclasses import dataclass
from enum import Enum
class TaskComplexity(Enum):
SIMPLE = "simple" # Classification, extraction
MODERATE = "moderate" # Summarization, Q&A
COMPLEX = "complex" # Reasoning, coding
CRITICAL = "critical" # High-stakes decisions
@dataclass
class ModelConfig:
name: str
cost_per_1k_input: float
cost_per_1k_output: float
max_context: int
speed: float # tokens/second
MODELS = {
"gpt-4o": ModelConfig("gpt-4o", 0.0025, 0.01, 128000, 100),
"gpt-4o-mini": ModelConfig("gpt-4o-mini", 0.00015, 0.0006, 128000, 200),
"claude-3-5-sonnet": ModelConfig("claude-3-5-sonnet", 0.003, 0.015, 200000, 80),
"claude-3-5-haiku": ModelConfig("claude-3-5-haiku", 0.0008, 0.004, 200000, 150),
}
class ModelRouter:
"""Route requests to optimal model based on task"""
def __init__(self, default_model: str = "gpt-4o-mini"):
self.default = default_model
self.classifier = OpenAI()
def classify_task(self, prompt: str) -> TaskComplexity:
"""Use a cheap model to classify task complexity"""
response = self.classifier.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Classify this task's complexity:
Task: {prompt[:500]}
Options:
- simple: Basic extraction, classification, formatting
- moderate: Summarization, Q&A, translation
- complex: Multi-step reasoning, coding, analysis
- critical: High-stakes decisions, medical/legal advice
Reply with just the complexity level."""
}],
max_tokens=10
)
level = response.choices[0].message.content.strip().lower()
return TaskComplexity(level) if level in [c.value for c in TaskComplexity] else TaskComplexity.MODERATE
def select_model(
self,
prompt: str,
priority: str = "balanced" # cost, speed, quality
) -> str:
"""Select best model for the task"""
complexity = self.classify_task(prompt)
routing = {
TaskComplexity.SIMPLE: {
"cost": "gpt-4o-mini",
"speed": "gpt-4o-mini",
"quality": "gpt-4o-mini",
"balanced": "gpt-4o-mini"
},
TaskComplexity.MODERATE: {
"cost": "gpt-4o-mini",
"speed": "claude-3-5-haiku",
"quality": "gpt-4o",
"balanced": "gpt-4o-mini"
},
TaskComplexity.COMPLEX: {
"cost": "gpt-4o-mini",
"speed": "gpt-4o",
"quality": "gpt-4o",
"balanced": "gpt-4o"
},
TaskComplexity.CRITICAL: {
"cost": "gpt-4o",
"speed": "gpt-4o",
"quality": "gpt-4o",
"balanced": "gpt-4o"
}
}
return routing[complexity][priority]
# Usage
router = ModelRouter()
async def smart_complete(prompt: str, priority: str = "balanced"):
model = router.select_model(prompt, priority)
logger.info(f"Routing to {model}")
return await llm_client.complete(prompt, model=model)
Deployment Options
Serverless (Recommended Start)
Copy
# Vercel / AWS Lambda with FastAPI
from mangum import Mangum
from fastapi import FastAPI
app = FastAPI()
@app.post("/chat")
async def chat(request: dict):
return await llm_client.complete(request["prompt"])
# Handler for AWS Lambda
handler = Mangum(app)
Docker Deployment
Copy
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Copy
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
deploy:
replicas: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
redis_data:
Kubernetes
Copy
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-api
spec:
replicas: 3
selector:
matchLabels:
app: ai-api
template:
metadata:
labels:
app: ai-api
spec:
containers:
- name: ai-api
image: your-registry/ai-api:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ai-api
spec:
selector:
app: ai-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
Key Takeaways
Retry Everything
LLM APIs fail. Build retries, fallbacks, and circuit breakers.
Cache Aggressively
Semantic caching can reduce costs by 50%+ for common queries.
Route Smart
Use cheap models for simple tasks, expensive ones for complex.
Monitor Everything
Track latency, costs, errors. You can’t optimize what you don’t measure.
What’s Next
Capstone Project
Build a complete production AI application from scratch