Skip to main content
Senior/Staff Level Content: This section covers advanced topics that differentiate senior engineers from mid-level. Master these for L5+ (Senior) and Staff interviews at FAANG.

Consistency Deep Dive

Linearizability vs Serializability

Most candidates confuse these. Know the difference!
┌─────────────────────────────────────────────────────────────────┐
│           Linearizability vs Serializability                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  LINEARIZABILITY (Single-object, real-time)                    │
│  ─────────────────────────────────────────                      │
│  • Operations appear to happen atomically at some point         │
│  • Real-time ordering: if op A completes before op B starts,   │
│    then A is ordered before B                                   │
│  • Think: "single copy of data that everyone sees"             │
│                                                                 │
│  Time ────────────────────────────────────────────►            │
│                                                                 │
│  Client A:  ──[write x=1]────────────                          │
│  Client B:        ──────[read x]───                            │
│                          │                                      │
│                          └─► Must return 1 (write completed)   │
│                                                                 │
│  SERIALIZABILITY (Multi-object, transactions)                  │
│  ───────────────────────────────────────────                    │
│  • Transactions appear to execute in SOME serial order          │
│  • Doesn't guarantee real-time ordering                         │
│  • Think: "transactions don't see partial state"               │
│                                                                 │
│  T1: read(A), write(B)                                         │
│  T2: read(B), write(A)                                         │
│  Serial order: T1→T2 or T2→T1, but not interleaved            │
│                                                                 │
│  STRICT SERIALIZABILITY                                         │
│  ───────────────────────                                        │
│  • Both! Serial order + real-time ordering                      │
│  • Most expensive, what Spanner provides                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Isolation Levels (Know All Four!)

┌─────────────────────────────────────────────────────────────────┐
│                    Isolation Levels                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  READ UNCOMMITTED (Weakest)                                     │
│  ─────────────────────────                                      │
│  • Can see uncommitted changes (dirty reads)                   │
│  • Almost never used                                            │
│                                                                 │
│  T1: write(x=1) ──────────── rollback                          │
│  T2:     read(x) ─► 1 (dirty read!)                            │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  READ COMMITTED (Default in PostgreSQL)                         │
│  ─────────────────────────────────────                          │
│  • Only see committed data                                     │
│  • Allows non-repeatable reads                                  │
│                                                                 │
│  T1: read(x) ─► 1 ─────────── read(x) ─► 2 (changed!)         │
│  T2:      write(x=2), commit                                   │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  REPEATABLE READ (Default in MySQL InnoDB)                      │
│  ─────────────────────────────────────────                      │
│  • Same query returns same results in a transaction             │
│  • Allows phantom reads (new rows appear)                       │
│                                                                 │
│  T1: SELECT COUNT(*) WHERE age > 30 ─► 5                       │
│  T2:      INSERT INTO users (age=35), commit                   │
│  T1: SELECT COUNT(*) WHERE age > 30 ─► 6 (phantom!)            │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  SERIALIZABLE (Strongest)                                       │
│  ────────────────────────                                       │
│  • Full isolation, as if transactions ran serially              │
│  • Highest safety, lowest performance                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
LevelDirty ReadNon-Repeatable ReadPhantom Read
Read Uncommitted
Read Committed
Repeatable Read
Serializable
Interview Answer: “I’d use Read Committed for most cases. Serializable only for critical financial transactions where consistency matters more than throughput.”

Distributed Consensus Deep Dive

Leader Election: Why It’s Hard

