Skip to main content
CAP Theorem Explained

What are Distributed Systems?

A distributed system is a collection of independent computers that appear to users as a single coherent system.

Why Distributed?

Scalability

Handle more load than a single machine

Reliability

Survive individual machine failures

Latency

Serve users from nearby locations

Compliance

Data locality requirements

The Eight Fallacies

Things developers wrongly assume about networks:
  1. The network is reliable → Packets get lost
  2. Latency is zero → Cross-region calls take 100+ ms
  3. Bandwidth is infinite → Large payloads slow things down
  4. The network is secure → Always encrypt
  5. Topology doesn’t change → Servers come and go
  6. There is one administrator → Multiple teams, policies
  7. Transport cost is zero → Data transfer costs money
  8. The network is homogeneous → Different hardware everywhere

Consistency Models

Strong Consistency

All nodes see the same data at the same time. Strong Consistency

Eventual Consistency

All nodes will eventually have the same data. Eventual Consistency

Consistency Implementation Examples

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, List, Any, Dict
from datetime import datetime
import hashlib
import json
import asyncio

@dataclass
class VectorClock:
    """Track causality in distributed systems"""
    clocks: Dict[str, int] = field(default_factory=dict)
    
    def increment(self, node_id: str) -> None:
        self.clocks[node_id] = self.clocks.get(node_id, 0) + 1
    
    def merge(self, other: 'VectorClock') -> 'VectorClock':
        """Merge two vector clocks (take max of each)"""
        merged = VectorClock()
        all_nodes = set(self.clocks.keys()) | set(other.clocks.keys())
        for node in all_nodes:
            merged.clocks[node] = max(
                self.clocks.get(node, 0),
                other.clocks.get(node, 0)
            )
        return merged
    
    def is_concurrent_with(self, other: 'VectorClock') -> bool:
        """Check if two events are concurrent (neither happened before the other)"""
        self_greater = any(
            self.clocks.get(k, 0) > other.clocks.get(k, 0)
            for k in self.clocks
        )
        other_greater = any(
            other.clocks.get(k, 0) > self.clocks.get(k, 0)
            for k in other.clocks
        )
        return self_greater and other_greater


@dataclass
class ReplicatedValue:
    """Last-Write-Wins Register for eventual consistency"""
    value: Any
    timestamp: float
    node_id: str
    vector_clock: VectorClock
    
    def merge(self, other: 'ReplicatedValue') -> 'ReplicatedValue':
        """Merge with conflict resolution"""
        # Check for concurrent updates
        if self.vector_clock.is_concurrent_with(other.vector_clock):
            # Conflict! Use timestamp + node_id as tiebreaker
            if (self.timestamp, self.node_id) > (other.timestamp, other.node_id):
                winner = self
            else:
                winner = other
        elif self.timestamp > other.timestamp:
            winner = self
        else:
            winner = other
            
        return ReplicatedValue(
            value=winner.value,
            timestamp=winner.timestamp,
            node_id=winner.node_id,
            vector_clock=self.vector_clock.merge(other.vector_clock)
        )


class EventuallyConsistentStore:
    """Distributed key-value store with eventual consistency"""
    
    def __init__(self, node_id: str, replicas: List['EventuallyConsistentStore'] = None):
        self.node_id = node_id
        self.data: Dict[str, ReplicatedValue] = {}
        self.vector_clock = VectorClock()
        self.replicas = replicas or []
        self.pending_sync: List[tuple] = []
    
    def write(self, key: str, value: Any) -> None:
        """Write with vector clock tracking"""
        self.vector_clock.increment(self.node_id)
        
        replicated = ReplicatedValue(
            value=value,
            timestamp=datetime.now().timestamp(),
            node_id=self.node_id,
            vector_clock=VectorClock(clocks=self.vector_clock.clocks.copy())
        )
        
        # Local write
        if key in self.data:
            self.data[key] = self.data[key].merge(replicated)
        else:
            self.data[key] = replicated
        
        # Queue for async replication
        self.pending_sync.append((key, replicated))
    
    def read(self, key: str) -> Optional[Any]:
        """Read (may return stale data)"""
        if key in self.data:
            return self.data[key].value
        return None
    
    async def sync_to_replicas(self) -> None:
        """Background sync to other replicas"""
        while self.pending_sync:
            key, value = self.pending_sync.pop(0)
            for replica in self.replicas:
                try:
                    await replica.receive_sync(key, value)
                except Exception as e:
                    # Re-queue failed sync
                    self.pending_sync.append((key, value))
                    print(f"Sync failed to {replica.node_id}: {e}")
    
    async def receive_sync(self, key: str, incoming: ReplicatedValue) -> None:
        """Receive and merge update from another replica"""
        if key in self.data:
            self.data[key] = self.data[key].merge(incoming)
        else:
            self.data[key] = incoming
        
        # Update our vector clock
        self.vector_clock = self.vector_clock.merge(incoming.vector_clock)


