Skip to main content
API Design Patterns

Why API Design Matters

APIs are contracts between services. Good API design leads to:
  • Developer Experience - Easy to use and understand
  • Maintainability - Evolve without breaking clients
  • Performance - Efficient data transfer
  • Security - Protected resources

REST API Design

Resource-Based URLs

┌─────────────────────────────────────────────────────────────────┐
│                    RESTful URL Design                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  [Good] (Resource-based):                                      │
│  ─────────────────────────                                      │
│  GET    /users              → List all users                   │
│  POST   /users              → Create a user                    │
│  GET    /users/123          → Get user 123                     │
│  PUT    /users/123          → Replace user 123                 │
│  PATCH  /users/123          → Update user 123                  │
│  DELETE /users/123          → Delete user 123                  │
│                                                                 │
│  GET    /users/123/orders   → Get orders for user 123          │
│  POST   /users/123/orders   → Create order for user 123        │
│                                                                 │
│  [Bad] (Action-based):                                         │
│  ─────────────────────────                                      │
│  GET    /getUsers                                               │
│  POST   /createUser                                             │
│  POST   /deleteUser/123                                         │
│  GET    /getUserOrders?userId=123                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Query Parameters

┌─────────────────────────────────────────────────────────────────┐
│                    Query Parameters                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Filtering:                                                     │
│  GET /products?category=electronics&price_min=100&in_stock=true │
│                                                                 │
│  Pagination:                                                    │
│  GET /users?page=2&per_page=20                                  │
│  GET /users?offset=40&limit=20                                  │
│  GET /users?cursor=eyJpZCI6MTIzfQ==                             │
│                                                                 │
│  Sorting:                                                       │
│  GET /products?sort=price          (ascending)                  │
│  GET /products?sort=-price         (descending)                 │
│  GET /products?sort=category,-price (multiple)                  │
│                                                                 │
│  Field Selection:                                               │
│  GET /users/123?fields=id,name,email                            │
│                                                                 │
│  Search:                                                        │
│  GET /products?q=laptop                                         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Response Structure

from fastapi import FastAPI, HTTPException, Query, Path, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List, Generic, TypeVar
from datetime import datetime
from uuid import uuid4

T = TypeVar('T')

# ============================================
# Response Models
# ============================================

class MetaInfo(BaseModel):
    request_id: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)

class PaginationInfo(BaseModel):
    page: int
    per_page: int
    total: int
    total_pages: int
    next_cursor: Optional[str] = None
    prev_cursor: Optional[str] = None

class ApiResponse(BaseModel, Generic[T]):
    success: bool = True
    data: T
    meta: MetaInfo

class PaginatedResponse(BaseModel, Generic[T]):
    success: bool = True
    data: List[T]
    pagination: PaginationInfo
    meta: MetaInfo

class ErrorDetail(BaseModel):
    field: Optional[str] = None
    message: str
    code: str

class ErrorResponse(BaseModel):
    success: bool = False
    error: dict
    meta: MetaInfo

# ============================================
# Domain Models
# ============================================

