Skip to main content
December 2025 Update: Covers serverless AI, GPU inference, caching strategies, rate limiting, and production infrastructure patterns.

The Production Gap

Building an AI demo takes hours. Making it production-ready takes weeks. This module covers:
  • Reliability (error handling, retries, fallbacks)
  • Performance (caching, batching, async)
  • Cost (model routing, token optimization)
  • Scaling (rate limits, queues, load balancing)
Reality Check: 90% of AI projects fail to reach production. The difference is infrastructure, not models.

Production Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                          LOAD BALANCER                               │
│                    (CloudFlare, AWS ALB, nginx)                     │
└─────────────────────────────────────────────────────────────────────┘

                    ┌───────────────┼───────────────┐
                    ▼               ▼               ▼
            ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
            │  API Pod 1  │ │  API Pod 2  │ │  API Pod N  │
            └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
                   │               │               │
                   └───────────────┼───────────────┘

            ┌─────────────────────────────────────────────────┐
            │              RATE LIMITER / QUEUE               │
            │                    (Redis)                      │
            └─────────────────────────────────────────────────┘

                   ┌───────────────┼───────────────┐
                   ▼               ▼               ▼
            ┌───────────┐   ┌───────────┐   ┌───────────┐
            │   Cache   │   │  Model    │   │ Fallback  │
            │  (Redis)  │   │  Router   │   │  Queue    │
            └───────────┘   └─────┬─────┘   └───────────┘

                    ┌─────────────┼─────────────┐
                    ▼             ▼             ▼
             ┌──────────┐  ┌──────────┐  ┌──────────┐
             │  OpenAI  │  │ Anthropic│  │  Local   │
             │   API    │  │   API    │  │  (Ollama)│
             └──────────┘  └──────────┘  └──────────┘

Error Handling & Retries

Robust API Wrapper

import asyncio
from openai import OpenAI, APIError, RateLimitError, APIConnectionError
from tenacity import (
    retry, 
    stop_after_attempt, 
    wait_exponential,
    retry_if_exception_type
)
import logging

logger = logging.getLogger(__name__)

class RobustLLMClient:
    """Production-grade LLM client with retries and fallbacks"""
    
    def __init__(self):
        self.openai = OpenAI()
        self.fallback_models = [
            "gpt-4o",
            "gpt-4o-mini",
            "gpt-3.5-turbo",
        ]
    
    @retry(
        retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=60),
        before_sleep=lambda retry_state: logger.warning(
            f"Retry {retry_state.attempt_number} after {retry_state.outcome.exception()}"
        )
    )
    async def _call_with_retry(self, model: str, messages: list, **kwargs):
        """Single model call with retries"""
        return self.openai.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs
        )
    
    async def complete(
        self, 
        messages: list,
        model: str = "gpt-4o",
        timeout: float = 30.0,
        **kwargs
    ) -> str:
        """Complete with automatic fallback"""
        
        models_to_try = [model] + [m for m in self.fallback_models if m != model]
        last_error = None
        
        for try_model in models_to_try:
            try:
                response = await asyncio.wait_for(
                    self._call_with_retry(try_model, messages, **kwargs),
                    timeout=timeout
                )
                
                if try_model != model:
                    logger.info(f"Succeeded with fallback model: {try_model}")
                
                return response.choices[0].message.content
                
            except asyncio.TimeoutError:
                logger.warning(f"Timeout with {try_model}")
                last_error = TimeoutError(f"Timeout after {timeout}s")
            except RateLimitError as e:
                logger.warning(f"Rate limited on {try_model}: {e}")
                last_error = e
            except APIError as e:
                logger.error(f"API error on {try_model}: {e}")
                last_error = e
        
        raise last_error or RuntimeError("All models failed")

Circuit Breaker Pattern

from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    recovery_timeout: int = 60  # seconds
    
    def __post_init__(self):
        self.failures = 0
        self.last_failure_time: datetime = None
        self.state = CircuitState.CLOSED
    
    def record_success(self):
        self.failures = 0
        self.state = CircuitState.CLOSED
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = datetime.now()
        
        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        
        # HALF_OPEN: allow one request to test
        return True

# Usage with LLM
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

async def protected_llm_call(prompt: str):
    if not circuit.can_execute():
        raise RuntimeError("Circuit breaker is OPEN - service unavailable")
    
    try:
        result = await llm_client.complete(prompt)
        circuit.record_success()
        return result
    except Exception as e:
        circuit.record_failure()
        raise

Caching Strategies

Semantic Cache

import hashlib
import json
from datetime import datetime, timedelta
import redis
import numpy as np