┌─────────────────────────────────────────────────────────────────┐
│                 Split Brain Problem                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Initial State: Node A is leader                               │
│                                                                 │
│  ┌────────┐      ┌────────┐      ┌────────┐                   │
│  │ Node A │──────│ Node B │──────│ Node C │                   │
│  │ LEADER │      │FOLLOWER│      │FOLLOWER│                   │
│  └────────┘      └────────┘      └────────┘                   │
│                                                                 │
│  Network partition occurs:                                     │
│                                                                 │
│  Partition 1         ║         Partition 2                     │
│  ┌────────┐          ║          ┌────────┐ ┌────────┐         │
│  │ Node A │          ║          │ Node B │─│ Node C │         │
│  │ LEADER │          ║          │FOLLOWER│ │FOLLOWER│         │
│  └────────┘          ║          └────────┘ └────────┘         │
│                      ║               │                         │
│  A thinks it's       ║          B or C becomes leader!        │
│  still leader!       ║          (timeout, no heartbeat)        │
│                      ║                                         │
│  DANGER: Two leaders! (Split brain)                            │
│                                                                 │
│  Solution: Quorum-based voting                                 │
│  • Need majority (N/2 + 1) to elect leader                     │
│  • Partition 1 has 1/3 (minority) → can't write               │
│  • Partition 2 has 2/3 (majority) → can elect, can write      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Raft: The Algorithm You Should Know

┌─────────────────────────────────────────────────────────────────┐
│                    Raft Consensus                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  KEY CONCEPTS:                                                  │
│                                                                 │
│  1. TERMS (Logical Clock)                                       │
│     • Time divided into terms                                   │
│     • Each term has at most one leader                         │
│     • Term number monotonically increases                       │
│                                                                 │
│     Term 1       Term 2       Term 3                           │
│     ├──Leader A──┼──Election──┼──Leader B──────►               │
│                  │  (failed)  │                                 │
│                                                                 │
│  2. LOG REPLICATION                                             │
│                                                                 │
│     Leader Log:  [1:x=1] [1:y=2] [2:x=3] [2:z=4]               │
│                    ↓       ↓       ↓       ↓                   │
│     Follower 1:  [1:x=1] [1:y=2] [2:x=3] [2:z=4] ✓             │
│     Follower 2:  [1:x=1] [1:y=2] [2:x=3]         (catching up) │
│                                                                 │
│     Entry committed when replicated to majority                 │
│                                                                 │
│  3. LEADER ELECTION                                             │
│                                                                 │
│     Follower timeout → Candidate → RequestVote RPC             │
│     • Vote granted if:                                         │
│       - Candidate's term >= voter's term                       │
│       - Candidate's log is at least as up-to-date              │
│       - Voter hasn't voted for someone else this term          │
│     • Majority votes → become Leader                           │
│                                                                 │
│  4. SAFETY GUARANTEE                                            │
│     • Elected leader has all committed entries                  │
│     • Leaders never overwrite their log                         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Clock Synchronization

Why Wall Clocks Fail