class UserBase(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr

class UserCreate(UserBase):
    password: str = Field(..., min_length=8)

class UserUpdate(BaseModel):
    name: Optional[str] = Field(None, min_length=1, max_length=100)
    email: Optional[EmailStr] = None

class User(UserBase):
    id: int
    created_at: datetime
    updated_at: datetime
    
    class Config:
        from_attributes = True

# ============================================
# API Implementation
# ============================================

app = FastAPI(title="User API", version="1.0.0")

# Request ID middleware
@app.middleware("http")
async def add_request_id(request, call_next):
    request_id = str(uuid4())
    request.state.request_id = request_id
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

# Custom exception handler
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    return JSONResponse(
        status_code=exc.status_code,
        content=ErrorResponse(
            error={
                "code": exc.detail.get("code", "ERROR"),
                "message": exc.detail.get("message", str(exc.detail)),
                "details": exc.detail.get("details", [])
            },
            meta=MetaInfo(request_id=request.state.request_id)
        ).dict()
    )

# List users with pagination
@app.get("/users", response_model=PaginatedResponse[User])
async def list_users(
    request: Request,
    page: int = Query(1, ge=1),
    per_page: int = Query(20, ge=1, le=100),
    sort: str = Query("created_at", regex="^-?(name|email|created_at)$"),
    search: Optional[str] = Query(None, min_length=1)
):
    # Parse sort direction
    sort_desc = sort.startswith("-")
    sort_field = sort.lstrip("-")
    
    # Build query (pseudo-code)
    query = db.query(UserModel)
    if search:
        query = query.filter(UserModel.name.ilike(f"%{search}%"))
    
    total = query.count()
    users = query.order_by(
        getattr(UserModel, sort_field).desc() if sort_desc else getattr(UserModel, sort_field)
    ).offset((page - 1) * per_page).limit(per_page).all()
    
    return PaginatedResponse(
        data=users,
        pagination=PaginationInfo(
            page=page,
            per_page=per_page,
            total=total,
            total_pages=(total + per_page - 1) // per_page
        ),
        meta=MetaInfo(request_id=request.state.request_id)
    )

# Get single user
@app.get("/users/{user_id}", response_model=ApiResponse[User])
async def get_user(
    request: Request,
    user_id: int = Path(..., ge=1)
):
    user = db.query(UserModel).filter(UserModel.id == user_id).first()
    
    if not user:
        raise HTTPException(
            status_code=404,
            detail={
                "code": "USER_NOT_FOUND",
                "message": f"User with id {user_id} not found"
            }
        )
    
    return ApiResponse(
        data=user,
        meta=MetaInfo(request_id=request.state.request_id)
    )

# Create user
@app.post("/users", response_model=ApiResponse[User], status_code=201)
async def create_user(
    request: Request,
    user_data: UserCreate
):
    # Check for existing email
    existing = db.query(UserModel).filter(UserModel.email == user_data.email).first()
    if existing:
        raise HTTPException(
            status_code=400,
            detail={
                "code": "VALIDATION_ERROR",
                "message": "Email already registered",
                "details": [{"field": "email", "message": "This email is already in use"}]
            }
        )
    
    user = UserModel(**user_data.dict())
    db.add(user)
    db.commit()
    db.refresh(user)
    
    return ApiResponse(
        data=user,
        meta=MetaInfo(request_id=request.state.request_id)
    )

# Update user (partial)
@app.patch("/users/{user_id}", response_model=ApiResponse[User])
async def update_user(
    request: Request,
    user_id: int = Path(..., ge=1),
    user_data: UserUpdate = None
):
    user = db.query(UserModel).filter(UserModel.id == user_id).first()
    
    if not user:
        raise HTTPException(status_code=404, detail={"code": "USER_NOT_FOUND", "message": "User not found"})
    
    update_data = user_data.dict(exclude_unset=True)
    for field, value in update_data.items():
        setattr(user, field, value)
    
    user.updated_at = datetime.utcnow()
    db.commit()
    db.refresh(user)
    
    return ApiResponse(
        data=user,
        meta=MetaInfo(request_id=request.state.request_id)
    )

# Delete user
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int = Path(..., ge=1)):
    user = db.query(UserModel).filter(UserModel.id == user_id).first()
    
    if not user:
        raise HTTPException(status_code=404, detail={"code": "USER_NOT_FOUND", "message": "User not found"})
    
    db.delete(user)
    db.commit()
    
    return None  # 204 No Content

Response JSON Examples

// Success Response
{
  "success": true,
  "data": {
    "id": 123,
    "name": "John Doe",
    "email": "[email protected]",
    "created_at": "2024-01-15T10:30:00Z"
  },
  "meta": {
    "request_id": "req_abc123"
  }
}

// List Response with Pagination
{
  "success": true,
  "data": [
    { "id": 1, "name": "User 1" },
    { "id": 2, "name": "User 2" }
  ],
  "pagination": {
    "page": 1,
    "per_page": 20,
    "total": 150,
    "total_pages": 8,
    "next_cursor": "eyJpZCI6MjB9"
  }
}

// Error Response
{
  "success": false,
  "error": {
    "code": "VALIDATION_ERROR",
    "message": "Invalid email format",
    "details": [
      {
        "field": "email",
        "message": "Must be a valid email address"
      }
    ]
  },
  "meta": {
    "request_id": "req_def456"
  }
}

Pagination Strategies

┌─────────────────────────────────────────────────────────────────┐
│                    Pagination Comparison                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Offset Pagination                                           │
│     GET /users?offset=40&limit=20                               │
│     SQL: SELECT * FROM users LIMIT 20 OFFSET 40                │
│     + Simple, supports jumping to any page                     │
│     - Slow for large offsets (scans skipped rows)              │
│     - Inconsistent with concurrent writes                      │
│                                                                 │
│  2. Cursor Pagination                                           │
│     GET /users?cursor=eyJpZCI6MTIzfQ==&limit=20                 │
│     SQL: SELECT * FROM users WHERE id > 123 LIMIT 20           │
│     + Consistent, fast for any position                        │
│     + Handles concurrent writes well                           │
│     - Can't jump to arbitrary pages                            │
│     - Cursor can be complex for multi-column sorts             │
│                                                                 │
│  3. Keyset Pagination (cursor variant)                          │
│     GET /users?after_id=123&limit=20                            │
│     + Simple cursor format                                     │
│     + Works with indexes                                       │
│                                                                 │
│  Recommendation: Use cursor for large datasets                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

GraphQL

GraphQL vs REST

REST                                GraphQL
───────────────────────            ───────────────────────

GET /users/123                     query {
→ All user fields                    user(id: 123) {
                                       id
GET /users/123/posts                   name
→ All posts                            posts {
                                         title
GET /users/123/followers               }
→ All followers                        followersCount
                                     }
3 requests                          }
Over-fetching data                  
                                    1 request
                                    Exact data needed

GraphQL Schema Example

type User {
  id: ID!
  name: String!
  email: String!
  posts: [Post!]!
  followers: [User!]!
  followersCount: Int!
  createdAt: DateTime!
}

type Post {
  id: ID!
  title: String!
  content: String!
  author: User!
  comments: [Comment!]!
  likes: Int!
}