# Usage example
async def demo_eventual_consistency():
    # Three replicas
    replica_a = EventuallyConsistentStore("node-a")
    replica_b = EventuallyConsistentStore("node-b")
    replica_c = EventuallyConsistentStore("node-c")
    
    # Cross-reference replicas
    replica_a.replicas = [replica_b, replica_c]
    replica_b.replicas = [replica_a, replica_c]
    replica_c.replicas = [replica_a, replica_b]
    
    # Concurrent writes (simulating network partition)
    replica_a.write("user:1:name", "Alice")
    replica_b.write("user:1:name", "Alicia")  # Conflict!
    
    # Before sync: different values
    print(f"A sees: {replica_a.read('user:1:name')}")  # Alice
    print(f"B sees: {replica_b.read('user:1:name')}")  # Alicia
    
    # After sync: converges to same value
    await replica_a.sync_to_replicas()
    await replica_b.sync_to_replicas()
    
    print(f"After sync - A: {replica_a.read('user:1:name')}")
    print(f"After sync - B: {replica_b.read('user:1:name')}")
    # Both will show the same value (winner based on timestamp + node_id)

Consistency Levels

LevelDescriptionTrade-off
StrongAll reads see latest writeHigh latency
EventualReads may be staleLow latency
CausalRespects cause-effectMedium
Read-your-writesSee your own writesGood UX
SessionConsistency within sessionPractical

Consensus Algorithms

How do distributed nodes agree on a value?

Paxos (Simplified)

Paxos Simplified

Raft (Easier to Understand)

Raft Consensus

Raft Implementation

import asyncio
import random
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime

class NodeState(Enum):
    FOLLOWER = "follower"
    CANDIDATE = "candidate"
    LEADER = "leader"

@dataclass
class LogEntry:
    term: int
    index: int
    command: str
    data: dict

