Why FastAPI for AI?
FastAPI is the go-to framework for AI APIs because:- Async native: Handle thousands of concurrent LLM calls
- Auto documentation: Swagger UI out of the box
- Type safety: Pydantic validation catches errors early
- Fast: One of the fastest Python frameworks
Quick Start
Copy
# Install
pip install "fastapi[standard]" uvicorn
# Create main.py and run
uvicorn main:app --reload
Copy
# main.py
from fastapi import FastAPI
app = FastAPI(title="AI API", version="1.0.0")
@app.get("/")
async def root():
return {"message": "AI API is running"}
@app.get("/health")
async def health():
return {"status": "healthy"}
http://localhost:8000/docs for interactive API docs.
Request & Response Models
Use Pydantic for validation:Copy
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
class ModelName(str, Enum):
GPT4 = "gpt-4o"
GPT35 = "gpt-3.5-turbo"
CLAUDE = "claude-3-opus"
class ChatRequest(BaseModel):
"""Request model for chat endpoint"""
message: str = Field(..., min_length=1, max_length=10000)
model: ModelName = ModelName.GPT4
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
max_tokens: int = Field(default=1000, ge=1, le=4000)
system_prompt: Optional[str] = None
class Config:
json_schema_extra = {
"example": {
"message": "What is machine learning?",
"model": "gpt-4o",
"temperature": 0.7
}
}
class ChatResponse(BaseModel):
"""Response model for chat endpoint"""
response: str
model: str
tokens_used: int
class ErrorResponse(BaseModel):
detail: str
error_code: str
app = FastAPI()
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# FastAPI automatically validates the request
return ChatResponse(
response="This is the AI response",
model=request.model,
tokens_used=150
)
Path & Query Parameters
Copy
from fastapi import FastAPI, Query, Path
from typing import Optional
app = FastAPI()
# Path parameters
@app.get("/conversations/{conversation_id}")
async def get_conversation(
conversation_id: str = Path(..., description="The conversation UUID")
):
return {"conversation_id": conversation_id}
# Query parameters
@app.get("/search")
async def search_documents(
query: str = Query(..., min_length=1, description="Search query"),
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0),
include_archived: bool = Query(default=False)
):
return {
"query": query,
"limit": limit,
"offset": offset,
"results": []
}
# Combine path and query
@app.get("/users/{user_id}/documents")
async def get_user_documents(
user_id: str = Path(...),
status: Optional[str] = Query(default=None, regex="^(active|archived|all)$"),
sort_by: str = Query(default="created_at")
):
return {"user_id": user_id, "status": status, "documents": []}
Dependency Injection
Handle auth, database connections, and shared logic:Copy
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Annotated
app = FastAPI()
# Simple dependency
async def get_api_key(x_api_key: str = Header(...)):
if not x_api_key.startswith("sk-"):
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
# Database dependency
class Database:
def __init__(self):
self.connected = False
async def connect(self):
self.connected = True
# Connect to actual database
async def disconnect(self):
self.connected = False
db = Database()
async def get_db():
await db.connect()
try:
yield db
finally:
await db.disconnect()
# User dependency (from API key)
async def get_current_user(
api_key: str = Depends(get_api_key),
db: Database = Depends(get_db)
):
# Look up user from API key
user = {"id": "user_123", "plan": "pro"}
return user
# Use dependencies in endpoints
@app.post("/chat")
async def chat(
request: ChatRequest,
user: dict = Depends(get_current_user),
db: Database = Depends(get_db)
):
# user and db are injected automatically
return {"response": "Hello", "user_id": user["id"]}
Async Operations
Handle concurrent LLM calls efficiently:Copy
import asyncio
from fastapi import FastAPI
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
@app.post("/chat")
async def chat(request: ChatRequest):
"""Single async completion"""
response = await client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
temperature=request.temperature
)
return {"response": response.choices[0].message.content}
@app.post("/batch")
async def batch_chat(requests: list[ChatRequest]):
"""Process multiple requests concurrently"""
async def process_one(req: ChatRequest):
response = await client.chat.completions.create(
model=req.model,
messages=[{"role": "user", "content": req.message}]
)
return response.choices[0].message.content
# Run all concurrently
tasks = [process_one(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {"results": results}
Streaming Responses
Essential for LLM applications:Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json
app = FastAPI()
client = AsyncOpenAI()
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
"""Stream LLM response tokens"""
async def generate():
stream = await client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
# Send as Server-Sent Events
data = {"token": chunk.choices[0].delta.content}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
Error Handling
Copy
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
app = FastAPI()
# Custom exception
class RateLimitExceeded(Exception):
def __init__(self, retry_after: int = 60):
self.retry_after = retry_after
class TokenLimitExceeded(Exception):
def __init__(self, tokens: int, limit: int):
self.tokens = tokens
self.limit = limit
# Exception handlers
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={
"error": "rate_limit_exceeded",
"message": f"Too many requests. Retry after {exc.retry_after}s",
"retry_after": exc.retry_after
},
headers={"Retry-After": str(exc.retry_after)}
)
@app.exception_handler(TokenLimitExceeded)
async def token_limit_handler(request: Request, exc: TokenLimitExceeded):
return JSONResponse(
status_code=400,
content={
"error": "token_limit_exceeded",
"message": f"Request has {exc.tokens} tokens, limit is {exc.limit}",
"tokens": exc.tokens,
"limit": exc.limit
}
)
# Usage in endpoint
@app.post("/chat")
async def chat(request: ChatRequest):
# Check rate limit
if is_rate_limited(request):
raise RateLimitExceeded(retry_after=30)
# Check token limit
token_count = count_tokens(request.message)
if token_count > 4000:
raise TokenLimitExceeded(tokens=token_count, limit=4000)
return {"response": "..."}
Middleware
Copy
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
import logging
app = FastAPI()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "https://myapp.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Custom logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.perf_counter()
# Process request
response = await call_next(request)
# Log details
duration = time.perf_counter() - start_time
logging.info(
f"{request.method} {request.url.path} "
f"- Status: {response.status_code} "
f"- Duration: {duration:.3f}s"
)
# Add timing header
response.headers["X-Response-Time"] = f"{duration:.3f}s"
return response
# Request ID middleware
import uuid
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
File Uploads
For document processing in RAG applications:Copy
from fastapi import FastAPI, UploadFile, File, HTTPException
from pathlib import Path
import aiofiles
app = FastAPI()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
ALLOWED_TYPES = {"application/pdf", "text/plain", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}
MAX_SIZE = 10 * 1024 * 1024 # 10MB
@app.post("/documents/upload")
async def upload_document(
file: UploadFile = File(...),
user_id: str = Depends(get_current_user)
):
# Validate file type
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(400, f"File type {file.content_type} not allowed")
# Read and check size
content = await file.read()
if len(content) > MAX_SIZE:
raise HTTPException(400, f"File too large. Max size is {MAX_SIZE // 1024 // 1024}MB")
# Save file
file_path = UPLOAD_DIR / f"{user_id}_{file.filename}"
async with aiofiles.open(file_path, "wb") as f:
await f.write(content)
# Process document (extract text, chunk, embed)
# ... processing logic ...
return {
"filename": file.filename,
"size": len(content),
"status": "processing"
}
@app.post("/documents/upload-multiple")
async def upload_multiple(
files: list[UploadFile] = File(...)
):
results = []
for file in files:
# Process each file
results.append({"filename": file.filename, "status": "uploaded"})
return {"files": results}
Background Tasks
Process documents without blocking:Copy
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
class DocumentUpload(BaseModel):
document_id: str
filename: str
content: str
async def process_document(document_id: str, content: str):
"""Background task to process document"""
# Chunk text
chunks = chunk_text(content)
# Generate embeddings
embeddings = await generate_embeddings(chunks)
# Store in vector database
await store_embeddings(document_id, chunks, embeddings)
# Update status
await update_document_status(document_id, "ready")
@app.post("/documents")
async def create_document(
doc: DocumentUpload,
background_tasks: BackgroundTasks
):
# Save document metadata immediately
await save_document_metadata(doc.document_id, doc.filename)
# Process in background
background_tasks.add_task(
process_document,
doc.document_id,
doc.content
)
return {
"document_id": doc.document_id,
"status": "processing"
}
Routers for Organization
Structure larger applications:Copy
# app/routers/chat.py
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/chat", tags=["Chat"])
@router.post("/")
async def create_chat():
return {"message": "Chat created"}
@router.get("/{chat_id}")
async def get_chat(chat_id: str):
return {"chat_id": chat_id}
@router.post("/{chat_id}/messages")
async def send_message(chat_id: str, message: str):
return {"chat_id": chat_id, "message": message}
# app/routers/documents.py
from fastapi import APIRouter
router = APIRouter(prefix="/documents", tags=["Documents"])
@router.post("/upload")
async def upload():
return {"status": "uploaded"}
@router.get("/")
async def list_documents():
return {"documents": []}
# app/main.py
from fastapi import FastAPI
from app.routers import chat, documents
app = FastAPI(title="AI API")
app.include_router(chat.router)
app.include_router(documents.router)
Application Structure
Copy
ai-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── config.py # Settings
│ ├── dependencies.py # Shared dependencies
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── chat.py
│ │ ├── documents.py
│ │ └── search.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── requests.py # Pydantic request models
│ │ └── responses.py # Pydantic response models
│ ├── services/
│ │ ├── __init__.py
│ │ ├── llm.py # LLM service
│ │ ├── embeddings.py # Embedding service
│ │ └── rag.py # RAG service
│ └── db/
│ ├── __init__.py
│ ├── database.py # Database connection
│ └── repositories.py # Data access
├── tests/
│ ├── test_chat.py
│ └── test_documents.py
├── .env
├── requirements.txt
└── Dockerfile
Configuration with Pydantic Settings
Copy
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# API
app_name: str = "AI API"
debug: bool = False
# OpenAI
openai_api_key: str
default_model: str = "gpt-4o"
# Database
database_url: str
# Redis
redis_url: str = "redis://localhost:6379"
# Limits
max_tokens: int = 4000
rate_limit_per_minute: int = 60
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()
# Usage
from app.config import get_settings
settings = get_settings()
print(settings.openai_api_key)
Testing
Copy
# tests/test_chat.py
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
from app.main import app
client = TestClient(app)
def test_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_chat_validation():
# Missing required field
response = client.post("/chat", json={})
assert response.status_code == 422
def test_chat_success():
with patch("app.services.llm.get_completion") as mock:
mock.return_value = "Mocked response"
response = client.post("/chat", json={
"message": "Hello",
"model": "gpt-4o"
})
assert response.status_code == 200
assert "response" in response.json()
# Async test
import pytest
@pytest.mark.asyncio
async def test_async_chat():
from httpx import AsyncClient
from app.main import app
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.post("/chat", json={"message": "Hi"})
assert response.status_code == 200
Production Deployment
Copy
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY app/ app/
# Run with uvicorn
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Copy
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://user:pass@db:5432/app
depends_on:
- db
- redis
db:
image: pgvector/pgvector:pg16
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=app
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
postgres_data:
Copy
# Run in production
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
Building MCP Servers with FastMCP
Model Context Protocol (MCP) allows AI assistants like Claude to interact with external tools. FastMCP makes it easy to create MCP servers using FastAPI-like patterns.Install FastMCP
Copy
pip install fastmcp
Basic MCP Server
Copy
# mcp_server.py
from fastmcp import FastMCP
# Create MCP server
mcp = FastMCP("My AI Tools")
@mcp.tool()
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b
@mcp.tool()
def search_database(query: str, limit: int = 10) -> list[dict]:
"""Search the database for matching records."""
# Your database search logic
return [{"id": 1, "name": "Result 1", "match": query}]
@mcp.tool()
def get_weather(city: str) -> dict:
"""Get current weather for a city."""
# Your weather API logic
return {
"city": city,
"temperature": 72,
"condition": "Sunny"
}
# Run the server
if __name__ == "__main__":
mcp.run()
MCP Resources (Read-Only Data)
Copy
from fastmcp import FastMCP
mcp = FastMCP("Data Server")
@mcp.resource("config://app")
def get_app_config() -> str:
"""Application configuration."""
return """
{
"version": "1.0.0",
"environment": "production",
"features": ["ai", "search", "analytics"]
}
"""
@mcp.resource("docs://api/{endpoint}")
def get_api_docs(endpoint: str) -> str:
"""Get API documentation for an endpoint."""
docs = {
"users": "GET /users - List all users\nPOST /users - Create user",
"products": "GET /products - List products\nGET /products/{id} - Get product"
}
return docs.get(endpoint, f"No docs for {endpoint}")
MCP Prompts (Reusable Templates)
Copy
from fastmcp import FastMCP
mcp = FastMCP("Prompt Server")
@mcp.prompt()
def code_review_prompt(code: str, language: str = "python") -> str:
"""Generate a code review prompt."""
return f"""Please review this {language} code:
```{language}
{code}
- Bugs and errors
- Performance issues
- Security vulnerabilities
- Code style and readability
Copy
### Combining with FastAPI
```python
from fastapi import FastAPI
from fastmcp import FastMCP
# Create both servers
app = FastAPI(title="AI API")
mcp = FastMCP("AI Tools")
# Regular FastAPI endpoints
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.post("/chat")
async def chat(message: str):
return {"response": f"You said: {message}"}
# MCP tools
@mcp.tool()
def query_api(endpoint: str) -> dict:
"""Query the FastAPI endpoints."""
# Internal API calls
return {"result": f"Called {endpoint}"}
# Run both
if __name__ == "__main__":
import asyncio
import uvicorn
async def main():
# Run MCP server in background
mcp_task = asyncio.create_task(mcp.run_async())
# Run FastAPI
config = uvicorn.Config(app, host="0.0.0.0", port=8000)
server = uvicorn.Server(config)
await server.serve()
asyncio.run(main())
Configure Claude Desktop
Add to your Claude Desktop config (claude_desktop_config.json):
Copy
{
"mcpServers": {
"my-tools": {
"command": "python",
"args": ["path/to/mcp_server.py"]
}
}
}
Quick Reference
| Feature | Code |
|---|---|
| Create app | app = FastAPI() |
| GET endpoint | @app.get("/path") |
| POST endpoint | @app.post("/path") |
| Path param | @app.get("/items/{id}") |
| Query param | def f(q: str = Query(...)) |
| Request body | def f(body: Model) |
| Dependency | Depends(function) |
| Background task | BackgroundTasks |
| Streaming | StreamingResponse |
| File upload | UploadFile |
| MCP tool | @mcp.tool() |
| MCP resource | @mcp.resource("uri") |
Next Step: Now learn database operations with SQLAlchemy & Databases Crash Course.