class SemanticCache:
    """Cache LLM responses with semantic similarity matching"""
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        similarity_threshold: float = 0.95,
        ttl_seconds: int = 3600
    ):
        self.redis = redis.from_url(redis_url)
        self.similarity_threshold = similarity_threshold
        self.ttl = ttl_seconds
        self.openai = OpenAI()
    
    def _get_embedding(self, text: str) -> list[float]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        a, b = np.array(a), np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _hash_key(self, text: str) -> str:
        return f"llm_cache:{hashlib.sha256(text.encode()).hexdigest()[:16]}"
    
    async def get(self, prompt: str) -> str | None:
        """Try to get cached response"""
        
        # First try exact match
        exact_key = self._hash_key(prompt)
        cached = self.redis.get(exact_key)
        if cached:
            return json.loads(cached)["response"]
        
        # Try semantic match
        prompt_embedding = self._get_embedding(prompt)
        
        # Scan recent cache entries (in production, use vector DB)
        for key in self.redis.scan_iter("llm_cache:*"):
            cached = json.loads(self.redis.get(key))
            if "embedding" in cached:
                similarity = self._cosine_similarity(
                    prompt_embedding, 
                    cached["embedding"]
                )
                if similarity >= self.similarity_threshold:
                    return cached["response"]
        
        return None
    
    async def set(self, prompt: str, response: str):
        """Cache a response"""
        key = self._hash_key(prompt)
        embedding = self._get_embedding(prompt)
        
        self.redis.setex(
            key,
            self.ttl,
            json.dumps({
                "prompt": prompt,
                "response": response,
                "embedding": embedding,
                "cached_at": datetime.now().isoformat()
            })
        )

# Usage
cache = SemanticCache()

async def cached_llm_call(prompt: str) -> str:
    # Try cache first
    cached = await cache.get(prompt)
    if cached:
        logger.info("Cache hit!")
        return cached
    
    # Call LLM
    response = await llm_client.complete(prompt)
    
    # Cache the response
    await cache.set(prompt, response)
    
    return response

Response Streaming with Cache

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import hashlib

app = FastAPI()

async def stream_with_cache(prompt: str, cache: SemanticCache):
    """Stream response while building cache"""
    
    # Check cache first
    cached = await cache.get(prompt)
    if cached:
        # Stream from cache
        for chunk in cached.split():
            yield f"data: {chunk} \n\n"
            await asyncio.sleep(0.02)  # Simulate streaming
        return
    
    # Stream from LLM and collect
    full_response = []
    
    async for chunk in llm_client.stream(prompt):
        full_response.append(chunk)
        yield f"data: {chunk}\n\n"
    
    # Cache complete response
    await cache.set(prompt, "".join(full_response))

@app.post("/chat/stream")
async def chat_stream(request: dict):
    return StreamingResponse(
        stream_with_cache(request["prompt"], cache),
        media_type="text/event-stream"
    )

Rate Limiting

Token Bucket Rate Limiter

import time
from dataclasses import dataclass
import redis

@dataclass
class RateLimitResult:
    allowed: bool
    remaining: int
    reset_at: float
    retry_after: float = 0

class TokenBucketLimiter:
    """Token bucket rate limiter with Redis backend"""
    
    def __init__(
        self,
        redis_url: str,
        tokens_per_minute: int = 60,
        bucket_size: int = 100
    ):
        self.redis = redis.from_url(redis_url)
        self.rate = tokens_per_minute / 60  # tokens per second
        self.bucket_size = bucket_size
    
    def check(self, key: str, tokens: int = 1) -> RateLimitResult:
        """Check if request is allowed and consume tokens"""
        
        now = time.time()
        bucket_key = f"ratelimit:{key}"
        
        # Get current bucket state
        pipe = self.redis.pipeline()
        pipe.hgetall(bucket_key)
        result = pipe.execute()[0]
        
        if result:
            tokens_available = float(result.get(b"tokens", self.bucket_size))
            last_update = float(result.get(b"last_update", now))
        else:
            tokens_available = self.bucket_size
            last_update = now
        
        # Refill tokens based on time passed
        time_passed = now - last_update
        tokens_available = min(
            self.bucket_size,
            tokens_available + (time_passed * self.rate)
        )
        
        # Check if we have enough tokens
        if tokens_available >= tokens:
            # Consume tokens
            tokens_available -= tokens
            self.redis.hset(bucket_key, mapping={
                "tokens": tokens_available,
                "last_update": now
            })
            self.redis.expire(bucket_key, 120)  # Cleanup old keys
            
            return RateLimitResult(
                allowed=True,
                remaining=int(tokens_available),
                reset_at=now + (self.bucket_size - tokens_available) / self.rate
            )
        else:
            # Calculate wait time
            tokens_needed = tokens - tokens_available
            wait_time = tokens_needed / self.rate
            
            return RateLimitResult(
                allowed=False,
                remaining=0,
                reset_at=now + wait_time,
                retry_after=wait_time
            )

# FastAPI middleware
from fastapi import Request, HTTPException