@dataclass
class RaftNode:
    """Simplified Raft consensus node"""
    node_id: str
    state: NodeState = NodeState.FOLLOWER
    current_term: int = 0
    voted_for: Optional[str] = None
    log: List[LogEntry] = field(default_factory=list)
    commit_index: int = 0
    last_applied: int = 0
    
    # Leader state
    next_index: Dict[str, int] = field(default_factory=dict)
    match_index: Dict[str, int] = field(default_factory=dict)
    
    # Cluster info
    peers: List[str] = field(default_factory=list)
    
    # Timers
    election_timeout: float = 0
    heartbeat_interval: float = 0.15  # 150ms
    
    def __post_init__(self):
        self.reset_election_timeout()
    
    def reset_election_timeout(self):
        """Randomized election timeout (150-300ms typically)"""
        self.election_timeout = random.uniform(0.15, 0.30)
    
    async def start_election(self) -> bool:
        """Candidate starts election"""
        self.state = NodeState.CANDIDATE
        self.current_term += 1
        self.voted_for = self.node_id
        votes = 1  # Vote for self
        
        # Request votes from all peers
        vote_requests = []
        for peer_id in self.peers:
            vote_requests.append(
                self.request_vote(peer_id)
            )
        
        results = await asyncio.gather(*vote_requests, return_exceptions=True)
        votes += sum(1 for r in results if r is True)
        
        # Check if won election
        quorum = (len(self.peers) + 1) // 2 + 1
        if votes >= quorum:
            await self.become_leader()
            return True
        
        return False
    
    async def become_leader(self):
        """Transition to leader state"""
        self.state = NodeState.LEADER
        print(f"[{self.node_id}] Became leader for term {self.current_term}")
        
        # Initialize leader state
        for peer_id in self.peers:
            self.next_index[peer_id] = len(self.log) + 1
            self.match_index[peer_id] = 0
        
        # Start heartbeat
        asyncio.create_task(self.send_heartbeats())
    
    async def send_heartbeats(self):
        """Leader sends periodic heartbeats"""
        while self.state == NodeState.LEADER:
            for peer_id in self.peers:
                asyncio.create_task(self.append_entries(peer_id, []))
            await asyncio.sleep(self.heartbeat_interval)
    
    async def request_vote(self, peer_id: str) -> bool:
        """Request vote from a peer"""
        # In real implementation, this would be RPC
        last_log_index = len(self.log)
        last_log_term = self.log[-1].term if self.log else 0
        
        # Simulate RPC response
        # Peer grants vote if:
        # 1. Candidate's term >= peer's term
        # 2. Peer hasn't voted for someone else
        # 3. Candidate's log is at least as up-to-date
        return True  # Simplified
    
    async def append_entries(
        self, 
        peer_id: str, 
        entries: List[LogEntry]
    ) -> bool:
        """AppendEntries RPC (heartbeat + log replication)"""
        prev_log_index = self.next_index.get(peer_id, 1) - 1
        prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0
        
        # Simulate RPC
        success = True  # In real implementation, check response
        
        if success and entries:
            self.match_index[peer_id] = prev_log_index + len(entries)
            self.next_index[peer_id] = self.match_index[peer_id] + 1
            
            # Update commit index
            self.update_commit_index()
        
        return success
    
    def update_commit_index(self):
        """Leader updates commit index based on replication"""
        for n in range(self.commit_index + 1, len(self.log) + 1):
            replicated_count = 1  # Count self
            for peer_id in self.peers:
                if self.match_index.get(peer_id, 0) >= n:
                    replicated_count += 1
            
            quorum = (len(self.peers) + 1) // 2 + 1
            if replicated_count >= quorum:
                if self.log[n - 1].term == self.current_term:
                    self.commit_index = n
    
    def propose(self, command: str, data: dict) -> Optional[LogEntry]:
        """Client proposes a command (only leader can accept)"""
        if self.state != NodeState.LEADER:
            return None  # Redirect to leader
        
        entry = LogEntry(
            term=self.current_term,
            index=len(self.log) + 1,
            command=command,
            data=data
        )
        self.log.append(entry)
        
        # Replicate to followers (async)
        asyncio.create_task(self.replicate_log())
        
        return entry
    
    async def replicate_log(self):
        """Replicate latest log entries to all followers"""
        for peer_id in self.peers:
            next_idx = self.next_index.get(peer_id, 1)
            entries_to_send = self.log[next_idx - 1:]
            
            if entries_to_send:
                await self.append_entries(peer_id, entries_to_send)


# Leader election simulation
async def simulate_raft():
    nodes = [
        RaftNode(node_id="node-1", peers=["node-2", "node-3"]),
        RaftNode(node_id="node-2", peers=["node-1", "node-3"]),
        RaftNode(node_id="node-3", peers=["node-1", "node-2"]),
    ]
    
    # Simulate node-1 winning election
    leader = nodes[0]
    await leader.start_election()
    
    # Propose a command
    if leader.state == NodeState.LEADER:
        entry = leader.propose("SET", {"key": "name", "value": "Alice"})
        print(f"Proposed: {entry}")

Distributed Transactions

Two-Phase Commit (2PC)

Two-Phase Commit

Two-Phase Commit Implementation

import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import uuid

class TxState(Enum):
    PENDING = "pending"
    PREPARED = "prepared"
    COMMITTED = "committed"
    ABORTED = "aborted"

@dataclass
class Transaction:
    tx_id: str
    state: TxState = TxState.PENDING
    participants: List[str] = field(default_factory=list)
    votes: Dict[str, bool] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.now)
    timeout: timedelta = timedelta(seconds=30)