type Query {
  user(id: ID!): User
  users(limit: Int, offset: Int): [User!]!
  post(id: ID!): Post
  feed(userId: ID!, limit: Int): [Post!]!
}

type Mutation {
  createUser(input: CreateUserInput!): User!
  updateUser(id: ID!, input: UpdateUserInput!): User!
  deleteUser(id: ID!): Boolean!
  createPost(input: CreatePostInput!): Post!
}

type Subscription {
  postCreated(userId: ID!): Post!
  commentAdded(postId: ID!): Comment!
}

GraphQL Resolver Implementation

import strawberry
from strawberry.types import Info
from strawberry.dataloader import DataLoader
from typing import List, Optional
from datetime import datetime
import asyncio

# ============================================
# DataLoaders (Solve N+1 Problem)
# ============================================

async def load_users(keys: List[int]) -> List["UserType"]:
    """Batch load users by IDs"""
    users = await User.filter(id__in=keys)
    user_map = {u.id: u for u in users}
    return [user_map.get(key) for key in keys]

async def load_posts_by_author(keys: List[int]) -> List[List["PostType"]]:
    """Batch load posts for multiple authors"""
    posts = await Post.filter(author_id__in=keys)
    posts_by_author = {}
    for post in posts:
        if post.author_id not in posts_by_author:
            posts_by_author[post.author_id] = []
        posts_by_author[post.author_id].append(post)
    return [posts_by_author.get(key, []) for key in keys]

async def load_followers_count(keys: List[int]) -> List[int]:
    """Batch load follower counts"""
    counts = await Follow.filter(following_id__in=keys).values('following_id').annotate(count=Count('id'))
    count_map = {c['following_id']: c['count'] for c in counts}
    return [count_map.get(key, 0) for key in keys]


def get_dataloaders() -> dict:
    return {
        "user_loader": DataLoader(load_fn=load_users),
        "posts_loader": DataLoader(load_fn=load_posts_by_author),
        "followers_count_loader": DataLoader(load_fn=load_followers_count)
    }


# ============================================
# Types
# ============================================

@strawberry.type
class UserType:
    id: strawberry.ID
    name: str
    email: str
    created_at: datetime
    
    @strawberry.field
    async def posts(self, info: Info) -> List["PostType"]:
        """Lazily loaded posts using DataLoader"""
        loader = info.context["posts_loader"]
        return await loader.load(self.id)
    
    @strawberry.field
    async def followers_count(self, info: Info) -> int:
        """Efficient batch-loaded follower count"""
        loader = info.context["followers_count_loader"]
        return await loader.load(self.id)
    
    @strawberry.field
    async def followers(
        self, 
        info: Info,
        limit: int = 10,
        offset: int = 0
    ) -> List["UserType"]:
        """Paginated followers list"""
        follows = await Follow.filter(following_id=self.id).offset(offset).limit(limit)
        follower_ids = [f.follower_id for f in follows]
        loader = info.context["user_loader"]
        return await asyncio.gather(*[loader.load(id) for id in follower_ids])


@strawberry.type
class PostType:
    id: strawberry.ID
    title: str
    content: str
    author_id: strawberry.Private[int]  # Hidden from schema
    likes: int
    created_at: datetime
    
    @strawberry.field
    async def author(self, info: Info) -> UserType:
        loader = info.context["user_loader"]
        return await loader.load(self.author_id)
    
    @strawberry.field
    async def comments(
        self,
        info: Info,
        limit: int = 20
    ) -> List["CommentType"]:
        return await Comment.filter(post_id=self.id).limit(limit)


# ============================================
# Inputs
# ============================================

@strawberry.input
class CreateUserInput:
    name: str
    email: str
    password: str

@strawberry.input
class UpdateUserInput:
    name: Optional[str] = None
    email: Optional[str] = None

@strawberry.input
class CreatePostInput:
    title: str
    content: str


# ============================================
# Query & Mutation
# ============================================

@strawberry.type
class Query:
    @strawberry.field
    async def user(self, info: Info, id: strawberry.ID) -> Optional[UserType]:
        loader = info.context["user_loader"]
        return await loader.load(int(id))
    
    @strawberry.field
    async def users(
        self,
        limit: int = 20,
        offset: int = 0,
        search: Optional[str] = None
    ) -> List[UserType]:
        query = User.all()
        if search:
            query = query.filter(name__icontains=search)
        return await query.offset(offset).limit(limit)
    
    @strawberry.field
    async def feed(
        self,
        user_id: strawberry.ID,
        limit: int = 20,
        cursor: Optional[str] = None
    ) -> List[PostType]:
        """User's feed with cursor pagination"""
        query = Post.filter(author_id=int(user_id))
        
        if cursor:
            # Decode cursor (base64 encoded post ID)
            last_id = int(base64.b64decode(cursor).decode())
            query = query.filter(id__lt=last_id)
        
        return await query.order_by("-created_at").limit(limit)


