Skip to main content

Why FastAPI for AI?

FastAPI is the go-to framework for AI APIs because:
  • Async native: Handle thousands of concurrent LLM calls
  • Auto documentation: Swagger UI out of the box
  • Type safety: Pydantic validation catches errors early
  • Fast: One of the fastest Python frameworks

Quick Start

# Install
pip install "fastapi[standard]" uvicorn

# Create main.py and run
uvicorn main:app --reload
# main.py
from fastapi import FastAPI

app = FastAPI(title="AI API", version="1.0.0")

@app.get("/")
async def root():
    return {"message": "AI API is running"}

@app.get("/health")
async def health():
    return {"status": "healthy"}
Visit http://localhost:8000/docs for interactive API docs.

Request & Response Models

Use Pydantic for validation:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum

class ModelName(str, Enum):
    GPT4 = "gpt-4o"
    GPT35 = "gpt-3.5-turbo"
    CLAUDE = "claude-3-opus"

class ChatRequest(BaseModel):
    """Request model for chat endpoint"""
    message: str = Field(..., min_length=1, max_length=10000)
    model: ModelName = ModelName.GPT4
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(default=1000, ge=1, le=4000)
    system_prompt: Optional[str] = None
    
    class Config:
        json_schema_extra = {
            "example": {
                "message": "What is machine learning?",
                "model": "gpt-4o",
                "temperature": 0.7
            }
        }

class ChatResponse(BaseModel):
    """Response model for chat endpoint"""
    response: str
    model: str
    tokens_used: int
    
class ErrorResponse(BaseModel):
    detail: str
    error_code: str

app = FastAPI()

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    # FastAPI automatically validates the request
    return ChatResponse(
        response="This is the AI response",
        model=request.model,
        tokens_used=150
    )

Path & Query Parameters

from fastapi import FastAPI, Query, Path
from typing import Optional

app = FastAPI()

# Path parameters
@app.get("/conversations/{conversation_id}")
async def get_conversation(
    conversation_id: str = Path(..., description="The conversation UUID")
):
    return {"conversation_id": conversation_id}

# Query parameters
@app.get("/search")
async def search_documents(
    query: str = Query(..., min_length=1, description="Search query"),
    limit: int = Query(default=10, ge=1, le=100),
    offset: int = Query(default=0, ge=0),
    include_archived: bool = Query(default=False)
):
    return {
        "query": query,
        "limit": limit,
        "offset": offset,
        "results": []
    }

# Combine path and query
@app.get("/users/{user_id}/documents")
async def get_user_documents(
    user_id: str = Path(...),
    status: Optional[str] = Query(default=None, regex="^(active|archived|all)$"),
    sort_by: str = Query(default="created_at")
):
    return {"user_id": user_id, "status": status, "documents": []}

Dependency Injection

Handle auth, database connections, and shared logic:
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Annotated

app = FastAPI()

# Simple dependency
async def get_api_key(x_api_key: str = Header(...)):
    if not x_api_key.startswith("sk-"):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key

# Database dependency
class Database:
    def __init__(self):
        self.connected = False
    
    async def connect(self):
        self.connected = True
        # Connect to actual database
    
    async def disconnect(self):
        self.connected = False

db = Database()

async def get_db():
    await db.connect()
    try:
        yield db
    finally:
        await db.disconnect()

# User dependency (from API key)
async def get_current_user(
    api_key: str = Depends(get_api_key),
    db: Database = Depends(get_db)
):
    # Look up user from API key
    user = {"id": "user_123", "plan": "pro"}
    return user

# Use dependencies in endpoints
@app.post("/chat")
async def chat(
    request: ChatRequest,
    user: dict = Depends(get_current_user),
    db: Database = Depends(get_db)
):
    # user and db are injected automatically
    return {"response": "Hello", "user_id": user["id"]}

Async Operations

Handle concurrent LLM calls efficiently:
import asyncio
from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat")
async def chat(request: ChatRequest):
    """Single async completion"""
    response = await client.chat.completions.create(
        model=request.model,
        messages=[{"role": "user", "content": request.message}],
        temperature=request.temperature
    )
    return {"response": response.choices[0].message.content}