class TwoPhaseCommitCoordinator:
    """Coordinates distributed transactions using 2PC"""
    
    def __init__(self, coordinator_id: str):
        self.coordinator_id = coordinator_id
        self.transactions: Dict[str, Transaction] = {}
        self.participants: Dict[str, 'TransactionParticipant'] = {}
    
    def register_participant(self, participant: 'TransactionParticipant'):
        self.participants[participant.participant_id] = participant
    
    async def begin_transaction(
        self, 
        participant_ids: List[str],
        operations: Dict[str, Any]
    ) -> str:
        """Start a new distributed transaction"""
        tx_id = f"tx-{uuid.uuid4().hex[:8]}"
        tx = Transaction(tx_id=tx_id, participants=participant_ids)
        self.transactions[tx_id] = tx
        
        print(f"[Coordinator] Starting transaction {tx_id}")
        
        try:
            # Phase 1: Prepare
            prepared = await self.prepare_phase(tx_id, operations)
            
            if prepared:
                # Phase 2: Commit
                await self.commit_phase(tx_id)
                return tx_id
            else:
                # Phase 2: Abort
                await self.abort_phase(tx_id)
                raise Exception("Transaction aborted - not all participants prepared")
                
        except asyncio.TimeoutError:
            await self.abort_phase(tx_id)
            raise Exception("Transaction timed out")
    
    async def prepare_phase(
        self, 
        tx_id: str, 
        operations: Dict[str, Any]
    ) -> bool:
        """Phase 1: Ask all participants to prepare"""
        tx = self.transactions[tx_id]
        print(f"[Coordinator] Phase 1: PREPARE for {tx_id}")
        
        prepare_tasks = []
        for participant_id in tx.participants:
            participant = self.participants.get(participant_id)
            if participant:
                op = operations.get(participant_id, {})
                prepare_tasks.append(
                    participant.prepare(tx_id, op)
                )
        
        # Wait for all votes with timeout
        try:
            results = await asyncio.wait_for(
                asyncio.gather(*prepare_tasks, return_exceptions=True),
                timeout=tx.timeout.total_seconds()
            )
            
            # Check all votes
            for participant_id, result in zip(tx.participants, results):
                if isinstance(result, Exception):
                    tx.votes[participant_id] = False
                else:
                    tx.votes[participant_id] = result
            
            all_prepared = all(tx.votes.values())
            tx.state = TxState.PREPARED if all_prepared else TxState.PENDING
            
            return all_prepared
            
        except asyncio.TimeoutError:
            return False
    
    async def commit_phase(self, tx_id: str):
        """Phase 2: Tell all participants to commit"""
        tx = self.transactions[tx_id]
        print(f"[Coordinator] Phase 2: COMMIT for {tx_id}")
        
        commit_tasks = []
        for participant_id in tx.participants:
            participant = self.participants.get(participant_id)
            if participant:
                commit_tasks.append(participant.commit(tx_id))
        
        await asyncio.gather(*commit_tasks, return_exceptions=True)
        tx.state = TxState.COMMITTED
        print(f"[Coordinator] Transaction {tx_id} COMMITTED")
    
    async def abort_phase(self, tx_id: str):
        """Phase 2: Tell all participants to abort"""
        tx = self.transactions[tx_id]
        print(f"[Coordinator] Phase 2: ABORT for {tx_id}")
        
        abort_tasks = []
        for participant_id in tx.participants:
            participant = self.participants.get(participant_id)
            if participant:
                abort_tasks.append(participant.abort(tx_id))
        
        await asyncio.gather(*abort_tasks, return_exceptions=True)
        tx.state = TxState.ABORTED
        print(f"[Coordinator] Transaction {tx_id} ABORTED")


class TransactionParticipant:
    """Participant in a distributed transaction"""
    
    def __init__(self, participant_id: str):
        self.participant_id = participant_id
        self.prepared_data: Dict[str, Any] = {}
        self.committed_data: Dict[str, Any] = {}
        self.undo_log: Dict[str, Any] = {}
    
    async def prepare(self, tx_id: str, operation: Dict) -> bool:
        """Prepare to commit - acquire locks, validate, write to redo log"""
        print(f"[{self.participant_id}] Preparing {tx_id}: {operation}")
        
        try:
            # Simulate validation and lock acquisition
            await asyncio.sleep(0.1)  # Simulate I/O
            
            # Write to redo log (would be durable in real implementation)
            self.prepared_data[tx_id] = operation
            
            # Save undo information
            key = operation.get("key")
            if key:
                self.undo_log[tx_id] = {
                    "key": key,
                    "old_value": self.committed_data.get(key)
                }
            
            print(f"[{self.participant_id}] Voted YES for {tx_id}")
            return True
            
        except Exception as e:
            print(f"[{self.participant_id}] Voted NO for {tx_id}: {e}")
            return False
    
    async def commit(self, tx_id: str):
        """Commit the prepared transaction"""
        print(f"[{self.participant_id}] Committing {tx_id}")
        
        operation = self.prepared_data.pop(tx_id, {})
        
        # Apply the operation
        key = operation.get("key")
        value = operation.get("value")
        if key:
            self.committed_data[key] = value
        
        # Clear undo log
        self.undo_log.pop(tx_id, None)
    
    async def abort(self, tx_id: str):
        """Abort - rollback using undo log"""
        print(f"[{self.participant_id}] Aborting {tx_id}")
        
        # Rollback using undo log
        undo = self.undo_log.pop(tx_id, None)
        if undo:
            key = undo["key"]
            old_value = undo["old_value"]
            if old_value is not None:
                self.committed_data[key] = old_value
            elif key in self.committed_data:
                del self.committed_data[key]
        
        self.prepared_data.pop(tx_id, None)