@strawberry.type
class Mutation:
    @strawberry.mutation
    async def create_user(self, input: CreateUserInput) -> UserType:
        # Validate email uniqueness
        existing = await User.filter(email=input.email).first()
        if existing:
            raise ValueError("Email already registered")
        
        user = await User.create(
            name=input.name,
            email=input.email,
            password=hash_password(input.password)
        )
        return user
    
    @strawberry.mutation
    async def update_user(
        self,
        id: strawberry.ID,
        input: UpdateUserInput,
        info: Info
    ) -> UserType:
        # Authorization check
        current_user = info.context["current_user"]
        if current_user.id != int(id):
            raise PermissionError("Not authorized")
        
        update_data = {k: v for k, v in input.__dict__.items() if v is not None}
        await User.filter(id=int(id)).update(**update_data)
        
        return await User.get(id=int(id))
    
    @strawberry.mutation
    async def create_post(
        self,
        input: CreatePostInput,
        info: Info
    ) -> PostType:
        current_user = info.context["current_user"]
        
        post = await Post.create(
            title=input.title,
            content=input.content,
            author_id=current_user.id
        )
        
        # Publish to subscribers
        await info.context["pubsub"].publish(
            f"post_created:{current_user.id}",
            post
        )
        
        return post


# ============================================
# Subscription
# ============================================

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def post_created(
        self,
        info: Info,
        user_id: strawberry.ID
    ) -> PostType:
        async for post in info.context["pubsub"].subscribe(f"post_created:{user_id}"):
            yield post


# ============================================
# Schema Setup
# ============================================

schema = strawberry.Schema(
    query=Query,
    mutation=Mutation,
    subscription=Subscription
)

# FastAPI integration
from strawberry.fastapi import GraphQLRouter

async def get_context():
    return {
        **get_dataloaders(),
        "current_user": get_current_user(),
        "pubsub": pubsub
    }

graphql_app = GraphQLRouter(schema, context_getter=get_context)
app.include_router(graphql_app, prefix="/graphql")

GraphQL Trade-offs

GraphQL Pros

  • Fetch exactly what you need
  • Single endpoint
  • Strong typing
  • Self-documenting (introspection)
  • Great for complex, nested data
  • Reduces over/under-fetching

GraphQL Cons

  • Caching is harder (no HTTP cache)
  • N+1 query problem
  • Rate limiting complexity
  • File uploads are awkward
  • Learning curve
  • Performance monitoring harder

N+1 Problem & DataLoader

┌─────────────────────────────────────────────────────────────────┐
│                    N+1 Problem in GraphQL                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Query:                                                         │
│  query {                                                        │
│    posts(limit: 10) {                                          │
│      title                                                      │
│      author { name }   ← Fetches author for EACH post          │
│    }                                                            │
│  }                                                              │
│                                                                 │
│  [Bad] Without DataLoader (11 queries):                            │
│  1. SELECT * FROM posts LIMIT 10                               │
│  2. SELECT * FROM users WHERE id = 1                           │
│  3. SELECT * FROM users WHERE id = 2                           │
│  ... (N more queries)                                          │
│                                                                 │
│  [Good] With DataLoader (2 queries):                                │
│  1. SELECT * FROM posts LIMIT 10                               │
│  2. SELECT * FROM users WHERE id IN (1, 2, 3, ...)            │
│                                                                 │
│  DataLoader batches requests within the same tick              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

API Versioning

Versioning Strategies

┌─────────────────────────────────────────────────────────────────┐
│                    API Versioning Strategies                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. URL Path Versioning (Most Common)                           │
│     GET /v1/users/123                                           │
│     GET /v2/users/123                                           │
│     + Clear, easy to implement                                 │
│     - URL pollution, hard to deprecate                         │
│                                                                 │
│  2. Query Parameter                                             │
│     GET /users/123?version=2                                    │
│     + Optional, backwards compatible                           │
│     - Can be missed, caching issues                            │
│                                                                 │
│  3. Header Versioning                                           │
│     GET /users/123                                              │
│     Header: Accept: application/vnd.api+json;version=2         │
│     + Clean URLs, semantic                                     │
│     - Hidden, harder to test                                   │
│                                                                 │
│  4. Content Negotiation                                         │
│     Header: Accept: application/vnd.company.api.v2+json        │
│     + RESTful, flexible                                        │
│     - Complex, client overhead                                 │
│                                                                 │
│  Recommendation: URL path for public APIs, headers for internal│
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Version Migration Strategy

Timeline for API Version Deprecation
─────────────────────────────────────────────────────────

v1 Launch    v2 Launch    v1 Deprecated    v1 Sunset
    │            │              │              │
    │◄──────────►│◄────────────►│◄────────────►│
    │   Active   │   Migration  │   Warning    │
    │            │    Period    │   Period     │
    │            │   (6 months) │  (3 months)  │
    
Communication:
- Announce v2 with migration guide
- Add deprecation headers to v1
- Email users with timelines
- Provide breaking change logs
- Offer migration support

Rate Limiting

Common Algorithms

