Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Why FastAPI for AI?

If Flask is a Swiss Army knife, FastAPI is a purpose-built power tool for APIs. It was designed from the ground up around async I/O and type safety — two things that matter enormously when you are wrapping LLM calls. Every LLM request is I/O-bound (you are waiting for the provider’s servers), so async support is not a nice-to-have; it is the difference between handling 10 concurrent users and 10,000. FastAPI is the go-to framework for AI APIs because:
  • Async native: Handle thousands of concurrent LLM calls without blocking worker threads
  • Auto documentation: Swagger UI out of the box — your frontend team can explore the API without reading code
  • Type safety: Pydantic validation catches malformed requests before they hit your LLM (saving you tokens and money)
  • Fast: One of the fastest Python frameworks, on par with Node.js and Go for I/O-bound workloads

Quick Start

# Install
pip install "fastapi[standard]" uvicorn

# Create main.py and run
uvicorn main:app --reload
# main.py
from fastapi import FastAPI

app = FastAPI(title="AI API", version="1.0.0")

@app.get("/")
async def root():
    return {"message": "AI API is running"}

@app.get("/health")
async def health():
    return {"status": "healthy"}
Visit http://localhost:8000/docs for interactive API docs.

Request & Response Models

Pydantic models are your contract with the outside world. Think of them like a bouncer at a club — they check IDs at the door so your endpoint code never has to worry about invalid data. If a client sends temperature: "hot" instead of temperature: 0.7, Pydantic rejects it with a clear error before your code ever runs. This saves LLM tokens (you never send garbage to the API) and simplifies debugging. Use Pydantic for validation:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum

class ModelName(str, Enum):
    GPT4 = "gpt-4o"
    GPT35 = "gpt-3.5-turbo"
    CLAUDE = "claude-3-opus"

class ChatRequest(BaseModel):
    """Request model for chat endpoint"""
    message: str = Field(..., min_length=1, max_length=10000)
    model: ModelName = ModelName.GPT4
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(default=1000, ge=1, le=4000)
    system_prompt: Optional[str] = None
    
    class Config:
        json_schema_extra = {
            "example": {
                "message": "What is machine learning?",
                "model": "gpt-4o",
                "temperature": 0.7
            }
        }

class ChatResponse(BaseModel):
    """Response model for chat endpoint"""
    response: str
    model: str
    tokens_used: int
    
class ErrorResponse(BaseModel):
    detail: str
    error_code: str

app = FastAPI()

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    # FastAPI automatically validates the request
    return ChatResponse(
        response="This is the AI response",
        model=request.model,
        tokens_used=150
    )

Path & Query Parameters

from fastapi import FastAPI, Query, Path
from typing import Optional

app = FastAPI()

# Path parameters
@app.get("/conversations/{conversation_id}")
async def get_conversation(
    conversation_id: str = Path(..., description="The conversation UUID")
):
    return {"conversation_id": conversation_id}

# Query parameters
@app.get("/search")
async def search_documents(
    query: str = Query(..., min_length=1, description="Search query"),
    limit: int = Query(default=10, ge=1, le=100),
    offset: int = Query(default=0, ge=0),
    include_archived: bool = Query(default=False)
):
    return {
        "query": query,
        "limit": limit,
        "offset": offset,
        "results": []
    }

# Combine path and query
@app.get("/users/{user_id}/documents")
async def get_user_documents(
    user_id: str = Path(...),
    status: Optional[str] = Query(default=None, regex="^(active|archived|all)$"),
    sort_by: str = Query(default="created_at")
):
    return {"user_id": user_id, "status": status, "documents": []}

Dependency Injection

Dependency injection in FastAPI is like a restaurant prep kitchen — ingredients (database connections, API keys, authenticated users) are prepared before the chef (your endpoint function) starts cooking. Instead of every endpoint manually checking API keys and opening database connections, you declare what you need in the function signature and FastAPI wires it up automatically. This keeps your endpoint code focused on business logic. Handle auth, database connections, and shared logic:
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Annotated

app = FastAPI()