# Usage example
async def demo_2pc():
    coordinator = TwoPhaseCommitCoordinator("coordinator-1")
    
    # Register participants (e.g., database shards)
    orders_db = TransactionParticipant("orders-db")
    inventory_db = TransactionParticipant("inventory-db")
    
    coordinator.register_participant(orders_db)
    coordinator.register_participant(inventory_db)
    
    # Execute distributed transaction
    try:
        tx_id = await coordinator.begin_transaction(
            participant_ids=["orders-db", "inventory-db"],
            operations={
                "orders-db": {"key": "order:123", "value": {"product": "laptop", "qty": 1}},
                "inventory-db": {"key": "product:laptop:stock", "value": 99}
            }
        )
        print(f"Transaction {tx_id} completed successfully!")
        
    except Exception as e:
        print(f"Transaction failed: {e}")

asyncio.run(demo_2pc())

Saga Pattern

Saga Pattern

Choreography vs Orchestration

Choreography (Event-driven)

Order ─► OrderCreated event

    ┌─────────┼─────────┐
    ▼         ▼         ▼
Inventory  Payment  Shipping
    │         │         │
    └─────────┴─────────┘
          Events

• No central controller
• Services react to events
• More decoupled


Orchestration (Command-driven)

    ┌───────────────────┐
    │   Saga Manager    │
    └─────────┬─────────┘

    ┌─────────┼─────────┐
    ▼         ▼         ▼
Inventory  Payment  Shipping
    
• Central controller
• Explicit flow control
• Easier to understand

Handling Failures

Circuit Breaker Pattern

Circuit Breaker
import time
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Generic, Optional, Any
from functools import wraps
from datetime import datetime, timedelta
import logging

T = TypeVar('T')
logger = logging.getLogger(__name__)

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    success_threshold: int = 2  # Successes needed to close from half-open
    timeout: timedelta = timedelta(seconds=60)
    half_open_max_calls: int = 3
    excluded_exceptions: tuple = ()  # Exceptions that don't count as failures

class CircuitBreakerOpenException(Exception):
    """Raised when circuit breaker is open"""
    def __init__(self, circuit_name: str, retry_after: float):
        self.circuit_name = circuit_name
        self.retry_after = retry_after
        super().__init__(f"Circuit {circuit_name} is OPEN. Retry after {retry_after:.1f}s")