┌─────────────────────────────────────────────────────────────────┐
│                    Rate Limiting Algorithms                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Fixed Window                                                │
│     ┌─────────────────┬─────────────────┐                      │
│     │   Window 1      │   Window 2      │                      │
│     │   100 requests  │   100 requests  │                      │
│     │   00:00-01:00   │   01:00-02:00   │                      │
│     └─────────────────┴─────────────────┘                      │
│     ⚠️ Burst at window boundary (200 req in 2 seconds)         │
│                                                                 │
│  2. Sliding Window Log                                          │
│     Track timestamp of each request                             │
│     Count requests in last N seconds                            │
│     + Accurate, - Memory intensive                            │
│                                                                 │
│  3. Sliding Window Counter                                      │
│     Weighted average of current + previous window              │
│     + Memory efficient, + Smooth                              │
│                                                                 │
│  4. Token Bucket                                                │
│     ┌─────────────────────┐                                    │
│     │  Bucket (capacity)  │ ← Tokens added at fixed rate       │
│     │  ████████░░░░░░░░   │                                    │
│     │  (8/15 tokens)      │                                    │
│     └──────────┬──────────┘                                    │
│                │                                                │
│     Request takes 1 token                                      │
│     + Allows bursts up to capacity                            │
│                                                                 │
│  5. Leaky Bucket                                                │
│     Requests queue and process at fixed rate                   │
│     + Smooth output rate                                       │
│     - No burst handling                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Rate Limit Headers

HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 998
X-RateLimit-Reset: 1640000000
X-RateLimit-Reset-After: 3600

# When rate limited:
HTTP/1.1 429 Too Many Requests
Retry-After: 60
Content-Type: application/json

{
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "Rate limit exceeded. Try again in 60 seconds.",
    "retry_after": 60
  }
}

Distributed Rate Limiting

┌─────────────────────────────────────────────────────────────────┐
│               Distributed Rate Limiting with Redis              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│                        ┌─────────┐                             │
│                        │  Redis  │  Central counter            │
│                        └────┬────┘                             │
│                             │                                   │
│         ┌───────────────────┼───────────────────┐              │
│         │                   │                   │               │
│    ┌────▼────┐         ┌────▼────┐         ┌────▼────┐        │
│    │ API 1   │         │ API 2   │         │ API 3   │        │
│    └─────────┘         └─────────┘         └─────────┘        │
│                                                                 │
│  Lua Script (atomic):                                          │
│  local current = redis.call('INCR', key)                       │
│  if current == 1 then                                          │
│      redis.call('EXPIRE', key, window_seconds)                 │
│  end                                                            │
│  return current <= limit                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Rate Limiting Implementation

import redis.asyncio as redis
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from functools import wraps
from typing import Optional, Callable
from dataclasses import dataclass
from enum import Enum
import time
import hashlib

class RateLimitAlgorithm(Enum):
    FIXED_WINDOW = "fixed_window"
    SLIDING_WINDOW = "sliding_window"
    TOKEN_BUCKET = "token_bucket"

@dataclass
class RateLimitConfig:
    requests: int
    window_seconds: int
    algorithm: RateLimitAlgorithm = RateLimitAlgorithm.SLIDING_WINDOW
    key_prefix: str = "ratelimit"

class RateLimiter:
    """Production-ready distributed rate limiter using Redis"""
    
    # Lua script for atomic sliding window
    SLIDING_WINDOW_SCRIPT = """
    local key = KEYS[1]
    local now = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])
    local limit = tonumber(ARGV[3])
    
    -- Remove old entries
    redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
    
    -- Count current entries
    local current = redis.call('ZCARD', key)
    
    if current < limit then
        -- Add new request
        redis.call('ZADD', key, now, now .. '-' .. math.random())
        redis.call('EXPIRE', key, window)
        return {1, limit - current - 1, window}
    else
        -- Get time until oldest entry expires
        local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
        local retry_after = oldest[2] + window - now
        return {0, 0, retry_after}
    end
    """
    
    # Lua script for token bucket
    TOKEN_BUCKET_SCRIPT = """
    local key = KEYS[1]
    local now = tonumber(ARGV[1])
    local rate = tonumber(ARGV[2])
    local capacity = tonumber(ARGV[3])
    
    local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
    local tokens = tonumber(bucket[1]) or capacity
    local last_update = tonumber(bucket[2]) or now
    
    -- Calculate tokens to add
    local elapsed = now - last_update
    tokens = math.min(capacity, tokens + elapsed * rate)
    
    if tokens >= 1 then
        tokens = tokens - 1
        redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
        redis.call('EXPIRE', key, math.ceil(capacity / rate) * 2)
        return {1, math.floor(tokens), 0}
    else
        local retry_after = (1 - tokens) / rate
        return {0, 0, retry_after}
    end
    """
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self._sliding_window_sha = None
        self._token_bucket_sha = None
    
    async def init_scripts(self):
        """Load Lua scripts into Redis"""
        self._sliding_window_sha = await self.redis.script_load(
            self.SLIDING_WINDOW_SCRIPT
        )
        self._token_bucket_sha = await self.redis.script_load(
            self.TOKEN_BUCKET_SCRIPT
        )
    
    async def is_allowed(
        self,
        identifier: str,
        config: RateLimitConfig
    ) -> tuple[bool, dict]:
        """Check if request is allowed under rate limit"""
        key = f"{config.key_prefix}:{identifier}"
        now = time.time()
        
        if config.algorithm == RateLimitAlgorithm.SLIDING_WINDOW:
            result = await self.redis.evalsha(
                self._sliding_window_sha,
                1, key,
                now, config.window_seconds, config.requests
            )
        elif config.algorithm == RateLimitAlgorithm.TOKEN_BUCKET:
            rate = config.requests / config.window_seconds
            result = await self.redis.evalsha(
                self._token_bucket_sha,
                1, key,
                now, rate, config.requests
            )
        else:  # Fixed window
            result = await self._fixed_window(key, config)
        
        allowed, remaining, retry_after = result
        
        return bool(allowed), {
            "limit": config.requests,
            "remaining": int(remaining),
            "reset": int(now + config.window_seconds),
            "retry_after": int(retry_after) if not allowed else None
        }
    
    async def _fixed_window(
        self,
        key: str,
        config: RateLimitConfig
    ) -> tuple:
        """Simple fixed window rate limiting"""
        current = await self.redis.incr(key)
        if current == 1:
            await self.redis.expire(key, config.window_seconds)
        
        if current <= config.requests:
            return (1, config.requests - current, 0)
        else:
            ttl = await self.redis.ttl(key)
            return (0, 0, ttl)