┌─────────────────────────────────────────────────────────────────┐
│                 Clock Synchronization Problems                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Physical clocks drift (100 ms/day typical)                    │
│  NTP sync has error bounds (1-10ms typically)                  │
│                                                                 │
│  Problem scenario:                                              │
│                                                                 │
│  Machine A (clock: 10:00:00.000):  write(x=1) @ 10:00:00.000  │
│  Machine B (clock: 10:00:00.050):  write(x=2) @ 10:00:00.050  │
│                                                                 │
│  But B's clock is 100ms ahead! Real order:                     │
│  • B actually wrote at real time 09:59:59.950                  │
│  • A wrote at real time 10:00:00.000                           │
│  • A happened AFTER B, but timestamps say B is newer!          │
│                                                                 │
│  Solution 1: Logical clocks (Lamport)                          │
│  Solution 2: Vector clocks                                      │
│  Solution 3: Hybrid clocks (HLC)                               │
│  Solution 4: GPS/atomic clocks (Spanner's TrueTime)            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Vector Clocks (Conflict Detection)

┌─────────────────────────────────────────────────────────────────┐
│                    Vector Clocks                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Each node maintains vector: {A: count, B: count, C: count}    │
│                                                                 │
│  Node A         Node B         Node C                          │
│  {A:0,B:0,C:0}  {A:0,B:0,C:0}  {A:0,B:0,C:0}                   │
│      │              │              │                            │
│  write(x=1)         │              │                            │
│  {A:1,B:0,C:0}      │              │                            │
│      │─────────────►│              │                            │
│      │         {A:1,B:0,C:0}       │                            │
│      │              │              │                            │
│      │         write(y=2)          │                            │
│      │         {A:1,B:1,C:0}       │                            │
│      │              │─────────────►│                            │
│      │              │         {A:1,B:1,C:0}                     │
│      │              │              │                            │
│                                                                 │
│  Comparison:                                                    │
│  {A:1,B:2,C:0} vs {A:2,B:1,C:0}                                │
│  Neither dominates → CONFLICT! Must resolve                    │
│                                                                 │
│  {A:1,B:2,C:0} vs {A:1,B:3,C:0}                                │
│  Second dominates → Second is newer, no conflict               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Data Partitioning Strategies

Partition Key Selection

┌─────────────────────────────────────────────────────────────────┐
│              Choosing Partition Keys                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  GOOD Partition Key Properties:                                 │
│  ✓ High cardinality (many unique values)                       │
│  ✓ Evenly distributed access                                   │
│  ✓ Matches query patterns                                      │
│                                                                 │
│  Example: E-commerce Orders                                     │
│                                                                 │
│  ❌ BAD: partition by date                                      │
│     • Today's partition gets all traffic (hot partition)       │
│     • Old partitions sit idle                                  │
│                                                                 │
│  ❌ BAD: partition by country                                   │
│     • US partition has 60% of traffic                          │
│     • Small countries underutilized                            │
│                                                                 │
│  ✅ GOOD: partition by order_id                                │
│     • Random distribution                                       │
│     • But: can't query "all orders for user X" easily          │
│                                                                 │
│  ✅ BETTER: partition by user_id                                │
│     • User's orders on same partition (locality)               │
│     • Query patterns match                                     │
│     • Watch for celebrity users (hot partition)                │
│                                                                 │
│  ✅ BEST: compound key (user_id, order_date)                   │
│     • Partition by user_id                                     │
│     • Sort by order_date within partition                      │
│     • Efficient for "user X's orders in last month"           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Handling Hot Partitions

# Strategy 1: Add random suffix
def get_partition_key(celebrity_id):
    if is_celebrity(celebrity_id):
        # Split celebrity across 10 partitions
        suffix = random.randint(0, 9)
        return f"{celebrity_id}_{suffix}"
    return celebrity_id

# Reading requires scatter-gather
def get_celebrity_data(celebrity_id):
    results = []
    for suffix in range(10):
        key = f"{celebrity_id}_{suffix}"
        results.extend(query_partition(key))
    return results

# Strategy 2: Time-based suffix
def get_partition_key_time(user_id):
    # Different partition each hour
    hour = datetime.now().hour
    return f"{user_id}_{hour % 4}"

Exactly-Once Semantics

The Three Delivery Guarantees

┌─────────────────────────────────────────────────────────────────┐
│              Message Delivery Guarantees                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  AT-MOST-ONCE                                                   │
│  ─────────────                                                  │
│  Send and forget. May lose messages.                           │
│  Use case: Metrics, logs (some loss OK)                        │
│                                                                 │
│  Producer ──[msg]──► Broker    (might fail silently)           │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  AT-LEAST-ONCE                                                  │
│  ─────────────                                                  │
│  Retry until ACK. May duplicate messages.                      │
│  Use case: Most applications (with idempotent consumers)       │
│                                                                 │
│  Producer ──[msg]──► Broker ──[ACK]──► Producer                │
│      │                  │                                       │
│      └──[retry]─────────┘ (if no ACK, retry → duplicate)       │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  EXACTLY-ONCE                                                   │
│  ────────────                                                   │
│  Each message processed exactly once. Hard to achieve!         │
│  Use case: Financial transactions, critical data               │
│                                                                 │
│  Achieved via: Idempotency + Deduplication + Transactions      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Implementing Exactly-Once

class ExactlyOnceProcessor:
    """
    Exactly-once processing with idempotency keys
    """
    
    def process_message(self, message):
        idempotency_key = message.id
        
        # Step 1: Check if already processed
        if self.is_processed(idempotency_key):
            return self.get_cached_result(idempotency_key)
        
        # Step 2: Process with transaction
        with self.db.transaction():
            # Do the work
            result = self.do_business_logic(message)
            
            # Record as processed (same transaction!)
            self.mark_processed(idempotency_key, result)
            
            # Commit message offset (Kafka-style)
            self.commit_offset(message.offset)
        
        return result
    
    def is_processed(self, key):
        return self.db.exists(f"processed:{key}")
    
    def mark_processed(self, key, result):
        self.db.set(f"processed:{key}", result, ttl=86400)

Distributed Caching Patterns

Cache Stampede Prevention

┌─────────────────────────────────────────────────────────────────┐
│                   Cache Stampede Problem                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Popular key expires → 1000 servers hit DB simultaneously      │
│                                                                 │
│  Server 1 ─┐                                                    │
│  Server 2 ─┤                                                    │
│  Server 3 ─┼───► Cache MISS ───► Database ← 💥 OVERWHELMED     │
│  ...       │                                                    │
│  Server N ─┘                                                    │
│                                                                 │
│  SOLUTIONS:                                                     │
│                                                                 │
│  1. LOCKING (Mutex)                                             │
│     First request acquires lock, others wait or get stale      │
│                                                                 │
│  2. PROBABILISTIC EARLY EXPIRATION                              │
│     Refresh before expiry with some probability                │
│                                                                 │
│  3. BACKGROUND REFRESH                                          │
│     Never expire, refresh asynchronously                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
import random
import time

class StampedePreventingCache:
    def get(self, key, fetch_func, ttl=3600, beta=1.0):
        cached = self.cache.get(key)
        
        if cached:
            value, expiry, delta = cached
            now = time.time()
            
            # Probabilistic early refresh
            # As we approach expiry, probability increases
            gap = expiry - now
            if gap > 0:
                # XFetch algorithm
                random_early = delta * beta * math.log(random.random())
                if gap + random_early > 0:
                    return value  # Use cached value
        
        # Cache miss or early refresh triggered
        # Use distributed lock to prevent stampede
        lock_key = f"lock:{key}"
        if self.acquire_lock(lock_key, timeout=5):
            try:
                start = time.time()
                value = fetch_func()
                delta = time.time() - start
                
                self.cache.set(key, (value, time.time() + ttl, delta))
                return value
            finally:
                self.release_lock(lock_key)
        else:
            # Someone else is refreshing, return stale or wait
            if cached:
                return cached[0]  # Return stale
            time.sleep(0.1)
            return self.get(key, fetch_func, ttl, beta)  # Retry

Rate Limiting at Scale

Distributed Rate Limiting

┌─────────────────────────────────────────────────────────────────┐
│           Distributed Rate Limiting Strategies                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  STRATEGY 1: Centralized (Redis)                               │
│  ─────────────────────────────────                              │
│  All servers check same Redis                                  │
│  ✓ Accurate   ✗ Single point of failure                       │
│                                                                 │
│  Server 1 ─┐                                                    │
│  Server 2 ─┼───► Redis ───► Accurate count                     │
│  Server 3 ─┘                                                    │
│                                                                 │
│  STRATEGY 2: Local + Sync                                       │
│  ───────────────────────                                        │
│  Local counters, periodically sync                             │
│  ✓ Fast   ✗ Approximate                                        │
│                                                                 │
│  Server 1: local_count=50 ──┐                                  │
│  Server 2: local_count=40 ──┼──► Sync every 1s                 │
│  Server 3: local_count=30 ──┘                                  │
│                                                                 │
│  STRATEGY 3: Token Bucket with Redis                           │
│  ─────────────────────────────────                              │
│  Each request: DECR if tokens > 0                              │
│  Background: Refill tokens at fixed rate                       │
│                                                                 │
│  Lua script for atomic check-and-decrement:                    │
│  local tokens = redis.call('GET', key)                         │
│  if tokens > 0 then                                            │
│      redis.call('DECR', key)                                   │
│      return 1  -- allowed                                      │
│  end                                                           │
│  return 0  -- rejected                                         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

CQRS (Command Query Responsibility Segregation)

CQRS separates read and write operations into different models, optimizing each for its specific use case. CQRS Pattern
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
from abc import ABC, abstractmethod
import asyncio
from enum import Enum

# ============== Commands (Write Side) ==============
class CommandType(Enum):
    CREATE_ORDER = "create_order"
    UPDATE_ORDER = "update_order"
    CANCEL_ORDER = "cancel_order"

@dataclass
class Command:
    command_type: CommandType
    aggregate_id: str
    payload: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.utcnow)
    correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))