# Simple dependency
async def get_api_key(x_api_key: str = Header(...)):
    if not x_api_key.startswith("sk-"):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key

# Database dependency
class Database:
    def __init__(self):
        self.connected = False
    
    async def connect(self):
        self.connected = True
        # Connect to actual database
    
    async def disconnect(self):
        self.connected = False

db = Database()

async def get_db():
    await db.connect()
    try:
        yield db
    finally:
        await db.disconnect()

# User dependency (from API key)
async def get_current_user(
    api_key: str = Depends(get_api_key),
    db: Database = Depends(get_db)
):
    # Look up user from API key
    user = {"id": "user_123", "plan": "pro"}
    return user

# Use dependencies in endpoints
@app.post("/chat")
async def chat(
    request: ChatRequest,
    user: dict = Depends(get_current_user),
    db: Database = Depends(get_db)
):
    # user and db are injected automatically
    return {"response": "Hello", "user_id": user["id"]}

Async Operations

This is where FastAPI earns its keep for AI applications. A synchronous server handles one LLM call at a time per worker — if a call takes 3 seconds, you need 100 workers to serve 100 concurrent users. An async server can handle hundreds of concurrent calls on a single worker because it releases the thread while waiting for the LLM provider to respond. Practical tip: always use AsyncOpenAI (not OpenAI) inside async endpoints, or you will block the event loop and lose all the concurrency benefits. Handle concurrent LLM calls efficiently:
import asyncio
from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat")
async def chat(request: ChatRequest):
    """Single async completion"""
    response = await client.chat.completions.create(
        model=request.model,
        messages=[{"role": "user", "content": request.message}],
        temperature=request.temperature
    )
    return {"response": response.choices[0].message.content}

@app.post("/batch")
async def batch_chat(requests: list[ChatRequest]):
    """Process multiple requests concurrently"""
    async def process_one(req: ChatRequest):
        response = await client.chat.completions.create(
            model=req.model,
            messages=[{"role": "user", "content": req.message}]
        )
        return response.choices[0].message.content
    
    # Run all concurrently
    tasks = [process_one(req) for req in requests]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return {"results": results}

Streaming Responses