limiter = TokenBucketLimiter(
    redis_url="redis://localhost:6379",
    tokens_per_minute=100
)

async def rate_limit_middleware(request: Request, call_next):
    # Get user identifier
    user_id = request.headers.get("X-API-Key", request.client.host)
    
    result = limiter.check(user_id)
    
    if not result.allowed:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={
                "Retry-After": str(int(result.retry_after)),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(result.reset_at))
            }
        )
    
    response = await call_next(request)
    response.headers["X-RateLimit-Remaining"] = str(result.remaining)
    return response

Model Router

Cost-Optimized Routing

from dataclasses import dataclass
from enum import Enum

class TaskComplexity(Enum):
    SIMPLE = "simple"       # Classification, extraction
    MODERATE = "moderate"   # Summarization, Q&A
    COMPLEX = "complex"     # Reasoning, coding
    CRITICAL = "critical"   # High-stakes decisions

@dataclass
class ModelConfig:
    name: str
    cost_per_1k_input: float
    cost_per_1k_output: float
    max_context: int
    speed: float  # tokens/second

MODELS = {
    "gpt-4o": ModelConfig("gpt-4o", 0.0025, 0.01, 128000, 100),
    "gpt-4o-mini": ModelConfig("gpt-4o-mini", 0.00015, 0.0006, 128000, 200),
    "claude-3-5-sonnet": ModelConfig("claude-3-5-sonnet", 0.003, 0.015, 200000, 80),
    "claude-3-5-haiku": ModelConfig("claude-3-5-haiku", 0.0008, 0.004, 200000, 150),
}

class ModelRouter:
    """Route requests to optimal model based on task"""
    
    def __init__(self, default_model: str = "gpt-4o-mini"):
        self.default = default_model
        self.classifier = OpenAI()
    
    def classify_task(self, prompt: str) -> TaskComplexity:
        """Use a cheap model to classify task complexity"""
        response = self.classifier.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": f"""Classify this task's complexity:
                
Task: {prompt[:500]}

Options:
- simple: Basic extraction, classification, formatting
- moderate: Summarization, Q&A, translation
- complex: Multi-step reasoning, coding, analysis
- critical: High-stakes decisions, medical/legal advice

Reply with just the complexity level."""
            }],
            max_tokens=10
        )
        
        level = response.choices[0].message.content.strip().lower()
        return TaskComplexity(level) if level in [c.value for c in TaskComplexity] else TaskComplexity.MODERATE
    
    def select_model(
        self,
        prompt: str,
        priority: str = "balanced"  # cost, speed, quality
    ) -> str:
        """Select best model for the task"""
        
        complexity = self.classify_task(prompt)
        
        routing = {
            TaskComplexity.SIMPLE: {
                "cost": "gpt-4o-mini",
                "speed": "gpt-4o-mini",
                "quality": "gpt-4o-mini",
                "balanced": "gpt-4o-mini"
            },
            TaskComplexity.MODERATE: {
                "cost": "gpt-4o-mini",
                "speed": "claude-3-5-haiku",
                "quality": "gpt-4o",
                "balanced": "gpt-4o-mini"
            },
            TaskComplexity.COMPLEX: {
                "cost": "gpt-4o-mini",
                "speed": "gpt-4o",
                "quality": "gpt-4o",
                "balanced": "gpt-4o"
            },
            TaskComplexity.CRITICAL: {
                "cost": "gpt-4o",
                "speed": "gpt-4o",
                "quality": "gpt-4o",
                "balanced": "gpt-4o"
            }
        }
        
        return routing[complexity][priority]

# Usage
router = ModelRouter()

async def smart_complete(prompt: str, priority: str = "balanced"):
    model = router.select_model(prompt, priority)
    logger.info(f"Routing to {model}")
    return await llm_client.complete(prompt, model=model)

Deployment Options

# Vercel / AWS Lambda with FastAPI
from mangum import Mangum
from fastapi import FastAPI

app = FastAPI()

@app.post("/chat")
async def chat(request: dict):
    return await llm_client.complete(request["prompt"])

# Handler for AWS Lambda
handler = Mangum(app)

Docker Deployment

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    deploy:
      replicas: 3
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
  redis_data:

Kubernetes

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ai-api
  template:
    metadata:
      labels:
        app: ai-api
    spec:
      containers:
      - name: ai-api
        image: your-registry/ai-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-secrets
              key: openai-key
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ai-api
spec:
  selector:
    app: ai-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

Key Takeaways

Retry Everything

LLM APIs fail. Build retries, fallbacks, and circuit breakers.

Cache Aggressively

Semantic caching can reduce costs by 50%+ for common queries.

Route Smart

Use cheap models for simple tasks, expensive ones for complex.

Monitor Everything

Track latency, costs, errors. You can’t optimize what you don’t measure.

What’s Next

Capstone Project

Build a complete production AI application from scratch