What are Distributed Systems?
A distributed system is a collection of independent computers that appear to users as a single coherent system.Why Distributed?
Scalability
Handle more load than a single machine
Reliability
Survive individual machine failures
Latency
Serve users from nearby locations
Compliance
Data locality requirements
The Eight Fallacies
Things developers wrongly assume about networks:- The network is reliable → Packets get lost
- Latency is zero → Cross-region calls take 100+ ms
- Bandwidth is infinite → Large payloads slow things down
- The network is secure → Always encrypt
- Topology doesn’t change → Servers come and go
- There is one administrator → Multiple teams, policies
- Transport cost is zero → Data transfer costs money
- The network is homogeneous → Different hardware everywhere
Consistency Models
Strong Consistency
All nodes see the same data at the same time.Eventual Consistency
All nodes will eventually have the same data.Consistency Implementation Examples
- Python
- JavaScript
Copy
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, List, Any, Dict
from datetime import datetime
import hashlib
import json
import asyncio
@dataclass
class VectorClock:
"""Track causality in distributed systems"""
clocks: Dict[str, int] = field(default_factory=dict)
def increment(self, node_id: str) -> None:
self.clocks[node_id] = self.clocks.get(node_id, 0) + 1
def merge(self, other: 'VectorClock') -> 'VectorClock':
"""Merge two vector clocks (take max of each)"""
merged = VectorClock()
all_nodes = set(self.clocks.keys()) | set(other.clocks.keys())
for node in all_nodes:
merged.clocks[node] = max(
self.clocks.get(node, 0),
other.clocks.get(node, 0)
)
return merged
def is_concurrent_with(self, other: 'VectorClock') -> bool:
"""Check if two events are concurrent (neither happened before the other)"""
self_greater = any(
self.clocks.get(k, 0) > other.clocks.get(k, 0)
for k in self.clocks
)
other_greater = any(
other.clocks.get(k, 0) > self.clocks.get(k, 0)
for k in other.clocks
)
return self_greater and other_greater
@dataclass
class ReplicatedValue:
"""Last-Write-Wins Register for eventual consistency"""
value: Any
timestamp: float
node_id: str
vector_clock: VectorClock
def merge(self, other: 'ReplicatedValue') -> 'ReplicatedValue':
"""Merge with conflict resolution"""
# Check for concurrent updates
if self.vector_clock.is_concurrent_with(other.vector_clock):
# Conflict! Use timestamp + node_id as tiebreaker
if (self.timestamp, self.node_id) > (other.timestamp, other.node_id):
winner = self
else:
winner = other
elif self.timestamp > other.timestamp:
winner = self
else:
winner = other
return ReplicatedValue(
value=winner.value,
timestamp=winner.timestamp,
node_id=winner.node_id,
vector_clock=self.vector_clock.merge(other.vector_clock)
)
class EventuallyConsistentStore:
"""Distributed key-value store with eventual consistency"""
def __init__(self, node_id: str, replicas: List['EventuallyConsistentStore'] = None):
self.node_id = node_id
self.data: Dict[str, ReplicatedValue] = {}
self.vector_clock = VectorClock()
self.replicas = replicas or []
self.pending_sync: List[tuple] = []
def write(self, key: str, value: Any) -> None:
"""Write with vector clock tracking"""
self.vector_clock.increment(self.node_id)
replicated = ReplicatedValue(
value=value,
timestamp=datetime.now().timestamp(),
node_id=self.node_id,
vector_clock=VectorClock(clocks=self.vector_clock.clocks.copy())
)
# Local write
if key in self.data:
self.data[key] = self.data[key].merge(replicated)
else:
self.data[key] = replicated
# Queue for async replication
self.pending_sync.append((key, replicated))
def read(self, key: str) -> Optional[Any]:
"""Read (may return stale data)"""
if key in self.data:
return self.data[key].value
return None
async def sync_to_replicas(self) -> None:
"""Background sync to other replicas"""
while self.pending_sync:
key, value = self.pending_sync.pop(0)
for replica in self.replicas:
try:
await replica.receive_sync(key, value)
except Exception as e:
# Re-queue failed sync
self.pending_sync.append((key, value))
print(f"Sync failed to {replica.node_id}: {e}")
async def receive_sync(self, key: str, incoming: ReplicatedValue) -> None:
"""Receive and merge update from another replica"""
if key in self.data:
self.data[key] = self.data[key].merge(incoming)
else:
self.data[key] = incoming
# Update our vector clock
self.vector_clock = self.vector_clock.merge(incoming.vector_clock)
# Usage example
async def demo_eventual_consistency():
# Three replicas
replica_a = EventuallyConsistentStore("node-a")
replica_b = EventuallyConsistentStore("node-b")
replica_c = EventuallyConsistentStore("node-c")
# Cross-reference replicas
replica_a.replicas = [replica_b, replica_c]
replica_b.replicas = [replica_a, replica_c]
replica_c.replicas = [replica_a, replica_b]
# Concurrent writes (simulating network partition)
replica_a.write("user:1:name", "Alice")
replica_b.write("user:1:name", "Alicia") # Conflict!
# Before sync: different values
print(f"A sees: {replica_a.read('user:1:name')}") # Alice
print(f"B sees: {replica_b.read('user:1:name')}") # Alicia
# After sync: converges to same value
await replica_a.sync_to_replicas()
await replica_b.sync_to_replicas()
print(f"After sync - A: {replica_a.read('user:1:name')}")
print(f"After sync - B: {replica_b.read('user:1:name')}")
# Both will show the same value (winner based on timestamp + node_id)
Copy
// Vector Clock for causality tracking
class VectorClock {
constructor(clocks = {}) {
this.clocks = { ...clocks };
}
increment(nodeId) {
this.clocks[nodeId] = (this.clocks[nodeId] || 0) + 1;
return this;
}
merge(other) {
const merged = new VectorClock();
const allNodes = new Set([
...Object.keys(this.clocks),
...Object.keys(other.clocks)
]);
for (const node of allNodes) {
merged.clocks[node] = Math.max(
this.clocks[node] || 0,
other.clocks[node] || 0
);
}
return merged;
}
isConcurrentWith(other) {
const selfGreater = Object.keys(this.clocks).some(
k => (this.clocks[k] || 0) > (other.clocks[k] || 0)
);
const otherGreater = Object.keys(other.clocks).some(
k => (other.clocks[k] || 0) > (this.clocks[k] || 0)
);
return selfGreater && otherGreater;
}
clone() {
return new VectorClock({ ...this.clocks });
}
}
// CRDT: Last-Write-Wins Register
class LWWRegister {
constructor(value, timestamp, nodeId, vectorClock) {
this.value = value;
this.timestamp = timestamp;
this.nodeId = nodeId;
this.vectorClock = vectorClock;
}
merge(other) {
let winner;
if (this.vectorClock.isConcurrentWith(other.vectorClock)) {
// Conflict resolution: timestamp + nodeId tiebreaker
const thisPriority = [this.timestamp, this.nodeId];
const otherPriority = [other.timestamp, other.nodeId];
if (thisPriority > otherPriority) {
winner = this;
} else {
winner = other;
}
} else {
winner = this.timestamp > other.timestamp ? this : other;
}
return new LWWRegister(
winner.value,
winner.timestamp,
winner.nodeId,
this.vectorClock.merge(other.vectorClock)
);
}
}
// Eventually Consistent Distributed Store
class EventuallyConsistentStore {
constructor(nodeId) {
this.nodeId = nodeId;
this.data = new Map();
this.vectorClock = new VectorClock();
this.replicas = [];
this.pendingSync = [];
this.antiEntropyInterval = null;
}
write(key, value) {
this.vectorClock.increment(this.nodeId);
const register = new LWWRegister(
value,
Date.now(),
this.nodeId,
this.vectorClock.clone()
);
// Local write with merge
if (this.data.has(key)) {
this.data.set(key, this.data.get(key).merge(register));
} else {
this.data.set(key, register);
}
// Queue for replication
this.pendingSync.push({ key, register });
return this.data.get(key).value;
}
read(key) {
const register = this.data.get(key);
return register ? register.value : null;
}
// Read from multiple replicas for higher consistency
async readWithQuorum(key, quorum = 2) {
const responses = await Promise.all([
this.read(key),
...this.replicas.slice(0, quorum - 1).map(
replica => replica.read(key)
)
]);
// Return most common value (simple quorum)
const counts = {};
responses.forEach(val => {
const str = JSON.stringify(val);
counts[str] = (counts[str] || 0) + 1;
});
let maxCount = 0;
let result = null;
for (const [val, count] of Object.entries(counts)) {
if (count > maxCount) {
maxCount = count;
result = JSON.parse(val);
}
}
return result;
}
async receiveSync(key, incoming) {
if (this.data.has(key)) {
this.data.set(key, this.data.get(key).merge(incoming));
} else {
this.data.set(key, incoming);
}
this.vectorClock = this.vectorClock.merge(incoming.vectorClock);
}
// Anti-entropy: periodic full sync
startAntiEntropy(intervalMs = 5000) {
this.antiEntropyInterval = setInterval(async () => {
for (const replica of this.replicas) {
try {
await this.syncWith(replica);
} catch (err) {
console.error(`Anti-entropy failed with ${replica.nodeId}:`, err);
}
}
}, intervalMs);
}
async syncWith(replica) {
// Exchange all keys
for (const [key, register] of this.data.entries()) {
await replica.receiveSync(key, register);
}
for (const [key, register] of replica.data.entries()) {
await this.receiveSync(key, register);
}
}
stopAntiEntropy() {
if (this.antiEntropyInterval) {
clearInterval(this.antiEntropyInterval);
}
}
}
// Demo
async function demo() {
const nodeA = new EventuallyConsistentStore('node-a');
const nodeB = new EventuallyConsistentStore('node-b');
nodeA.replicas = [nodeB];
nodeB.replicas = [nodeA];
// Concurrent writes
nodeA.write('counter', 10);
nodeB.write('counter', 20); // Conflict!
console.log('Before sync:');
console.log('Node A:', nodeA.read('counter')); // 10
console.log('Node B:', nodeB.read('counter')); // 20
// Sync resolves conflict
await nodeA.syncWith(nodeB);
console.log('After sync:');
console.log('Node A:', nodeA.read('counter')); // Same value
console.log('Node B:', nodeB.read('counter')); // Same value
}
Consistency Levels
| Level | Description | Trade-off |
|---|---|---|
| Strong | All reads see latest write | High latency |
| Eventual | Reads may be stale | Low latency |
| Causal | Respects cause-effect | Medium |
| Read-your-writes | See your own writes | Good UX |
| Session | Consistency within session | Practical |
Consensus Algorithms
How do distributed nodes agree on a value?Paxos (Simplified)
Raft (Easier to Understand)
Raft Implementation
- Python
- JavaScript
Copy
import asyncio
import random
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
term: int
index: int
command: str
data: dict
@dataclass
class RaftNode:
"""Simplified Raft consensus node"""
node_id: str
state: NodeState = NodeState.FOLLOWER
current_term: int = 0
voted_for: Optional[str] = None
log: List[LogEntry] = field(default_factory=list)
commit_index: int = 0
last_applied: int = 0
# Leader state
next_index: Dict[str, int] = field(default_factory=dict)
match_index: Dict[str, int] = field(default_factory=dict)
# Cluster info
peers: List[str] = field(default_factory=list)
# Timers
election_timeout: float = 0
heartbeat_interval: float = 0.15 # 150ms
def __post_init__(self):
self.reset_election_timeout()
def reset_election_timeout(self):
"""Randomized election timeout (150-300ms typically)"""
self.election_timeout = random.uniform(0.15, 0.30)
async def start_election(self) -> bool:
"""Candidate starts election"""
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
votes = 1 # Vote for self
# Request votes from all peers
vote_requests = []
for peer_id in self.peers:
vote_requests.append(
self.request_vote(peer_id)
)
results = await asyncio.gather(*vote_requests, return_exceptions=True)
votes += sum(1 for r in results if r is True)
# Check if won election
quorum = (len(self.peers) + 1) // 2 + 1
if votes >= quorum:
await self.become_leader()
return True
return False
async def become_leader(self):
"""Transition to leader state"""
self.state = NodeState.LEADER
print(f"[{self.node_id}] Became leader for term {self.current_term}")
# Initialize leader state
for peer_id in self.peers:
self.next_index[peer_id] = len(self.log) + 1
self.match_index[peer_id] = 0
# Start heartbeat
asyncio.create_task(self.send_heartbeats())
async def send_heartbeats(self):
"""Leader sends periodic heartbeats"""
while self.state == NodeState.LEADER:
for peer_id in self.peers:
asyncio.create_task(self.append_entries(peer_id, []))
await asyncio.sleep(self.heartbeat_interval)
async def request_vote(self, peer_id: str) -> bool:
"""Request vote from a peer"""
# In real implementation, this would be RPC
last_log_index = len(self.log)
last_log_term = self.log[-1].term if self.log else 0
# Simulate RPC response
# Peer grants vote if:
# 1. Candidate's term >= peer's term
# 2. Peer hasn't voted for someone else
# 3. Candidate's log is at least as up-to-date
return True # Simplified
async def append_entries(
self,
peer_id: str,
entries: List[LogEntry]
) -> bool:
"""AppendEntries RPC (heartbeat + log replication)"""
prev_log_index = self.next_index.get(peer_id, 1) - 1
prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0
# Simulate RPC
success = True # In real implementation, check response
if success and entries:
self.match_index[peer_id] = prev_log_index + len(entries)
self.next_index[peer_id] = self.match_index[peer_id] + 1
# Update commit index
self.update_commit_index()
return success
def update_commit_index(self):
"""Leader updates commit index based on replication"""
for n in range(self.commit_index + 1, len(self.log) + 1):
replicated_count = 1 # Count self
for peer_id in self.peers:
if self.match_index.get(peer_id, 0) >= n:
replicated_count += 1
quorum = (len(self.peers) + 1) // 2 + 1
if replicated_count >= quorum:
if self.log[n - 1].term == self.current_term:
self.commit_index = n
def propose(self, command: str, data: dict) -> Optional[LogEntry]:
"""Client proposes a command (only leader can accept)"""
if self.state != NodeState.LEADER:
return None # Redirect to leader
entry = LogEntry(
term=self.current_term,
index=len(self.log) + 1,
command=command,
data=data
)
self.log.append(entry)
# Replicate to followers (async)
asyncio.create_task(self.replicate_log())
return entry
async def replicate_log(self):
"""Replicate latest log entries to all followers"""
for peer_id in self.peers:
next_idx = self.next_index.get(peer_id, 1)
entries_to_send = self.log[next_idx - 1:]
if entries_to_send:
await self.append_entries(peer_id, entries_to_send)
# Leader election simulation
async def simulate_raft():
nodes = [
RaftNode(node_id="node-1", peers=["node-2", "node-3"]),
RaftNode(node_id="node-2", peers=["node-1", "node-3"]),
RaftNode(node_id="node-3", peers=["node-1", "node-2"]),
]
# Simulate node-1 winning election
leader = nodes[0]
await leader.start_election()
# Propose a command
if leader.state == NodeState.LEADER:
entry = leader.propose("SET", {"key": "name", "value": "Alice"})
print(f"Proposed: {entry}")
Copy
// Raft Node States
const NodeState = {
FOLLOWER: 'follower',
CANDIDATE: 'candidate',
LEADER: 'leader'
};
class LogEntry {
constructor(term, index, command, data) {
this.term = term;
this.index = index;
this.command = command;
this.data = data;
this.timestamp = Date.now();
}
}
class RaftNode {
constructor(nodeId, peers = []) {
this.nodeId = nodeId;
this.state = NodeState.FOLLOWER;
this.currentTerm = 0;
this.votedFor = null;
this.log = [];
// Volatile state
this.commitIndex = 0;
this.lastApplied = 0;
// Leader state
this.nextIndex = new Map();
this.matchIndex = new Map();
// Cluster
this.peers = peers;
this.leaderId = null;
// Timers
this.electionTimer = null;
this.heartbeatTimer = null;
this.heartbeatInterval = 150; // ms
this.resetElectionTimeout();
}
resetElectionTimeout() {
if (this.electionTimer) {
clearTimeout(this.electionTimer);
}
// Randomized timeout (150-300ms)
const timeout = 150 + Math.random() * 150;
this.electionTimer = setTimeout(() => {
if (this.state !== NodeState.LEADER) {
this.startElection();
}
}, timeout);
}
async startElection() {
console.log(`[${this.nodeId}] Starting election for term ${this.currentTerm + 1}`);
this.state = NodeState.CANDIDATE;
this.currentTerm++;
this.votedFor = this.nodeId;
let votes = 1; // Vote for self
// Request votes in parallel
const votePromises = this.peers.map(peerId =>
this.requestVote(peerId)
);
const results = await Promise.allSettled(votePromises);
for (const result of results) {
if (result.status === 'fulfilled' && result.value.granted) {
votes++;
// Update term if needed
if (result.value.term > this.currentTerm) {
this.stepDown(result.value.term);
return;
}
}
}
// Check quorum
const quorum = Math.floor((this.peers.length + 1) / 2) + 1;
if (votes >= quorum && this.state === NodeState.CANDIDATE) {
this.becomeLeader();
} else {
this.state = NodeState.FOLLOWER;
this.resetElectionTimeout();
}
}
async requestVote(peerId) {
const lastLogIndex = this.log.length;
const lastLogTerm = lastLogIndex > 0 ? this.log[lastLogIndex - 1].term : 0;
// Simulate RPC call
const request = {
term: this.currentTerm,
candidateId: this.nodeId,
lastLogIndex,
lastLogTerm
};
// In real implementation, this would be network call
return { granted: true, term: this.currentTerm };
}
becomeLeader() {
console.log(`[${this.nodeId}] Became LEADER for term ${this.currentTerm}`);
this.state = NodeState.LEADER;
this.leaderId = this.nodeId;
// Initialize leader state
for (const peerId of this.peers) {
this.nextIndex.set(peerId, this.log.length + 1);
this.matchIndex.set(peerId, 0);
}
// Clear election timer
if (this.electionTimer) {
clearTimeout(this.electionTimer);
}
// Start heartbeats
this.sendHeartbeats();
}
sendHeartbeats() {
if (this.state !== NodeState.LEADER) return;
// Send AppendEntries to all peers
for (const peerId of this.peers) {
this.appendEntries(peerId, []);
}
this.heartbeatTimer = setTimeout(
() => this.sendHeartbeats(),
this.heartbeatInterval
);
}
async appendEntries(peerId, entries) {
const nextIdx = this.nextIndex.get(peerId) || 1;
const prevLogIndex = nextIdx - 1;
const prevLogTerm = prevLogIndex > 0 ? this.log[prevLogIndex - 1].term : 0;
const request = {
term: this.currentTerm,
leaderId: this.nodeId,
prevLogIndex,
prevLogTerm,
entries,
leaderCommit: this.commitIndex
};
// Simulate RPC - in real implementation, handle response
const success = true;
if (success && entries.length > 0) {
this.matchIndex.set(peerId, prevLogIndex + entries.length);
this.nextIndex.set(peerId, this.matchIndex.get(peerId) + 1);
this.updateCommitIndex();
}
return success;
}
updateCommitIndex() {
// Find highest N where majority has replicated entry N
for (let n = this.log.length; n > this.commitIndex; n--) {
let replicationCount = 1; // Count self
for (const peerId of this.peers) {
if ((this.matchIndex.get(peerId) || 0) >= n) {
replicationCount++;
}
}
const quorum = Math.floor((this.peers.length + 1) / 2) + 1;
if (replicationCount >= quorum && this.log[n - 1].term === this.currentTerm) {
this.commitIndex = n;
this.applyCommittedEntries();
break;
}
}
}
applyCommittedEntries() {
while (this.lastApplied < this.commitIndex) {
this.lastApplied++;
const entry = this.log[this.lastApplied - 1];
console.log(`[${this.nodeId}] Applied: ${entry.command} ${JSON.stringify(entry.data)}`);
// Apply to state machine here
}
}
// Client interface
propose(command, data) {
if (this.state !== NodeState.LEADER) {
return {
success: false,
error: 'Not leader',
leaderId: this.leaderId
};
}
const entry = new LogEntry(
this.currentTerm,
this.log.length + 1,
command,
data
);
this.log.push(entry);
// Replicate to followers
this.replicateLog();
return {
success: true,
entry,
message: 'Proposed - will be committed after replication'
};
}
async replicateLog() {
if (this.state !== NodeState.LEADER) return;
const promises = this.peers.map(peerId => {
const nextIdx = this.nextIndex.get(peerId) || 1;
const entries = this.log.slice(nextIdx - 1);
if (entries.length > 0) {
return this.appendEntries(peerId, entries);
}
return Promise.resolve(true);
});
await Promise.allSettled(promises);
this.updateCommitIndex();
}
stepDown(newTerm) {
console.log(`[${this.nodeId}] Stepping down, new term: ${newTerm}`);
this.currentTerm = newTerm;
this.state = NodeState.FOLLOWER;
this.votedFor = null;
if (this.heartbeatTimer) {
clearTimeout(this.heartbeatTimer);
}
this.resetElectionTimeout();
}
}
// Usage
const node1 = new RaftNode('node-1', ['node-2', 'node-3']);
const node2 = new RaftNode('node-2', ['node-1', 'node-3']);
const node3 = new RaftNode('node-3', ['node-1', 'node-2']);
// Simulate leader election
node1.becomeLeader();
// Propose a command
const result = node1.propose('SET', { key: 'name', value: 'Alice' });
console.log('Proposal result:', result);
Distributed Transactions
Two-Phase Commit (2PC)
Two-Phase Commit Implementation
- Python
- JavaScript
Copy
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import uuid
class TxState(Enum):
PENDING = "pending"
PREPARED = "prepared"
COMMITTED = "committed"
ABORTED = "aborted"
@dataclass
class Transaction:
tx_id: str
state: TxState = TxState.PENDING
participants: List[str] = field(default_factory=list)
votes: Dict[str, bool] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
timeout: timedelta = timedelta(seconds=30)
class TwoPhaseCommitCoordinator:
"""Coordinates distributed transactions using 2PC"""
def __init__(self, coordinator_id: str):
self.coordinator_id = coordinator_id
self.transactions: Dict[str, Transaction] = {}
self.participants: Dict[str, 'TransactionParticipant'] = {}
def register_participant(self, participant: 'TransactionParticipant'):
self.participants[participant.participant_id] = participant
async def begin_transaction(
self,
participant_ids: List[str],
operations: Dict[str, Any]
) -> str:
"""Start a new distributed transaction"""
tx_id = f"tx-{uuid.uuid4().hex[:8]}"
tx = Transaction(tx_id=tx_id, participants=participant_ids)
self.transactions[tx_id] = tx
print(f"[Coordinator] Starting transaction {tx_id}")
try:
# Phase 1: Prepare
prepared = await self.prepare_phase(tx_id, operations)
if prepared:
# Phase 2: Commit
await self.commit_phase(tx_id)
return tx_id
else:
# Phase 2: Abort
await self.abort_phase(tx_id)
raise Exception("Transaction aborted - not all participants prepared")
except asyncio.TimeoutError:
await self.abort_phase(tx_id)
raise Exception("Transaction timed out")
async def prepare_phase(
self,
tx_id: str,
operations: Dict[str, Any]
) -> bool:
"""Phase 1: Ask all participants to prepare"""
tx = self.transactions[tx_id]
print(f"[Coordinator] Phase 1: PREPARE for {tx_id}")
prepare_tasks = []
for participant_id in tx.participants:
participant = self.participants.get(participant_id)
if participant:
op = operations.get(participant_id, {})
prepare_tasks.append(
participant.prepare(tx_id, op)
)
# Wait for all votes with timeout
try:
results = await asyncio.wait_for(
asyncio.gather(*prepare_tasks, return_exceptions=True),
timeout=tx.timeout.total_seconds()
)
# Check all votes
for participant_id, result in zip(tx.participants, results):
if isinstance(result, Exception):
tx.votes[participant_id] = False
else:
tx.votes[participant_id] = result
all_prepared = all(tx.votes.values())
tx.state = TxState.PREPARED if all_prepared else TxState.PENDING
return all_prepared
except asyncio.TimeoutError:
return False
async def commit_phase(self, tx_id: str):
"""Phase 2: Tell all participants to commit"""
tx = self.transactions[tx_id]
print(f"[Coordinator] Phase 2: COMMIT for {tx_id}")
commit_tasks = []
for participant_id in tx.participants:
participant = self.participants.get(participant_id)
if participant:
commit_tasks.append(participant.commit(tx_id))
await asyncio.gather(*commit_tasks, return_exceptions=True)
tx.state = TxState.COMMITTED
print(f"[Coordinator] Transaction {tx_id} COMMITTED")
async def abort_phase(self, tx_id: str):
"""Phase 2: Tell all participants to abort"""
tx = self.transactions[tx_id]
print(f"[Coordinator] Phase 2: ABORT for {tx_id}")
abort_tasks = []
for participant_id in tx.participants:
participant = self.participants.get(participant_id)
if participant:
abort_tasks.append(participant.abort(tx_id))
await asyncio.gather(*abort_tasks, return_exceptions=True)
tx.state = TxState.ABORTED
print(f"[Coordinator] Transaction {tx_id} ABORTED")
class TransactionParticipant:
"""Participant in a distributed transaction"""
def __init__(self, participant_id: str):
self.participant_id = participant_id
self.prepared_data: Dict[str, Any] = {}
self.committed_data: Dict[str, Any] = {}
self.undo_log: Dict[str, Any] = {}
async def prepare(self, tx_id: str, operation: Dict) -> bool:
"""Prepare to commit - acquire locks, validate, write to redo log"""
print(f"[{self.participant_id}] Preparing {tx_id}: {operation}")
try:
# Simulate validation and lock acquisition
await asyncio.sleep(0.1) # Simulate I/O
# Write to redo log (would be durable in real implementation)
self.prepared_data[tx_id] = operation
# Save undo information
key = operation.get("key")
if key:
self.undo_log[tx_id] = {
"key": key,
"old_value": self.committed_data.get(key)
}
print(f"[{self.participant_id}] Voted YES for {tx_id}")
return True
except Exception as e:
print(f"[{self.participant_id}] Voted NO for {tx_id}: {e}")
return False
async def commit(self, tx_id: str):
"""Commit the prepared transaction"""
print(f"[{self.participant_id}] Committing {tx_id}")
operation = self.prepared_data.pop(tx_id, {})
# Apply the operation
key = operation.get("key")
value = operation.get("value")
if key:
self.committed_data[key] = value
# Clear undo log
self.undo_log.pop(tx_id, None)
async def abort(self, tx_id: str):
"""Abort - rollback using undo log"""
print(f"[{self.participant_id}] Aborting {tx_id}")
# Rollback using undo log
undo = self.undo_log.pop(tx_id, None)
if undo:
key = undo["key"]
old_value = undo["old_value"]
if old_value is not None:
self.committed_data[key] = old_value
elif key in self.committed_data:
del self.committed_data[key]
self.prepared_data.pop(tx_id, None)
# Usage example
async def demo_2pc():
coordinator = TwoPhaseCommitCoordinator("coordinator-1")
# Register participants (e.g., database shards)
orders_db = TransactionParticipant("orders-db")
inventory_db = TransactionParticipant("inventory-db")
coordinator.register_participant(orders_db)
coordinator.register_participant(inventory_db)
# Execute distributed transaction
try:
tx_id = await coordinator.begin_transaction(
participant_ids=["orders-db", "inventory-db"],
operations={
"orders-db": {"key": "order:123", "value": {"product": "laptop", "qty": 1}},
"inventory-db": {"key": "product:laptop:stock", "value": 99}
}
)
print(f"Transaction {tx_id} completed successfully!")
except Exception as e:
print(f"Transaction failed: {e}")
asyncio.run(demo_2pc())
Copy
const { EventEmitter } = require('events');
const { v4: uuidv4 } = require('uuid');
// Transaction states
const TxState = {
PENDING: 'pending',
PREPARED: 'prepared',
COMMITTED: 'committed',
ABORTED: 'aborted'
};
class Transaction {
constructor(txId, participants) {
this.txId = txId;
this.state = TxState.PENDING;
this.participants = participants;
this.votes = new Map();
this.createdAt = Date.now();
this.timeout = 30000; // 30 seconds
}
}
class TwoPhaseCommitCoordinator extends EventEmitter {
constructor(coordinatorId) {
super();
this.coordinatorId = coordinatorId;
this.transactions = new Map();
this.participants = new Map();
}
registerParticipant(participant) {
this.participants.set(participant.participantId, participant);
}
async beginTransaction(participantIds, operations) {
const txId = `tx-${uuidv4().slice(0, 8)}`;
const tx = new Transaction(txId, participantIds);
this.transactions.set(txId, tx);
console.log(`[Coordinator] Starting transaction ${txId}`);
try {
// Phase 1: Prepare
const prepared = await this.preparePhase(txId, operations);
if (prepared) {
// Phase 2: Commit
await this.commitPhase(txId);
return { success: true, txId };
} else {
// Phase 2: Abort
await this.abortPhase(txId);
throw new Error('Transaction aborted - not all participants prepared');
}
} catch (error) {
if (error.message !== 'Transaction aborted - not all participants prepared') {
await this.abortPhase(txId);
}
throw error;
}
}
async preparePhase(txId, operations) {
const tx = this.transactions.get(txId);
console.log(`[Coordinator] Phase 1: PREPARE for ${txId}`);
const preparePromises = tx.participants.map(async (participantId) => {
const participant = this.participants.get(participantId);
if (!participant) return false;
const operation = operations[participantId] || {};
try {
const vote = await this.withTimeout(
participant.prepare(txId, operation),
tx.timeout
);
return { participantId, vote };
} catch (error) {
console.error(`[Coordinator] ${participantId} failed to prepare:`, error.message);
return { participantId, vote: false };
}
});
const results = await Promise.all(preparePromises);
// Collect votes
let allPrepared = true;
for (const { participantId, vote } of results) {
tx.votes.set(participantId, vote);
if (!vote) allPrepared = false;
}
tx.state = allPrepared ? TxState.PREPARED : TxState.PENDING;
return allPrepared;
}
async commitPhase(txId) {
const tx = this.transactions.get(txId);
console.log(`[Coordinator] Phase 2: COMMIT for ${txId}`);
const commitPromises = tx.participants.map(async (participantId) => {
const participant = this.participants.get(participantId);
if (participant) {
await participant.commit(txId);
}
});
await Promise.allSettled(commitPromises);
tx.state = TxState.COMMITTED;
console.log(`[Coordinator] Transaction ${txId} COMMITTED`);
this.emit('committed', txId);
}
async abortPhase(txId) {
const tx = this.transactions.get(txId);
console.log(`[Coordinator] Phase 2: ABORT for ${txId}`);
const abortPromises = tx.participants.map(async (participantId) => {
const participant = this.participants.get(participantId);
if (participant) {
await participant.abort(txId);
}
});
await Promise.allSettled(abortPromises);
tx.state = TxState.ABORTED;
console.log(`[Coordinator] Transaction ${txId} ABORTED`);
this.emit('aborted', txId);
}
withTimeout(promise, ms) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), ms)
)
]);
}
}
class TransactionParticipant extends EventEmitter {
constructor(participantId) {
super();
this.participantId = participantId;
this.preparedData = new Map();
this.committedData = new Map();
this.undoLog = new Map();
}
async prepare(txId, operation) {
console.log(`[${this.participantId}] Preparing ${txId}:`, operation);
try {
// Simulate validation, lock acquisition, redo log
await this.simulateIO(100);
// Write to redo log
this.preparedData.set(txId, operation);
// Save undo information
const { key } = operation;
if (key) {
this.undoLog.set(txId, {
key,
oldValue: this.committedData.get(key)
});
}
console.log(`[${this.participantId}] Voted YES for ${txId}`);
return true;
} catch (error) {
console.log(`[${this.participantId}] Voted NO for ${txId}:`, error.message);
return false;
}
}
async commit(txId) {
console.log(`[${this.participantId}] Committing ${txId}`);
const operation = this.preparedData.get(txId);
this.preparedData.delete(txId);
if (operation && operation.key) {
this.committedData.set(operation.key, operation.value);
}
// Clear undo log
this.undoLog.delete(txId);
this.emit('committed', txId);
}
async abort(txId) {
console.log(`[${this.participantId}] Aborting ${txId}`);
// Rollback using undo log
const undo = this.undoLog.get(txId);
if (undo) {
if (undo.oldValue !== undefined) {
this.committedData.set(undo.key, undo.oldValue);
} else {
this.committedData.delete(undo.key);
}
}
this.preparedData.delete(txId);
this.undoLog.delete(txId);
this.emit('aborted', txId);
}
simulateIO(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Utility to get committed data
get(key) {
return this.committedData.get(key);
}
}
// Usage
async function demo2PC() {
const coordinator = new TwoPhaseCommitCoordinator('coordinator-1');
const ordersDb = new TransactionParticipant('orders-db');
const inventoryDb = new TransactionParticipant('inventory-db');
coordinator.registerParticipant(ordersDb);
coordinator.registerParticipant(inventoryDb);
try {
const result = await coordinator.beginTransaction(
['orders-db', 'inventory-db'],
{
'orders-db': { key: 'order:123', value: { product: 'laptop', qty: 1 } },
'inventory-db': { key: 'product:laptop:stock', value: 99 }
}
);
console.log('Transaction completed:', result);
console.log('Orders DB:', ordersDb.get('order:123'));
console.log('Inventory DB:', inventoryDb.get('product:laptop:stock'));
} catch (error) {
console.error('Transaction failed:', error.message);
}
}
demo2PC();
Saga Pattern
Choreography vs Orchestration
Copy
Choreography (Event-driven)
Order ─► OrderCreated event
│
┌─────────┼─────────┐
▼ ▼ ▼
Inventory Payment Shipping
│ │ │
└─────────┴─────────┘
Events
• No central controller
• Services react to events
• More decoupled
Orchestration (Command-driven)
┌───────────────────┐
│ Saga Manager │
└─────────┬─────────┘
│
┌─────────┼─────────┐
▼ ▼ ▼
Inventory Payment Shipping
• Central controller
• Explicit flow control
• Easier to understand
Handling Failures
Circuit Breaker Pattern
- Python
- JavaScript
Copy
import time
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Generic, Optional, Any
from functools import wraps
from datetime import datetime, timedelta
import logging
T = TypeVar('T')
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
success_threshold: int = 2 # Successes needed to close from half-open
timeout: timedelta = timedelta(seconds=60)
half_open_max_calls: int = 3
excluded_exceptions: tuple = () # Exceptions that don't count as failures
class CircuitBreakerOpenException(Exception):
"""Raised when circuit breaker is open"""
def __init__(self, circuit_name: str, retry_after: float):
self.circuit_name = circuit_name
self.retry_after = retry_after
super().__init__(f"Circuit {circuit_name} is OPEN. Retry after {retry_after:.1f}s")
@dataclass
class CircuitBreaker:
"""Production-ready circuit breaker with metrics"""
name: str
config: CircuitBreakerConfig = field(default_factory=CircuitBreakerConfig)
state: CircuitState = field(default=CircuitState.CLOSED, init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
last_failure_time: Optional[datetime] = field(default=None, init=False)
half_open_calls: int = field(default=0, init=False)
# Metrics
total_calls: int = field(default=0, init=False)
total_failures: int = field(default=0, init=False)
total_rejections: int = field(default=0, init=False)
def __call__(self, func: Callable) -> Callable:
"""Use as decorator"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
return await self.call_async(lambda: func(*args, **kwargs))
@wraps(func)
def sync_wrapper(*args, **kwargs):
return self.call_sync(lambda: func(*args, **kwargs))
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
def call_sync(self, func: Callable[[], T]) -> T:
"""Execute function with circuit breaker (sync)"""
self._check_state()
self.total_calls += 1
try:
result = func()
self._on_success()
return result
except self.config.excluded_exceptions:
raise # Don't count as failure
except Exception as e:
self._on_failure()
raise
async def call_async(self, func: Callable) -> Any:
"""Execute function with circuit breaker (async)"""
self._check_state()
self.total_calls += 1
try:
result = await func() if asyncio.iscoroutinefunction(func) else func()
self._on_success()
return result
except self.config.excluded_exceptions:
raise
except Exception as e:
self._on_failure()
raise
def _check_state(self):
"""Check if request should be allowed"""
if self.state == CircuitState.CLOSED:
return
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self._transition_to_half_open()
else:
self.total_rejections += 1
retry_after = (
self.last_failure_time + self.config.timeout - datetime.now()
).total_seconds()
raise CircuitBreakerOpenException(self.name, max(0, retry_after))
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.config.half_open_max_calls:
self.total_rejections += 1
raise CircuitBreakerOpenException(self.name, 1.0)
self.half_open_calls += 1
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to try again"""
if self.last_failure_time is None:
return True
return datetime.now() - self.last_failure_time >= self.config.timeout
def _transition_to_half_open(self):
"""Move to half-open state"""
logger.info(f"Circuit {self.name}: OPEN -> HALF_OPEN")
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.success_count = 0
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self._close()
else:
self.failure_count = 0 # Reset on success
def _on_failure(self):
"""Handle failed call"""
self.total_failures += 1
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self._open()
elif self.failure_count >= self.config.failure_threshold:
self._open()
def _open(self):
"""Open the circuit"""
logger.warning(f"Circuit {self.name}: -> OPEN (failures: {self.failure_count})")
self.state = CircuitState.OPEN
def _close(self):
"""Close the circuit"""
logger.info(f"Circuit {self.name}: -> CLOSED")
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.half_open_calls = 0
@property
def metrics(self) -> dict:
"""Get circuit breaker metrics"""
return {
"name": self.name,
"state": self.state.value,
"total_calls": self.total_calls,
"total_failures": self.total_failures,
"total_rejections": self.total_rejections,
"failure_rate": self.total_failures / max(1, self.total_calls),
"current_failure_count": self.failure_count
}
# Usage as decorator
payment_circuit = CircuitBreaker(
name="payment-service",
config=CircuitBreakerConfig(
failure_threshold=3,
success_threshold=2,
timeout=timedelta(seconds=30)
)
)
@payment_circuit
async def call_payment_service(amount: float):
# Simulate external API call
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.payment.com/charge",
json={"amount": amount}
) as response:
if response.status != 200:
raise Exception(f"Payment failed: {response.status}")
return await response.json()
# Usage with context manager
class CircuitBreakerContext:
def __init__(self, circuit: CircuitBreaker):
self.circuit = circuit
def __enter__(self):
self.circuit._check_state()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.circuit._on_success()
elif exc_type not in self.circuit.config.excluded_exceptions:
self.circuit._on_failure()
return False # Don't suppress exceptions
# Example with fallback
async def get_user_with_fallback(user_id: str):
try:
return await call_user_service(user_id)
except CircuitBreakerOpenException:
# Fallback: return cached data
return get_cached_user(user_id)
Copy
const EventEmitter = require('events');
// Circuit States
const CircuitState = {
CLOSED: 'closed',
OPEN: 'open',
HALF_OPEN: 'half_open'
};
class CircuitBreakerOpenError extends Error {
constructor(circuitName, retryAfter) {
super(`Circuit ${circuitName} is OPEN. Retry after ${retryAfter.toFixed(1)}s`);
this.name = 'CircuitBreakerOpenError';
this.circuitName = circuitName;
this.retryAfter = retryAfter;
}
}
class CircuitBreaker extends EventEmitter {
constructor(name, options = {}) {
super();
this.name = name;
this.config = {
failureThreshold: options.failureThreshold || 5,
successThreshold: options.successThreshold || 2,
timeout: options.timeout || 60000, // 60 seconds
halfOpenMaxCalls: options.halfOpenMaxCalls || 3,
excludedErrors: options.excludedErrors || []
};
this.state = CircuitState.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = null;
this.halfOpenCalls = 0;
// Metrics
this.metrics = {
totalCalls: 0,
totalFailures: 0,
totalRejections: 0,
totalSuccesses: 0
};
}
async execute(fn) {
this.checkState();
this.metrics.totalCalls++;
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
// Check if error should be excluded
const isExcluded = this.config.excludedErrors.some(
ErrorClass => error instanceof ErrorClass
);
if (!isExcluded) {
this.onFailure();
}
throw error;
}
}
// Wrap a function with circuit breaker
wrap(fn) {
return async (...args) => {
return this.execute(() => fn(...args));
};
}
checkState() {
if (this.state === CircuitState.CLOSED) {
return;
}
if (this.state === CircuitState.OPEN) {
if (this.shouldAttemptReset()) {
this.transitionToHalfOpen();
} else {
this.metrics.totalRejections++;
const retryAfter = (
this.lastFailureTime + this.config.timeout - Date.now()
) / 1000;
throw new CircuitBreakerOpenError(this.name, Math.max(0, retryAfter));
}
}
if (this.state === CircuitState.HALF_OPEN) {
if (this.halfOpenCalls >= this.config.halfOpenMaxCalls) {
this.metrics.totalRejections++;
throw new CircuitBreakerOpenError(this.name, 1.0);
}
this.halfOpenCalls++;
}
}
shouldAttemptReset() {
if (!this.lastFailureTime) return true;
return Date.now() - this.lastFailureTime >= this.config.timeout;
}
transitionToHalfOpen() {
console.log(`Circuit ${this.name}: OPEN -> HALF_OPEN`);
this.state = CircuitState.HALF_OPEN;
this.halfOpenCalls = 0;
this.successCount = 0;
this.emit('half-open');
}
onSuccess() {
this.metrics.totalSuccesses++;
if (this.state === CircuitState.HALF_OPEN) {
this.successCount++;
if (this.successCount >= this.config.successThreshold) {
this.close();
}
} else {
this.failureCount = 0;
}
}
onFailure() {
this.metrics.totalFailures++;
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.state === CircuitState.HALF_OPEN) {
this.open();
} else if (this.failureCount >= this.config.failureThreshold) {
this.open();
}
}
open() {
console.log(`Circuit ${this.name}: -> OPEN (failures: ${this.failureCount})`);
this.state = CircuitState.OPEN;
this.emit('open');
}
close() {
console.log(`Circuit ${this.name}: -> CLOSED`);
this.state = CircuitState.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.halfOpenCalls = 0;
this.emit('close');
}
getMetrics() {
return {
name: this.name,
state: this.state,
...this.metrics,
failureRate: this.metrics.totalFailures / Math.max(1, this.metrics.totalCalls),
currentFailureCount: this.failureCount
};
}
// Manual control
forceOpen() {
this.open();
}
forceClose() {
this.close();
}
}
// Usage
const paymentCircuit = new CircuitBreaker('payment-service', {
failureThreshold: 3,
successThreshold: 2,
timeout: 30000
});
// Event listeners
paymentCircuit.on('open', () => {
console.log('🔴 Payment circuit opened - switching to fallback');
// Alert, switch to backup, etc.
});
paymentCircuit.on('close', () => {
console.log('🟢 Payment circuit closed - back to normal');
});
// Wrap existing function
async function chargePayment(amount) {
const response = await fetch('https://api.payment.com/charge', {
method: 'POST',
body: JSON.stringify({ amount })
});
if (!response.ok) {
throw new Error(`Payment failed: ${response.status}`);
}
return response.json();
}
const safeChargePayment = paymentCircuit.wrap(chargePayment);
// With fallback
async function chargeWithFallback(amount) {
try {
return await safeChargePayment(amount);
} catch (error) {
if (error instanceof CircuitBreakerOpenError) {
// Use fallback payment processor
return await fallbackPaymentProcessor(amount);
}
throw error;
}
}
// Express middleware example
function circuitBreakerMiddleware(circuit) {
return (req, res, next) => {
req.circuit = circuit;
// Override res.json to track success/failure
const originalJson = res.json.bind(res);
res.json = function(data) {
if (res.statusCode >= 500) {
circuit.onFailure();
} else {
circuit.onSuccess();
}
return originalJson(data);
};
next();
};
}
Retry with Exponential Backoff
- Python
- JavaScript
Copy
import asyncio
import random
import time
from functools import wraps
from typing import Callable, TypeVar, Type, Tuple, Optional
from dataclasses import dataclass
import logging
T = TypeVar('T')
logger = logging.getLogger(__name__)
@dataclass
class RetryConfig:
max_retries: int = 5
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
on_retry: Optional[Callable[[Exception, int], None]] = None
class RetryExhausted(Exception):
"""All retries failed"""
def __init__(self, last_exception: Exception, attempts: int):
self.last_exception = last_exception
self.attempts = attempts
super().__init__(f"All {attempts} retry attempts failed: {last_exception}")
def retry_with_backoff(config: Optional[RetryConfig] = None):
"""Decorator for retry with exponential backoff"""
if config is None:
config = RetryConfig()
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(config.max_retries):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_retries - 1:
break
# Calculate delay with exponential backoff
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
# Add jitter to prevent thundering herd
if config.jitter:
delay = delay * (0.5 + random.random())
logger.warning(
f"Attempt {attempt + 1}/{config.max_retries} failed: {e}. "
f"Retrying in {delay:.2f}s"
)
if config.on_retry:
config.on_retry(e, attempt + 1)
await asyncio.sleep(delay)
raise RetryExhausted(last_exception, config.max_retries)
@wraps(func)
def sync_wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(config.max_retries):
try:
return func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_retries - 1:
break
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
if config.jitter:
delay = delay * (0.5 + random.random())
logger.warning(
f"Attempt {attempt + 1}/{config.max_retries} failed: {e}. "
f"Retrying in {delay:.2f}s"
)
if config.on_retry:
config.on_retry(e, attempt + 1)
time.sleep(delay)
raise RetryExhausted(last_exception, config.max_retries)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# Usage examples
@retry_with_backoff(RetryConfig(
max_retries=3,
base_delay=1.0,
retryable_exceptions=(ConnectionError, TimeoutError)
))
async def fetch_user_data(user_id: str):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
if resp.status == 503:
raise ConnectionError("Service unavailable")
return await resp.json()
# Retry with callback for metrics
def on_retry_callback(exception: Exception, attempt: int):
metrics.increment("api.retries", tags={"attempt": attempt})
@retry_with_backoff(RetryConfig(
max_retries=5,
on_retry=on_retry_callback
))
def call_external_api():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()
# Combine with circuit breaker
@payment_circuit
@retry_with_backoff(RetryConfig(max_retries=2))
async def charge_payment_resilient(amount: float):
"""Retries within circuit breaker protection"""
return await call_payment_service(amount)
Copy
class RetryConfig {
constructor(options = {}) {
this.maxRetries = options.maxRetries || 5;
this.baseDelay = options.baseDelay || 1000; // ms
this.maxDelay = options.maxDelay || 60000;
this.exponentialBase = options.exponentialBase || 2;
this.jitter = options.jitter !== false;
this.retryableErrors = options.retryableErrors || [Error];
this.onRetry = options.onRetry || null;
this.shouldRetry = options.shouldRetry || null; // Custom retry predicate
}
}
class RetryExhaustedError extends Error {
constructor(lastError, attempts) {
super(`All ${attempts} retry attempts failed: ${lastError.message}`);
this.name = 'RetryExhaustedError';
this.lastError = lastError;
this.attempts = attempts;
}
}
async function retryWithBackoff(fn, config = new RetryConfig()) {
let lastError;
for (let attempt = 0; attempt < config.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
// Check if we should retry this error
const isRetryable = config.shouldRetry
? config.shouldRetry(error)
: config.retryableErrors.some(ErrorClass => error instanceof ErrorClass);
if (!isRetryable || attempt === config.maxRetries - 1) {
break;
}
// Calculate delay with exponential backoff
let delay = Math.min(
config.baseDelay * Math.pow(config.exponentialBase, attempt),
config.maxDelay
);
// Add jitter
if (config.jitter) {
delay = delay * (0.5 + Math.random());
}
console.warn(
`Attempt ${attempt + 1}/${config.maxRetries} failed: ${error.message}. ` +
`Retrying in ${(delay / 1000).toFixed(2)}s`
);
if (config.onRetry) {
await config.onRetry(error, attempt + 1);
}
await sleep(delay);
}
}
throw new RetryExhaustedError(lastError, config.maxRetries);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Decorator-style wrapper
function withRetry(config = new RetryConfig()) {
return function(fn) {
return async function(...args) {
return retryWithBackoff(() => fn.apply(this, args), config);
};
};
}
// Usage examples
const fetchUserData = withRetry(new RetryConfig({
maxRetries: 3,
baseDelay: 1000,
shouldRetry: (error) => {
// Retry on network errors or 5xx
return error.code === 'ECONNREFUSED' ||
error.code === 'ETIMEDOUT' ||
(error.status && error.status >= 500);
}
}))(async function(userId) {
const response = await fetch(`https://api.example.com/users/${userId}`);
if (!response.ok) {
const error = new Error(`HTTP ${response.status}`);
error.status = response.status;
throw error;
}
return response.json();
});
// With metrics callback
async function callApiWithMetrics() {
return retryWithBackoff(
() => fetch('https://api.example.com/data'),
new RetryConfig({
maxRetries: 5,
onRetry: async (error, attempt) => {
// Send metrics
metrics.increment('api.retries', { attempt });
// Maybe alert if approaching max retries
if (attempt >= 4) {
await alertTeam(`API retry attempt ${attempt}`);
}
}
})
);
}
// Combine with circuit breaker
const paymentService = paymentCircuit.wrap(
withRetry(new RetryConfig({ maxRetries: 2 }))(
async function chargePayment(amount) {
const response = await fetch('https://api.payment.com/charge', {
method: 'POST',
body: JSON.stringify({ amount })
});
if (!response.ok) throw new Error('Payment failed');
return response.json();
}
)
);
// Express error handling middleware with retry
function retryMiddleware(config) {
return (req, res, next) => {
// Attach retry helper to request
req.retryable = async (fn) => {
return retryWithBackoff(fn, config);
};
next();
};
}
// Usage in route
app.get('/users/:id', retryMiddleware(new RetryConfig()), async (req, res) => {
try {
const user = await req.retryable(() => fetchUserFromDb(req.params.id));
res.json(user);
} catch (error) {
if (error instanceof RetryExhaustedError) {
res.status(503).json({ error: 'Service temporarily unavailable' });
} else {
res.status(500).json({ error: error.message });
}
}
});
Bulkhead Pattern
Isolate failures to prevent cascade.- Python
- JavaScript
Copy
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Optional, Callable, TypeVar, Any
from functools import wraps
from contextlib import asynccontextmanager
import time
from enum import Enum
T = TypeVar('T')
class BulkheadRejectedException(Exception):
"""Raised when bulkhead rejects request"""
def __init__(self, bulkhead_name: str, reason: str):
self.bulkhead_name = bulkhead_name
self.reason = reason
super().__init__(f"Bulkhead {bulkhead_name} rejected: {reason}")
@dataclass
class BulkheadConfig:
max_concurrent: int = 10
max_queue: int = 100
queue_timeout: float = 5.0 # seconds
@dataclass
class BulkheadMetrics:
active_count: int = 0
queue_size: int = 0
total_accepted: int = 0
total_rejected: int = 0
total_timeout: int = 0
class Bulkhead:
"""
Bulkhead pattern - isolate components to prevent cascade failures.
Limits concurrent execution and queues excess requests.
"""
def __init__(self, name: str, config: BulkheadConfig = None):
self.name = name
self.config = config or BulkheadConfig()
self.semaphore = asyncio.Semaphore(self.config.max_concurrent)
self.metrics = BulkheadMetrics()
self._queue = asyncio.Queue(maxsize=self.config.max_queue)
def __call__(self, func: Callable) -> Callable:
"""Use as decorator"""
@wraps(func)
async def wrapper(*args, **kwargs):
async with self.acquire():
return await func(*args, **kwargs)
return wrapper
@asynccontextmanager
async def acquire(self):
"""Acquire a slot in the bulkhead"""
# Check if we can get a semaphore slot
if self.semaphore.locked() and self._queue.full():
self.metrics.total_rejected += 1
raise BulkheadRejectedException(
self.name,
f"Max concurrent ({self.config.max_concurrent}) and queue ({self.config.max_queue}) reached"
)
# Try to acquire with timeout
try:
await asyncio.wait_for(
self.semaphore.acquire(),
timeout=self.config.queue_timeout
)
except asyncio.TimeoutError:
self.metrics.total_timeout += 1
raise BulkheadRejectedException(
self.name,
f"Queue timeout after {self.config.queue_timeout}s"
)
self.metrics.active_count += 1
self.metrics.total_accepted += 1
try:
yield
finally:
self.metrics.active_count -= 1
self.semaphore.release()
def get_metrics(self) -> dict:
return {
"name": self.name,
"active": self.metrics.active_count,
"max_concurrent": self.config.max_concurrent,
"accepted": self.metrics.total_accepted,
"rejected": self.metrics.total_rejected,
"timeouts": self.metrics.total_timeout,
"utilization": self.metrics.active_count / self.config.max_concurrent
}
class ThreadPoolBulkhead:
"""
Thread pool bulkhead for sync operations.
Each pool isolates a group of operations.
"""
def __init__(self, name: str, max_workers: int = 10):
from concurrent.futures import ThreadPoolExecutor
self.name = name
self.executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=f"bulkhead-{name}"
)
self.max_workers = max_workers
def submit(self, func: Callable, *args, **kwargs):
"""Submit work to the bulkhead thread pool"""
return self.executor.submit(func, *args, **kwargs)
async def run(self, func: Callable, *args, **kwargs) -> Any:
"""Run sync function in bulkhead pool"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: func(*args, **kwargs)
)
# Bulkhead manager for multiple isolated pools
class BulkheadManager:
"""Manage multiple bulkheads for different services"""
def __init__(self):
self.bulkheads: Dict[str, Bulkhead] = {}
def create(self, name: str, config: BulkheadConfig = None) -> Bulkhead:
bulkhead = Bulkhead(name, config)
self.bulkheads[name] = bulkhead
return bulkhead
def get(self, name: str) -> Optional[Bulkhead]:
return self.bulkheads.get(name)
def get_all_metrics(self) -> Dict[str, dict]:
return {
name: bh.get_metrics()
for name, bh in self.bulkheads.items()
}
# Usage
bulkhead_manager = BulkheadManager()
# Separate pools for different services
payment_bulkhead = bulkhead_manager.create(
"payment",
BulkheadConfig(max_concurrent=5, max_queue=20)
)
inventory_bulkhead = bulkhead_manager.create(
"inventory",
BulkheadConfig(max_concurrent=10, max_queue=50)
)
notification_bulkhead = bulkhead_manager.create(
"notification",
BulkheadConfig(max_concurrent=20, max_queue=100)
)
@payment_bulkhead
async def process_payment(order_id: str, amount: float):
"""Isolated payment processing"""
await asyncio.sleep(0.5) # Simulate API call
return {"status": "success", "order_id": order_id}
@inventory_bulkhead
async def check_inventory(product_id: str):
"""Isolated inventory check"""
await asyncio.sleep(0.1)
return {"available": True, "quantity": 100}
# Even if payment service is slow/failing,
# inventory and notifications continue working
async def handle_order(order):
try:
# These run in isolated pools
inventory = await check_inventory(order["product_id"])
payment = await process_payment(order["id"], order["amount"])
await send_notification(order["user_id"])
return {"status": "completed"}
except BulkheadRejectedException as e:
return {"status": "retry_later", "reason": str(e)}
Copy
class BulkheadRejectedException extends Error {
constructor(bulkheadName, reason) {
super(`Bulkhead ${bulkheadName} rejected: ${reason}`);
this.name = 'BulkheadRejectedException';
this.bulkheadName = bulkheadName;
this.reason = reason;
}
}
class Bulkhead {
constructor(name, options = {}) {
this.name = name;
this.maxConcurrent = options.maxConcurrent || 10;
this.maxQueue = options.maxQueue || 100;
this.queueTimeout = options.queueTimeout || 5000;
this.activeCount = 0;
this.queue = [];
// Metrics
this.metrics = {
totalAccepted: 0,
totalRejected: 0,
totalTimeout: 0
};
}
async execute(fn) {
// Check if we can accept the request
if (this.activeCount >= this.maxConcurrent) {
if (this.queue.length >= this.maxQueue) {
this.metrics.totalRejected++;
throw new BulkheadRejectedException(
this.name,
`Max concurrent (${this.maxConcurrent}) and queue (${this.maxQueue}) reached`
);
}
// Queue the request
try {
await this.waitForSlot();
} catch (error) {
this.metrics.totalTimeout++;
throw new BulkheadRejectedException(
this.name,
`Queue timeout after ${this.queueTimeout}ms`
);
}
}
this.activeCount++;
this.metrics.totalAccepted++;
try {
return await fn();
} finally {
this.activeCount--;
this.releaseNext();
}
}
waitForSlot() {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
// Remove from queue
const index = this.queue.findIndex(item => item.resolve === resolve);
if (index !== -1) {
this.queue.splice(index, 1);
}
reject(new Error('Queue timeout'));
}, this.queueTimeout);
this.queue.push({
resolve: () => {
clearTimeout(timeout);
resolve();
},
reject,
enqueuedAt: Date.now()
});
});
}
releaseNext() {
if (this.queue.length > 0 && this.activeCount < this.maxConcurrent) {
const next = this.queue.shift();
next.resolve();
}
}
// Wrap a function with bulkhead
wrap(fn) {
return async (...args) => {
return this.execute(() => fn(...args));
};
}
getMetrics() {
return {
name: this.name,
active: this.activeCount,
queueSize: this.queue.length,
maxConcurrent: this.maxConcurrent,
maxQueue: this.maxQueue,
...this.metrics,
utilization: this.activeCount / this.maxConcurrent
};
}
}
// Bulkhead manager
class BulkheadManager {
constructor() {
this.bulkheads = new Map();
}
create(name, options) {
const bulkhead = new Bulkhead(name, options);
this.bulkheads.set(name, bulkhead);
return bulkhead;
}
get(name) {
return this.bulkheads.get(name);
}
getAllMetrics() {
const metrics = {};
for (const [name, bulkhead] of this.bulkheads) {
metrics[name] = bulkhead.getMetrics();
}
return metrics;
}
}
// Usage
const manager = new BulkheadManager();
// Create isolated pools for different services
const paymentBulkhead = manager.create('payment', {
maxConcurrent: 5,
maxQueue: 20,
queueTimeout: 3000
});
const inventoryBulkhead = manager.create('inventory', {
maxConcurrent: 10,
maxQueue: 50
});
const notificationBulkhead = manager.create('notification', {
maxConcurrent: 20,
maxQueue: 100
});
// Wrap service functions
const processPayment = paymentBulkhead.wrap(async (orderId, amount) => {
const response = await fetch('https://payment.api/charge', {
method: 'POST',
body: JSON.stringify({ orderId, amount })
});
return response.json();
});
const checkInventory = inventoryBulkhead.wrap(async (productId) => {
const response = await fetch(`https://inventory.api/check/${productId}`);
return response.json();
});
const sendNotification = notificationBulkhead.wrap(async (userId, message) => {
await fetch('https://notification.api/send', {
method: 'POST',
body: JSON.stringify({ userId, message })
});
});
// Handle order with isolated failures
async function handleOrder(order) {
const results = {
inventory: null,
payment: null,
notification: null
};
try {
results.inventory = await checkInventory(order.productId);
} catch (error) {
if (error instanceof BulkheadRejectedException) {
return { status: 'retry_later', service: 'inventory' };
}
throw error;
}
try {
results.payment = await processPayment(order.id, order.amount);
} catch (error) {
if (error instanceof BulkheadRejectedException) {
// Payment bulkhead full, but inventory still works
return { status: 'retry_later', service: 'payment' };
}
throw error;
}
// Fire and forget notification
sendNotification(order.userId, 'Order confirmed').catch(console.error);
return { status: 'completed', ...results };
}
// Express middleware for bulkhead metrics
function bulkheadMetricsEndpoint(manager) {
return (req, res) => {
res.json(manager.getAllMetrics());
};
}
// app.get('/metrics/bulkheads', bulkheadMetricsEndpoint(manager));
Key Takeaways
| Concept | Remember |
|---|---|
| CAP Theorem | Pick 2 of 3: Consistency, Availability, Partition Tolerance |
| Consensus | Use Raft for leader election, state machine replication |
| Transactions | 2PC for strong consistency, Sagas for microservices |
| Failures | Design for failure: retries, circuit breakers, bulkheads |
Distributed systems are hard. Every network call can fail, be slow, or return stale data. Design for failure from day one.