class CommandHandler(ABC):
    @abstractmethod
    async def handle(self, command: Command) -> None:
        pass

class OrderCommandHandler(CommandHandler):
    def __init__(self, event_store: 'EventStore', event_bus: 'EventBus'):
        self.event_store = event_store
        self.event_bus = event_bus
    
    async def handle(self, command: Command) -> str:
        if command.command_type == CommandType.CREATE_ORDER:
            return await self._handle_create_order(command)
        elif command.command_type == CommandType.CANCEL_ORDER:
            return await self._handle_cancel_order(command)
        raise ValueError(f"Unknown command type: {command.command_type}")
    
    async def _handle_create_order(self, command: Command) -> str:
        # Business validation
        order_id = str(uuid.uuid4())
        
        # Create event
        event = Event(
            event_type=EventType.ORDER_CREATED,
            aggregate_id=order_id,
            payload={
                "user_id": command.payload["user_id"],
                "items": command.payload["items"],
                "total": command.payload["total"],
            },
            version=1
        )
        
        # Persist event
        await self.event_store.append(event)
        
        # Publish for read model updates
        await self.event_bus.publish(event)
        
        return order_id
    
    async def _handle_cancel_order(self, command: Command) -> None:
        # Load current state from events
        events = await self.event_store.get_events(command.aggregate_id)
        order = OrderAggregate.from_events(events)
        
        # Business validation
        if order.status == OrderStatus.CANCELLED:
            raise ValueError("Order already cancelled")
        if order.status == OrderStatus.SHIPPED:
            raise ValueError("Cannot cancel shipped order")
        
        # Create cancellation event
        event = Event(
            event_type=EventType.ORDER_CANCELLED,
            aggregate_id=command.aggregate_id,
            payload={"reason": command.payload.get("reason", "User requested")},
            version=order.version + 1
        )
        
        await self.event_store.append(event)
        await self.event_bus.publish(event)

