Core API Patterns
Synchronous Chat Endpoint
Copy
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import List, Optional
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class Message(BaseModel):
role: str = Field(..., pattern="^(system|user|assistant)$")
content: str = Field(..., min_length=1, max_length=100000)
class ChatRequest(BaseModel):
messages: List[Message]
model: str = "gpt-4o"
temperature: float = Field(0.7, ge=0, le=2)
max_tokens: int = Field(1000, ge=1, le=4096)
user_id: Optional[str] = None
class ChatResponse(BaseModel):
content: str
model: str
usage: dict
finish_reason: str
@app.post("/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Synchronous chat completion endpoint"""
try:
response = await client.chat.completions.create(
model=request.model,
messages=[m.model_dump() for m in request.messages],
temperature=request.temperature,
max_tokens=request.max_tokens
)
return ChatResponse(
content=response.choices[0].message.content,
model=response.model,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
finish_reason=response.choices[0].finish_reason
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Streaming Endpoint
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import json
class StreamRequest(BaseModel):
messages: List[Message]
model: str = "gpt-4o"
async def stream_response(
request: StreamRequest
) -> AsyncGenerator[str, None]:
"""Generate SSE stream"""
try:
stream = await client.chat.completions.create(
model=request.model,
messages=[m.model_dump() for m in request.messages],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
data = {
"content": chunk.choices[0].delta.content,
"finish_reason": chunk.choices[0].finish_reason
}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
@app.post("/v1/chat/stream")
async def chat_stream(request: StreamRequest):
"""Server-Sent Events streaming endpoint"""
return StreamingResponse(
stream_response(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
Async Job Pattern
For long-running LLM tasks:Copy
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid
import asyncio
from datetime import datetime
# In-memory store (use Redis in production)
jobs = {}
class JobStatus(BaseModel):
job_id: str
status: str # "pending", "processing", "completed", "failed"
created_at: str
completed_at: Optional[str] = None
result: Optional[dict] = None
error: Optional[str] = None
progress: float = 0.0
class AsyncJobRequest(BaseModel):
prompt: str
model: str = "gpt-4o"
webhook_url: Optional[str] = None
async def process_job(job_id: str, request: AsyncJobRequest):
"""Background job processor"""
jobs[job_id]["status"] = "processing"
try:
response = await client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.prompt}]
)
jobs[job_id]["status"] = "completed"
jobs[job_id]["completed_at"] = datetime.utcnow().isoformat()
jobs[job_id]["result"] = {
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
}
}
jobs[job_id]["progress"] = 1.0
# Send webhook if configured
if request.webhook_url:
await send_webhook(request.webhook_url, jobs[job_id])
except Exception as e:
jobs[job_id]["status"] = "failed"
jobs[job_id]["error"] = str(e)
jobs[job_id]["completed_at"] = datetime.utcnow().isoformat()
@app.post("/v1/jobs", response_model=JobStatus)
async def create_job(
request: AsyncJobRequest,
background_tasks: BackgroundTasks
):
"""Create async job"""
job_id = str(uuid.uuid4())
jobs[job_id] = {
"job_id": job_id,
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
"completed_at": None,
"result": None,
"error": None,
"progress": 0.0
}
background_tasks.add_task(process_job, job_id, request)
return JobStatus(**jobs[job_id])
@app.get("/v1/jobs/{job_id}", response_model=JobStatus)
async def get_job(job_id: str):
"""Get job status"""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
return JobStatus(**jobs[job_id])
@app.delete("/v1/jobs/{job_id}")
async def cancel_job(job_id: str):
"""Cancel job (if still pending)"""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
if jobs[job_id]["status"] == "pending":
jobs[job_id]["status"] = "cancelled"
return {"message": "Job cancelled"}
return {"message": "Job cannot be cancelled"}
Webhook Integration
Copy
import httpx
from typing import Optional
from pydantic import BaseModel, HttpUrl
import hmac
import hashlib
class WebhookConfig(BaseModel):
url: HttpUrl
secret: Optional[str] = None
events: List[str] = ["job.completed", "job.failed"]
class WebhookSender:
"""Send webhook notifications"""
def __init__(self, timeout: float = 30.0, max_retries: int = 3):
self.timeout = timeout
self.max_retries = max_retries
def _sign_payload(self, payload: str, secret: str) -> str:
"""Create HMAC signature for webhook"""
return hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
async def send(
self,
url: str,
event: str,
data: dict,
secret: str = None
) -> bool:
"""Send webhook with retries"""
payload = {
"event": event,
"timestamp": datetime.utcnow().isoformat(),
"data": data
}
headers = {
"Content-Type": "application/json",
"X-Webhook-Event": event
}
if secret:
payload_str = json.dumps(payload)
signature = self._sign_payload(payload_str, secret)
headers["X-Webhook-Signature"] = f"sha256={signature}"
async with httpx.AsyncClient() as client:
for attempt in range(self.max_retries):
try:
response = await client.post(
url,
json=payload,
headers=headers,
timeout=self.timeout
)
if response.status_code < 300:
return True
except httpx.RequestError:
pass
# Exponential backoff
await asyncio.sleep(2 ** attempt)
return False
webhook_sender = WebhookSender()
async def send_webhook(url: str, job_data: dict, secret: str = None):
"""Send job completion webhook"""
event = "job.completed" if job_data["status"] == "completed" else "job.failed"
await webhook_sender.send(
url=url,
event=event,
data=job_data,
secret=secret
)
Rate Limiting
Copy
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
requests_per_minute: int = 60
requests_per_hour: int = 1000
tokens_per_minute: int = 100000
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self):
self.requests = defaultdict(list) # user_id -> list of timestamps
self.tokens = defaultdict(int) # user_id -> tokens used
def _clean_old_requests(self, user_id: str, window_seconds: int):
"""Remove requests outside the window"""
cutoff = time.time() - window_seconds
self.requests[user_id] = [
t for t in self.requests[user_id] if t > cutoff
]
def check_rate_limit(
self,
user_id: str,
config: RateLimitConfig
) -> tuple[bool, dict]:
"""Check if request is within rate limits"""
now = time.time()
# Check per-minute limit
self._clean_old_requests(user_id, 60)
minute_requests = len(self.requests[user_id])
if minute_requests >= config.requests_per_minute:
return False, {
"error": "rate_limit_exceeded",
"limit": config.requests_per_minute,
"window": "minute",
"retry_after": 60
}
# Check per-hour limit
self._clean_old_requests(user_id, 3600)
hour_requests = len(self.requests[user_id])
if hour_requests >= config.requests_per_hour:
return False, {
"error": "rate_limit_exceeded",
"limit": config.requests_per_hour,
"window": "hour",
"retry_after": 3600
}
return True, {}
def record_request(self, user_id: str):
"""Record a request"""
self.requests[user_id].append(time.time())
def record_tokens(self, user_id: str, tokens: int):
"""Record token usage"""
self.tokens[user_id] += tokens
rate_limiter = RateLimiter()
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Extract user ID from auth token or API key
user_id = request.headers.get("X-API-Key", "anonymous")
# Check rate limit
allowed, error_info = rate_limiter.check_rate_limit(
user_id,
RateLimitConfig()
)
if not allowed:
return JSONResponse(
status_code=429,
content=error_info,
headers={"Retry-After": str(error_info["retry_after"])}
)
# Record request
rate_limiter.record_request(user_id)
response = await call_next(request)
return response
app.add_middleware(RateLimitMiddleware)
API Versioning
Copy
from fastapi import FastAPI, APIRouter
from enum import Enum
class APIVersion(str, Enum):
V1 = "v1"
V2 = "v2"
# Version 1 router
v1_router = APIRouter(prefix="/v1", tags=["v1"])
@v1_router.post("/chat")
async def chat_v1(request: ChatRequest):
# V1 implementation
pass
# Version 2 router with new features
v2_router = APIRouter(prefix="/v2", tags=["v2"])
class ChatRequestV2(BaseModel):
messages: List[Message]
model: str = "gpt-4o"
temperature: float = 0.7
max_tokens: int = 1000
# New V2 fields
response_format: Optional[dict] = None
tools: Optional[List[dict]] = None
stream: bool = False
@v2_router.post("/chat")
async def chat_v2(request: ChatRequestV2):
# V2 implementation with new features
pass
# Mount routers
app.include_router(v1_router)
app.include_router(v2_router)
# Deprecation headers
@v1_router.post("/chat", deprecated=True)
async def chat_v1_deprecated(request: ChatRequest):
response = await chat_v1(request)
# Add deprecation header
return JSONResponse(
content=response,
headers={
"Deprecation": "true",
"Sunset": "2025-06-01",
"Link": "</v2/chat>; rel='successor-version'"
}
)
Request Validation
Copy
from pydantic import BaseModel, Field, validator
from typing import List, Optional
class Message(BaseModel):
role: str
content: str
@validator("role")
def validate_role(cls, v):
allowed = ["system", "user", "assistant", "tool"]
if v not in allowed:
raise ValueError(f"role must be one of {allowed}")
return v
@validator("content")
def validate_content(cls, v):
if len(v) > 100000:
raise ValueError("content exceeds maximum length of 100000")
if len(v.strip()) == 0:
raise ValueError("content cannot be empty")
return v
class ChatRequest(BaseModel):
messages: List[Message] = Field(..., min_items=1, max_items=100)
model: str = Field("gpt-4o", pattern="^[a-zA-Z0-9-]+$")
temperature: float = Field(0.7, ge=0, le=2)
max_tokens: int = Field(1000, ge=1, le=128000)
stop: Optional[List[str]] = Field(None, max_items=4)
@validator("messages")
def validate_messages(cls, v):
# Must have at least one user message
if not any(m.role == "user" for m in v):
raise ValueError("must include at least one user message")
# System message must be first if present
system_indices = [i for i, m in enumerate(v) if m.role == "system"]
if system_indices and system_indices[0] != 0:
raise ValueError("system message must be first")
return v
@validator("stop")
def validate_stop(cls, v):
if v:
for s in v:
if len(s) > 100:
raise ValueError("stop sequence too long")
return v
# Error responses
class APIError(BaseModel):
error: str
code: str
message: str
param: Optional[str] = None
@app.exception_handler(ValueError)
async def validation_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content=APIError(
error="invalid_request",
code="validation_error",
message=str(exc)
).model_dump()
)
Health and Status Endpoints
Copy
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict
from datetime import datetime
import psutil
class HealthStatus(BaseModel):
status: str
timestamp: str
version: str
uptime_seconds: float
class DetailedStatus(BaseModel):
status: str
providers: Dict[str, str]
metrics: Dict[str, float]
start_time = datetime.utcnow()
@app.get("/health", response_model=HealthStatus)
async def health():
"""Basic health check"""
return HealthStatus(
status="healthy",
timestamp=datetime.utcnow().isoformat(),
version="1.0.0",
uptime_seconds=(datetime.utcnow() - start_time).total_seconds()
)
@app.get("/status", response_model=DetailedStatus)
async def status():
"""Detailed status with provider checks"""
providers = {}
# Check OpenAI
try:
await client.models.list()
providers["openai"] = "healthy"
except Exception:
providers["openai"] = "unhealthy"
# System metrics
metrics = {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"active_requests": len(jobs) # From job store
}
overall = "healthy" if all(
s == "healthy" for s in providers.values()
) else "degraded"
return DetailedStatus(
status=overall,
providers=providers,
metrics=metrics
)
@app.get("/ready")
async def ready():
"""Kubernetes readiness probe"""
# Check if service is ready to receive traffic
return {"ready": True}
@app.get("/live")
async def live():
"""Kubernetes liveness probe"""
# Check if service is alive
return {"live": True}
API Response Standards
Copy
from pydantic import BaseModel
from typing import TypeVar, Generic, Optional
T = TypeVar("T")
class APIResponse(BaseModel, Generic[T]):
"""Standard API response wrapper"""
success: bool
data: Optional[T] = None
error: Optional[APIError] = None
meta: Optional[dict] = None
class PaginatedResponse(BaseModel, Generic[T]):
"""Paginated response"""
items: List[T]
total: int
page: int
page_size: int
has_more: bool
# Usage
@app.get("/v1/models", response_model=APIResponse[List[str]])
async def list_models():
models = ["gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet"]
return APIResponse(
success=True,
data=models,
meta={"count": len(models)}
)
API Design Checklist
| Aspect | Implementation |
|---|---|
| Versioning | URL path (/v1/, /v2/) |
| Authentication | API keys, OAuth2 |
| Rate Limiting | Token bucket, per-user |
| Pagination | Cursor-based |
| Errors | Consistent JSON format |
| Streaming | SSE for real-time |
| Async Jobs | Webhooks + polling |
| Health | /health, /ready, /live |
What is Next
Evaluation and Testing
Learn comprehensive evaluation strategies for LLM applications