# FastAPI dependency
def rate_limit(
    requests: int = 100,
    window_seconds: int = 60,
    key_func: Optional[Callable[[Request], str]] = None
):
    """Rate limiting dependency with customizable key extraction"""
    
    config = RateLimitConfig(
        requests=requests,
        window_seconds=window_seconds
    )
    
    async def dependency(request: Request):
        rate_limiter: RateLimiter = request.app.state.rate_limiter
        
        # Extract identifier (IP, user ID, API key, etc.)
        if key_func:
            identifier = key_func(request)
        else:
            # Default: use IP + endpoint
            ip = request.client.host
            path = request.url.path
            identifier = hashlib.md5(f"{ip}:{path}".encode()).hexdigest()
        
        allowed, info = await rate_limiter.is_allowed(identifier, config)
        
        # Add rate limit headers to response
        request.state.rate_limit_headers = {
            "X-RateLimit-Limit": str(info["limit"]),
            "X-RateLimit-Remaining": str(info["remaining"]),
            "X-RateLimit-Reset": str(info["reset"])
        }
        
        if not allowed:
            raise HTTPException(
                status_code=429,
                detail={
                    "code": "RATE_LIMIT_EXCEEDED",
                    "message": f"Rate limit exceeded. Retry after {info['retry_after']} seconds.",
                    "retry_after": info["retry_after"]
                },
                headers={
                    "Retry-After": str(info["retry_after"]),
                    **request.state.rate_limit_headers
                }
            )
        
        return info
    
    return Depends(dependency)


# Usage
app = FastAPI()

@app.on_event("startup")
async def startup():
    app.state.redis = redis.from_url("redis://localhost:6379")
    app.state.rate_limiter = RateLimiter(app.state.redis)
    await app.state.rate_limiter.init_scripts()

# Global rate limit
@app.get("/api/data", dependencies=[rate_limit(requests=100, window_seconds=60)])
async def get_data():
    return {"data": "value"}

# Custom key (per-user rate limit)
def user_key(request: Request) -> str:
    return f"user:{request.state.user_id}"

@app.get("/api/premium", dependencies=[rate_limit(requests=1000, window_seconds=60, key_func=user_key)])
async def premium_endpoint():
    return {"data": "premium"}

# Tiered rate limits
@app.get("/api/search")
async def search(
    q: str,
    request: Request,
    _free: dict = rate_limit(requests=10, window_seconds=60),
    _burst: dict = rate_limit(requests=5, window_seconds=1)  # Also limit bursts
):
    return {"results": []}

Authentication & Authorization

Auth Patterns

┌─────────────────────────────────────────────────────────────────┐
│                    Authentication Methods                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. API Keys                                                    │
│     Header: X-API-Key: abc123                                   │
│     + Simple                                                   │
│     - Can't expire, no user context                            │
│     Use: Server-to-server, public APIs                         │
│                                                                 │
│  2. Bearer Tokens (JWT)                                         │
│     Header: Authorization: Bearer eyJhbGciOiJIUzI1...          │
│     + Stateless, contains claims                               │
│     - Can't revoke until expiry                                │
│     Use: User authentication                                    │
│                                                                 │
│  3. OAuth 2.0                                                   │
│     ┌────────┐    ┌────────┐    ┌────────┐                     │
│     │  User  │───►│  Auth  │───►│  App   │                     │
│     │        │◄───│ Server │◄───│        │                     │
│     └────────┘    └────────┘    └────────┘                     │
│     Use: Third-party access, SSO                               │
│                                                                 │
│  4. Session Cookies                                             │
│     Set-Cookie: session_id=abc123; HttpOnly; Secure            │
│     + Simple, can revoke server-side                          │
│     - CSRF vulnerability, not for APIs                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

JWT Structure

Header.Payload.Signature

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4iLCJpYXQiOjE1MTYyMzkwMjJ9.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