# ============== Queries (Read Side) ==============
@dataclass
class OrderReadModel:
    """Denormalized read model optimized for queries"""
    id: str
    user_id: str
    status: str
    items: List[Dict]
    total: float
    created_at: datetime
    updated_at: datetime

class OrderQueryService:
    def __init__(self, read_db: 'ReadDatabase'):
        self.read_db = read_db
    
    async def get_order(self, order_id: str) -> Optional[OrderReadModel]:
        """Fast read from denormalized model"""
        return await self.read_db.find_one("orders", {"id": order_id})
    
    async def get_user_orders(
        self, 
        user_id: str, 
        status: Optional[str] = None,
        limit: int = 10,
        offset: int = 0
    ) -> List[OrderReadModel]:
        """Query with filters - optimized for read patterns"""
        query = {"user_id": user_id}
        if status:
            query["status"] = status
        
        return await self.read_db.find(
            "orders",
            query,
            sort=[("created_at", -1)],
            limit=limit,
            skip=offset
        )
    
    async def get_orders_by_status(self, status: str) -> List[OrderReadModel]:
        """Admin query - different index"""
        return await self.read_db.find(
            "orders",
            {"status": status},
            sort=[("updated_at", -1)]
        )

# ============== Read Model Projector ==============
class OrderProjector:
    """Updates read model based on events"""
    
    def __init__(self, read_db: 'ReadDatabase'):
        self.read_db = read_db
    
    async def project(self, event: 'Event') -> None:
        handler = getattr(self, f"_handle_{event.event_type.value}", None)
        if handler:
            await handler(event)
    
    async def _handle_order_created(self, event: 'Event') -> None:
        order = OrderReadModel(
            id=event.aggregate_id,
            user_id=event.payload["user_id"],
            status="pending",
            items=event.payload["items"],
            total=event.payload["total"],
            created_at=event.timestamp,
            updated_at=event.timestamp
        )
        await self.read_db.insert("orders", order.__dict__)
    
    async def _handle_order_cancelled(self, event: 'Event') -> None:
        await self.read_db.update(
            "orders",
            {"id": event.aggregate_id},
            {
                "status": "cancelled",
                "cancellation_reason": event.payload["reason"],
                "updated_at": event.timestamp
            }
        )