@app.post("/batch")
async def batch_chat(requests: list[ChatRequest]):
    """Process multiple requests concurrently"""
    async def process_one(req: ChatRequest):
        response = await client.chat.completions.create(
            model=req.model,
            messages=[{"role": "user", "content": req.message}]
        )
        return response.choices[0].message.content
    
    # Run all concurrently
    tasks = [process_one(req) for req in requests]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return {"results": results}

Streaming Responses

Essential for LLM applications:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    """Stream LLM response tokens"""
    
    async def generate():
        stream = await client.chat.completions.create(
            model=request.model,
            messages=[{"role": "user", "content": request.message}],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                # Send as Server-Sent Events
                data = {"token": chunk.choices[0].delta.content}
                yield f"data: {json.dumps(data)}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

Error Handling

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel

app = FastAPI()

# Custom exception
class RateLimitExceeded(Exception):
    def __init__(self, retry_after: int = 60):
        self.retry_after = retry_after

class TokenLimitExceeded(Exception):
    def __init__(self, tokens: int, limit: int):
        self.tokens = tokens
        self.limit = limit

# Exception handlers
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return JSONResponse(
        status_code=429,
        content={
            "error": "rate_limit_exceeded",
            "message": f"Too many requests. Retry after {exc.retry_after}s",
            "retry_after": exc.retry_after
        },
        headers={"Retry-After": str(exc.retry_after)}
    )

@app.exception_handler(TokenLimitExceeded)
async def token_limit_handler(request: Request, exc: TokenLimitExceeded):
    return JSONResponse(
        status_code=400,
        content={
            "error": "token_limit_exceeded",
            "message": f"Request has {exc.tokens} tokens, limit is {exc.limit}",
            "tokens": exc.tokens,
            "limit": exc.limit
        }
    )

# Usage in endpoint
@app.post("/chat")
async def chat(request: ChatRequest):
    # Check rate limit
    if is_rate_limited(request):
        raise RateLimitExceeded(retry_after=30)
    
    # Check token limit
    token_count = count_tokens(request.message)
    if token_count > 4000:
        raise TokenLimitExceeded(tokens=token_count, limit=4000)
    
    return {"response": "..."}

Middleware

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
import logging

app = FastAPI()

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000", "https://myapp.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Custom logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.perf_counter()
    
    # Process request
    response = await call_next(request)
    
    # Log details
    duration = time.perf_counter() - start_time
    logging.info(
        f"{request.method} {request.url.path} "
        f"- Status: {response.status_code} "
        f"- Duration: {duration:.3f}s"
    )
    
    # Add timing header
    response.headers["X-Response-Time"] = f"{duration:.3f}s"
    
    return response

# Request ID middleware
import uuid

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
    
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    
    return response

File Uploads

For document processing in RAG applications:
from fastapi import FastAPI, UploadFile, File, HTTPException
from pathlib import Path
import aiofiles

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

ALLOWED_TYPES = {"application/pdf", "text/plain", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}
MAX_SIZE = 10 * 1024 * 1024  # 10MB

@app.post("/documents/upload")
async def upload_document(
    file: UploadFile = File(...),
    user_id: str = Depends(get_current_user)
):
    # Validate file type
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(400, f"File type {file.content_type} not allowed")
    
    # Read and check size
    content = await file.read()
    if len(content) > MAX_SIZE:
        raise HTTPException(400, f"File too large. Max size is {MAX_SIZE // 1024 // 1024}MB")
    
    # Save file
    file_path = UPLOAD_DIR / f"{user_id}_{file.filename}"
    async with aiofiles.open(file_path, "wb") as f:
        await f.write(content)
    
    # Process document (extract text, chunk, embed)
    # ... processing logic ...
    
    return {
        "filename": file.filename,
        "size": len(content),
        "status": "processing"
    }

@app.post("/documents/upload-multiple")
async def upload_multiple(
    files: list[UploadFile] = File(...)
):
    results = []
    for file in files:
        # Process each file
        results.append({"filename": file.filename, "status": "uploaded"})
    return {"files": results}

Background Tasks

Process documents without blocking:
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

app = FastAPI()

class DocumentUpload(BaseModel):
    document_id: str
    filename: str
    content: str

async def process_document(document_id: str, content: str):
    """Background task to process document"""
    # Chunk text
    chunks = chunk_text(content)
    
    # Generate embeddings
    embeddings = await generate_embeddings(chunks)
    
    # Store in vector database
    await store_embeddings(document_id, chunks, embeddings)
    
    # Update status
    await update_document_status(document_id, "ready")

@app.post("/documents")
async def create_document(
    doc: DocumentUpload,
    background_tasks: BackgroundTasks
):
    # Save document metadata immediately
    await save_document_metadata(doc.document_id, doc.filename)
    
    # Process in background
    background_tasks.add_task(
        process_document,
        doc.document_id,
        doc.content
    )
    
    return {
        "document_id": doc.document_id,
        "status": "processing"
    }

Routers for Organization

Structure larger applications:
# app/routers/chat.py
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/chat", tags=["Chat"])

@router.post("/")
async def create_chat():
    return {"message": "Chat created"}

@router.get("/{chat_id}")
async def get_chat(chat_id: str):
    return {"chat_id": chat_id}

@router.post("/{chat_id}/messages")
async def send_message(chat_id: str, message: str):
    return {"chat_id": chat_id, "message": message}

# app/routers/documents.py
from fastapi import APIRouter

router = APIRouter(prefix="/documents", tags=["Documents"])

@router.post("/upload")
async def upload():
    return {"status": "uploaded"}

@router.get("/")
async def list_documents():
    return {"documents": []}

# app/main.py
from fastapi import FastAPI
from app.routers import chat, documents

app = FastAPI(title="AI API")

app.include_router(chat.router)
app.include_router(documents.router)

Application Structure

ai-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app
│   ├── config.py            # Settings
│   ├── dependencies.py      # Shared dependencies
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── chat.py
│   │   ├── documents.py
│   │   └── search.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── requests.py      # Pydantic request models
│   │   └── responses.py     # Pydantic response models
│   ├── services/
│   │   ├── __init__.py
│   │   ├── llm.py           # LLM service
│   │   ├── embeddings.py    # Embedding service
│   │   └── rag.py           # RAG service
│   └── db/
│       ├── __init__.py
│       ├── database.py      # Database connection
│       └── repositories.py  # Data access
├── tests/
│   ├── test_chat.py
│   └── test_documents.py
├── .env
├── requirements.txt
└── Dockerfile

Configuration with Pydantic Settings

# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # API
    app_name: str = "AI API"
    debug: bool = False
    
    # OpenAI
    openai_api_key: str
    default_model: str = "gpt-4o"
    
    # Database
    database_url: str
    
    # Redis
    redis_url: str = "redis://localhost:6379"
    
    # Limits
    max_tokens: int = 4000
    rate_limit_per_minute: int = 60
    
    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

# Usage
from app.config import get_settings

settings = get_settings()
print(settings.openai_api_key)

Testing

# tests/test_chat.py
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
from app.main import app

client = TestClient(app)

def test_health():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_chat_validation():
    # Missing required field
    response = client.post("/chat", json={})
    assert response.status_code == 422

def test_chat_success():
    with patch("app.services.llm.get_completion") as mock:
        mock.return_value = "Mocked response"
        
        response = client.post("/chat", json={
            "message": "Hello",
            "model": "gpt-4o"
        })
        
        assert response.status_code == 200
        assert "response" in response.json()

# Async test
import pytest

@pytest.mark.asyncio
async def test_async_chat():
    from httpx import AsyncClient
    from app.main import app
    
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.post("/chat", json={"message": "Hi"})
        assert response.status_code == 200

Production Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY app/ app/

# Run with uvicorn
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://user:pass@db:5432/app
    depends_on:
      - db
      - redis
  
  db:
    image: pgvector/pgvector:pg16
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=app
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  redis:
    image: redis:7-alpine

volumes:
  postgres_data:
# Run in production
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

Building MCP Servers with FastMCP

Model Context Protocol (MCP) allows AI assistants like Claude to interact with external tools. FastMCP makes it easy to create MCP servers using FastAPI-like patterns.

Install FastMCP

pip install fastmcp

Basic MCP Server

# mcp_server.py
from fastmcp import FastMCP

# Create MCP server
mcp = FastMCP("My AI Tools")

@mcp.tool()
def add_numbers(a: int, b: int) -> int:
    """Add two numbers together."""
    return a + b

@mcp.tool()
def search_database(query: str, limit: int = 10) -> list[dict]:
    """Search the database for matching records."""
    # Your database search logic
    return [{"id": 1, "name": "Result 1", "match": query}]

@mcp.tool()
def get_weather(city: str) -> dict:
    """Get current weather for a city."""
    # Your weather API logic
    return {
        "city": city,
        "temperature": 72,
        "condition": "Sunny"
    }

# Run the server
if __name__ == "__main__":
    mcp.run()

MCP Resources (Read-Only Data)

from fastmcp import FastMCP

mcp = FastMCP("Data Server")

@mcp.resource("config://app")
def get_app_config() -> str:
    """Application configuration."""
    return """
    {
        "version": "1.0.0",
        "environment": "production",
        "features": ["ai", "search", "analytics"]
    }
    """

@mcp.resource("docs://api/{endpoint}")
def get_api_docs(endpoint: str) -> str:
    """Get API documentation for an endpoint."""
    docs = {
        "users": "GET /users - List all users\nPOST /users - Create user",
        "products": "GET /products - List products\nGET /products/{id} - Get product"
    }
    return docs.get(endpoint, f"No docs for {endpoint}")

MCP Prompts (Reusable Templates)

from fastmcp import FastMCP

mcp = FastMCP("Prompt Server")

@mcp.prompt()
def code_review_prompt(code: str, language: str = "python") -> str:
    """Generate a code review prompt."""
    return f"""Please review this {language} code:

```{language}
{code}
Focus on:
  1. Bugs and errors
  2. Performance issues
  3. Security vulnerabilities
  4. Code style and readability
Provide specific suggestions for improvement.""" @mcp.prompt() def sql_expert_prompt(schema: str) -> str: """Generate a SQL expert prompt with schema context.""" return f"""You are a SQL expert. Here is the database schema: Answer questions about this database with optimized SQL queries. Explain your queries and suggest indexes if needed."""

### Combining with FastAPI

```python
from fastapi import FastAPI
from fastmcp import FastMCP

# Create both servers
app = FastAPI(title="AI API")
mcp = FastMCP("AI Tools")

# Regular FastAPI endpoints
@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.post("/chat")
async def chat(message: str):
    return {"response": f"You said: {message}"}

# MCP tools
@mcp.tool()
def query_api(endpoint: str) -> dict:
    """Query the FastAPI endpoints."""
    # Internal API calls
    return {"result": f"Called {endpoint}"}

# Run both
if __name__ == "__main__":
    import asyncio
    import uvicorn
    
    async def main():
        # Run MCP server in background
        mcp_task = asyncio.create_task(mcp.run_async())
        
        # Run FastAPI
        config = uvicorn.Config(app, host="0.0.0.0", port=8000)
        server = uvicorn.Server(config)
        await server.serve()
    
    asyncio.run(main())

Configure Claude Desktop

Add to your Claude Desktop config (claude_desktop_config.json):
{
  "mcpServers": {
    "my-tools": {
      "command": "python",
      "args": ["path/to/mcp_server.py"]
    }
  }
}
Now Claude can use your tools directly!

Quick Reference

FeatureCode
Create appapp = FastAPI()
GET endpoint@app.get("/path")
POST endpoint@app.post("/path")
Path param@app.get("/items/{id}")
Query paramdef f(q: str = Query(...))
Request bodydef f(body: Model)
DependencyDepends(function)
Background taskBackgroundTasks
StreamingStreamingResponse
File uploadUploadFile
MCP tool@mcp.tool()
MCP resource@mcp.resource("uri")
Next Step: Now learn database operations with SQLAlchemy & Databases Crash Course.