┌─────────────────────────────────────────────────────────────────┐
│  Header (Base64)    │  Payload (Base64)   │  Signature         │
├─────────────────────┼─────────────────────┼─────────────────────┤
│  {                  │  {                  │  HMACSHA256(        │
│    "alg": "HS256",  │    "sub": "123",    │    base64(header) + │
│    "typ": "JWT"     │    "name": "John",  │    "." +            │
│  }                  │    "role": "admin", │    base64(payload), │
│                     │    "exp": 16500..., │    secret           │
│                     │    "iat": 16490...  │  )                  │
│                     │  }                  │                     │
└─────────────────────┴─────────────────────┴─────────────────────┘

Idempotency

Why Idempotency Matters

┌─────────────────────────────────────────────────────────────────┐
│                    The Problem                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Client                              Server                     │
│     │                                   │                       │
│     │──── POST /charge (card) ─────────►│                      │
│     │                                   │ Process payment       │
│     │         ❌ Network timeout ❌      │ $100 charged         │
│     │◄─── [Connection lost] ────────────│                      │
│     │                                   │                       │
│     │ "Did it work? Let me retry..."   │                       │
│     │                                   │                       │
│     │──── POST /charge (card) ─────────►│                      │
│     │                                   │ Process payment       │
│     │◄─── 200 OK ───────────────────────│ $100 charged AGAIN!  │
│                                                                 │
│  Customer charged $200 instead of $100!                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Idempotency Key Pattern

┌─────────────────────────────────────────────────────────────────┐
│                    Idempotency Key Solution                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Request:                                                       │
│  POST /v1/charges                                               │
│  Idempotency-Key: unique-request-id-123                         │
│  Content-Type: application/json                                 │
│                                                                 │
│  { "amount": 100, "currency": "USD", ... }                     │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   Server Logic                          │   │
│  │                                                         │   │
│  │  1. Check if idempotency_key exists in Redis/DB        │   │
│  │     → If exists: return cached response                │   │
│  │     → If not: continue                                  │   │
│  │                                                         │   │
│  │  2. Store idempotency_key with status "processing"     │   │
│  │                                                         │   │
│  │  3. Process the request                                 │   │
│  │                                                         │   │
│  │  4. Store response with idempotency_key               │   │
│  │     (TTL: 24 hours)                                    │   │
│  │                                                         │   │
│  │  5. Return response                                     │   │
│  │                                                         │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Idempotency Implementation

import redis.asyncio as redis
from fastapi import FastAPI, Request, HTTPException, Depends, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, Any, Callable
from dataclasses import dataclass
from datetime import timedelta
import json
import hashlib
from enum import Enum

class IdempotencyStatus(Enum):
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class IdempotencyRecord:
    status: IdempotencyStatus
    status_code: Optional[int] = None
    response_body: Optional[str] = None
    request_hash: Optional[str] = None

class IdempotencyStore:
    """Redis-based idempotency key storage"""
    
    def __init__(self, redis_client: redis.Redis, ttl: timedelta = timedelta(hours=24)):
        self.redis = redis_client
        self.ttl = ttl
        self.key_prefix = "idempotency"
    
    def _key(self, idempotency_key: str) -> str:
        return f"{self.key_prefix}:{idempotency_key}"
    
    async def get(self, idempotency_key: str) -> Optional[IdempotencyRecord]:
        """Get existing idempotency record"""
        data = await self.redis.hgetall(self._key(idempotency_key))
        if not data:
            return None
        
        return IdempotencyRecord(
            status=IdempotencyStatus(data.get(b"status", b"").decode()),
            status_code=int(data[b"status_code"]) if b"status_code" in data else None,
            response_body=data.get(b"response_body", b"").decode() or None,
            request_hash=data.get(b"request_hash", b"").decode() or None
        )
    
    async def start_processing(
        self, 
        idempotency_key: str, 
        request_hash: str
    ) -> bool:
        """
        Mark key as processing (atomic).
        Returns True if we acquired the lock, False if already exists.
        """
        key = self._key(idempotency_key)
        
        # Use SETNX-like behavior with HSETNX
        acquired = await self.redis.hsetnx(key, "status", IdempotencyStatus.PROCESSING.value)
        
        if acquired:
            await self.redis.hset(key, "request_hash", request_hash)
            await self.redis.expire(key, int(self.ttl.total_seconds()))
            return True
        
        return False
    
    async def complete(
        self,
        idempotency_key: str,
        status_code: int,
        response_body: str
    ):
        """Mark request as completed with response"""
        key = self._key(idempotency_key)
        
        await self.redis.hset(key, mapping={
            "status": IdempotencyStatus.COMPLETED.value,
            "status_code": str(status_code),
            "response_body": response_body
        })
        await self.redis.expire(key, int(self.ttl.total_seconds()))
    
    async def fail(self, idempotency_key: str):
        """Mark request as failed (allows retry)"""
        await self.redis.delete(self._key(idempotency_key))


def compute_request_hash(body: bytes, path: str, method: str) -> str:
    """Create hash of request for conflict detection"""
    content = f"{method}:{path}:{body.decode()}"
    return hashlib.sha256(content.encode()).hexdigest()


class IdempotencyError(HTTPException):
    pass


