Skip to main content
Designing robust APIs for LLM applications requires careful consideration of latency, error handling, streaming, and scalability.

Core API Patterns

Synchronous Chat Endpoint

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import List, Optional
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class Message(BaseModel):
    role: str = Field(..., pattern="^(system|user|assistant)$")
    content: str = Field(..., min_length=1, max_length=100000)

class ChatRequest(BaseModel):
    messages: List[Message]
    model: str = "gpt-4o"
    temperature: float = Field(0.7, ge=0, le=2)
    max_tokens: int = Field(1000, ge=1, le=4096)
    user_id: Optional[str] = None

class ChatResponse(BaseModel):
    content: str
    model: str
    usage: dict
    finish_reason: str

@app.post("/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """Synchronous chat completion endpoint"""
    
    try:
        response = await client.chat.completions.create(
            model=request.model,
            messages=[m.model_dump() for m in request.messages],
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        
        return ChatResponse(
            content=response.choices[0].message.content,
            model=response.model,
            usage={
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            },
            finish_reason=response.choices[0].finish_reason
        )
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Streaming Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import json

class StreamRequest(BaseModel):
    messages: List[Message]
    model: str = "gpt-4o"

async def stream_response(
    request: StreamRequest
) -> AsyncGenerator[str, None]:
    """Generate SSE stream"""
    
    try:
        stream = await client.chat.completions.create(
            model=request.model,
            messages=[m.model_dump() for m in request.messages],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                data = {
                    "content": chunk.choices[0].delta.content,
                    "finish_reason": chunk.choices[0].finish_reason
                }
                yield f"data: {json.dumps(data)}\n\n"
        
        yield "data: [DONE]\n\n"
    
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/v1/chat/stream")
async def chat_stream(request: StreamRequest):
    """Server-Sent Events streaming endpoint"""
    
    return StreamingResponse(
        stream_response(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Disable nginx buffering
        }
    )

Async Job Pattern

For long-running LLM tasks:
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid
import asyncio
from datetime import datetime

# In-memory store (use Redis in production)
jobs = {}

class JobStatus(BaseModel):
    job_id: str
    status: str  # "pending", "processing", "completed", "failed"
    created_at: str
    completed_at: Optional[str] = None
    result: Optional[dict] = None
    error: Optional[str] = None
    progress: float = 0.0

class AsyncJobRequest(BaseModel):
    prompt: str
    model: str = "gpt-4o"
    webhook_url: Optional[str] = None

async def process_job(job_id: str, request: AsyncJobRequest):
    """Background job processor"""
    
    jobs[job_id]["status"] = "processing"
    
    try:
        response = await client.chat.completions.create(
            model=request.model,
            messages=[{"role": "user", "content": request.prompt}]
        )
        
        jobs[job_id]["status"] = "completed"
        jobs[job_id]["completed_at"] = datetime.utcnow().isoformat()
        jobs[job_id]["result"] = {
            "content": response.choices[0].message.content,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens
            }
        }
        jobs[job_id]["progress"] = 1.0
        
        # Send webhook if configured
        if request.webhook_url:
            await send_webhook(request.webhook_url, jobs[job_id])
    
    except Exception as e:
        jobs[job_id]["status"] = "failed"
        jobs[job_id]["error"] = str(e)
        jobs[job_id]["completed_at"] = datetime.utcnow().isoformat()

@app.post("/v1/jobs", response_model=JobStatus)
async def create_job(
    request: AsyncJobRequest,
    background_tasks: BackgroundTasks
):
    """Create async job"""
    
    job_id = str(uuid.uuid4())
    
    jobs[job_id] = {
        "job_id": job_id,
        "status": "pending",
        "created_at": datetime.utcnow().isoformat(),
        "completed_at": None,
        "result": None,
        "error": None,
        "progress": 0.0
    }
    
    background_tasks.add_task(process_job, job_id, request)
    
    return JobStatus(**jobs[job_id])

@app.get("/v1/jobs/{job_id}", response_model=JobStatus)
async def get_job(job_id: str):
    """Get job status"""
    
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    return JobStatus(**jobs[job_id])

@app.delete("/v1/jobs/{job_id}")
async def cancel_job(job_id: str):
    """Cancel job (if still pending)"""
    
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    if jobs[job_id]["status"] == "pending":
        jobs[job_id]["status"] = "cancelled"
        return {"message": "Job cancelled"}
    
    return {"message": "Job cannot be cancelled"}

Webhook Integration

import httpx
from typing import Optional
from pydantic import BaseModel, HttpUrl
import hmac
import hashlib

class WebhookConfig(BaseModel):
    url: HttpUrl
    secret: Optional[str] = None
    events: List[str] = ["job.completed", "job.failed"]

class WebhookSender:
    """Send webhook notifications"""
    
    def __init__(self, timeout: float = 30.0, max_retries: int = 3):
        self.timeout = timeout
        self.max_retries = max_retries
    
    def _sign_payload(self, payload: str, secret: str) -> str:
        """Create HMAC signature for webhook"""
        return hmac.new(
            secret.encode(),
            payload.encode(),
            hashlib.sha256
        ).hexdigest()
    
    async def send(
        self,
        url: str,
        event: str,
        data: dict,
        secret: str = None
    ) -> bool:
        """Send webhook with retries"""
        
        payload = {
            "event": event,
            "timestamp": datetime.utcnow().isoformat(),
            "data": data
        }
        
        headers = {
            "Content-Type": "application/json",
            "X-Webhook-Event": event
        }
        
        if secret:
            payload_str = json.dumps(payload)
            signature = self._sign_payload(payload_str, secret)
            headers["X-Webhook-Signature"] = f"sha256={signature}"
        
        async with httpx.AsyncClient() as client:
            for attempt in range(self.max_retries):
                try:
                    response = await client.post(
                        url,
                        json=payload,
                        headers=headers,
                        timeout=self.timeout
                    )
                    
                    if response.status_code < 300:
                        return True
                    
                except httpx.RequestError:
                    pass
                
                # Exponential backoff
                await asyncio.sleep(2 ** attempt)
        
        return False

webhook_sender = WebhookSender()

async def send_webhook(url: str, job_data: dict, secret: str = None):
    """Send job completion webhook"""
    
    event = "job.completed" if job_data["status"] == "completed" else "job.failed"
    
    await webhook_sender.send(
        url=url,
        event=event,
        data=job_data,
        secret=secret
    )

Rate Limiting

from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    requests_per_minute: int = 60
    requests_per_hour: int = 1000
    tokens_per_minute: int = 100000

class RateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self):
        self.requests = defaultdict(list)  # user_id -> list of timestamps
        self.tokens = defaultdict(int)  # user_id -> tokens used
    
    def _clean_old_requests(self, user_id: str, window_seconds: int):
        """Remove requests outside the window"""
        cutoff = time.time() - window_seconds
        self.requests[user_id] = [
            t for t in self.requests[user_id] if t > cutoff
        ]
    
    def check_rate_limit(
        self,
        user_id: str,
        config: RateLimitConfig
    ) -> tuple[bool, dict]:
        """Check if request is within rate limits"""
        
        now = time.time()
        
        # Check per-minute limit
        self._clean_old_requests(user_id, 60)
        minute_requests = len(self.requests[user_id])
        
        if minute_requests >= config.requests_per_minute:
            return False, {
                "error": "rate_limit_exceeded",
                "limit": config.requests_per_minute,
                "window": "minute",
                "retry_after": 60
            }
        
        # Check per-hour limit
        self._clean_old_requests(user_id, 3600)
        hour_requests = len(self.requests[user_id])
        
        if hour_requests >= config.requests_per_hour:
            return False, {
                "error": "rate_limit_exceeded",
                "limit": config.requests_per_hour,
                "window": "hour",
                "retry_after": 3600
            }
        
        return True, {}
    
    def record_request(self, user_id: str):
        """Record a request"""
        self.requests[user_id].append(time.time())
    
    def record_tokens(self, user_id: str, tokens: int):
        """Record token usage"""
        self.tokens[user_id] += tokens

rate_limiter = RateLimiter()

class RateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Extract user ID from auth token or API key
        user_id = request.headers.get("X-API-Key", "anonymous")
        
        # Check rate limit
        allowed, error_info = rate_limiter.check_rate_limit(
            user_id,
            RateLimitConfig()
        )
        
        if not allowed:
            return JSONResponse(
                status_code=429,
                content=error_info,
                headers={"Retry-After": str(error_info["retry_after"])}
            )
        
        # Record request
        rate_limiter.record_request(user_id)
        
        response = await call_next(request)
        
        return response

app.add_middleware(RateLimitMiddleware)

API Versioning

from fastapi import FastAPI, APIRouter
from enum import Enum

class APIVersion(str, Enum):
    V1 = "v1"
    V2 = "v2"

# Version 1 router
v1_router = APIRouter(prefix="/v1", tags=["v1"])

@v1_router.post("/chat")
async def chat_v1(request: ChatRequest):
    # V1 implementation
    pass

# Version 2 router with new features
v2_router = APIRouter(prefix="/v2", tags=["v2"])

class ChatRequestV2(BaseModel):
    messages: List[Message]
    model: str = "gpt-4o"
    temperature: float = 0.7
    max_tokens: int = 1000
    # New V2 fields
    response_format: Optional[dict] = None
    tools: Optional[List[dict]] = None
    stream: bool = False

@v2_router.post("/chat")
async def chat_v2(request: ChatRequestV2):
    # V2 implementation with new features
    pass

# Mount routers
app.include_router(v1_router)
app.include_router(v2_router)

# Deprecation headers
@v1_router.post("/chat", deprecated=True)
async def chat_v1_deprecated(request: ChatRequest):
    response = await chat_v1(request)
    # Add deprecation header
    return JSONResponse(
        content=response,
        headers={
            "Deprecation": "true",
            "Sunset": "2025-06-01",
            "Link": "</v2/chat>; rel='successor-version'"
        }
    )

Request Validation

from pydantic import BaseModel, Field, validator
from typing import List, Optional

class Message(BaseModel):
    role: str
    content: str
    
    @validator("role")
    def validate_role(cls, v):
        allowed = ["system", "user", "assistant", "tool"]
        if v not in allowed:
            raise ValueError(f"role must be one of {allowed}")
        return v
    
    @validator("content")
    def validate_content(cls, v):
        if len(v) > 100000:
            raise ValueError("content exceeds maximum length of 100000")
        if len(v.strip()) == 0:
            raise ValueError("content cannot be empty")
        return v

class ChatRequest(BaseModel):
    messages: List[Message] = Field(..., min_items=1, max_items=100)
    model: str = Field("gpt-4o", pattern="^[a-zA-Z0-9-]+$")
    temperature: float = Field(0.7, ge=0, le=2)
    max_tokens: int = Field(1000, ge=1, le=128000)
    stop: Optional[List[str]] = Field(None, max_items=4)
    
    @validator("messages")
    def validate_messages(cls, v):
        # Must have at least one user message
        if not any(m.role == "user" for m in v):
            raise ValueError("must include at least one user message")
        
        # System message must be first if present
        system_indices = [i for i, m in enumerate(v) if m.role == "system"]
        if system_indices and system_indices[0] != 0:
            raise ValueError("system message must be first")
        
        return v
    
    @validator("stop")
    def validate_stop(cls, v):
        if v:
            for s in v:
                if len(s) > 100:
                    raise ValueError("stop sequence too long")
        return v

# Error responses
class APIError(BaseModel):
    error: str
    code: str
    message: str
    param: Optional[str] = None

@app.exception_handler(ValueError)
async def validation_error_handler(request: Request, exc: ValueError):
    return JSONResponse(
        status_code=400,
        content=APIError(
            error="invalid_request",
            code="validation_error",
            message=str(exc)
        ).model_dump()
    )

Health and Status Endpoints

from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict
from datetime import datetime
import psutil

class HealthStatus(BaseModel):
    status: str
    timestamp: str
    version: str
    uptime_seconds: float

class DetailedStatus(BaseModel):
    status: str
    providers: Dict[str, str]
    metrics: Dict[str, float]

start_time = datetime.utcnow()

@app.get("/health", response_model=HealthStatus)
async def health():
    """Basic health check"""
    return HealthStatus(
        status="healthy",
        timestamp=datetime.utcnow().isoformat(),
        version="1.0.0",
        uptime_seconds=(datetime.utcnow() - start_time).total_seconds()
    )

@app.get("/status", response_model=DetailedStatus)
async def status():
    """Detailed status with provider checks"""
    
    providers = {}
    
    # Check OpenAI
    try:
        await client.models.list()
        providers["openai"] = "healthy"
    except Exception:
        providers["openai"] = "unhealthy"
    
    # System metrics
    metrics = {
        "cpu_percent": psutil.cpu_percent(),
        "memory_percent": psutil.virtual_memory().percent,
        "active_requests": len(jobs)  # From job store
    }
    
    overall = "healthy" if all(
        s == "healthy" for s in providers.values()
    ) else "degraded"
    
    return DetailedStatus(
        status=overall,
        providers=providers,
        metrics=metrics
    )

@app.get("/ready")
async def ready():
    """Kubernetes readiness probe"""
    # Check if service is ready to receive traffic
    return {"ready": True}

@app.get("/live")
async def live():
    """Kubernetes liveness probe"""
    # Check if service is alive
    return {"live": True}

API Response Standards

from pydantic import BaseModel
from typing import TypeVar, Generic, Optional

T = TypeVar("T")

class APIResponse(BaseModel, Generic[T]):
    """Standard API response wrapper"""
    success: bool
    data: Optional[T] = None
    error: Optional[APIError] = None
    meta: Optional[dict] = None

class PaginatedResponse(BaseModel, Generic[T]):
    """Paginated response"""
    items: List[T]
    total: int
    page: int
    page_size: int
    has_more: bool

# Usage
@app.get("/v1/models", response_model=APIResponse[List[str]])
async def list_models():
    models = ["gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet"]
    
    return APIResponse(
        success=True,
        data=models,
        meta={"count": len(models)}
    )

API Design Checklist

AspectImplementation
VersioningURL path (/v1/, /v2/)
AuthenticationAPI keys, OAuth2
Rate LimitingToken bucket, per-user
PaginationCursor-based
ErrorsConsistent JSON format
StreamingSSE for real-time
Async JobsWebhooks + polling
Health/health, /ready, /live

What is Next

Evaluation and Testing

Learn comprehensive evaluation strategies for LLM applications