@dataclass
class CircuitBreaker:
    """Production-ready circuit breaker with metrics"""
    name: str
    config: CircuitBreakerConfig = field(default_factory=CircuitBreakerConfig)
    
    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count: int = field(default=0, init=False)
    success_count: int = field(default=0, init=False)
    last_failure_time: Optional[datetime] = field(default=None, init=False)
    half_open_calls: int = field(default=0, init=False)
    
    # Metrics
    total_calls: int = field(default=0, init=False)
    total_failures: int = field(default=0, init=False)
    total_rejections: int = field(default=0, init=False)

    def __call__(self, func: Callable) -> Callable:
        """Use as decorator"""
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            return await self.call_async(lambda: func(*args, **kwargs))
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            return self.call_sync(lambda: func(*args, **kwargs))
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

    def call_sync(self, func: Callable[[], T]) -> T:
        """Execute function with circuit breaker (sync)"""
        self._check_state()
        self.total_calls += 1
        
        try:
            result = func()
            self._on_success()
            return result
        except self.config.excluded_exceptions:
            raise  # Don't count as failure
        except Exception as e:
            self._on_failure()
            raise

    async def call_async(self, func: Callable) -> Any:
        """Execute function with circuit breaker (async)"""
        self._check_state()
        self.total_calls += 1
        
        try:
            result = await func() if asyncio.iscoroutinefunction(func) else func()
            self._on_success()
            return result
        except self.config.excluded_exceptions:
            raise
        except Exception as e:
            self._on_failure()
            raise

    def _check_state(self):
        """Check if request should be allowed"""
        if self.state == CircuitState.CLOSED:
            return
        
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self._transition_to_half_open()
            else:
                self.total_rejections += 1
                retry_after = (
                    self.last_failure_time + self.config.timeout - datetime.now()
                ).total_seconds()
                raise CircuitBreakerOpenException(self.name, max(0, retry_after))
        
        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.config.half_open_max_calls:
                self.total_rejections += 1
                raise CircuitBreakerOpenException(self.name, 1.0)
            self.half_open_calls += 1

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to try again"""
        if self.last_failure_time is None:
            return True
        return datetime.now() - self.last_failure_time >= self.config.timeout

    def _transition_to_half_open(self):
        """Move to half-open state"""
        logger.info(f"Circuit {self.name}: OPEN -> HALF_OPEN")
        self.state = CircuitState.HALF_OPEN
        self.half_open_calls = 0
        self.success_count = 0

    def _on_success(self):
        """Handle successful call"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.config.success_threshold:
                self._close()
        else:
            self.failure_count = 0  # Reset on success

    def _on_failure(self):
        """Handle failed call"""
        self.total_failures += 1
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.state == CircuitState.HALF_OPEN:
            self._open()
        elif self.failure_count >= self.config.failure_threshold:
            self._open()

    def _open(self):
        """Open the circuit"""
        logger.warning(f"Circuit {self.name}: -> OPEN (failures: {self.failure_count})")
        self.state = CircuitState.OPEN

    def _close(self):
        """Close the circuit"""
        logger.info(f"Circuit {self.name}: -> CLOSED")
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.half_open_calls = 0

    @property
    def metrics(self) -> dict:
        """Get circuit breaker metrics"""
        return {
            "name": self.name,
            "state": self.state.value,
            "total_calls": self.total_calls,
            "total_failures": self.total_failures,
            "total_rejections": self.total_rejections,
            "failure_rate": self.total_failures / max(1, self.total_calls),
            "current_failure_count": self.failure_count
        }


# Usage as decorator
payment_circuit = CircuitBreaker(
    name="payment-service",
    config=CircuitBreakerConfig(
        failure_threshold=3,
        success_threshold=2,
        timeout=timedelta(seconds=30)
    )
)

@payment_circuit
async def call_payment_service(amount: float):
    # Simulate external API call
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.payment.com/charge",
            json={"amount": amount}
        ) as response:
            if response.status != 200:
                raise Exception(f"Payment failed: {response.status}")
            return await response.json()


# Usage with context manager
class CircuitBreakerContext:
    def __init__(self, circuit: CircuitBreaker):
        self.circuit = circuit
    
    def __enter__(self):
        self.circuit._check_state()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.circuit._on_success()
        elif exc_type not in self.circuit.config.excluded_exceptions:
            self.circuit._on_failure()
        return False  # Don't suppress exceptions


# Example with fallback
async def get_user_with_fallback(user_id: str):
    try:
        return await call_user_service(user_id)
    except CircuitBreakerOpenException:
        # Fallback: return cached data
        return get_cached_user(user_id)

Retry with Exponential Backoff

import asyncio
import random
import time
from functools import wraps
from typing import Callable, TypeVar, Type, Tuple, Optional
from dataclasses import dataclass
import logging

T = TypeVar('T')
logger = logging.getLogger(__name__)

@dataclass
class RetryConfig:
    max_retries: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
    on_retry: Optional[Callable[[Exception, int], None]] = None

class RetryExhausted(Exception):
    """All retries failed"""
    def __init__(self, last_exception: Exception, attempts: int):
        self.last_exception = last_exception
        self.attempts = attempts
        super().__init__(f"All {attempts} retry attempts failed: {last_exception}")