def require_idempotency(
    methods: list[str] = ["POST", "PUT", "PATCH"],
    header_name: str = "Idempotency-Key"
):
    """
    Dependency that enforces idempotency for mutating operations.
    """
    
    async def dependency(
        request: Request,
        idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key")
    ):
        if request.method not in methods:
            return None
        
        if not idempotency_key:
            raise HTTPException(
                status_code=400,
                detail={
                    "code": "MISSING_IDEMPOTENCY_KEY",
                    "message": f"Header '{header_name}' is required for {request.method} requests"
                }
            )
        
        store: IdempotencyStore = request.app.state.idempotency_store
        body = await request.body()
        request_hash = compute_request_hash(body, request.url.path, request.method)
        
        # Check for existing record
        existing = await store.get(idempotency_key)
        
        if existing:
            # Check if same request
            if existing.request_hash != request_hash:
                raise HTTPException(
                    status_code=422,
                    detail={
                        "code": "IDEMPOTENCY_KEY_REUSED",
                        "message": "Idempotency key was used with different request parameters"
                    }
                )
            
            if existing.status == IdempotencyStatus.PROCESSING:
                raise HTTPException(
                    status_code=409,
                    detail={
                        "code": "REQUEST_IN_PROGRESS",
                        "message": "A request with this idempotency key is currently being processed"
                    }
                )
            
            if existing.status == IdempotencyStatus.COMPLETED:
                # Return cached response
                return JSONResponse(
                    status_code=existing.status_code,
                    content=json.loads(existing.response_body),
                    headers={"Idempotency-Replay": "true"}
                )
        
        # Try to start processing
        acquired = await store.start_processing(idempotency_key, request_hash)
        
        if not acquired:
            # Race condition - another request got there first
            raise HTTPException(
                status_code=409,
                detail={
                    "code": "REQUEST_IN_PROGRESS",
                    "message": "A request with this idempotency key is currently being processed"
                }
            )
        
        # Store key in request state for response capture
        request.state.idempotency_key = idempotency_key
        
        return None
    
    return Depends(dependency)


# Middleware to capture response
class IdempotencyMiddleware:
    def __init__(self, app, store: IdempotencyStore):
        self.app = app
        self.store = store
    
    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            return await self.app(scope, receive, send)
        
        request = Request(scope, receive, send)
        idempotency_key = getattr(request.state, "idempotency_key", None)
        
        if not idempotency_key:
            return await self.app(scope, receive, send)
        
        # Capture response
        response_body = []
        response_status = [200]
        
        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                response_status[0] = message["status"]
            elif message["type"] == "http.response.body":
                response_body.append(message.get("body", b""))
            await send(message)
        
        try:
            await self.app(scope, receive, send_wrapper)
            
            # Store successful response
            body = b"".join(response_body).decode()
            await self.store.complete(
                idempotency_key,
                response_status[0],
                body
            )
            
        except Exception as e:
            # Clear on failure to allow retry
            await self.store.fail(idempotency_key)
            raise


# Usage
app = FastAPI()

@app.on_event("startup")
async def startup():
    app.state.redis = redis.from_url("redis://localhost:6379")
    app.state.idempotency_store = IdempotencyStore(app.state.redis)

app.add_middleware(IdempotencyMiddleware, store=app.state.idempotency_store)

@app.post("/v1/charges", dependencies=[require_idempotency()])
async def create_charge(request: Request, charge: ChargeRequest):
    """
    Create a payment charge.
    Requires Idempotency-Key header for safe retries.
    """
    result = await payment_service.charge(
        amount=charge.amount,
        currency=charge.currency,
        source=charge.source
    )
    
    return {"success": True, "data": result}

API Documentation

OpenAPI/Swagger Example

openapi: 3.0.0
info:
  title: User API
  version: 1.0.0
  description: API for managing users

servers:
  - url: https://api.example.com/v1

paths:
  /users:
    get:
      summary: List all users
      parameters:
        - name: page
          in: query
          schema:
            type: integer
            default: 1
        - name: limit
          in: query
          schema:
            type: integer
            default: 20
            maximum: 100
      responses:
        '200':
          description: Successful response
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/UserList'
    post:
      summary: Create a user
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/CreateUser'
      responses:
        '201':
          description: User created
        '400':
          description: Validation error

components:
  schemas:
    User:
      type: object
      properties:
        id:
          type: integer
        name:
          type: string
        email:
          type: string
          format: email

Best Practices Summary

CategoryBest Practice
URLsUse nouns, plural resources, kebab-case
MethodsUse correct HTTP methods (GET=read, POST=create, etc.)
Status CodesReturn appropriate codes (201 for create, 204 for delete)
ErrorsConsistent error format with codes and messages
VersioningPlan for it from day 1, use URL path for public APIs
PaginationUse cursor-based for large datasets
Rate LimitingImplement early, return helpful headers
IdempotencyRequire for all non-read operations
DocumentationAuto-generate from code when possible
Interview Tip: When designing an API in an interview, start by identifying resources, define endpoints, discuss authentication, mention rate limiting, and talk about error handling. This shows comprehensive API design thinking.