Monolith vs Microservices
Architecture Comparison
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Monolith │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Single Application │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ User │ │ Order │ │Payment │ │ Ship │ │ │
│ │ │ Module │ │ Module │ │ Module │ │ Module │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ │ │ │ │
│ │ ┌─────────▼─────────┐ │ │
│ │ │ Shared Database │ │ │
│ │ └───────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ + Simple to develop, test, deploy │
│ - Scales as a unit, hard to modify │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Microservices │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ User │ │ Order │ │ Payment │ │Shipping │ │
│ │ Service │ │ Service │ │ Service │ │ Service │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ User DB │ │Order DB │ │ Stripe │ │ Ship DB │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ + Independent scaling, tech diversity, team autonomy │
│ - Distributed system complexity, operational overhead │
│ │
└─────────────────────────────────────────────────────────────────┘
When to Use Microservices
Use Microservices
- Large, complex applications
- Multiple teams working independently
- Parts need different scaling
- Different technology requirements
- Frequent, independent deployments
Avoid Microservices
- Small team (< 10 engineers)
- Simple domain
- Early-stage startup
- Unclear domain boundaries
- Team lacks distributed systems experience
Start with a Monolith: Most successful microservices evolved from monoliths. The domain boundaries became clear over time. Don’t start with microservices unless you have a clear reason.
Service Communication
Synchronous vs Asynchronous
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Communication Patterns │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Synchronous (Request-Response) │
│ ───────────────────────────── │
│ │
│ Order ─────► User ─────► Payment │
│ Service Service Service │
│ │ │ │ │
│ │◄──────────│◄──────────│ │
│ │
│ + Simple, immediate response │
│ - Cascading failures, tight coupling │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ Asynchronous (Event-Driven) │
│ ────────────────────────── │
│ │
│ Order ─────► Message Queue ─────► Payment │
│ Service │ Service │
│ │ │
│ └───────────────► Notification │
│ Service │
│ │
│ + Decoupled, resilient, scalable │
│ - Complex, eventual consistency │
│ │
└─────────────────────────────────────────────────────────────────┘
API Gateway Pattern
Copy
│ ┌─────────────────────┼─────────────────────┐ │
│ │ │ │ │
│ ┌────▼────┐ ┌─────▼─────┐ ┌────▼────┐ │
│ │ Users │ │ Orders │ │ Products │ │
│ │ Service │ │ Service │ │ Service │ │
│ └─────────┘ └───────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Popular Options: Kong, AWS API Gateway, Nginx, Traefik
Backend for Frontend (BFF)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ BFF Pattern │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web │ │ Mobile │ │ IoT │ │
│ │ Client │ │ App │ │ Device │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ Web │ │ Mobile │ │ IoT │ │
│ │ BFF │ │ BFF │ │ BFF │ │
│ │ (Rich) │ │ (Lean) │ │(Minimal)│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Services │ │
│ └─────────────────┘ │
│ │
│ Each BFF tailored for its client: │
│ - Web: Rich data, complex UIs │
│ - Mobile: Optimized payloads, less bandwidth │
│ - IoT: Minimal data, low power │
│ │
└─────────────────────────────────────────────────────────────────┘
Service Discovery
Client-Side vs Server-Side
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Service Discovery │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client-Side Discovery │
│ ──────────────────────── │
│ │
│ Client ───► Service Registry ───► "Order service at 1.2.3.4"│
│ │ │ │
│ └──────────────┴──────────► Order Service │
│ │
│ Client decides which instance to call │
│ Examples: Netflix Eureka, Consul client │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ Server-Side Discovery │
│ ───────────────────── │
│ │
│ Client ───► Load Balancer ───► Service Registry │
│ │ │ │
│ └────────────────────┘ │
│ │ │
│ ▼ │
│ Order Service │
│ │
│ Load balancer handles discovery │
│ Examples: AWS ALB, Kubernetes Services │
│ │
└─────────────────────────────────────────────────────────────────┘
Service Discovery Implementation
- Python
- JavaScript
Copy
import asyncio
import random
import time
import hashlib
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from abc import ABC, abstractmethod
import aiohttp
import logging
logger = logging.getLogger(__name__)
class ServiceStatus(Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DRAINING = "draining"
@dataclass
class ServiceInstance:
service_name: str
instance_id: str
host: str
port: int
metadata: Dict[str, str] = field(default_factory=dict)
status: ServiceStatus = ServiceStatus.HEALTHY
last_heartbeat: float = field(default_factory=time.time)
weight: int = 100
@property
def address(self) -> str:
return f"{self.host}:{self.port}"
class ServiceRegistry:
"""In-memory service registry with health checking"""
def __init__(self, heartbeat_timeout: float = 30.0):
self.services: Dict[str, Dict[str, ServiceInstance]] = {}
self.heartbeat_timeout = heartbeat_timeout
self._cleanup_task = None
def register(self, instance: ServiceInstance) -> None:
"""Register a service instance"""
if instance.service_name not in self.services:
self.services[instance.service_name] = {}
self.services[instance.service_name][instance.instance_id] = instance
logger.info(f"Registered {instance.service_name}/{instance.instance_id} at {instance.address}")
def deregister(self, service_name: str, instance_id: str) -> None:
"""Deregister a service instance"""
if service_name in self.services:
if instance_id in self.services[service_name]:
del self.services[service_name][instance_id]
logger.info(f"Deregistered {service_name}/{instance_id}")
def heartbeat(self, service_name: str, instance_id: str) -> bool:
"""Update heartbeat for an instance"""
if service_name in self.services:
if instance_id in self.services[service_name]:
instance = self.services[service_name][instance_id]
instance.last_heartbeat = time.time()
instance.status = ServiceStatus.HEALTHY
return True
return False
def get_instances(
self,
service_name: str,
healthy_only: bool = True
) -> List[ServiceInstance]:
"""Get all instances of a service"""
if service_name not in self.services:
return []
instances = list(self.services[service_name].values())
if healthy_only:
instances = [i for i in instances if i.status == ServiceStatus.HEALTHY]
return instances
async def start_cleanup(self) -> None:
"""Start background task to clean up dead instances"""
while True:
await asyncio.sleep(10)
self._cleanup_expired()
def _cleanup_expired(self) -> None:
"""Remove instances that haven't sent heartbeat"""
now = time.time()
for service_name in list(self.services.keys()):
for instance_id in list(self.services[service_name].keys()):
instance = self.services[service_name][instance_id]
if now - instance.last_heartbeat > self.heartbeat_timeout:
instance.status = ServiceStatus.UNHEALTHY
logger.warning(f"Instance {service_name}/{instance_id} marked unhealthy")
class LoadBalancer(ABC):
"""Abstract load balancer"""
@abstractmethod
def select(self, instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
pass
class RoundRobinLoadBalancer(LoadBalancer):
def __init__(self):
self.counters: Dict[str, int] = {}
def select(self, instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
if not instances:
return None
service_name = instances[0].service_name
if service_name not in self.counters:
self.counters[service_name] = 0
index = self.counters[service_name] % len(instances)
self.counters[service_name] += 1
return instances[index]
class WeightedRoundRobinLoadBalancer(LoadBalancer):
def __init__(self):
self.current_weights: Dict[str, Dict[str, int]] = {}
def select(self, instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
if not instances:
return None
service_name = instances[0].service_name
if service_name not in self.current_weights:
self.current_weights[service_name] = {}
weights = self.current_weights[service_name]
# Initialize weights
for instance in instances:
if instance.instance_id not in weights:
weights[instance.instance_id] = 0
# Add original weights
for instance in instances:
weights[instance.instance_id] += instance.weight
# Select instance with highest current weight
max_weight = -1
selected = None
for instance in instances:
if weights[instance.instance_id] > max_weight:
max_weight = weights[instance.instance_id]
selected = instance
# Decrease selected instance's weight
if selected:
total_weight = sum(i.weight for i in instances)
weights[selected.instance_id] -= total_weight
return selected
class ConsistentHashLoadBalancer(LoadBalancer):
"""Consistent hashing for sticky sessions"""
def __init__(self, replicas: int = 100):
self.replicas = replicas
self.ring: Dict[int, ServiceInstance] = {}
self.sorted_keys: List[int] = []
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def _build_ring(self, instances: List[ServiceInstance]) -> None:
self.ring.clear()
for instance in instances:
for i in range(self.replicas):
key = f"{instance.instance_id}:{i}"
hash_val = self._hash(key)
self.ring[hash_val] = instance
self.sorted_keys = sorted(self.ring.keys())
def select(
self,
instances: List[ServiceInstance],
key: str = None
) -> Optional[ServiceInstance]:
if not instances:
return None
self._build_ring(instances)
if not key:
key = str(random.random())
hash_val = self._hash(key)
# Find first node >= hash
for ring_key in self.sorted_keys:
if ring_key >= hash_val:
return self.ring[ring_key]
return self.ring[self.sorted_keys[0]]
class ServiceDiscoveryClient:
"""Client-side service discovery with load balancing"""
def __init__(
self,
registry: ServiceRegistry,
load_balancer: LoadBalancer = None,
cache_ttl: float = 30.0
):
self.registry = registry
self.load_balancer = load_balancer or RoundRobinLoadBalancer()
self.cache_ttl = cache_ttl
self.cache: Dict[str, tuple] = {} # (instances, timestamp)
def discover(self, service_name: str) -> Optional[ServiceInstance]:
"""Discover and select a service instance"""
instances = self._get_instances(service_name)
if not instances:
raise ServiceNotFoundException(f"No instances found for {service_name}")
return self.load_balancer.select(instances)
def _get_instances(self, service_name: str) -> List[ServiceInstance]:
"""Get instances with caching"""
if service_name in self.cache:
instances, timestamp = self.cache[service_name]
if time.time() - timestamp < self.cache_ttl:
return instances
instances = self.registry.get_instances(service_name)
self.cache[service_name] = (instances, time.time())
return instances
async def call(
self,
service_name: str,
path: str,
method: str = "GET",
**kwargs
) -> Any:
"""Make HTTP call to discovered service"""
instance = self.discover(service_name)
url = f"http://{instance.address}{path}"
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs) as response:
return await response.json()
class ServiceNotFoundException(Exception):
pass
# Self-registering service
class SelfRegisteringService:
"""Base class for services that self-register"""
def __init__(
self,
service_name: str,
host: str,
port: int,
registry: ServiceRegistry,
heartbeat_interval: float = 10.0
):
self.instance = ServiceInstance(
service_name=service_name,
instance_id=f"{host}:{port}:{random.randint(1000, 9999)}",
host=host,
port=port
)
self.registry = registry
self.heartbeat_interval = heartbeat_interval
self._heartbeat_task = None
async def start(self) -> None:
"""Register and start heartbeat"""
self.registry.register(self.instance)
self._heartbeat_task = asyncio.create_task(self._send_heartbeats())
async def stop(self) -> None:
"""Deregister and stop heartbeat"""
if self._heartbeat_task:
self._heartbeat_task.cancel()
self.registry.deregister(
self.instance.service_name,
self.instance.instance_id
)
async def _send_heartbeats(self) -> None:
while True:
await asyncio.sleep(self.heartbeat_interval)
self.registry.heartbeat(
self.instance.service_name,
self.instance.instance_id
)
# Usage example
async def main():
# Create registry
registry = ServiceRegistry()
# Register some services
registry.register(ServiceInstance(
service_name="order-service",
instance_id="order-1",
host="10.0.0.1",
port=8080,
weight=100
))
registry.register(ServiceInstance(
service_name="order-service",
instance_id="order-2",
host="10.0.0.2",
port=8080,
weight=50 # Less capacity
))
# Create discovery client
client = ServiceDiscoveryClient(
registry,
load_balancer=WeightedRoundRobinLoadBalancer()
)
# Discover and call service
for _ in range(10):
instance = client.discover("order-service")
print("Selected:", instance.address)
Copy
const crypto = require('crypto');
const EventEmitter = require('events');
// Service status enum
const ServiceStatus = {
HEALTHY: 'healthy',
UNHEALTHY: 'unhealthy',
DRAINING: 'draining'
};
class ServiceInstance {
constructor({
serviceName,
instanceId,
host,
port,
metadata = {},
weight = 100
}) {
this.serviceName = serviceName;
this.instanceId = instanceId;
this.host = host;
this.port = port;
this.metadata = metadata;
this.status = ServiceStatus.HEALTHY;
this.lastHeartbeat = Date.now();
this.weight = weight;
}
get address() {
return `${this.host}:${this.port}`;
}
}
class ServiceRegistry extends EventEmitter {
constructor(heartbeatTimeout = 30000) {
super();
this.services = new Map();
this.heartbeatTimeout = heartbeatTimeout;
this.cleanupInterval = null;
}
register(instance) {
if (!this.services.has(instance.serviceName)) {
this.services.set(instance.serviceName, new Map());
}
this.services.get(instance.serviceName).set(instance.instanceId, instance);
console.log(`Registered ${instance.serviceName}/${instance.instanceId} at ${instance.address}`);
this.emit('registered', instance);
}
deregister(serviceName, instanceId) {
if (this.services.has(serviceName)) {
const instances = this.services.get(serviceName);
const instance = instances.get(instanceId);
if (instance) {
instances.delete(instanceId);
console.log(`Deregistered ${serviceName}/${instanceId}`);
this.emit('deregistered', instance);
}
}
}
heartbeat(serviceName, instanceId) {
if (this.services.has(serviceName)) {
const instance = this.services.get(serviceName).get(instanceId);
if (instance) {
instance.lastHeartbeat = Date.now();
instance.status = ServiceStatus.HEALTHY;
return true;
}
}
return false;
}
getInstances(serviceName, healthyOnly = true) {
if (!this.services.has(serviceName)) {
return [];
}
let instances = Array.from(this.services.get(serviceName).values());
if (healthyOnly) {
instances = instances.filter(i => i.status === ServiceStatus.HEALTHY);
}
return instances;
}
startCleanup() {
this.cleanupInterval = setInterval(() => {
this.cleanupExpired();
}, 10000);
}
stopCleanup() {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
}
}
cleanupExpired() {
const now = Date.now();
for (const [serviceName, instances] of this.services) {
for (const [instanceId, instance] of instances) {
if (now - instance.lastHeartbeat > this.heartbeatTimeout) {
instance.status = ServiceStatus.UNHEALTHY;
console.warn(`Instance ${serviceName}/${instanceId} marked unhealthy`);
this.emit('unhealthy', instance);
}
}
}
}
}
// Load balancer implementations
class RoundRobinLoadBalancer {
constructor() {
this.counters = new Map();
}
select(instances) {
if (instances.length === 0) return null;
const serviceName = instances[0].serviceName;
const counter = (this.counters.get(serviceName) || 0);
const index = counter % instances.length;
this.counters.set(serviceName, counter + 1);
return instances[index];
}
}
class WeightedRoundRobinLoadBalancer {
constructor() {
this.currentWeights = new Map();
}
select(instances) {
if (instances.length === 0) return null;
const serviceName = instances[0].serviceName;
if (!this.currentWeights.has(serviceName)) {
this.currentWeights.set(serviceName, new Map());
}
const weights = this.currentWeights.get(serviceName);
// Initialize and add weights
for (const instance of instances) {
const current = weights.get(instance.instanceId) || 0;
weights.set(instance.instanceId, current + instance.weight);
}
// Select instance with highest weight
let selected = null;
let maxWeight = -1;
for (const instance of instances) {
const weight = weights.get(instance.instanceId);
if (weight > maxWeight) {
maxWeight = weight;
selected = instance;
}
}
// Decrease selected weight
if (selected) {
const totalWeight = instances.reduce((sum, i) => sum + i.weight, 0);
weights.set(
selected.instanceId,
weights.get(selected.instanceId) - totalWeight
);
}
return selected;
}
}
class ConsistentHashLoadBalancer {
constructor(replicas = 100) {
this.replicas = replicas;
this.ring = new Map();
this.sortedKeys = [];
}
hash(key) {
return parseInt(
crypto.createHash('md5').update(key).digest('hex').slice(0, 8),
16
);
}
buildRing(instances) {
this.ring.clear();
for (const instance of instances) {
for (let i = 0; i < this.replicas; i++) {
const key = `${instance.instanceId}:${i}`;
const hash = this.hash(key);
this.ring.set(hash, instance);
}
}
this.sortedKeys = Array.from(this.ring.keys()).sort((a, b) => a - b);
}
select(instances, key = null) {
if (instances.length === 0) return null;
this.buildRing(instances);
if (!key) {
key = String(Math.random());
}
const hash = this.hash(key);
// Find first node >= hash
for (const ringKey of this.sortedKeys) {
if (ringKey >= hash) {
return this.ring.get(ringKey);
}
}
return this.ring.get(this.sortedKeys[0]);
}
}
class ServiceDiscoveryClient {
constructor({ registry, loadBalancer = null, cacheTtl = 30000 }) {
this.registry = registry;
this.loadBalancer = loadBalancer || new RoundRobinLoadBalancer();
this.cacheTtl = cacheTtl;
this.cache = new Map();
}
discover(serviceName) {
const instances = this.getInstances(serviceName);
if (instances.length === 0) {
throw new Error(`No instances found for ${serviceName}`);
}
return this.loadBalancer.select(instances);
}
getInstances(serviceName) {
const cached = this.cache.get(serviceName);
if (cached && Date.now() - cached.timestamp < this.cacheTtl) {
return cached.instances;
}
const instances = this.registry.getInstances(serviceName);
this.cache.set(serviceName, { instances, timestamp: Date.now() });
return instances;
}
async call(serviceName, path, options = {}) {
const instance = this.discover(serviceName);
const url = `http://${instance.address}${path}`;
const response = await fetch(url, options);
return response.json();
}
}
// Self-registering service
class SelfRegisteringService {
constructor({
serviceName,
host,
port,
registry,
heartbeatInterval = 10000
}) {
this.instance = new ServiceInstance({
serviceName,
instanceId: `${host}:${port}:${Math.floor(Math.random() * 9000) + 1000}`,
host,
port
});
this.registry = registry;
this.heartbeatInterval = heartbeatInterval;
this.heartbeatTimer = null;
}
start() {
this.registry.register(this.instance);
this.heartbeatTimer = setInterval(() => {
this.registry.heartbeat(
this.instance.serviceName,
this.instance.instanceId
);
}, this.heartbeatInterval);
}
stop() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
this.registry.deregister(
this.instance.serviceName,
this.instance.instanceId
);
}
}
// Express integration
const express = require('express');
function createServiceDiscoveryMiddleware(client) {
return (serviceName) => async (req, res, next) => {
try {
req.serviceInstance = client.discover(serviceName);
next();
} catch (error) {
res.status(503).json({ error: `Service ${serviceName} unavailable` });
}
};
}
// Usage
const registry = new ServiceRegistry();
registry.startCleanup();
// Register services
registry.register(new ServiceInstance({
serviceName: 'order-service',
instanceId: 'order-1',
host: '10.0.0.1',
port: 8080,
weight: 100
}));
registry.register(new ServiceInstance({
serviceName: 'order-service',
instanceId: 'order-2',
host: '10.0.0.2',
port: 8080,
weight: 50
}));
// Create client
const client = new ServiceDiscoveryClient({
registry,
loadBalancer: new WeightedRoundRobinLoadBalancer()
});
// Use in Express
const app = express();
const discoverService = createServiceDiscoveryMiddleware(client);
app.get('/orders/:id', discoverService('order-service'), async (req, res) => {
const instance = req.serviceInstance;
// Forward request to discovered instance
const response = await fetch(`http://${instance.address}/orders/${req.params.id}`);
res.json(await response.json());
});
Service Registration
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Service Registration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Self-Registration: │
│ ┌─────────────┐ │
│ │ Service │──► Register on startup │
│ │ │──► Send heartbeats │
│ │ │──► Deregister on shutdown │
│ └─────────────┘ │
│ │
│ Third-Party Registration: │
│ ┌─────────────┐ ┌───────────────┐ │
│ │ Service │◄───│ Registrar │ (Sidecar/Agent) │
│ │ │ │ (monitors) │ │
│ └─────────────┘ └───────────────┘ │
│ │ │
│ ▼ │
│ Service Registry │
│ │
│ Popular Registries: Consul, etcd, ZooKeeper, Kubernetes DNS │
│ │
└─────────────────────────────────────────────────────────────────┘
Data Management
Database per Service
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Database per Service Pattern │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ User │ │ Order │ │ Product │ │
│ │ Service │ │ Service │ │ Service │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ │
│ │ PostgreSQL│ │ MongoDB │ │Elasticsearch│ │
│ │ (Users) │ │ (Orders) │ │ (Products) │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
│ + Independent scaling │
│ + Tech flexibility (right DB for the job) │
│ + Loose coupling │
│ │
│ - Cross-service queries are hard │
│ - Data consistency challenges │
│ - More infrastructure to manage │
│ │
└─────────────────────────────────────────────────────────────────┘
Saga Pattern for Distributed Transactions
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Saga Pattern │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Example: E-commerce Order Flow │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Create │──►│ Reserve │──►│ Charge │──►│ Ship │ │
│ │ Order │ │Inventory│ │ Payment │ │ Order │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └─────────┘ │
│ │ │ │ │
│ Compensating Actions (on failure): │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ Cancel │◄──│ Release │◄──│ Refund │ ← If shipping fails │
│ │ Order │ │Inventory│ │ Payment │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Choreography: Services publish/consume events │
│ Orchestration: Central saga coordinator │
│ │
└─────────────────────────────────────────────────────────────────┘
Saga Pattern Implementation
- Python
- JavaScript
Copy
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
from datetime import datetime
import uuid
import logging
logger = logging.getLogger(__name__)
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
timeout: float = 30.0
retries: int = 3
@dataclass
class StepResult:
step_name: str
success: bool
data: Any = None
error: Optional[str] = None
@dataclass
class Saga:
saga_id: str
name: str
steps: List[SagaStep]
status: SagaStatus = SagaStatus.PENDING
current_step: int = 0
context: Dict[str, Any] = field(default_factory=dict)
results: List[StepResult] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
class SagaOrchestrator:
"""
Central saga orchestrator for distributed transactions.
Handles step execution, compensation, and state management.
"""
def __init__(self, store: 'SagaStore' = None):
self.store = store or InMemorySagaStore()
async def execute(
self,
name: str,
steps: List[SagaStep],
context: Dict[str, Any] = None
) -> Saga:
"""Execute a saga"""
saga = Saga(
saga_id=str(uuid.uuid4()),
name=name,
steps=steps,
context=context or {}
)
await self.store.save(saga)
try:
saga.status = SagaStatus.RUNNING
await self._execute_steps(saga)
saga.status = SagaStatus.COMPLETED
except SagaStepFailed as e:
logger.error(f"Saga {saga.saga_id} step failed: {e}")
saga.status = SagaStatus.COMPENSATING
await self._compensate(saga)
saga.status = SagaStatus.COMPENSATED
except Exception as e:
logger.error(f"Saga {saga.saga_id} failed unexpectedly: {e}")
saga.status = SagaStatus.FAILED
finally:
saga.completed_at = datetime.now()
await self.store.save(saga)
return saga
async def _execute_steps(self, saga: Saga) -> None:
"""Execute saga steps in order"""
for i, step in enumerate(saga.steps):
saga.current_step = i
await self.store.save(saga)
result = await self._execute_step_with_retry(saga, step)
saga.results.append(result)
if not result.success:
raise SagaStepFailed(step.name, result.error)
# Store step result in context for next steps
saga.context[f"{step.name}_result"] = result.data
async def _execute_step_with_retry(
self,
saga: Saga,
step: SagaStep
) -> StepResult:
"""Execute a step with retries"""
last_error = None
for attempt in range(step.retries):
try:
async with asyncio.timeout(step.timeout):
result = await step.action(saga.context)
return StepResult(
step_name=step.name,
success=True,
data=result
)
except asyncio.TimeoutError:
last_error = f"Step timed out after {step.timeout}s"
except Exception as e:
last_error = str(e)
if attempt < step.retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
return StepResult(
step_name=step.name,
success=False,
error=last_error
)
async def _compensate(self, saga: Saga) -> None:
"""Execute compensating actions in reverse order"""
# Only compensate completed steps
completed_steps = saga.results[:saga.current_step]
for i in range(len(completed_steps) - 1, -1, -1):
step = saga.steps[i]
try:
logger.info(f"Compensating step: {step.name}")
async with asyncio.timeout(step.timeout):
await step.compensation(saga.context)
except Exception as e:
logger.error(f"Compensation failed for {step.name}: {e}")
# Continue compensating other steps
class SagaStepFailed(Exception):
def __init__(self, step_name: str, error: str):
self.step_name = step_name
self.error = error
super().__init__(f"Step '{step_name}' failed: {error}")
class SagaStore(ABC):
@abstractmethod
async def save(self, saga: Saga) -> None:
pass
@abstractmethod
async def get(self, saga_id: str) -> Optional[Saga]:
pass
class InMemorySagaStore(SagaStore):
def __init__(self):
self.sagas: Dict[str, Saga] = {}
async def save(self, saga: Saga) -> None:
self.sagas[saga.saga_id] = saga
async def get(self, saga_id: str) -> Optional[Saga]:
return self.sagas.get(saga_id)
# Example: Order processing saga
async def create_order_saga(order_data: dict) -> Saga:
orchestrator = SagaOrchestrator()
# Define saga steps
steps = [
SagaStep(
name="create_order",
action=lambda ctx: create_order(ctx["order_data"]),
compensation=lambda ctx: cancel_order(ctx["create_order_result"]["order_id"]),
timeout=10.0
),
SagaStep(
name="reserve_inventory",
action=lambda ctx: reserve_inventory(
ctx["create_order_result"]["order_id"],
ctx["order_data"]["items"]
),
compensation=lambda ctx: release_inventory(
ctx["create_order_result"]["order_id"]
),
timeout=15.0
),
SagaStep(
name="charge_payment",
action=lambda ctx: charge_payment(
ctx["create_order_result"]["order_id"],
ctx["order_data"]["amount"]
),
compensation=lambda ctx: refund_payment(
ctx["charge_payment_result"]["transaction_id"]
),
timeout=30.0,
retries=3
),
SagaStep(
name="ship_order",
action=lambda ctx: initiate_shipping(
ctx["create_order_result"]["order_id"]
),
compensation=lambda ctx: cancel_shipping(
ctx["ship_order_result"]["shipment_id"]
),
timeout=20.0
)
]
return await orchestrator.execute(
name="order_processing",
steps=steps,
context={"order_data": order_data}
)
# Service functions (would be actual service calls)
async def create_order(order_data: dict) -> dict:
return {"order_id": str(uuid.uuid4()), "status": "created"}
async def cancel_order(order_id: str) -> None:
logger.info(f"Cancelling order {order_id}")
async def reserve_inventory(order_id: str, items: list) -> dict:
return {"reservation_id": str(uuid.uuid4())}
async def release_inventory(order_id: str) -> None:
logger.info(f"Releasing inventory for order {order_id}")
async def charge_payment(order_id: str, amount: float) -> dict:
return {"transaction_id": str(uuid.uuid4()), "amount": amount}
async def refund_payment(transaction_id: str) -> None:
logger.info(f"Refunding transaction {transaction_id}")
async def initiate_shipping(order_id: str) -> dict:
return {"shipment_id": str(uuid.uuid4())}
async def cancel_shipping(shipment_id: str) -> None:
logger.info(f"Cancelling shipment {shipment_id}")
# Usage
async def main():
result = await create_order_saga({
"items": [{"product_id": "123", "quantity": 2}],
"amount": 99.99,
"customer_id": "cust_123"
})
print(f"Saga status: {result.status}")
print(f"Results: {result.results}")
Copy
const { v4: uuidv4 } = require('uuid');
const EventEmitter = require('events');
// Saga status
const SagaStatus = {
PENDING: 'pending',
RUNNING: 'running',
COMPLETED: 'completed',
COMPENSATING: 'compensating',
COMPENSATED: 'compensated',
FAILED: 'failed'
};
class SagaStep {
constructor({ name, action, compensation, timeout = 30000, retries = 3 }) {
this.name = name;
this.action = action;
this.compensation = compensation;
this.timeout = timeout;
this.retries = retries;
}
}
class Saga {
constructor(name, steps) {
this.sagaId = uuidv4();
this.name = name;
this.steps = steps;
this.status = SagaStatus.PENDING;
this.currentStep = 0;
this.context = {};
this.results = [];
this.createdAt = new Date();
this.completedAt = null;
}
}
class SagaOrchestrator extends EventEmitter {
constructor(store = null) {
super();
this.store = store || new InMemorySagaStore();
}
async execute(name, steps, initialContext = {}) {
const saga = new Saga(name, steps);
saga.context = { ...initialContext };
await this.store.save(saga);
this.emit('saga:started', saga);
try {
saga.status = SagaStatus.RUNNING;
await this.executeSteps(saga);
saga.status = SagaStatus.COMPLETED;
this.emit('saga:completed', saga);
} catch (error) {
console.error(`Saga ${saga.sagaId} step failed:`, error);
saga.status = SagaStatus.COMPENSATING;
this.emit('saga:compensating', saga);
await this.compensate(saga);
saga.status = SagaStatus.COMPENSATED;
this.emit('saga:compensated', saga);
} finally {
saga.completedAt = new Date();
await this.store.save(saga);
}
return saga;
}
async executeSteps(saga) {
for (let i = 0; i < saga.steps.length; i++) {
const step = saga.steps[i];
saga.currentStep = i;
await this.store.save(saga);
this.emit('step:started', { saga, step });
const result = await this.executeStepWithRetry(saga, step);
saga.results.push(result);
if (!result.success) {
this.emit('step:failed', { saga, step, result });
throw new SagaStepError(step.name, result.error);
}
// Store result for next steps
saga.context[`${step.name}_result`] = result.data;
this.emit('step:completed', { saga, step, result });
}
}
async executeStepWithRetry(saga, step) {
let lastError = null;
for (let attempt = 0; attempt < step.retries; attempt++) {
try {
const result = await this.withTimeout(
step.action(saga.context),
step.timeout
);
return {
stepName: step.name,
success: true,
data: result
};
} catch (error) {
lastError = error.message;
if (attempt < step.retries - 1) {
await this.sleep(Math.pow(2, attempt) * 1000);
}
}
}
return {
stepName: step.name,
success: false,
error: lastError
};
}
async compensate(saga) {
// Only compensate completed steps (in reverse order)
const completedSteps = saga.results.slice(0, saga.currentStep);
for (let i = completedSteps.length - 1; i >= 0; i--) {
const step = saga.steps[i];
try {
console.log(`Compensating step: ${step.name}`);
this.emit('compensation:started', { saga, step });
await this.withTimeout(
step.compensation(saga.context),
step.timeout
);
this.emit('compensation:completed', { saga, step });
} catch (error) {
console.error(`Compensation failed for ${step.name}:`, error);
this.emit('compensation:failed', { saga, step, error });
// Continue compensating other steps
}
}
}
withTimeout(promise, ms) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`Timeout after ${ms}ms`)), ms)
)
]);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
class SagaStepError extends Error {
constructor(stepName, error) {
super(`Step '${stepName}' failed: ${error}`);
this.name = 'SagaStepError';
this.stepName = stepName;
this.originalError = error;
}
}
class InMemorySagaStore {
constructor() {
this.sagas = new Map();
}
async save(saga) {
this.sagas.set(saga.sagaId, saga);
}
async get(sagaId) {
return this.sagas.get(sagaId);
}
async getByStatus(status) {
return Array.from(this.sagas.values()).filter(s => s.status === status);
}
}
// Example: Order processing saga
async function createOrderSaga(orderData) {
const orchestrator = new SagaOrchestrator();
// Add event listeners for monitoring
orchestrator.on('step:completed', ({ step }) => {
console.log(`✅ Step completed: ${step.name}`);
});
orchestrator.on('saga:compensated', (saga) => {
console.log(`⏪ Saga compensated: ${saga.sagaId}`);
});
const steps = [
new SagaStep({
name: 'create_order',
action: async (ctx) => {
return await orderService.create(ctx.orderData);
},
compensation: async (ctx) => {
await orderService.cancel(ctx.create_order_result.orderId);
},
timeout: 10000
}),
new SagaStep({
name: 'reserve_inventory',
action: async (ctx) => {
return await inventoryService.reserve(
ctx.create_order_result.orderId,
ctx.orderData.items
);
},
compensation: async (ctx) => {
await inventoryService.release(ctx.create_order_result.orderId);
},
timeout: 15000
}),
new SagaStep({
name: 'charge_payment',
action: async (ctx) => {
return await paymentService.charge(
ctx.create_order_result.orderId,
ctx.orderData.amount
);
},
compensation: async (ctx) => {
await paymentService.refund(ctx.charge_payment_result.transactionId);
},
timeout: 30000,
retries: 3
}),
new SagaStep({
name: 'ship_order',
action: async (ctx) => {
return await shippingService.initiate(ctx.create_order_result.orderId);
},
compensation: async (ctx) => {
await shippingService.cancel(ctx.ship_order_result.shipmentId);
},
timeout: 20000
})
];
return orchestrator.execute('order_processing', steps, { orderData });
}
// Mock services
const orderService = {
create: async (data) => ({ orderId: uuidv4(), status: 'created' }),
cancel: async (orderId) => console.log(`Cancelling order ${orderId}`)
};
const inventoryService = {
reserve: async (orderId, items) => ({ reservationId: uuidv4() }),
release: async (orderId) => console.log(`Releasing inventory for ${orderId}`)
};
const paymentService = {
charge: async (orderId, amount) => ({ transactionId: uuidv4(), amount }),
refund: async (txId) => console.log(`Refunding transaction ${txId}`)
};
const shippingService = {
initiate: async (orderId) => ({ shipmentId: uuidv4() }),
cancel: async (shipmentId) => console.log(`Cancelling shipment ${shipmentId}`)
};
// Express endpoint
const express = require('express');
const app = express();
app.use(express.json());
app.post('/orders', async (req, res) => {
try {
const saga = await createOrderSaga(req.body);
if (saga.status === SagaStatus.COMPLETED) {
res.status(201).json({
orderId: saga.context.create_order_result.orderId,
status: 'confirmed'
});
} else {
res.status(500).json({
error: 'Order processing failed',
sagaStatus: saga.status,
failedStep: saga.results.find(r => !r.success)?.stepName
});
}
} catch (error) {
res.status(500).json({ error: error.message });
}
});
CQRS (Command Query Responsibility Segregation)
Event Sourcing
Store all changes to application state as a sequence of events.Resilience Patterns
Circuit Breaker
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Circuit Breaker States │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ │ │
│ ▼ │ │
│ ┌────────┐ Failure ┌────────┐ Timeout ┌─────────┐ │
│ │ CLOSED │───threshold──►│ OPEN │─────────────►│HALF-OPEN│ │
│ │ │ │ │ │ │ │
│ └────────┘ └────────┘ └────┬────┘ │
│ ▲ │ │
│ │ Success │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ CLOSED: Requests pass through, count failures │
│ OPEN: Requests fail immediately, don't call service │
│ HALF-OPEN: Allow limited requests to test if service is back │
│ │
└─────────────────────────────────────────────────────────────────┘
Copy
# Python implementation using circuitbreaker library
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
def call_payment_service(order_id):
response = requests.post(
"https://payment-service/charge",
json={"order_id": order_id}
)
response.raise_for_status()
return response.json()
# Usage with fallback
try:
result = call_payment_service(order_id)
except CircuitBreakerError:
# Fallback: queue for later processing
queue_for_retry(order_id)
return {"status": "pending", "message": "Payment will be processed shortly"}
Bulkhead Pattern
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Bulkhead Pattern │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Without Bulkhead: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Shared Thread Pool (100) │ │
│ │ │ │
│ │ Product (fast) Order (slow) Payment (medium) │ │
│ │ │ │
│ │ Slow Order service exhausts all threads │ │
│ │ → Product & Payment also fail │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ With Bulkhead: │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Product │ │ Order │ │ Payment │ │
│ │ Pool (30) │ │ Pool (40) │ │ Pool (30) │ │
│ │ │ │ ██████████ │ │ │ │
│ │ Still │ │ (exhausted) │ │ Still │ │
│ │ working │ │ │ │ working │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Failure is isolated to Order service only │
│ │
└─────────────────────────────────────────────────────────────────┘
Retry with Backoff
Copy
import time
import random
from functools import wraps
def retry_with_exponential_backoff(
max_retries=3,
base_delay=1,
max_delay=60,
exponential_base=2,
jitter=True
):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries > max_retries:
raise
delay = min(
base_delay * (exponential_base ** (retries - 1)),
max_delay
)
if jitter:
delay = delay * (0.5 + random.random())
print(f"Retry {retries}/{max_retries} in {delay:.2f}s")
time.sleep(delay)
return wrapper
return decorator
@retry_with_exponential_backoff(max_retries=5)
def call_external_service():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()
Service Mesh
What is a Service Mesh?
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Service Mesh Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ │
│ │ Control Plane │ │
│ │ (Istio, Linkerd) │ │
│ │ - Config │ │
│ │ - Policies │ │
│ │ - Certificates │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌─────────────────────┼─────────────────────┐ │
│ │ │ │ │
│ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ │
│ │┌─────────┐│ │┌─────────┐│ │┌─────────┐│ │
│ ││ Sidecar ││◄───────►││ Sidecar ││◄───────►││ Sidecar ││ │
│ ││ (Envoy) ││ ││ (Envoy) ││ ││ (Envoy) ││ │
│ │└────┬────┘│ │└────┬────┘│ │└────┬────┘│ │
│ │ │ │ │ │ │ │ │ │ │
│ │┌────▼────┐│ │┌────▼────┐│ │┌────▼────┐│ │
│ ││ Service ││ ││ Service ││ ││ Service ││ │
│ ││ A ││ ││ B ││ ││ C ││ │
│ │└─────────┘│ │└─────────┘│ │└─────────┘│ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ Pod Pod Pod │
│ │
│ Data Plane: Sidecar proxies handle all traffic │
│ │
└─────────────────────────────────────────────────────────────────┘
Service Mesh Features
Traffic Management
- Load balancing
- A/B testing
- Canary deployments
- Traffic splitting
- Retries & timeouts
Security
- mTLS encryption
- Service-to-service auth
- Access policies
- Certificate management
Observability
- Distributed tracing
- Metrics collection
- Access logs
- Service topology
Resilience
- Circuit breaking
- Rate limiting
- Fault injection
- Health checks
Observability
The Three Pillars
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Observability Pillars │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LOGS │ │ METRICS │ │ TRACES │ │
│ │ │ │ │ │ │ │
│ │ What │ │ How much │ │ Where │ │
│ │ happened? │ │ & how fast? │ │ did it go? │ │
│ │ │ │ │ │ │ │
│ │ • Events │ │ • Counters │ │ • Spans │ │
│ │ • Errors │ │ • Gauges │ │ • Context │ │
│ │ • Debug │ │ • Histograms│ │ • Latency │ │
│ │ │ │ │ │ │ │
│ │ ELK Stack │ │ Prometheus │ │ Jaeger │ │
│ │ Splunk │ │ Grafana │ │ Zipkin │ │
│ │ DataDog │ │ DataDog │ │ DataDog │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Distributed Tracing
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Distributed Trace Example │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Request: GET /orders/123 │
│ │
│ Trace ID: abc-123 │
│ ├── API Gateway (50ms) │
│ │ └── Order Service (200ms) │
│ │ ├── User Service (30ms) │
│ │ │ └── User DB (15ms) │
│ │ ├── Product Service (80ms) │
│ │ │ └── Product DB (40ms) │
│ │ └── Order DB (50ms) │
│ │ │
│ Total: 250ms │
│ │
│ Waterfall View: │
│ ───────────────────────────────────────────────────────────── │
│ API Gateway [██████] │
│ Order Service [████████████████████████████████████████] │
│ User Service [████] │
│ User DB [██] │
│ Product Service [████████████] │
│ Product DB [████████] │
│ Order DB [██████████] │
│ │
│ 0ms 50ms 100ms 150ms 200ms 250ms │
│ │
└─────────────────────────────────────────────────────────────────┘
Deployment Strategies
Common Strategies
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Deployment Strategies │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Blue-Green Deployment │
│ ┌────────────┐ ┌────────────┐ │
│ │ Blue │ │ Green │ │
│ │ (v1) │ │ (v2) │ │
│ │ Current │ ──► │ New │ │
│ └────────────┘ └────────────┘ │
│ Switch traffic instantly, easy rollback │
│ │
│ 2. Canary Deployment │
│ ┌────────────┐ ┌────────────┐ │
│ │ v1 │ ─── │ v2 │ │
│ │ 95% │ │ 5% │ │
│ └────────────┘ └────────────┘ │
│ Gradually increase v2 traffic, monitor for issues │
│ │
│ 3. Rolling Update │
│ [v1] [v1] [v1] [v1] │
│ [v2] [v1] [v1] [v1] → Update one at a time │
│ [v2] [v2] [v1] [v1] │
│ [v2] [v2] [v2] [v1] │
│ [v2] [v2] [v2] [v2] │
│ │
│ 4. A/B Testing │
│ Users A-M → v1 (control) │
│ Users N-Z → v2 (experiment) │
│ Measure business metrics, not just technical │
│ │
└─────────────────────────────────────────────────────────────────┘
Key Takeaways
| Pattern | When to Use |
|---|---|
| API Gateway | Single entry point, cross-cutting concerns |
| Service Discovery | Dynamic service locations |
| Circuit Breaker | Prevent cascading failures |
| Saga | Distributed transactions |
| CQRS | Different read/write patterns |
| Event Sourcing | Audit trail, temporal queries |
| Service Mesh | Complex microservices, need observability |
Interview Tip: When discussing microservices, always mention trade-offs. “Microservices add operational complexity, distributed system challenges, and require mature DevOps practices. The benefits (independent scaling, team autonomy) should outweigh these costs.”