def retry_with_backoff(config: Optional[RetryConfig] = None):
    """Decorator for retry with exponential backoff"""
    if config is None:
        config = RetryConfig()
    
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def async_wrapper(*args, **kwargs) -> T:
            last_exception = None
            
            for attempt in range(config.max_retries):
                try:
                    return await func(*args, **kwargs)
                    
                except config.retryable_exceptions as e:
                    last_exception = e
                    
                    if attempt == config.max_retries - 1:
                        break
                    
                    # Calculate delay with exponential backoff
                    delay = min(
                        config.base_delay * (config.exponential_base ** attempt),
                        config.max_delay
                    )
                    
                    # Add jitter to prevent thundering herd
                    if config.jitter:
                        delay = delay * (0.5 + random.random())
                    
                    logger.warning(
                        f"Attempt {attempt + 1}/{config.max_retries} failed: {e}. "
                        f"Retrying in {delay:.2f}s"
                    )
                    
                    if config.on_retry:
                        config.on_retry(e, attempt + 1)
                    
                    await asyncio.sleep(delay)
            
            raise RetryExhausted(last_exception, config.max_retries)
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs) -> T:
            last_exception = None
            
            for attempt in range(config.max_retries):
                try:
                    return func(*args, **kwargs)
                    
                except config.retryable_exceptions as e:
                    last_exception = e
                    
                    if attempt == config.max_retries - 1:
                        break
                    
                    delay = min(
                        config.base_delay * (config.exponential_base ** attempt),
                        config.max_delay
                    )
                    
                    if config.jitter:
                        delay = delay * (0.5 + random.random())
                    
                    logger.warning(
                        f"Attempt {attempt + 1}/{config.max_retries} failed: {e}. "
                        f"Retrying in {delay:.2f}s"
                    )
                    
                    if config.on_retry:
                        config.on_retry(e, attempt + 1)
                    
                    time.sleep(delay)
            
            raise RetryExhausted(last_exception, config.max_retries)
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
    
    return decorator


# Usage examples
@retry_with_backoff(RetryConfig(
    max_retries=3,
    base_delay=1.0,
    retryable_exceptions=(ConnectionError, TimeoutError)
))
async def fetch_user_data(user_id: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/users/{user_id}") as resp:
            if resp.status == 503:
                raise ConnectionError("Service unavailable")
            return await resp.json()


# Retry with callback for metrics
def on_retry_callback(exception: Exception, attempt: int):
    metrics.increment("api.retries", tags={"attempt": attempt})

@retry_with_backoff(RetryConfig(
    max_retries=5,
    on_retry=on_retry_callback
))
def call_external_api():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()


# Combine with circuit breaker
@payment_circuit
@retry_with_backoff(RetryConfig(max_retries=2))
async def charge_payment_resilient(amount: float):
    """Retries within circuit breaker protection"""
    return await call_payment_service(amount)

Bulkhead Pattern

Isolate failures to prevent cascade. Bulkhead Pattern
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Optional, Callable, TypeVar, Any
from functools import wraps
from contextlib import asynccontextmanager
import time
from enum import Enum

T = TypeVar('T')

class BulkheadRejectedException(Exception):
    """Raised when bulkhead rejects request"""
    def __init__(self, bulkhead_name: str, reason: str):
        self.bulkhead_name = bulkhead_name
        self.reason = reason
        super().__init__(f"Bulkhead {bulkhead_name} rejected: {reason}")

@dataclass
class BulkheadConfig:
    max_concurrent: int = 10
    max_queue: int = 100
    queue_timeout: float = 5.0  # seconds

@dataclass
class BulkheadMetrics:
    active_count: int = 0
    queue_size: int = 0
    total_accepted: int = 0
    total_rejected: int = 0
    total_timeout: int = 0

class Bulkhead:
    """
    Bulkhead pattern - isolate components to prevent cascade failures.
    Limits concurrent execution and queues excess requests.
    """
    
    def __init__(self, name: str, config: BulkheadConfig = None):
        self.name = name
        self.config = config or BulkheadConfig()
        self.semaphore = asyncio.Semaphore(self.config.max_concurrent)
        self.metrics = BulkheadMetrics()
        self._queue = asyncio.Queue(maxsize=self.config.max_queue)
    
    def __call__(self, func: Callable) -> Callable:
        """Use as decorator"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with self.acquire():
                return await func(*args, **kwargs)
        return wrapper
    
    @asynccontextmanager
    async def acquire(self):
        """Acquire a slot in the bulkhead"""
        # Check if we can get a semaphore slot
        if self.semaphore.locked() and self._queue.full():
            self.metrics.total_rejected += 1
            raise BulkheadRejectedException(
                self.name, 
                f"Max concurrent ({self.config.max_concurrent}) and queue ({self.config.max_queue}) reached"
            )
        
        # Try to acquire with timeout
        try:
            await asyncio.wait_for(
                self.semaphore.acquire(),
                timeout=self.config.queue_timeout
            )
        except asyncio.TimeoutError:
            self.metrics.total_timeout += 1
            raise BulkheadRejectedException(
                self.name,
                f"Queue timeout after {self.config.queue_timeout}s"
            )
        
        self.metrics.active_count += 1
        self.metrics.total_accepted += 1
        
        try:
            yield
        finally:
            self.metrics.active_count -= 1
            self.semaphore.release()
    
    def get_metrics(self) -> dict:
        return {
            "name": self.name,
            "active": self.metrics.active_count,
            "max_concurrent": self.config.max_concurrent,
            "accepted": self.metrics.total_accepted,
            "rejected": self.metrics.total_rejected,
            "timeouts": self.metrics.total_timeout,
            "utilization": self.metrics.active_count / self.config.max_concurrent
        }


class ThreadPoolBulkhead:
    """
    Thread pool bulkhead for sync operations.
    Each pool isolates a group of operations.
    """
    
    def __init__(self, name: str, max_workers: int = 10):
        from concurrent.futures import ThreadPoolExecutor
        self.name = name
        self.executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=f"bulkhead-{name}"
        )
        self.max_workers = max_workers
    
    def submit(self, func: Callable, *args, **kwargs):
        """Submit work to the bulkhead thread pool"""
        return self.executor.submit(func, *args, **kwargs)
    
    async def run(self, func: Callable, *args, **kwargs) -> Any:
        """Run sync function in bulkhead pool"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: func(*args, **kwargs)
        )