Event Sourcing

Event sourcing stores all changes as a sequence of events, providing a complete audit trail and enabling time-travel debugging. Event Sourcing
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
from abc import ABC, abstractmethod
import json
import uuid

class EventType(Enum):
    ORDER_CREATED = "order_created"
    ORDER_ITEM_ADDED = "order_item_added"
    ORDER_ITEM_REMOVED = "order_item_removed"
    ORDER_SUBMITTED = "order_submitted"
    ORDER_PAID = "order_paid"
    ORDER_SHIPPED = "order_shipped"
    ORDER_DELIVERED = "order_delivered"
    ORDER_CANCELLED = "order_cancelled"

@dataclass
class Event:
    event_type: EventType
    aggregate_id: str
    payload: Dict[str, Any]
    version: int
    timestamp: datetime = field(default_factory=datetime.utcnow)
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    metadata: Dict[str, Any] = field(default_factory=dict)

class EventStore:
    """Append-only event store with PostgreSQL"""
    
    def __init__(self, db_pool):
        self.db_pool = db_pool
    
    async def append(self, event: Event) -> None:
        """Append event with optimistic concurrency control"""
        async with self.db_pool.acquire() as conn:
            try:
                await conn.execute("""
                    INSERT INTO events (
                        event_id, aggregate_id, event_type, 
                        payload, version, timestamp, metadata
                    )
                    VALUES ($1, $2, $3, $4, $5, $6, $7)
                """,
                    event.event_id,
                    event.aggregate_id,
                    event.event_type.value,
                    json.dumps(event.payload),
                    event.version,
                    event.timestamp,
                    json.dumps(event.metadata)
                )
            except UniqueViolationError:
                raise ConcurrencyError(
                    f"Version {event.version} already exists for {event.aggregate_id}"
                )
    
    async def get_events(
        self, 
        aggregate_id: str, 
        from_version: int = 0
    ) -> List[Event]:
        """Load all events for an aggregate"""
        async with self.db_pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT event_id, aggregate_id, event_type, 
                       payload, version, timestamp, metadata
                FROM events
                WHERE aggregate_id = $1 AND version > $2
                ORDER BY version ASC
            """, aggregate_id, from_version)
            
            return [
                Event(
                    event_id=row['event_id'],
                    aggregate_id=row['aggregate_id'],
                    event_type=EventType(row['event_type']),
                    payload=json.loads(row['payload']),
                    version=row['version'],
                    timestamp=row['timestamp'],
                    metadata=json.loads(row['metadata'])
                )
                for row in rows
            ]
    
    async def get_all_events(
        self, 
        from_position: int = 0,
        batch_size: int = 1000
    ) -> List[Event]:
        """Stream all events for projections"""
        async with self.db_pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT * FROM events
                WHERE global_position > $1
                ORDER BY global_position ASC
                LIMIT $2
            """, from_position, batch_size)
            
            return [self._row_to_event(row) for row in rows]

