Why API Design Matters
APIs are contracts between services. Good API design leads to:- Developer Experience - Easy to use and understand
- Maintainability - Evolve without breaking clients
- Performance - Efficient data transfer
- Security - Protected resources
REST API Design
Resource-Based URLs
Copy
┌─────────────────────────────────────────────────────────────────┐
│ RESTful URL Design │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Good] (Resource-based): │
│ ───────────────────────── │
│ GET /users → List all users │
│ POST /users → Create a user │
│ GET /users/123 → Get user 123 │
│ PUT /users/123 → Replace user 123 │
│ PATCH /users/123 → Update user 123 │
│ DELETE /users/123 → Delete user 123 │
│ │
│ GET /users/123/orders → Get orders for user 123 │
│ POST /users/123/orders → Create order for user 123 │
│ │
│ [Bad] (Action-based): │
│ ───────────────────────── │
│ GET /getUsers │
│ POST /createUser │
│ POST /deleteUser/123 │
│ GET /getUserOrders?userId=123 │
│ │
└─────────────────────────────────────────────────────────────────┘
Query Parameters
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Query Parameters │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Filtering: │
│ GET /products?category=electronics&price_min=100&in_stock=true │
│ │
│ Pagination: │
│ GET /users?page=2&per_page=20 │
│ GET /users?offset=40&limit=20 │
│ GET /users?cursor=eyJpZCI6MTIzfQ== │
│ │
│ Sorting: │
│ GET /products?sort=price (ascending) │
│ GET /products?sort=-price (descending) │
│ GET /products?sort=category,-price (multiple) │
│ │
│ Field Selection: │
│ GET /users/123?fields=id,name,email │
│ │
│ Search: │
│ GET /products?q=laptop │
│ │
└─────────────────────────────────────────────────────────────────┘
Response Structure
- Python (FastAPI)
- JavaScript (Express)
Copy
from fastapi import FastAPI, HTTPException, Query, Path, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List, Generic, TypeVar
from datetime import datetime
from uuid import uuid4
T = TypeVar('T')
# ============================================
# Response Models
# ============================================
class MetaInfo(BaseModel):
request_id: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
class PaginationInfo(BaseModel):
page: int
per_page: int
total: int
total_pages: int
next_cursor: Optional[str] = None
prev_cursor: Optional[str] = None
class ApiResponse(BaseModel, Generic[T]):
success: bool = True
data: T
meta: MetaInfo
class PaginatedResponse(BaseModel, Generic[T]):
success: bool = True
data: List[T]
pagination: PaginationInfo
meta: MetaInfo
class ErrorDetail(BaseModel):
field: Optional[str] = None
message: str
code: str
class ErrorResponse(BaseModel):
success: bool = False
error: dict
meta: MetaInfo
# ============================================
# Domain Models
# ============================================
class UserBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
email: Optional[EmailStr] = None
class User(UserBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# ============================================
# API Implementation
# ============================================
app = FastAPI(title="User API", version="1.0.0")
# Request ID middleware
@app.middleware("http")
async def add_request_id(request, call_next):
request_id = str(uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# Custom exception handler
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(
error={
"code": exc.detail.get("code", "ERROR"),
"message": exc.detail.get("message", str(exc.detail)),
"details": exc.detail.get("details", [])
},
meta=MetaInfo(request_id=request.state.request_id)
).dict()
)
# List users with pagination
@app.get("/users", response_model=PaginatedResponse[User])
async def list_users(
request: Request,
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
sort: str = Query("created_at", regex="^-?(name|email|created_at)$"),
search: Optional[str] = Query(None, min_length=1)
):
# Parse sort direction
sort_desc = sort.startswith("-")
sort_field = sort.lstrip("-")
# Build query (pseudo-code)
query = db.query(UserModel)
if search:
query = query.filter(UserModel.name.ilike(f"%{search}%"))
total = query.count()
users = query.order_by(
getattr(UserModel, sort_field).desc() if sort_desc else getattr(UserModel, sort_field)
).offset((page - 1) * per_page).limit(per_page).all()
return PaginatedResponse(
data=users,
pagination=PaginationInfo(
page=page,
per_page=per_page,
total=total,
total_pages=(total + per_page - 1) // per_page
),
meta=MetaInfo(request_id=request.state.request_id)
)
# Get single user
@app.get("/users/{user_id}", response_model=ApiResponse[User])
async def get_user(
request: Request,
user_id: int = Path(..., ge=1)
):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(
status_code=404,
detail={
"code": "USER_NOT_FOUND",
"message": f"User with id {user_id} not found"
}
)
return ApiResponse(
data=user,
meta=MetaInfo(request_id=request.state.request_id)
)
# Create user
@app.post("/users", response_model=ApiResponse[User], status_code=201)
async def create_user(
request: Request,
user_data: UserCreate
):
# Check for existing email
existing = db.query(UserModel).filter(UserModel.email == user_data.email).first()
if existing:
raise HTTPException(
status_code=400,
detail={
"code": "VALIDATION_ERROR",
"message": "Email already registered",
"details": [{"field": "email", "message": "This email is already in use"}]
}
)
user = UserModel(**user_data.dict())
db.add(user)
db.commit()
db.refresh(user)
return ApiResponse(
data=user,
meta=MetaInfo(request_id=request.state.request_id)
)
# Update user (partial)
@app.patch("/users/{user_id}", response_model=ApiResponse[User])
async def update_user(
request: Request,
user_id: int = Path(..., ge=1),
user_data: UserUpdate = None
):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail={"code": "USER_NOT_FOUND", "message": "User not found"})
update_data = user_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
user.updated_at = datetime.utcnow()
db.commit()
db.refresh(user)
return ApiResponse(
data=user,
meta=MetaInfo(request_id=request.state.request_id)
)
# Delete user
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int = Path(..., ge=1)):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail={"code": "USER_NOT_FOUND", "message": "User not found"})
db.delete(user)
db.commit()
return None # 204 No Content
Copy
const express = require('express');
const { body, query, param, validationResult } = require('express-validator');
const { v4: uuidv4 } = require('uuid');
const app = express();
app.use(express.json());
// ============================================
// Response Helpers
// ============================================
class ApiResponse {
static success(data, meta = {}) {
return {
success: true,
data,
meta: {
request_id: meta.requestId,
timestamp: new Date().toISOString()
}
};
}
static paginated(data, pagination, meta = {}) {
return {
success: true,
data,
pagination,
meta: {
request_id: meta.requestId,
timestamp: new Date().toISOString()
}
};
}
static error(code, message, details = [], meta = {}) {
return {
success: false,
error: { code, message, details },
meta: {
request_id: meta.requestId,
timestamp: new Date().toISOString()
}
};
}
}
// ============================================
// Middleware
// ============================================
// Request ID
function requestIdMiddleware(req, res, next) {
req.requestId = uuidv4();
res.setHeader('X-Request-ID', req.requestId);
next();
}
// Validation error handler
function validate(validations) {
return async (req, res, next) => {
await Promise.all(validations.map(v => v.run(req)));
const errors = validationResult(req);
if (errors.isEmpty()) {
return next();
}
const details = errors.array().map(err => ({
field: err.path,
message: err.msg
}));
res.status(400).json(
ApiResponse.error('VALIDATION_ERROR', 'Validation failed', details, { requestId: req.requestId })
);
};
}
// Async error wrapper
const asyncHandler = (fn) => (req, res, next) => {
Promise.resolve(fn(req, res, next)).catch(next);
};
app.use(requestIdMiddleware);
// ============================================
// Routes
// ============================================
// List users with pagination
app.get('/users',
validate([
query('page').optional().isInt({ min: 1 }).toInt(),
query('per_page').optional().isInt({ min: 1, max: 100 }).toInt(),
query('sort').optional().matches(/^-?(name|email|created_at)$/)
]),
asyncHandler(async (req, res) => {
const page = req.query.page || 1;
const perPage = req.query.per_page || 20;
const sort = req.query.sort || 'created_at';
const search = req.query.search;
// Parse sort
const sortDesc = sort.startsWith('-');
const sortField = sort.replace('-', '');
// Build query (pseudo-code with Prisma/Sequelize)
const where = search ? { name: { contains: search } } : {};
const [users, total] = await Promise.all([
User.findMany({
where,
orderBy: { [sortField]: sortDesc ? 'desc' : 'asc' },
skip: (page - 1) * perPage,
take: perPage
}),
User.count({ where })
]);
res.json(ApiResponse.paginated(
users,
{
page,
per_page: perPage,
total,
total_pages: Math.ceil(total / perPage)
},
{ requestId: req.requestId }
));
})
);
// Get single user
app.get('/users/:id',
validate([
param('id').isInt({ min: 1 }).toInt()
]),
asyncHandler(async (req, res) => {
const user = await User.findUnique({ where: { id: req.params.id } });
if (!user) {
return res.status(404).json(
ApiResponse.error('USER_NOT_FOUND', `User with id ${req.params.id} not found`, [], { requestId: req.requestId })
);
}
res.json(ApiResponse.success(user, { requestId: req.requestId }));
})
);
// Create user
app.post('/users',
validate([
body('name').isString().trim().isLength({ min: 1, max: 100 }),
body('email').isEmail().normalizeEmail(),
body('password').isString().isLength({ min: 8 })
]),
asyncHandler(async (req, res) => {
const { name, email, password } = req.body;
// Check existing email
const existing = await User.findUnique({ where: { email } });
if (existing) {
return res.status(400).json(
ApiResponse.error(
'VALIDATION_ERROR',
'Email already registered',
[{ field: 'email', message: 'This email is already in use' }],
{ requestId: req.requestId }
)
);
}
const hashedPassword = await bcrypt.hash(password, 10);
const user = await User.create({
data: { name, email, password: hashedPassword }
});
res.status(201).json(ApiResponse.success(user, { requestId: req.requestId }));
})
);
// Update user (partial)
app.patch('/users/:id',
validate([
param('id').isInt({ min: 1 }).toInt(),
body('name').optional().isString().trim().isLength({ min: 1, max: 100 }),
body('email').optional().isEmail().normalizeEmail()
]),
asyncHandler(async (req, res) => {
const user = await User.findUnique({ where: { id: req.params.id } });
if (!user) {
return res.status(404).json(
ApiResponse.error('USER_NOT_FOUND', 'User not found', [], { requestId: req.requestId })
);
}
const updatedUser = await User.update({
where: { id: req.params.id },
data: { ...req.body, updated_at: new Date() }
});
res.json(ApiResponse.success(updatedUser, { requestId: req.requestId }));
})
);
// Delete user
app.delete('/users/:id',
validate([param('id').isInt({ min: 1 }).toInt()]),
asyncHandler(async (req, res) => {
const user = await User.findUnique({ where: { id: req.params.id } });
if (!user) {
return res.status(404).json(
ApiResponse.error('USER_NOT_FOUND', 'User not found', [], { requestId: req.requestId })
);
}
await User.delete({ where: { id: req.params.id } });
res.status(204).send();
})
);
// Global error handler
app.use((err, req, res, next) => {
console.error(err);
res.status(500).json(
ApiResponse.error('INTERNAL_ERROR', 'An unexpected error occurred', [], { requestId: req.requestId })
);
});
app.listen(3000, () => console.log('API running on port 3000'));
Response JSON Examples
Copy
// Success Response
{
"success": true,
"data": {
"id": 123,
"name": "John Doe",
"email": "[email protected]",
"created_at": "2024-01-15T10:30:00Z"
},
"meta": {
"request_id": "req_abc123"
}
}
// List Response with Pagination
{
"success": true,
"data": [
{ "id": 1, "name": "User 1" },
{ "id": 2, "name": "User 2" }
],
"pagination": {
"page": 1,
"per_page": 20,
"total": 150,
"total_pages": 8,
"next_cursor": "eyJpZCI6MjB9"
}
}
// Error Response
{
"success": false,
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid email format",
"details": [
{
"field": "email",
"message": "Must be a valid email address"
}
]
},
"meta": {
"request_id": "req_def456"
}
}
Pagination Strategies
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Pagination Comparison │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Offset Pagination │
│ GET /users?offset=40&limit=20 │
│ SQL: SELECT * FROM users LIMIT 20 OFFSET 40 │
│ + Simple, supports jumping to any page │
│ - Slow for large offsets (scans skipped rows) │
│ - Inconsistent with concurrent writes │
│ │
│ 2. Cursor Pagination │
│ GET /users?cursor=eyJpZCI6MTIzfQ==&limit=20 │
│ SQL: SELECT * FROM users WHERE id > 123 LIMIT 20 │
│ + Consistent, fast for any position │
│ + Handles concurrent writes well │
│ - Can't jump to arbitrary pages │
│ - Cursor can be complex for multi-column sorts │
│ │
│ 3. Keyset Pagination (cursor variant) │
│ GET /users?after_id=123&limit=20 │
│ + Simple cursor format │
│ + Works with indexes │
│ │
│ Recommendation: Use cursor for large datasets │
│ │
└─────────────────────────────────────────────────────────────────┘
GraphQL
GraphQL vs REST
Copy
REST GraphQL
─────────────────────── ───────────────────────
GET /users/123 query {
→ All user fields user(id: 123) {
id
GET /users/123/posts name
→ All posts posts {
title
GET /users/123/followers }
→ All followers followersCount
}
3 requests }
Over-fetching data
1 request
Exact data needed
GraphQL Schema Example
Copy
type User {
id: ID!
name: String!
email: String!
posts: [Post!]!
followers: [User!]!
followersCount: Int!
createdAt: DateTime!
}
type Post {
id: ID!
title: String!
content: String!
author: User!
comments: [Comment!]!
likes: Int!
}
type Query {
user(id: ID!): User
users(limit: Int, offset: Int): [User!]!
post(id: ID!): Post
feed(userId: ID!, limit: Int): [Post!]!
}
type Mutation {
createUser(input: CreateUserInput!): User!
updateUser(id: ID!, input: UpdateUserInput!): User!
deleteUser(id: ID!): Boolean!
createPost(input: CreatePostInput!): Post!
}
type Subscription {
postCreated(userId: ID!): Post!
commentAdded(postId: ID!): Comment!
}
GraphQL Resolver Implementation
- Python (Strawberry)
- JavaScript (Apollo Server)
Copy
import strawberry
from strawberry.types import Info
from strawberry.dataloader import DataLoader
from typing import List, Optional
from datetime import datetime
import asyncio
# ============================================
# DataLoaders (Solve N+1 Problem)
# ============================================
async def load_users(keys: List[int]) -> List["UserType"]:
"""Batch load users by IDs"""
users = await User.filter(id__in=keys)
user_map = {u.id: u for u in users}
return [user_map.get(key) for key in keys]
async def load_posts_by_author(keys: List[int]) -> List[List["PostType"]]:
"""Batch load posts for multiple authors"""
posts = await Post.filter(author_id__in=keys)
posts_by_author = {}
for post in posts:
if post.author_id not in posts_by_author:
posts_by_author[post.author_id] = []
posts_by_author[post.author_id].append(post)
return [posts_by_author.get(key, []) for key in keys]
async def load_followers_count(keys: List[int]) -> List[int]:
"""Batch load follower counts"""
counts = await Follow.filter(following_id__in=keys).values('following_id').annotate(count=Count('id'))
count_map = {c['following_id']: c['count'] for c in counts}
return [count_map.get(key, 0) for key in keys]
def get_dataloaders() -> dict:
return {
"user_loader": DataLoader(load_fn=load_users),
"posts_loader": DataLoader(load_fn=load_posts_by_author),
"followers_count_loader": DataLoader(load_fn=load_followers_count)
}
# ============================================
# Types
# ============================================
@strawberry.type
class UserType:
id: strawberry.ID
name: str
email: str
created_at: datetime
@strawberry.field
async def posts(self, info: Info) -> List["PostType"]:
"""Lazily loaded posts using DataLoader"""
loader = info.context["posts_loader"]
return await loader.load(self.id)
@strawberry.field
async def followers_count(self, info: Info) -> int:
"""Efficient batch-loaded follower count"""
loader = info.context["followers_count_loader"]
return await loader.load(self.id)
@strawberry.field
async def followers(
self,
info: Info,
limit: int = 10,
offset: int = 0
) -> List["UserType"]:
"""Paginated followers list"""
follows = await Follow.filter(following_id=self.id).offset(offset).limit(limit)
follower_ids = [f.follower_id for f in follows]
loader = info.context["user_loader"]
return await asyncio.gather(*[loader.load(id) for id in follower_ids])
@strawberry.type
class PostType:
id: strawberry.ID
title: str
content: str
author_id: strawberry.Private[int] # Hidden from schema
likes: int
created_at: datetime
@strawberry.field
async def author(self, info: Info) -> UserType:
loader = info.context["user_loader"]
return await loader.load(self.author_id)
@strawberry.field
async def comments(
self,
info: Info,
limit: int = 20
) -> List["CommentType"]:
return await Comment.filter(post_id=self.id).limit(limit)
# ============================================
# Inputs
# ============================================
@strawberry.input
class CreateUserInput:
name: str
email: str
password: str
@strawberry.input
class UpdateUserInput:
name: Optional[str] = None
email: Optional[str] = None
@strawberry.input
class CreatePostInput:
title: str
content: str
# ============================================
# Query & Mutation
# ============================================
@strawberry.type
class Query:
@strawberry.field
async def user(self, info: Info, id: strawberry.ID) -> Optional[UserType]:
loader = info.context["user_loader"]
return await loader.load(int(id))
@strawberry.field
async def users(
self,
limit: int = 20,
offset: int = 0,
search: Optional[str] = None
) -> List[UserType]:
query = User.all()
if search:
query = query.filter(name__icontains=search)
return await query.offset(offset).limit(limit)
@strawberry.field
async def feed(
self,
user_id: strawberry.ID,
limit: int = 20,
cursor: Optional[str] = None
) -> List[PostType]:
"""User's feed with cursor pagination"""
query = Post.filter(author_id=int(user_id))
if cursor:
# Decode cursor (base64 encoded post ID)
last_id = int(base64.b64decode(cursor).decode())
query = query.filter(id__lt=last_id)
return await query.order_by("-created_at").limit(limit)
@strawberry.type
class Mutation:
@strawberry.mutation
async def create_user(self, input: CreateUserInput) -> UserType:
# Validate email uniqueness
existing = await User.filter(email=input.email).first()
if existing:
raise ValueError("Email already registered")
user = await User.create(
name=input.name,
email=input.email,
password=hash_password(input.password)
)
return user
@strawberry.mutation
async def update_user(
self,
id: strawberry.ID,
input: UpdateUserInput,
info: Info
) -> UserType:
# Authorization check
current_user = info.context["current_user"]
if current_user.id != int(id):
raise PermissionError("Not authorized")
update_data = {k: v for k, v in input.__dict__.items() if v is not None}
await User.filter(id=int(id)).update(**update_data)
return await User.get(id=int(id))
@strawberry.mutation
async def create_post(
self,
input: CreatePostInput,
info: Info
) -> PostType:
current_user = info.context["current_user"]
post = await Post.create(
title=input.title,
content=input.content,
author_id=current_user.id
)
# Publish to subscribers
await info.context["pubsub"].publish(
f"post_created:{current_user.id}",
post
)
return post
# ============================================
# Subscription
# ============================================
@strawberry.type
class Subscription:
@strawberry.subscription
async def post_created(
self,
info: Info,
user_id: strawberry.ID
) -> PostType:
async for post in info.context["pubsub"].subscribe(f"post_created:{user_id}"):
yield post
# ============================================
# Schema Setup
# ============================================
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription
)
# FastAPI integration
from strawberry.fastapi import GraphQLRouter
async def get_context():
return {
**get_dataloaders(),
"current_user": get_current_user(),
"pubsub": pubsub
}
graphql_app = GraphQLRouter(schema, context_getter=get_context)
app.include_router(graphql_app, prefix="/graphql")
Copy
const { ApolloServer, gql } = require('apollo-server-express');
const DataLoader = require('dataloader');
// ============================================
// Type Definitions
// ============================================
const typeDefs = gql`
type User {
id: ID!
name: String!
email: String!
posts: [Post!]!
followers(limit: Int, offset: Int): [User!]!
followersCount: Int!
createdAt: DateTime!
}
type Post {
id: ID!
title: String!
content: String!
author: User!
comments(limit: Int): [Comment!]!
likes: Int!
createdAt: DateTime!
}
type Comment {
id: ID!
content: String!
author: User!
post: Post!
}
input CreateUserInput {
name: String!
email: String!
password: String!
}
input UpdateUserInput {
name: String
email: String
}
input CreatePostInput {
title: String!
content: String!
}
type Query {
user(id: ID!): User
users(limit: Int, offset: Int, search: String): [User!]!
post(id: ID!): Post
feed(userId: ID!, limit: Int, cursor: String): [Post!]!
}
type Mutation {
createUser(input: CreateUserInput!): User!
updateUser(id: ID!, input: UpdateUserInput!): User!
deleteUser(id: ID!): Boolean!
createPost(input: CreatePostInput!): Post!
}
type Subscription {
postCreated(userId: ID!): Post!
commentAdded(postId: ID!): Comment!
}
`;
// ============================================
// DataLoaders (Batch & Cache)
// ============================================
function createLoaders() {
return {
// Batch load users by IDs
userLoader: new DataLoader(async (ids) => {
const users = await User.findMany({
where: { id: { in: ids.map(id => parseInt(id)) } }
});
const userMap = new Map(users.map(u => [u.id.toString(), u]));
return ids.map(id => userMap.get(id) || null);
}),
// Batch load posts by author IDs
postsByAuthorLoader: new DataLoader(async (authorIds) => {
const posts = await Post.findMany({
where: { authorId: { in: authorIds.map(id => parseInt(id)) } }
});
const postsByAuthor = new Map();
posts.forEach(post => {
const key = post.authorId.toString();
if (!postsByAuthor.has(key)) {
postsByAuthor.set(key, []);
}
postsByAuthor.get(key).push(post);
});
return authorIds.map(id => postsByAuthor.get(id) || []);
}),
// Batch load follower counts
followersCountLoader: new DataLoader(async (userIds) => {
const counts = await Follow.groupBy({
by: ['followingId'],
where: { followingId: { in: userIds.map(id => parseInt(id)) } },
_count: { id: true }
});
const countMap = new Map(
counts.map(c => [c.followingId.toString(), c._count.id])
);
return userIds.map(id => countMap.get(id) || 0);
})
};
}
// ============================================
// Resolvers
// ============================================
const resolvers = {
Query: {
user: async (_, { id }, { loaders }) => {
return loaders.userLoader.load(id);
},
users: async (_, { limit = 20, offset = 0, search }) => {
const where = search
? { name: { contains: search, mode: 'insensitive' } }
: {};
return User.findMany({
where,
skip: offset,
take: limit,
orderBy: { createdAt: 'desc' }
});
},
feed: async (_, { userId, limit = 20, cursor }) => {
const where = { authorId: parseInt(userId) };
if (cursor) {
// Decode cursor (base64 encoded post ID)
const lastId = parseInt(Buffer.from(cursor, 'base64').toString());
where.id = { lt: lastId };
}
return Post.findMany({
where,
take: limit,
orderBy: { createdAt: 'desc' }
});
}
},
User: {
posts: async (user, _, { loaders }) => {
return loaders.postsByAuthorLoader.load(user.id.toString());
},
followersCount: async (user, _, { loaders }) => {
return loaders.followersCountLoader.load(user.id.toString());
},
followers: async (user, { limit = 10, offset = 0 }, { loaders }) => {
const follows = await Follow.findMany({
where: { followingId: user.id },
skip: offset,
take: limit
});
return Promise.all(
follows.map(f => loaders.userLoader.load(f.followerId.toString()))
);
}
},
Post: {
author: async (post, _, { loaders }) => {
return loaders.userLoader.load(post.authorId.toString());
},
comments: async (post, { limit = 20 }) => {
return Comment.findMany({
where: { postId: post.id },
take: limit,
orderBy: { createdAt: 'desc' }
});
}
},
Mutation: {
createUser: async (_, { input }) => {
// Check email uniqueness
const existing = await User.findUnique({ where: { email: input.email } });
if (existing) {
throw new Error('Email already registered');
}
const hashedPassword = await bcrypt.hash(input.password, 10);
return User.create({
data: {
name: input.name,
email: input.email,
password: hashedPassword
}
});
},
updateUser: async (_, { id, input }, { currentUser }) => {
// Authorization
if (currentUser.id !== parseInt(id)) {
throw new Error('Not authorized');
}
const data = {};
if (input.name) data.name = input.name;
if (input.email) data.email = input.email;
data.updatedAt = new Date();
return User.update({
where: { id: parseInt(id) },
data
});
},
createPost: async (_, { input }, { currentUser, pubsub }) => {
const post = await Post.create({
data: {
title: input.title,
content: input.content,
authorId: currentUser.id
}
});
// Publish to subscribers
pubsub.publish(`POST_CREATED_${currentUser.id}`, {
postCreated: post
});
return post;
}
},
Subscription: {
postCreated: {
subscribe: (_, { userId }, { pubsub }) => {
return pubsub.asyncIterator(`POST_CREATED_${userId}`);
}
},
commentAdded: {
subscribe: (_, { postId }, { pubsub }) => {
return pubsub.asyncIterator(`COMMENT_ADDED_${postId}`);
}
}
}
};
// ============================================
// Server Setup
// ============================================
const { PubSub } = require('graphql-subscriptions');
const pubsub = new PubSub();
const server = new ApolloServer({
typeDefs,
resolvers,
context: ({ req }) => ({
loaders: createLoaders(),
currentUser: req.user, // From auth middleware
pubsub
}),
plugins: [
// Query complexity analysis
{
requestDidStart: () => ({
didResolveOperation({ request, document }) {
const complexity = getComplexity({
schema: server.schema,
query: document,
variables: request.variables,
estimators: [
fieldExtensionsEstimator(),
simpleEstimator({ defaultComplexity: 1 })
]
});
if (complexity > 1000) {
throw new Error('Query too complex');
}
}
})
}
]
});
async function startServer() {
await server.start();
server.applyMiddleware({ app });
// WebSocket for subscriptions
const httpServer = createServer(app);
server.installSubscriptionHandlers(httpServer);
httpServer.listen(4000, () => {
console.log(`🚀 Server ready at http://localhost:4000${server.graphqlPath}`);
console.log(`🚀 Subscriptions at ws://localhost:4000${server.subscriptionsPath}`);
});
}
startServer();
GraphQL Trade-offs
GraphQL Pros
- Fetch exactly what you need
- Single endpoint
- Strong typing
- Self-documenting (introspection)
- Great for complex, nested data
- Reduces over/under-fetching
GraphQL Cons
- Caching is harder (no HTTP cache)
- N+1 query problem
- Rate limiting complexity
- File uploads are awkward
- Learning curve
- Performance monitoring harder
N+1 Problem & DataLoader
Copy
┌─────────────────────────────────────────────────────────────────┐
│ N+1 Problem in GraphQL │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Query: │
│ query { │
│ posts(limit: 10) { │
│ title │
│ author { name } ← Fetches author for EACH post │
│ } │
│ } │
│ │
│ [Bad] Without DataLoader (11 queries): │
│ 1. SELECT * FROM posts LIMIT 10 │
│ 2. SELECT * FROM users WHERE id = 1 │
│ 3. SELECT * FROM users WHERE id = 2 │
│ ... (N more queries) │
│ │
│ [Good] With DataLoader (2 queries): │
│ 1. SELECT * FROM posts LIMIT 10 │
│ 2. SELECT * FROM users WHERE id IN (1, 2, 3, ...) │
│ │
│ DataLoader batches requests within the same tick │
│ │
└─────────────────────────────────────────────────────────────────┘
API Versioning
Versioning Strategies
Copy
┌─────────────────────────────────────────────────────────────────┐
│ API Versioning Strategies │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. URL Path Versioning (Most Common) │
│ GET /v1/users/123 │
│ GET /v2/users/123 │
│ + Clear, easy to implement │
│ - URL pollution, hard to deprecate │
│ │
│ 2. Query Parameter │
│ GET /users/123?version=2 │
│ + Optional, backwards compatible │
│ - Can be missed, caching issues │
│ │
│ 3. Header Versioning │
│ GET /users/123 │
│ Header: Accept: application/vnd.api+json;version=2 │
│ + Clean URLs, semantic │
│ - Hidden, harder to test │
│ │
│ 4. Content Negotiation │
│ Header: Accept: application/vnd.company.api.v2+json │
│ + RESTful, flexible │
│ - Complex, client overhead │
│ │
│ Recommendation: URL path for public APIs, headers for internal│
│ │
└─────────────────────────────────────────────────────────────────┘
Version Migration Strategy
Copy
Timeline for API Version Deprecation
─────────────────────────────────────────────────────────
v1 Launch v2 Launch v1 Deprecated v1 Sunset
│ │ │ │
│◄──────────►│◄────────────►│◄────────────►│
│ Active │ Migration │ Warning │
│ │ Period │ Period │
│ │ (6 months) │ (3 months) │
Communication:
- Announce v2 with migration guide
- Add deprecation headers to v1
- Email users with timelines
- Provide breaking change logs
- Offer migration support
Rate Limiting
Common Algorithms
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Rate Limiting Algorithms │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Fixed Window │
│ ┌─────────────────┬─────────────────┐ │
│ │ Window 1 │ Window 2 │ │
│ │ 100 requests │ 100 requests │ │
│ │ 00:00-01:00 │ 01:00-02:00 │ │
│ └─────────────────┴─────────────────┘ │
│ ⚠️ Burst at window boundary (200 req in 2 seconds) │
│ │
│ 2. Sliding Window Log │
│ Track timestamp of each request │
│ Count requests in last N seconds │
│ + Accurate, - Memory intensive │
│ │
│ 3. Sliding Window Counter │
│ Weighted average of current + previous window │
│ + Memory efficient, + Smooth │
│ │
│ 4. Token Bucket │
│ ┌─────────────────────┐ │
│ │ Bucket (capacity) │ ← Tokens added at fixed rate │
│ │ ████████░░░░░░░░ │ │
│ │ (8/15 tokens) │ │
│ └──────────┬──────────┘ │
│ │ │
│ Request takes 1 token │
│ + Allows bursts up to capacity │
│ │
│ 5. Leaky Bucket │
│ Requests queue and process at fixed rate │
│ + Smooth output rate │
│ - No burst handling │
│ │
└─────────────────────────────────────────────────────────────────┘
Rate Limit Headers
Copy
HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 998
X-RateLimit-Reset: 1640000000
X-RateLimit-Reset-After: 3600
# When rate limited:
HTTP/1.1 429 Too Many Requests
Retry-After: 60
Content-Type: application/json
{
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "Rate limit exceeded. Try again in 60 seconds.",
"retry_after": 60
}
}
Distributed Rate Limiting
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Distributed Rate Limiting with Redis │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ Redis │ Central counter │
│ └────┬────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ API 1 │ │ API 2 │ │ API 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Lua Script (atomic): │
│ local current = redis.call('INCR', key) │
│ if current == 1 then │
│ redis.call('EXPIRE', key, window_seconds) │
│ end │
│ return current <= limit │
│ │
└─────────────────────────────────────────────────────────────────┘
Rate Limiting Implementation
- Python (FastAPI)
- JavaScript (Express)
Copy
import redis.asyncio as redis
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from functools import wraps
from typing import Optional, Callable
from dataclasses import dataclass
from enum import Enum
import time
import hashlib
class RateLimitAlgorithm(Enum):
FIXED_WINDOW = "fixed_window"
SLIDING_WINDOW = "sliding_window"
TOKEN_BUCKET = "token_bucket"
@dataclass
class RateLimitConfig:
requests: int
window_seconds: int
algorithm: RateLimitAlgorithm = RateLimitAlgorithm.SLIDING_WINDOW
key_prefix: str = "ratelimit"
class RateLimiter:
"""Production-ready distributed rate limiter using Redis"""
# Lua script for atomic sliding window
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- Count current entries
local current = redis.call('ZCARD', key)
if current < limit then
-- Add new request
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, window)
return {1, limit - current - 1, window}
else
-- Get time until oldest entry expires
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retry_after = oldest[2] + window - now
return {0, 0, retry_after}
end
"""
# Lua script for token bucket
TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
-- Calculate tokens to add
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate) * 2)
return {1, math.floor(tokens), 0}
else
local retry_after = (1 - tokens) / rate
return {0, 0, retry_after}
end
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self._sliding_window_sha = None
self._token_bucket_sha = None
async def init_scripts(self):
"""Load Lua scripts into Redis"""
self._sliding_window_sha = await self.redis.script_load(
self.SLIDING_WINDOW_SCRIPT
)
self._token_bucket_sha = await self.redis.script_load(
self.TOKEN_BUCKET_SCRIPT
)
async def is_allowed(
self,
identifier: str,
config: RateLimitConfig
) -> tuple[bool, dict]:
"""Check if request is allowed under rate limit"""
key = f"{config.key_prefix}:{identifier}"
now = time.time()
if config.algorithm == RateLimitAlgorithm.SLIDING_WINDOW:
result = await self.redis.evalsha(
self._sliding_window_sha,
1, key,
now, config.window_seconds, config.requests
)
elif config.algorithm == RateLimitAlgorithm.TOKEN_BUCKET:
rate = config.requests / config.window_seconds
result = await self.redis.evalsha(
self._token_bucket_sha,
1, key,
now, rate, config.requests
)
else: # Fixed window
result = await self._fixed_window(key, config)
allowed, remaining, retry_after = result
return bool(allowed), {
"limit": config.requests,
"remaining": int(remaining),
"reset": int(now + config.window_seconds),
"retry_after": int(retry_after) if not allowed else None
}
async def _fixed_window(
self,
key: str,
config: RateLimitConfig
) -> tuple:
"""Simple fixed window rate limiting"""
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, config.window_seconds)
if current <= config.requests:
return (1, config.requests - current, 0)
else:
ttl = await self.redis.ttl(key)
return (0, 0, ttl)
# FastAPI dependency
def rate_limit(
requests: int = 100,
window_seconds: int = 60,
key_func: Optional[Callable[[Request], str]] = None
):
"""Rate limiting dependency with customizable key extraction"""
config = RateLimitConfig(
requests=requests,
window_seconds=window_seconds
)
async def dependency(request: Request):
rate_limiter: RateLimiter = request.app.state.rate_limiter
# Extract identifier (IP, user ID, API key, etc.)
if key_func:
identifier = key_func(request)
else:
# Default: use IP + endpoint
ip = request.client.host
path = request.url.path
identifier = hashlib.md5(f"{ip}:{path}".encode()).hexdigest()
allowed, info = await rate_limiter.is_allowed(identifier, config)
# Add rate limit headers to response
request.state.rate_limit_headers = {
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["reset"])
}
if not allowed:
raise HTTPException(
status_code=429,
detail={
"code": "RATE_LIMIT_EXCEEDED",
"message": f"Rate limit exceeded. Retry after {info['retry_after']} seconds.",
"retry_after": info["retry_after"]
},
headers={
"Retry-After": str(info["retry_after"]),
**request.state.rate_limit_headers
}
)
return info
return Depends(dependency)
# Usage
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url("redis://localhost:6379")
app.state.rate_limiter = RateLimiter(app.state.redis)
await app.state.rate_limiter.init_scripts()
# Global rate limit
@app.get("/api/data", dependencies=[rate_limit(requests=100, window_seconds=60)])
async def get_data():
return {"data": "value"}
# Custom key (per-user rate limit)
def user_key(request: Request) -> str:
return f"user:{request.state.user_id}"
@app.get("/api/premium", dependencies=[rate_limit(requests=1000, window_seconds=60, key_func=user_key)])
async def premium_endpoint():
return {"data": "premium"}
# Tiered rate limits
@app.get("/api/search")
async def search(
q: str,
request: Request,
_free: dict = rate_limit(requests=10, window_seconds=60),
_burst: dict = rate_limit(requests=5, window_seconds=1) # Also limit bursts
):
return {"results": []}
Copy
const Redis = require('ioredis');
const crypto = require('crypto');
class RateLimiter {
constructor(redis) {
this.redis = redis;
this.scripts = {};
}
async init() {
// Sliding window Lua script
this.scripts.slidingWindow = await this.redis.script('LOAD', `
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local current = redis.call('ZCARD', key)
if current < limit then
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, window)
return {1, limit - current - 1, window}
else
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retry_after = oldest[2] + window - now
return {0, 0, retry_after}
end
`);
// Token bucket Lua script
this.scripts.tokenBucket = await this.redis.script('LOAD', `
local key = KEYS[1]
local now = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate) * 2)
return {1, math.floor(tokens), 0}
else
local retry_after = (1 - tokens) / rate
return {0, 0, retry_after}
end
`);
}
async isAllowed(identifier, options = {}) {
const {
requests = 100,
windowSeconds = 60,
algorithm = 'sliding_window',
keyPrefix = 'ratelimit'
} = options;
const key = `${keyPrefix}:${identifier}`;
const now = Date.now() / 1000;
let result;
if (algorithm === 'sliding_window') {
result = await this.redis.evalsha(
this.scripts.slidingWindow,
1, key,
now, windowSeconds, requests
);
} else if (algorithm === 'token_bucket') {
const rate = requests / windowSeconds;
result = await this.redis.evalsha(
this.scripts.tokenBucket,
1, key,
now, rate, requests
);
} else {
// Fixed window fallback
result = await this.fixedWindow(key, requests, windowSeconds);
}
const [allowed, remaining, retryAfter] = result;
return {
allowed: !!allowed,
limit: requests,
remaining: parseInt(remaining),
reset: Math.floor(now + windowSeconds),
retryAfter: allowed ? null : Math.ceil(retryAfter)
};
}
async fixedWindow(key, limit, windowSeconds) {
const current = await this.redis.incr(key);
if (current === 1) {
await this.redis.expire(key, windowSeconds);
}
if (current <= limit) {
return [1, limit - current, 0];
} else {
const ttl = await this.redis.ttl(key);
return [0, 0, ttl];
}
}
}
// Express middleware factory
function rateLimit(options = {}) {
const {
requests = 100,
windowSeconds = 60,
keyGenerator = (req) => {
const ip = req.ip || req.connection.remoteAddress;
const path = req.path;
return crypto.createHash('md5').update(`${ip}:${path}`).digest('hex');
},
skip = () => false,
handler = null
} = options;
return async (req, res, next) => {
// Skip rate limiting if needed
if (skip(req)) {
return next();
}
const rateLimiter = req.app.get('rateLimiter');
const identifier = keyGenerator(req);
try {
const result = await rateLimiter.isAllowed(identifier, {
requests,
windowSeconds
});
// Set rate limit headers
res.set({
'X-RateLimit-Limit': result.limit,
'X-RateLimit-Remaining': result.remaining,
'X-RateLimit-Reset': result.reset
});
if (!result.allowed) {
res.set('Retry-After', result.retryAfter);
if (handler) {
return handler(req, res, next, result);
}
return res.status(429).json({
success: false,
error: {
code: 'RATE_LIMIT_EXCEEDED',
message: `Rate limit exceeded. Retry after ${result.retryAfter} seconds.`,
retry_after: result.retryAfter
}
});
}
next();
} catch (error) {
// On Redis failure, allow the request (fail open)
console.error('Rate limiter error:', error);
next();
}
};
}
// Tiered rate limiting based on user plan
function tieredRateLimit() {
return async (req, res, next) => {
const user = req.user;
let config;
switch (user?.plan) {
case 'enterprise':
config = { requests: 10000, windowSeconds: 60 };
break;
case 'pro':
config = { requests: 1000, windowSeconds: 60 };
break;
default:
config = { requests: 100, windowSeconds: 60 };
}
return rateLimit({
...config,
keyGenerator: (req) => `user:${req.user?.id || req.ip}`
})(req, res, next);
};
}
// Setup
const app = require('express')();
const redis = new Redis('redis://localhost:6379');
const rateLimiter = new RateLimiter(redis);
async function start() {
await rateLimiter.init();
app.set('rateLimiter', rateLimiter);
// Global rate limit
app.use(rateLimit({ requests: 1000, windowSeconds: 60 }));
// Endpoint-specific limits
app.get('/api/search',
rateLimit({ requests: 30, windowSeconds: 60 }),
(req, res) => {
res.json({ results: [] });
}
);
// User-specific with tier
app.get('/api/data',
tieredRateLimit(),
(req, res) => {
res.json({ data: 'value' });
}
);
// Skip rate limiting for certain requests
app.get('/health',
rateLimit({ skip: () => true }),
(req, res) => res.json({ status: 'ok' })
);
app.listen(3000, () => console.log('Server running on port 3000'));
}
start();
Authentication & Authorization
Auth Patterns
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Authentication Methods │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. API Keys │
│ Header: X-API-Key: abc123 │
│ + Simple │
│ - Can't expire, no user context │
│ Use: Server-to-server, public APIs │
│ │
│ 2. Bearer Tokens (JWT) │
│ Header: Authorization: Bearer eyJhbGciOiJIUzI1... │
│ + Stateless, contains claims │
│ - Can't revoke until expiry │
│ Use: User authentication │
│ │
│ 3. OAuth 2.0 │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ User │───►│ Auth │───►│ App │ │
│ │ │◄───│ Server │◄───│ │ │
│ └────────┘ └────────┘ └────────┘ │
│ Use: Third-party access, SSO │
│ │
│ 4. Session Cookies │
│ Set-Cookie: session_id=abc123; HttpOnly; Secure │
│ + Simple, can revoke server-side │
│ - CSRF vulnerability, not for APIs │
│ │
└─────────────────────────────────────────────────────────────────┘
JWT Structure
Copy
Header.Payload.Signature
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4iLCJpYXQiOjE1MTYyMzkwMjJ9.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
┌─────────────────────────────────────────────────────────────────┐
│ Header (Base64) │ Payload (Base64) │ Signature │
├─────────────────────┼─────────────────────┼─────────────────────┤
│ { │ { │ HMACSHA256( │
│ "alg": "HS256", │ "sub": "123", │ base64(header) + │
│ "typ": "JWT" │ "name": "John", │ "." + │
│ } │ "role": "admin", │ base64(payload), │
│ │ "exp": 16500..., │ secret │
│ │ "iat": 16490... │ ) │
│ │ } │ │
└─────────────────────┴─────────────────────┴─────────────────────┘
Idempotency
Why Idempotency Matters
Copy
┌─────────────────────────────────────────────────────────────────┐
│ The Problem │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client Server │
│ │ │ │
│ │──── POST /charge (card) ─────────►│ │
│ │ │ Process payment │
│ │ ❌ Network timeout ❌ │ $100 charged │
│ │◄─── [Connection lost] ────────────│ │
│ │ │ │
│ │ "Did it work? Let me retry..." │ │
│ │ │ │
│ │──── POST /charge (card) ─────────►│ │
│ │ │ Process payment │
│ │◄─── 200 OK ───────────────────────│ $100 charged AGAIN! │
│ │
│ Customer charged $200 instead of $100! │
│ │
└─────────────────────────────────────────────────────────────────┘
Idempotency Key Pattern
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Idempotency Key Solution │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Request: │
│ POST /v1/charges │
│ Idempotency-Key: unique-request-id-123 │
│ Content-Type: application/json │
│ │
│ { "amount": 100, "currency": "USD", ... } │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Server Logic │ │
│ │ │ │
│ │ 1. Check if idempotency_key exists in Redis/DB │ │
│ │ → If exists: return cached response │ │
│ │ → If not: continue │ │
│ │ │ │
│ │ 2. Store idempotency_key with status "processing" │ │
│ │ │ │
│ │ 3. Process the request │ │
│ │ │ │
│ │ 4. Store response with idempotency_key │ │
│ │ (TTL: 24 hours) │ │
│ │ │ │
│ │ 5. Return response │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Idempotency Implementation
- Python (FastAPI)
- JavaScript (Express)
Copy
import redis.asyncio as redis
from fastapi import FastAPI, Request, HTTPException, Depends, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, Any, Callable
from dataclasses import dataclass
from datetime import timedelta
import json
import hashlib
from enum import Enum
class IdempotencyStatus(Enum):
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class IdempotencyRecord:
status: IdempotencyStatus
status_code: Optional[int] = None
response_body: Optional[str] = None
request_hash: Optional[str] = None
class IdempotencyStore:
"""Redis-based idempotency key storage"""
def __init__(self, redis_client: redis.Redis, ttl: timedelta = timedelta(hours=24)):
self.redis = redis_client
self.ttl = ttl
self.key_prefix = "idempotency"
def _key(self, idempotency_key: str) -> str:
return f"{self.key_prefix}:{idempotency_key}"
async def get(self, idempotency_key: str) -> Optional[IdempotencyRecord]:
"""Get existing idempotency record"""
data = await self.redis.hgetall(self._key(idempotency_key))
if not data:
return None
return IdempotencyRecord(
status=IdempotencyStatus(data.get(b"status", b"").decode()),
status_code=int(data[b"status_code"]) if b"status_code" in data else None,
response_body=data.get(b"response_body", b"").decode() or None,
request_hash=data.get(b"request_hash", b"").decode() or None
)
async def start_processing(
self,
idempotency_key: str,
request_hash: str
) -> bool:
"""
Mark key as processing (atomic).
Returns True if we acquired the lock, False if already exists.
"""
key = self._key(idempotency_key)
# Use SETNX-like behavior with HSETNX
acquired = await self.redis.hsetnx(key, "status", IdempotencyStatus.PROCESSING.value)
if acquired:
await self.redis.hset(key, "request_hash", request_hash)
await self.redis.expire(key, int(self.ttl.total_seconds()))
return True
return False
async def complete(
self,
idempotency_key: str,
status_code: int,
response_body: str
):
"""Mark request as completed with response"""
key = self._key(idempotency_key)
await self.redis.hset(key, mapping={
"status": IdempotencyStatus.COMPLETED.value,
"status_code": str(status_code),
"response_body": response_body
})
await self.redis.expire(key, int(self.ttl.total_seconds()))
async def fail(self, idempotency_key: str):
"""Mark request as failed (allows retry)"""
await self.redis.delete(self._key(idempotency_key))
def compute_request_hash(body: bytes, path: str, method: str) -> str:
"""Create hash of request for conflict detection"""
content = f"{method}:{path}:{body.decode()}"
return hashlib.sha256(content.encode()).hexdigest()
class IdempotencyError(HTTPException):
pass
def require_idempotency(
methods: list[str] = ["POST", "PUT", "PATCH"],
header_name: str = "Idempotency-Key"
):
"""
Dependency that enforces idempotency for mutating operations.
"""
async def dependency(
request: Request,
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key")
):
if request.method not in methods:
return None
if not idempotency_key:
raise HTTPException(
status_code=400,
detail={
"code": "MISSING_IDEMPOTENCY_KEY",
"message": f"Header '{header_name}' is required for {request.method} requests"
}
)
store: IdempotencyStore = request.app.state.idempotency_store
body = await request.body()
request_hash = compute_request_hash(body, request.url.path, request.method)
# Check for existing record
existing = await store.get(idempotency_key)
if existing:
# Check if same request
if existing.request_hash != request_hash:
raise HTTPException(
status_code=422,
detail={
"code": "IDEMPOTENCY_KEY_REUSED",
"message": "Idempotency key was used with different request parameters"
}
)
if existing.status == IdempotencyStatus.PROCESSING:
raise HTTPException(
status_code=409,
detail={
"code": "REQUEST_IN_PROGRESS",
"message": "A request with this idempotency key is currently being processed"
}
)
if existing.status == IdempotencyStatus.COMPLETED:
# Return cached response
return JSONResponse(
status_code=existing.status_code,
content=json.loads(existing.response_body),
headers={"Idempotency-Replay": "true"}
)
# Try to start processing
acquired = await store.start_processing(idempotency_key, request_hash)
if not acquired:
# Race condition - another request got there first
raise HTTPException(
status_code=409,
detail={
"code": "REQUEST_IN_PROGRESS",
"message": "A request with this idempotency key is currently being processed"
}
)
# Store key in request state for response capture
request.state.idempotency_key = idempotency_key
return None
return Depends(dependency)
# Middleware to capture response
class IdempotencyMiddleware:
def __init__(self, app, store: IdempotencyStore):
self.app = app
self.store = store
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
request = Request(scope, receive, send)
idempotency_key = getattr(request.state, "idempotency_key", None)
if not idempotency_key:
return await self.app(scope, receive, send)
# Capture response
response_body = []
response_status = [200]
async def send_wrapper(message):
if message["type"] == "http.response.start":
response_status[0] = message["status"]
elif message["type"] == "http.response.body":
response_body.append(message.get("body", b""))
await send(message)
try:
await self.app(scope, receive, send_wrapper)
# Store successful response
body = b"".join(response_body).decode()
await self.store.complete(
idempotency_key,
response_status[0],
body
)
except Exception as e:
# Clear on failure to allow retry
await self.store.fail(idempotency_key)
raise
# Usage
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url("redis://localhost:6379")
app.state.idempotency_store = IdempotencyStore(app.state.redis)
app.add_middleware(IdempotencyMiddleware, store=app.state.idempotency_store)
@app.post("/v1/charges", dependencies=[require_idempotency()])
async def create_charge(request: Request, charge: ChargeRequest):
"""
Create a payment charge.
Requires Idempotency-Key header for safe retries.
"""
result = await payment_service.charge(
amount=charge.amount,
currency=charge.currency,
source=charge.source
)
return {"success": True, "data": result}
Copy
const Redis = require('ioredis');
const crypto = require('crypto');
class IdempotencyStore {
constructor(redis, options = {}) {
this.redis = redis;
this.ttl = options.ttl || 86400; // 24 hours
this.prefix = options.prefix || 'idempotency';
}
key(idempotencyKey) {
return `${this.prefix}:${idempotencyKey}`;
}
async get(idempotencyKey) {
const data = await this.redis.hgetall(this.key(idempotencyKey));
if (!data || Object.keys(data).length === 0) {
return null;
}
return {
status: data.status,
statusCode: data.status_code ? parseInt(data.status_code) : null,
responseBody: data.response_body || null,
requestHash: data.request_hash || null
};
}
async startProcessing(idempotencyKey, requestHash) {
const key = this.key(idempotencyKey);
// Atomic: only set if not exists
const acquired = await this.redis.hsetnx(key, 'status', 'processing');
if (acquired) {
await this.redis.hset(key, 'request_hash', requestHash);
await this.redis.expire(key, this.ttl);
return true;
}
return false;
}
async complete(idempotencyKey, statusCode, responseBody) {
const key = this.key(idempotencyKey);
await this.redis.hmset(key, {
status: 'completed',
status_code: statusCode.toString(),
response_body: responseBody
});
await this.redis.expire(key, this.ttl);
}
async fail(idempotencyKey) {
await this.redis.del(this.key(idempotencyKey));
}
}
function computeRequestHash(body, path, method) {
const content = `${method}:${path}:${JSON.stringify(body)}`;
return crypto.createHash('sha256').update(content).digest('hex');
}
function idempotencyMiddleware(store, options = {}) {
const {
headerName = 'Idempotency-Key',
methods = ['POST', 'PUT', 'PATCH'],
required = true
} = options;
return async (req, res, next) => {
// Skip for non-mutating methods
if (!methods.includes(req.method)) {
return next();
}
const idempotencyKey = req.get(headerName);
// Check if key is required
if (!idempotencyKey) {
if (required) {
return res.status(400).json({
success: false,
error: {
code: 'MISSING_IDEMPOTENCY_KEY',
message: `Header '${headerName}' is required for ${req.method} requests`
}
});
}
return next();
}
const requestHash = computeRequestHash(req.body, req.path, req.method);
// Check for existing record
const existing = await store.get(idempotencyKey);
if (existing) {
// Verify same request
if (existing.requestHash !== requestHash) {
return res.status(422).json({
success: false,
error: {
code: 'IDEMPOTENCY_KEY_REUSED',
message: 'Idempotency key was used with different request parameters'
}
});
}
if (existing.status === 'processing') {
return res.status(409).json({
success: false,
error: {
code: 'REQUEST_IN_PROGRESS',
message: 'A request with this idempotency key is currently being processed'
}
});
}
if (existing.status === 'completed') {
// Return cached response
res.set('Idempotency-Replay', 'true');
return res
.status(existing.statusCode)
.json(JSON.parse(existing.responseBody));
}
}
// Try to acquire processing lock
const acquired = await store.startProcessing(idempotencyKey, requestHash);
if (!acquired) {
return res.status(409).json({
success: false,
error: {
code: 'REQUEST_IN_PROGRESS',
message: 'A request with this idempotency key is currently being processed'
}
});
}
// Store key for response capture
req.idempotencyKey = idempotencyKey;
// Intercept response
const originalJson = res.json.bind(res);
res.json = async function(body) {
try {
const bodyStr = JSON.stringify(body);
await store.complete(idempotencyKey, res.statusCode, bodyStr);
return originalJson(body);
} catch (error) {
console.error('Failed to store idempotency response:', error);
return originalJson(body);
}
};
// Handle errors
const originalEnd = res.end.bind(res);
res.end = async function(...args) {
if (res.statusCode >= 500) {
// Allow retry on server errors
await store.fail(idempotencyKey);
}
return originalEnd(...args);
};
next();
};
}
// Error handler for idempotency failures
function idempotencyErrorHandler(store) {
return async (err, req, res, next) => {
if (req.idempotencyKey) {
// Clear processing status on error
await store.fail(req.idempotencyKey);
}
next(err);
};
}
// Setup
const express = require('express');
const app = express();
const redis = new Redis('redis://localhost:6379');
const store = new IdempotencyStore(redis);
app.use(express.json());
app.use(idempotencyMiddleware(store));
// Charges endpoint
app.post('/v1/charges', async (req, res) => {
const { amount, currency, source } = req.body;
try {
const result = await paymentService.charge({
amount,
currency,
source
});
res.status(201).json({
success: true,
data: result
});
} catch (error) {
res.status(400).json({
success: false,
error: {
code: 'CHARGE_FAILED',
message: error.message
}
});
}
});
// Error handling
app.use(idempotencyErrorHandler(store));
app.listen(3000, () => console.log('Server running on port 3000'));
API Documentation
OpenAPI/Swagger Example
Copy
openapi: 3.0.0
info:
title: User API
version: 1.0.0
description: API for managing users
servers:
- url: https://api.example.com/v1
paths:
/users:
get:
summary: List all users
parameters:
- name: page
in: query
schema:
type: integer
default: 1
- name: limit
in: query
schema:
type: integer
default: 20
maximum: 100
responses:
'200':
description: Successful response
content:
application/json:
schema:
$ref: '#/components/schemas/UserList'
post:
summary: Create a user
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateUser'
responses:
'201':
description: User created
'400':
description: Validation error
components:
schemas:
User:
type: object
properties:
id:
type: integer
name:
type: string
email:
type: string
format: email
Best Practices Summary
| Category | Best Practice |
|---|---|
| URLs | Use nouns, plural resources, kebab-case |
| Methods | Use correct HTTP methods (GET=read, POST=create, etc.) |
| Status Codes | Return appropriate codes (201 for create, 204 for delete) |
| Errors | Consistent error format with codes and messages |
| Versioning | Plan for it from day 1, use URL path for public APIs |
| Pagination | Use cursor-based for large datasets |
| Rate Limiting | Implement early, return helpful headers |
| Idempotency | Require for all non-read operations |
| Documentation | Auto-generate from code when possible |
Interview Tip: When designing an API in an interview, start by identifying resources, define endpoints, discuss authentication, mention rate limiting, and talk about error handling. This shows comprehensive API design thinking.