# Bulkhead manager for multiple isolated pools
class BulkheadManager:
    """Manage multiple bulkheads for different services"""
    
    def __init__(self):
        self.bulkheads: Dict[str, Bulkhead] = {}
    
    def create(self, name: str, config: BulkheadConfig = None) -> Bulkhead:
        bulkhead = Bulkhead(name, config)
        self.bulkheads[name] = bulkhead
        return bulkhead
    
    def get(self, name: str) -> Optional[Bulkhead]:
        return self.bulkheads.get(name)
    
    def get_all_metrics(self) -> Dict[str, dict]:
        return {
            name: bh.get_metrics() 
            for name, bh in self.bulkheads.items()
        }


# Usage
bulkhead_manager = BulkheadManager()

# Separate pools for different services
payment_bulkhead = bulkhead_manager.create(
    "payment",
    BulkheadConfig(max_concurrent=5, max_queue=20)
)

inventory_bulkhead = bulkhead_manager.create(
    "inventory", 
    BulkheadConfig(max_concurrent=10, max_queue=50)
)

notification_bulkhead = bulkhead_manager.create(
    "notification",
    BulkheadConfig(max_concurrent=20, max_queue=100)
)

@payment_bulkhead
async def process_payment(order_id: str, amount: float):
    """Isolated payment processing"""
    await asyncio.sleep(0.5)  # Simulate API call
    return {"status": "success", "order_id": order_id}

@inventory_bulkhead  
async def check_inventory(product_id: str):
    """Isolated inventory check"""
    await asyncio.sleep(0.1)
    return {"available": True, "quantity": 100}

# Even if payment service is slow/failing,
# inventory and notifications continue working
async def handle_order(order):
    try:
        # These run in isolated pools
        inventory = await check_inventory(order["product_id"])
        payment = await process_payment(order["id"], order["amount"])
        await send_notification(order["user_id"])
        return {"status": "completed"}
    except BulkheadRejectedException as e:
        return {"status": "retry_later", "reason": str(e)}

Key Takeaways

ConceptRemember
CAP TheoremPick 2 of 3: Consistency, Availability, Partition Tolerance
ConsensusUse Raft for leader election, state machine replication
Transactions2PC for strong consistency, Sagas for microservices
FailuresDesign for failure: retries, circuit breakers, bulkheads
Distributed systems are hard. Every network call can fail, be slow, or return stale data. Design for failure from day one.