# ============== Aggregate with Event Sourcing ==============
class OrderStatus(Enum):
    DRAFT = "draft"
    PENDING = "pending"
    PAID = "paid"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

@dataclass
class OrderAggregate:
    """Order aggregate rebuilt from events"""
    id: str
    user_id: Optional[str] = None
    status: OrderStatus = OrderStatus.DRAFT
    items: List[Dict] = field(default_factory=list)
    total: float = 0.0
    version: int = 0
    created_at: Optional[datetime] = None
    
    @classmethod
    def from_events(cls, events: List[Event]) -> 'OrderAggregate':
        """Reconstruct aggregate from event history"""
        aggregate = cls(id=events[0].aggregate_id if events else None)
        
        for event in events:
            aggregate._apply(event)
        
        return aggregate
    
    def _apply(self, event: Event) -> None:
        """Apply event to update state"""
        handler = getattr(self, f"_apply_{event.event_type.value}", None)
        if handler:
            handler(event)
        self.version = event.version
    
    def _apply_order_created(self, event: Event) -> None:
        self.user_id = event.payload["user_id"]
        self.status = OrderStatus.DRAFT
        self.created_at = event.timestamp
    
    def _apply_order_item_added(self, event: Event) -> None:
        self.items.append(event.payload["item"])
        self._recalculate_total()
    
    def _apply_order_item_removed(self, event: Event) -> None:
        item_id = event.payload["item_id"]
        self.items = [i for i in self.items if i["id"] != item_id]
        self._recalculate_total()
    
    def _apply_order_submitted(self, event: Event) -> None:
        self.status = OrderStatus.PENDING
    
    def _apply_order_paid(self, event: Event) -> None:
        self.status = OrderStatus.PAID
    
    def _apply_order_shipped(self, event: Event) -> None:
        self.status = OrderStatus.SHIPPED
    
    def _apply_order_cancelled(self, event: Event) -> None:
        self.status = OrderStatus.CANCELLED
    
    def _recalculate_total(self) -> None:
        self.total = sum(
            item["price"] * item["quantity"] 
            for item in self.items
        )