Streaming is essential for LLM applications because users perceive a streaming response as 5-10x faster than a batch response, even when the total generation time is identical. It is the same reason loading progress bars feel faster than a blank screen. Server-Sent Events (SSE) are the standard transport for LLM streaming — the client opens one HTTP connection and the server pushes tokens as they are generated.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    """Stream LLM response tokens"""
    
    async def generate():
        stream = await client.chat.completions.create(
            model=request.model,
            messages=[{"role": "user", "content": request.message}],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                # Send as Server-Sent Events
                data = {"token": chunk.choices[0].delta.content}
                yield f"data: {json.dumps(data)}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

Error Handling

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel

app = FastAPI()

# Custom exception
class RateLimitExceeded(Exception):
    def __init__(self, retry_after: int = 60):
        self.retry_after = retry_after

class TokenLimitExceeded(Exception):
    def __init__(self, tokens: int, limit: int):
        self.tokens = tokens
        self.limit = limit

# Exception handlers
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return JSONResponse(
        status_code=429,
        content={
            "error": "rate_limit_exceeded",
            "message": f"Too many requests. Retry after {exc.retry_after}s",
            "retry_after": exc.retry_after
        },
        headers={"Retry-After": str(exc.retry_after)}
    )

@app.exception_handler(TokenLimitExceeded)
async def token_limit_handler(request: Request, exc: TokenLimitExceeded):
    return JSONResponse(
        status_code=400,
        content={
            "error": "token_limit_exceeded",
            "message": f"Request has {exc.tokens} tokens, limit is {exc.limit}",
            "tokens": exc.tokens,
            "limit": exc.limit
        }
    )

# Usage in endpoint
@app.post("/chat")
async def chat(request: ChatRequest):
    # Check rate limit
    if is_rate_limited(request):
        raise RateLimitExceeded(retry_after=30)
    
    # Check token limit
    token_count = count_tokens(request.message)
    if token_count > 4000:
        raise TokenLimitExceeded(tokens=token_count, limit=4000)
    
    return {"response": "..."}

Middleware

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
import logging

app = FastAPI()

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000", "https://myapp.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Custom logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.perf_counter()
    
    # Process request
    response = await call_next(request)
    
    # Log details
    duration = time.perf_counter() - start_time
    logging.info(
        f"{request.method} {request.url.path} "
        f"- Status: {response.status_code} "
        f"- Duration: {duration:.3f}s"
    )
    
    # Add timing header
    response.headers["X-Response-Time"] = f"{duration:.3f}s"
    
    return response

# Request ID middleware
import uuid

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
    
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    
    return response

File Uploads

Document upload is the front door to any RAG pipeline. Users submit PDFs, Word docs, or plain text, and your API needs to validate the file type, enforce size limits, and kick off the processing pipeline. Practical tip: always validate on the server side even if the frontend checks too — clients can be spoofed, and a 500MB PDF will happily crash your worker if you do not enforce limits.
from fastapi import FastAPI, UploadFile, File, HTTPException
from pathlib import Path
import aiofiles

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

ALLOWED_TYPES = {"application/pdf", "text/plain", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}
MAX_SIZE = 10 * 1024 * 1024  # 10MB

@app.post("/documents/upload")
async def upload_document(
    file: UploadFile = File(...),
    user_id: str = Depends(get_current_user)
):
    # Validate file type
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(400, f"File type {file.content_type} not allowed")
    
    # Read and check size
    content = await file.read()
    if len(content) > MAX_SIZE:
        raise HTTPException(400, f"File too large. Max size is {MAX_SIZE // 1024 // 1024}MB")
    
    # Save file
    file_path = UPLOAD_DIR / f"{user_id}_{file.filename}"
    async with aiofiles.open(file_path, "wb") as f:
        await f.write(content)
    
    # Process document (extract text, chunk, embed)
    # ... processing logic ...
    
    return {
        "filename": file.filename,
        "size": len(content),
        "status": "processing"
    }

@app.post("/documents/upload-multiple")
async def upload_multiple(
    files: list[UploadFile] = File(...)
):
    results = []
    for file in files:
        # Process each file
        results.append({"filename": file.filename, "status": "uploaded"})
    return {"files": results}

Background Tasks

When a user uploads a document, you do not want them staring at a spinner while you chunk, embed, and index it. Background tasks let you return a “processing” status immediately and do the heavy lifting after the response is sent. Think of it like dropping off dry cleaning — you get a ticket instantly and pick up the finished result later. For larger workloads, consider Celery or a dedicated task queue, but FastAPI’s built-in BackgroundTasks is perfect for lightweight jobs.
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

app = FastAPI()

class DocumentUpload(BaseModel):
    document_id: str
    filename: str
    content: str

async def process_document(document_id: str, content: str):
    """Background task to process document"""
    # Chunk text
    chunks = chunk_text(content)
    
    # Generate embeddings
    embeddings = await generate_embeddings(chunks)
    
    # Store in vector database
    await store_embeddings(document_id, chunks, embeddings)
    
    # Update status
    await update_document_status(document_id, "ready")

@app.post("/documents")
async def create_document(
    doc: DocumentUpload,
    background_tasks: BackgroundTasks
):
    # Save document metadata immediately
    await save_document_metadata(doc.document_id, doc.filename)
    
    # Process in background
    background_tasks.add_task(
        process_document,
        doc.document_id,
        doc.content
    )
    
    return {
        "document_id": doc.document_id,
        "status": "processing"
    }

Routers for Organization

As your AI API grows beyond 5-6 endpoints, a single main.py becomes unwieldy. Routers let you split your API into logical modules — chat, documents, search — each in its own file with its own prefix and tags. This is the same idea as Blueprints in Flask, but with better typing support.
# app/routers/chat.py
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/chat", tags=["Chat"])

@router.post("/")
async def create_chat():
    return {"message": "Chat created"}

@router.get("/{chat_id}")
async def get_chat(chat_id: str):
    return {"chat_id": chat_id}

@router.post("/{chat_id}/messages")
async def send_message(chat_id: str, message: str):
    return {"chat_id": chat_id, "message": message}

# app/routers/documents.py
from fastapi import APIRouter

router = APIRouter(prefix="/documents", tags=["Documents"])

@router.post("/upload")
async def upload():
    return {"status": "uploaded"}

@router.get("/")
async def list_documents():
    return {"documents": []}

# app/main.py
from fastapi import FastAPI
from app.routers import chat, documents

app = FastAPI(title="AI API")

app.include_router(chat.router)
app.include_router(documents.router)

Application Structure

ai-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app
│   ├── config.py            # Settings
│   ├── dependencies.py      # Shared dependencies
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── chat.py
│   │   ├── documents.py
│   │   └── search.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── requests.py      # Pydantic request models
│   │   └── responses.py     # Pydantic response models
│   ├── services/
│   │   ├── __init__.py
│   │   ├── llm.py           # LLM service
│   │   ├── embeddings.py    # Embedding service
│   │   └── rag.py           # RAG service
│   └── db/
│       ├── __init__.py
│       ├── database.py      # Database connection
│       └── repositories.py  # Data access
├── tests/
│   ├── test_chat.py
│   └── test_documents.py
├── .env
├── requirements.txt
└── Dockerfile

Configuration with Pydantic Settings

Never hardcode API keys, model names, or rate limits in your code. Pydantic Settings reads from environment variables (and .env files) and validates them at startup. If OPENAI_API_KEY is missing, your app fails immediately with a clear error instead of crashing on the first request. The @lru_cache decorator ensures the settings are parsed once and reused, not re-read on every request.
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # API
    app_name: str = "AI API"
    debug: bool = False
    
    # OpenAI -- validated at startup, not on first request
    openai_api_key: str
    default_model: str = "gpt-4o"
    
    # Database
    database_url: str
    
    # Redis -- default to localhost for dev, override in production
    redis_url: str = "redis://localhost:6379"
    
    # Limits -- tune these per environment
    max_tokens: int = 4000
    rate_limit_per_minute: int = 60
    
    class Config:
        env_file = ".env"

@lru_cache()  # Parse once, reuse everywhere
def get_settings() -> Settings:
    return Settings()

# Usage
from app.config import get_settings

settings = get_settings()
print(settings.openai_api_key)

Testing

# tests/test_chat.py
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
from app.main import app

client = TestClient(app)

def test_health():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_chat_validation():
    # Missing required field
    response = client.post("/chat", json={})
    assert response.status_code == 422

def test_chat_success():
    with patch("app.services.llm.get_completion") as mock:
        mock.return_value = "Mocked response"
        
        response = client.post("/chat", json={
            "message": "Hello",
            "model": "gpt-4o"
        })
        
        assert response.status_code == 200
        assert "response" in response.json()

# Async test
import pytest

@pytest.mark.asyncio
async def test_async_chat():
    from httpx import AsyncClient
    from app.main import app
    
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.post("/chat", json={"message": "Hi"})
        assert response.status_code == 200

Production Deployment

Practical tip: in production, always run uvicorn with --workers 4 (or 2x your CPU cores for I/O-bound AI workloads). Each worker is a separate process, so a crash in one does not take down the others. For Kubernetes deployments, use a single worker per container and scale horizontally instead — it is easier to manage and gives you cleaner resource limits.
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY app/ app/

# Run with uvicorn
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://user:pass@db:5432/app
    depends_on:
      - db
      - redis
  
  db:
    image: pgvector/pgvector:pg16
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=app
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  redis:
    image: redis:7-alpine

volumes:
  postgres_data:
# Run in production
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

Building MCP Servers with FastMCP

Model Context Protocol (MCP) allows AI assistants like Claude to interact with external tools. FastMCP makes it easy to create MCP servers using FastAPI-like patterns.

Install FastMCP

pip install fastmcp

Basic MCP Server

# mcp_server.py
from fastmcp import FastMCP

# Create MCP server
mcp = FastMCP("My AI Tools")

@mcp.tool()
def add_numbers(a: int, b: int) -> int:
    """Add two numbers together."""
    return a + b

@mcp.tool()
def search_database(query: str, limit: int = 10) -> list[dict]:
    """Search the database for matching records."""
    # Your database search logic
    return [{"id": 1, "name": "Result 1", "match": query}]

@mcp.tool()
def get_weather(city: str) -> dict:
    """Get current weather for a city."""
    # Your weather API logic
    return {
        "city": city,
        "temperature": 72,
        "condition": "Sunny"
    }

# Run the server
if __name__ == "__main__":
    mcp.run()

MCP Resources (Read-Only Data)

from fastmcp import FastMCP

mcp = FastMCP("Data Server")

@mcp.resource("config://app")
def get_app_config() -> str:
    """Application configuration."""
    return """
    {
        "version": "1.0.0",
        "environment": "production",
        "features": ["ai", "search", "analytics"]
    }
    """

@mcp.resource("docs://api/{endpoint}")
def get_api_docs(endpoint: str) -> str:
    """Get API documentation for an endpoint."""
    docs = {
        "users": "GET /users - List all users\nPOST /users - Create user",
        "products": "GET /products - List products\nGET /products/{id} - Get product"
    }
    return docs.get(endpoint, f"No docs for {endpoint}")

MCP Prompts (Reusable Templates)

from fastmcp import FastMCP

mcp = FastMCP("Prompt Server")

@mcp.prompt()
def code_review_prompt(code: str, language: str = "python") -> str:
    """Generate a code review prompt."""
    return f"""Please review this {language} code:

```{language}
{code}
Focus on:
  1. Bugs and errors
  2. Performance issues
  3. Security vulnerabilities
  4. Code style and readability
Provide specific suggestions for improvement.""" @mcp.prompt() def sql_expert_prompt(schema: str) -> str: """Generate a SQL expert prompt with schema context.""" return f"""You are a SQL expert. Here is the database schema: Answer questions about this database with optimized SQL queries. Explain your queries and suggest indexes if needed."""

### Combining with FastAPI

```python
from fastapi import FastAPI
from fastmcp import FastMCP

# Create both servers
app = FastAPI(title="AI API")
mcp = FastMCP("AI Tools")

# Regular FastAPI endpoints
@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.post("/chat")
async def chat(message: str):
    return {"response": f"You said: {message}"}

# MCP tools
@mcp.tool()
def query_api(endpoint: str) -> dict:
    """Query the FastAPI endpoints."""
    # Internal API calls
    return {"result": f"Called {endpoint}"}

# Run both
if __name__ == "__main__":
    import asyncio
    import uvicorn
    
    async def main():
        # Run MCP server in background
        mcp_task = asyncio.create_task(mcp.run_async())
        
        # Run FastAPI
        config = uvicorn.Config(app, host="0.0.0.0", port=8000)
        server = uvicorn.Server(config)
        await server.serve()
    
    asyncio.run(main())

Configure Claude Desktop

Add to your Claude Desktop config (claude_desktop_config.json):
{
  "mcpServers": {
    "my-tools": {
      "command": "python",
      "args": ["path/to/mcp_server.py"]
    }
  }
}
Now Claude can use your tools directly!

Quick Reference

FeatureCode
Create appapp = FastAPI()
GET endpoint@app.get("/path")
POST endpoint@app.post("/path")
Path param@app.get("/items/{id}")
Query paramdef f(q: str = Query(...))
Request bodydef f(body: Model)
DependencyDepends(function)
Background taskBackgroundTasks
StreamingStreamingResponse
File uploadUploadFile
MCP tool@mcp.tool()
MCP resource@mcp.resource("uri")
Next Step: Now learn database operations with SQLAlchemy & Databases Crash Course.

Interview Deep-Dive

Strong Answer:
  • The core problem is that LLM calls are I/O-bound and highly variable in latency. I would build the API with three tiers of response patterns. For fast requests under 5 seconds, a standard async endpoint with streaming via Server-Sent Events so the user sees tokens as they arrive rather than staring at a spinner. For medium requests 5-30 seconds, I would still stream but add a timeout with a clear error message: if the upstream provider has not started streaming within 10 seconds, I return a 504 with a retry-after header. For long-running requests like batch processing or document analysis, I would use a job-based pattern: the POST endpoint returns a 202 Accepted with a job ID immediately, a background task processes the request, and the client polls a GET endpoint or receives a webhook callback.
  • The critical design choice in FastAPI specifically is using AsyncOpenAI instead of the sync client. If you use the sync OpenAI() client inside an async endpoint, you block the entire event loop — one slow LLM call will freeze every other request. I have seen this bug in production at a startup where 40 concurrent users brought the API to its knees because someone used the sync client. The fix was a one-line change to AsyncOpenAI and throughput went from 5 to 200 concurrent requests.
  • I would also add middleware for request ID tracking, structured logging of latency per provider, and a circuit breaker that stops sending requests to a provider after 3 consecutive timeouts. FastAPI’s dependency injection makes this clean: the circuit breaker state lives in a dependency that gets injected into every endpoint.
Red Flags: Candidate does not mention async/sync distinction, suggests using threads instead of async for I/O-bound work, or does not consider the streaming use case for LLM APIs.Follow-up: How would you implement rate limiting per user and per provider in this FastAPI service?I would implement two separate rate limiters. For per-user rate limiting, I use a Redis-backed sliding window counter keyed by API key. The dependency injection pattern in FastAPI makes this elegant — I create a RateLimiter dependency that checks Redis before the endpoint logic runs and raises an HTTPException(429) with a Retry-After header if the limit is exceeded. For per-provider rate limiting, I use a token bucket algorithm because LLM providers have both requests-per-minute and tokens-per-minute limits. The token bucket lives in a shared dependency and is checked before every outbound LLM call, not at the API endpoint level. The key nuance is that these are two different concerns: user rate limiting protects my service, provider rate limiting protects my account from getting throttled. I have seen teams conflate these and end up either over-restricting users or burning through provider rate limits.
Strong Answer:
  • FastAPI’s dependency injection uses Python’s type hints and the Depends() function to automatically resolve, instantiate, and inject shared resources into endpoint handlers. Dependencies can be chained — a get_current_user dependency can itself depend on get_api_key which depends on get_db. FastAPI resolves the full dependency graph per request and handles cleanup via generator dependencies (using yield).
  • For AI applications specifically, DI solves three hard problems. First, expensive client initialization: you do not want to create a new AsyncOpenAI() client on every request. A dependency with @lru_cache or a lifespan event creates the client once and shares it. Second, context propagation: every LLM call needs the user’s API key, their rate limit tier, and a trace ID for observability. Instead of passing these through every function, I inject them as dependencies. Third, testability: I can swap the real LLM client with a mock in tests by overriding the dependency, which means I can test my API logic without making actual LLM calls that cost money and are non-deterministic.
  • The pattern I use in production is a LLMService dependency that encapsulates the client, handles retries, tracks costs, and logs to the observability platform. Every endpoint gets this service injected rather than talking to OpenAI directly. When we added Anthropic as a fallback provider, we only changed the LLMService dependency — zero endpoint code changed.
Red Flags: Candidate describes DI as just “passing arguments to functions,” cannot explain the difference between function dependencies and generator dependencies (yield), or does not see why DI matters more for AI apps than CRUD apps.Follow-up: What is the difference between using Depends() with a function versus a class, and when would you choose each?Function dependencies are great for simple resolution logic like extracting and validating an API key from a header. Class dependencies shine when you need stateful resources with lifecycle management — for example, a database connection pool or an LLM client with built-in retry state. With a class, I implement __init__ for configuration and make the instance callable so Depends(MyService) works. The class approach also makes testing cleaner: I can subclass for a mock implementation. In practice, my AI APIs use function dependencies for auth and validation, and class dependencies for services like the LLM client, vector store connection, and caching layer. The gotcha is that FastAPI creates a new instance of a class dependency per request by default — if you want a singleton (which you usually do for expensive clients), you need @lru_cache on the factory function or use the lifespan context manager.
Strong Answer:
  • FastAPI’s BackgroundTasks runs tasks in the same process after the response is sent. It is perfect for lightweight fire-and-forget work: logging an analytics event, sending a notification, updating a cache entry. The task shares the same event loop and process memory, so it has zero serialization overhead and can access in-process state directly.
  • Celery (or alternatives like Dramatiq, Huey, or even Redis Streams with a custom worker) runs tasks in separate worker processes with a message broker in between. This is what you need for AI workloads that are heavy: embedding a 500-page PDF, running a batch of 1000 LLM evaluations, fine-tuning a model, or generating 50 images.
  • The decision criteria are: if the task takes under 5 seconds and you can tolerate losing it on a process crash, use BackgroundTasks. If the task takes longer, needs retry logic, must survive a server restart, or needs to scale horizontally across multiple workers, use a task queue. In my experience, most AI workloads fall into the second category because LLM calls are inherently slow and expensive enough that you want guaranteed delivery.
  • A common mistake I see is teams starting with BackgroundTasks for document processing because it is simpler, then hitting problems when the FastAPI process runs out of memory or restarts mid-task and the user’s document is silently lost. The migration to Celery later is painful because the code was not designed for serializable task arguments.
Red Flags: Candidate does not know the difference, suggests BackgroundTasks for all async work regardless of duration, or does not mention task persistence and failure recovery.Follow-up: How would you design a document ingestion pipeline for a RAG system using FastAPI as the API layer?The API layer accepts the upload via a POST endpoint with UploadFile, validates the file type and size, stores the raw file to S3 or equivalent object storage, and enqueues a processing job with the file reference and metadata. The endpoint returns a 202 with a job ID immediately. The processing pipeline runs in Celery workers: step one chunks the document using a text splitter, step two generates embeddings via the embedding API (batched for efficiency), step three upserts the vectors into the vector database with metadata. Each step is a separate Celery task chained together so a failure at step two retries just the embedding step, not the chunking. The API exposes a GET /documents/{id}/status endpoint that reads the job state from Redis or the database. I also add a webhook callback option so the client does not need to poll. The key design insight is keeping the FastAPI process thin — it only handles HTTP and enqueuing. All heavy compute lives in the workers.
Strong Answer:
  • I use a three-layer testing strategy. Layer one is unit tests with fully mocked LLM responses. I override the LLM service dependency using FastAPI’s app.dependency_overrides so every endpoint gets a mock service that returns deterministic responses. This covers request validation, error handling, auth logic, and response formatting — everything except the actual LLM interaction. These tests run in CI in under 10 seconds.
  • Layer two is integration tests with recorded responses. I use a library like vcrpy or a custom fixture that records real LLM API responses once and replays them in subsequent test runs. This catches serialization bugs, response parsing edge cases, and content-type mismatches that mocks would miss. I re-record these fixtures monthly or when I change models.
  • Layer three is contract tests against the live API, run on a schedule (nightly) rather than on every commit. These verify that the LLM providers have not changed their response format, that our API keys are valid, and that the end-to-end flow works. These tests have a cost budget — I cap them at $2 per run.
  • The biggest testing mistake specific to AI APIs is making assertions too tight. If you assert response.json()["answer"] == "Machine learning is...", the test will break every time the LLM gives a slightly different phrasing. Instead, I assert on structure (valid JSON, required fields present, status code), on constraints (response under token limit, no banned words), and use LLM-as-Judge for quality assertions only in the nightly suite.
Red Flags: Candidate only mentions manual testing, mocks everything including the HTTP layer, or writes brittle tests that assert on exact LLM output strings.Follow-up: How do you handle the non-determinism of LLM responses in your test assertions?I set temperature=0 for all test calls to minimize variance, but even at temperature zero, LLM responses are not perfectly deterministic across API versions. So my assertions focus on invariants rather than exact content. I check structural invariants (is it valid JSON, does it have the required keys), content invariants (does it mention the required topics, is it within length bounds), and safety invariants (does it avoid banned phrases). For quality, I use a lightweight LLM judge in the integration test suite that scores the response on a 1-5 scale and asserts the score is above 3. The judge prompt is versioned alongside the test code. The key principle is: test the contract, not the content.