# ============== Snapshots for Performance ==============
class SnapshotStore:
    """Store periodic snapshots to speed up replay"""
    
    def __init__(self, db_pool, snapshot_interval: int = 100):
        self.db_pool = db_pool
        self.snapshot_interval = snapshot_interval
    
    async def save_snapshot(
        self, 
        aggregate_id: str, 
        aggregate: OrderAggregate
    ) -> None:
        """Save aggregate snapshot"""
        async with self.db_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO snapshots (aggregate_id, version, state, created_at)
                VALUES ($1, $2, $3, $4)
                ON CONFLICT (aggregate_id) 
                DO UPDATE SET version = $2, state = $3, created_at = $4
            """,
                aggregate_id,
                aggregate.version,
                json.dumps(aggregate.__dict__, default=str),
                datetime.utcnow()
            )
    
    async def get_snapshot(
        self, 
        aggregate_id: str
    ) -> Optional[OrderAggregate]:
        """Load latest snapshot"""
        async with self.db_pool.acquire() as conn:
            row = await conn.fetchrow("""
                SELECT state, version FROM snapshots
                WHERE aggregate_id = $1
            """, aggregate_id)
            
            if row:
                state = json.loads(row['state'])
                return OrderAggregate(**state)
            return None

class OrderRepository:
    """Repository using snapshots + events"""
    
    def __init__(
        self, 
        event_store: EventStore, 
        snapshot_store: SnapshotStore
    ):
        self.event_store = event_store
        self.snapshot_store = snapshot_store
    
    async def get(self, order_id: str) -> Optional[OrderAggregate]:
        # Try to load from snapshot first
        aggregate = await self.snapshot_store.get_snapshot(order_id)
        
        if aggregate:
            # Only replay events after snapshot
            events = await self.event_store.get_events(
                order_id, 
                from_version=aggregate.version
            )
        else:
            # Replay all events
            events = await self.event_store.get_events(order_id)
            if not events:
                return None
            aggregate = OrderAggregate(id=order_id)
        
        # Apply remaining events
        for event in events:
            aggregate._apply(event)
        
        # Create snapshot if needed
        if aggregate.version % self.snapshot_store.snapshot_interval == 0:
            await self.snapshot_store.save_snapshot(order_id, aggregate)
        
        return aggregate
When to use Event Sourcing:
  • Audit requirements (financial systems, healthcare)
  • Need to replay/debug past states
  • Complex business logic with temporal queries
  • Event-driven microservices architecture

Interview Questions: Senior Level

Key Points:
  1. Data replication: Async replication between regions (eventual consistency)
  2. Conflict resolution: Last-write-wins (with vector clocks) or custom merge
  3. Routing: GeoDNS to route users to nearest region
  4. Failover: Health checks + automatic DNS failover
  5. Consistency: Accept that cross-region writes may conflict
Trade-offs to mention:
  • Latency vs consistency
  • Cost of running in multiple regions
  • Complexity of conflict resolution
Solutions in order of complexity:
  1. Batch writes: Accumulate and write in batches
  2. Write-behind cache: Write to Redis, async persist to DB
  3. Message queue: Queue writes, process at sustainable rate
  4. Sharding: Distribute writes across multiple DB nodes
  5. Different DB: Switch to write-optimized DB (Cassandra, ScyllaDB)
Always ask: “What’s the consistency requirement? Can we lose some writes?”
Answer structure:
  1. First ask: “Do we really need distributed transactions?” Often can redesign.
  2. 2PC: Strong consistency, but blocking and slow
  3. Saga: Eventual consistency, compensating transactions
  4. Outbox pattern: Reliable event publishing with local transaction
Code example for Saga:
async def create_order_saga(order):
    try:
        order_id = await order_service.create(order)
        await inventory_service.reserve(order.items)
        await payment_service.charge(order.payment)
        await order_service.confirm(order_id)
    except PaymentFailed:
        await inventory_service.release(order.items)
        await order_service.cancel(order_id)
Systematic approach:
  1. Observe: Check metrics dashboards (p99 latency by service)
  2. Trace: Use distributed tracing (Jaeger/Zipkin) to find slow span
  3. Correlate: Check if spike correlates with deployments, traffic, or GC
  4. Drill down: Once you find the service, check:
    • CPU/memory usage
    • DB query times (slow query log)
    • Network latency between services
    • Thread pool saturation
    • Lock contention
Common causes: DB slow queries, GC pauses, connection pool exhaustion, lock contention, network issues
Approach:
  1. Back of envelope: 1M RPS = ~60K servers at 16 RPS each (conservative)
  2. Stateless compute: Horizontal scaling with load balancer
  3. Caching: Cache everything possible (aim for 99%+ cache hit)
  4. CDN: Serve static content from edge
  5. Database: Shard aggressively, read replicas
  6. Async: Queue non-critical work
Bottleneck analysis:
  • Network: 1M × 10KB = 10GB/s = 80Gbps (need multiple LBs)
  • Compute: 1M / 10K (RPS per server) = 100 servers minimum
  • Database: Can’t hit DB for every request, need 99%+ cache hit