Skip to main content

Track 5: Data Systems at Scale

Building and understanding data systems that handle millions of operations per second.
Track Duration: 44-54 hours
Modules: 5
Key Topics: Partitioning, Consistent Hashing, Spanner, Kafka, Stream Processing

Module 22: Partitioning Strategies

Why Partition?

┌─────────────────────────────────────────────────────────────────────────────┐
│                      WHY PARTITIONING?                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SINGLE NODE LIMITS:                                                        │
│  ───────────────────                                                        │
│  Data: Can't fit TB of data on one disk                                     │
│  CPU: Can't handle 1M QPS on one CPU                                        │
│  Memory: Can't cache everything in one RAM                                  │
│                                                                              │
│  SOLUTION: Split data across multiple nodes (partitions/shards)             │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                         FULL DATASET                                │    │
│  │                                                                     │    │
│  │  ┌─────────────┬─────────────┬─────────────┬─────────────┐         │    │
│  │  │ Partition 0 │ Partition 1 │ Partition 2 │ Partition 3 │         │    │
│  │  │  (A-F)      │  (G-L)      │  (M-R)      │  (S-Z)      │         │    │
│  │  └──────┬──────┴──────┬──────┴──────┬──────┴──────┬──────┘         │    │
│  │         │             │             │             │                 │    │
│  │         ▼             ▼             ▼             ▼                 │    │
│  │      Node 1        Node 2        Node 3        Node 4               │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│  GOAL: Even data distribution + even query load                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
Partitioning Strategies

Key-Range Partitioning

┌─────────────────────────────────────────────────────────────────────────────┐
│                    KEY-RANGE PARTITIONING                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Assign contiguous ranges of keys to each partition               │
│                                                                              │
│  Example: Partitioning by user_id (alphabetical)                            │
│                                                                              │
│  Partition 1: user_id A-C                                                   │
│  Partition 2: user_id D-F                                                   │
│  Partition 3: user_id G-I                                                   │
│  ...                                                                        │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Range queries efficient (scan contiguous partition)                      │
│  ✓ Sorted within partition                                                  │
│  ✓ Easy to understand                                                       │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ HOT SPOTS! Some ranges get more traffic                                  │
│  ✗ Timestamps cause sequential writes to one partition                      │
│  ✗ Manual rebalancing often needed                                          │
│                                                                              │
│  HOT SPOT EXAMPLE:                                                          │
│  ─────────────────                                                          │
│  Key = timestamp                                                            │
│  All writes go to "current time" partition                                  │
│  Result: One partition overloaded, others idle                              │
│                                                                              │
│  SOLUTION: Compound key                                                     │
│  Key = (sensor_id, timestamp)                                               │
│  Writes spread across partitions by sensor_id                               │
│                                                                              │
│  USED BY: HBase, BigTable, traditional RDBMS sharding                       │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Hash Partitioning

┌─────────────────────────────────────────────────────────────────────────────┐
│                      HASH PARTITIONING                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Hash the key, use hash to determine partition                    │
│                                                                              │
│  partition = hash(key) % num_partitions                                     │
│                                                                              │
│  Example:                                                                   │
│  hash("user123") = 847291                                                   │
│  847291 % 4 = 3                                                             │
│  → Goes to partition 3                                                      │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Even distribution (good hash = uniform spread)                           │
│  ✓ No hot spots from key patterns                                           │
│  ✓ Works with any key type                                                  │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ Range queries inefficient (must query all partitions)                    │
│  ✗ Changing num_partitions = rehash everything!                             │
│  ✗ No natural ordering                                                      │
│                                                                              │
│  REHASHING PROBLEM:                                                         │
│  ──────────────────                                                         │
│  Before: 4 partitions                                                       │
│  hash("user123") % 4 = 3                                                    │
│                                                                              │
│  After: 5 partitions                                                        │
│  hash("user123") % 5 = 1                                                    │
│                                                                              │
│  Almost all data moves! (See: Consistent Hashing)                           │
│                                                                              │
│  USED BY: MongoDB (with hashed shard key), Cassandra, Riak                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Secondary Indexes

┌─────────────────────────────────────────────────────────────────────────────┐
│                     SECONDARY INDEXES                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  PROBLEM: Data partitioned by primary key                                   │
│           But queries often filter by other columns                         │
│                                                                              │
│  Example:                                                                   │
│  Primary key: user_id (partitioned by this)                                 │
│  Query: SELECT * FROM users WHERE city = 'NYC'                              │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════    │
│                                                                              │
│  OPTION 1: LOCAL INDEX (document-partitioned)                               │
│  ─────────────────────────────────────────────                              │
│  Each partition maintains its own index                                     │
│                                                                              │
│  Partition 1          Partition 2          Partition 3                      │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐                  │
│  │ Data: A-F   │      │ Data: G-L   │      │ Data: M-Z   │                  │
│  │ Index: city │      │ Index: city │      │ Index: city │                  │
│  │ NYC: [a,c]  │      │ NYC: [h,j]  │      │ NYC: [n,p]  │                  │
│  └─────────────┘      └─────────────┘      └─────────────┘                  │
│                                                                              │
│  Query city=NYC: Must query ALL partitions (scatter-gather)                 │
│  Write: Fast (update one partition's index)                                 │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════    │
│                                                                              │
│  OPTION 2: GLOBAL INDEX (term-partitioned)                                  │
│  ─────────────────────────────────────────                                  │
│  Index partitioned separately from data                                     │
│                                                                              │
│  Index Partition: Cities A-M     Index Partition: Cities N-Z                │
│  ┌────────────────────────┐      ┌────────────────────────┐                 │
│  │ Boston: [partitions]   │      │ NYC: [partitions]      │                 │
│  │ Chicago: [partitions]  │      │ Seattle: [partitions]  │                 │
│  └────────────────────────┘      └────────────────────────┘                 │
│                                                                              │
│  Query city=NYC: Query one index partition (fast!)                          │
│  Write: May need to update multiple index partitions (slow)                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Advanced: Z-Order Curves & Multi-dimensional Partitioning

Standard partitioning (Key-Range or Hash) works well for one-dimensional keys. However, for multi-dimensional data (e.g., searching for users by (latitude, longitude) or (age, income)), a single-key partition leads to expensive “scatter-gather” queries. Z-Order Curves (Morton Order) map multi-dimensional data into a single dimension while preserving locality.

How it works: Bit Interleaving

To map a 2D point (x,y)(x, y) to a 1D Z-value:
  1. Represent xx and yy as binary strings.
  2. Interleave the bits: Z=ynxn...y1x1y0x0Z = y_n x_n ... y_1 x_1 y_0 x_0.
Example: (x=2, y=3)
x = 2 (10 in binary)
y = 3 (11 in binary)
Interleave: y1 x1 y0 x0 = 1 1 1 0 = 14
Z-Value = 14

Why it matters for Distributed Systems:

  • Locality Preservation: Points that are close in 2D space are usually close on the 1D Z-curve.
  • Range Queries: A 2D box query becomes a set of 1D range scans on a standard Key-Range partitioned database (like HBase or Cassandra).
  • Used By: Amazon DynamoDB (for Geo-spatial), Uber (H3 is a hexagonal alternative), and many GIS systems.

Module 23: Consistent Hashing

The foundational algorithm for distributed systems. Consistent Hashing

Virtual Nodes

┌─────────────────────────────────────────────────────────────────────────────┐
│                       VIRTUAL NODES                                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  PROBLEM: Few nodes = uneven distribution                                   │
│                                                                              │
│  3 nodes might hash to:                                                     │
│  N1: 45°, N2: 90°, N3: 300°                                                │
│  N1 handles 45°, N2 handles 45°, N3 handles 270° (uneven!)                 │
│                                                                              │
│  SOLUTION: Each physical node = many virtual nodes                          │
│                                                                              │
│  Physical Node A → Virtual: A1, A2, A3, A4, A5 (5 positions on ring)       │
│  Physical Node B → Virtual: B1, B2, B3, B4, B5 (5 positions on ring)       │
│  Physical Node C → Virtual: C1, C2, C3, C4, C5 (5 positions on ring)       │
│                                                                              │
│  With 15 points on ring, distribution is much more even!                    │
│                                                                              │
│  TYPICAL: 100-200 virtual nodes per physical node                           │
│                                                                              │
│  HETEROGENEOUS HARDWARE:                                                    │
│  ───────────────────────                                                    │
│  Powerful server → 200 virtual nodes                                        │
│  Weak server → 50 virtual nodes                                             │
│  Proportional load distribution!                                            │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation

import hashlib
from bisect import bisect_right

class ConsistentHash:
    def __init__(self, nodes=None, virtual_nodes=100):
        self.virtual_nodes = virtual_nodes
        self.ring = {}  # hash -> node
        self.sorted_hashes = []
        
        if nodes:
            for node in nodes:
                self.add_node(node)
    
    def _hash(self, key):
        """Generate hash for a key"""
        return int(hashlib.md5(
            key.encode()
        ).hexdigest(), 16)
    
    def add_node(self, node):
        """Add a node with virtual nodes"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:{i}"
            h = self._hash(virtual_key)
            self.ring[h] = node
            self.sorted_hashes.append(h)
        
        self.sorted_hashes.sort()
    
    def remove_node(self, node):
        """Remove a node and its virtual nodes"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:{i}"
            h = self._hash(virtual_key)
            del self.ring[h]
            self.sorted_hashes.remove(h)
    
    def get_node(self, key):
        """Find the node responsible for this key"""
        if not self.ring:
            return None
        
        h = self._hash(key)
        
        # Find first node with hash >= key's hash
        idx = bisect_right(self.sorted_hashes, h)
        
        # Wrap around if needed
        if idx >= len(self.sorted_hashes):
            idx = 0
        
        return self.ring[self.sorted_hashes[idx]]
    
    def get_nodes(self, key, n=3):
        """Get n nodes for replication"""
        if not self.ring:
            return []
        
        h = self._hash(key)
        idx = bisect_right(self.sorted_hashes, h)
        
        nodes = []
        seen = set()
        
        for i in range(len(self.sorted_hashes)):
            node_idx = (idx + i) % len(self.sorted_hashes)
            node = self.ring[self.sorted_hashes[node_idx]]
            
            if node not in seen:
                nodes.append(node)
                seen.add(node)
            
            if len(nodes) >= n:
                break
        
        return nodes


# Usage
ch = ConsistentHash(['node1', 'node2', 'node3'])

# Get node for a key
node = ch.get_node("user:12345")

# Get 3 nodes for replication
replicas = ch.get_nodes("user:12345", n=3)

Alternative: Rendezvous Hashing

┌─────────────────────────────────────────────────────────────────────────────┐
│                    RENDEZVOUS HASHING (HRW)                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  "Highest Random Weight" - alternative to consistent hashing                │
│                                                                              │
│  ALGORITHM:                                                                 │
│  ──────────                                                                 │
│  For each key:                                                              │
│  1. Compute weight(key, node) for each node                                 │
│  2. Pick node with highest weight                                           │
│                                                                              │
│  weight = hash(key + node_id)                                               │
│                                                                              │
│  EXAMPLE:                                                                   │
│  ────────                                                                   │
│  key = "user:123"                                                           │
│  hash("user:123" + "node1") = 847                                           │
│  hash("user:123" + "node2") = 234                                           │
│  hash("user:123" + "node3") = 912  ← Winner!                               │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Simpler than consistent hashing                                          │
│  ✓ No virtual nodes needed for balance                                      │
│  ✓ Easy to get k nodes (top k weights)                                      │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ O(n) lookup (must compute all weights)                                   │
│  ✗ Consistent hashing is O(log n)                                           │
│                                                                              │
│  USED BY: Uber's Ringpop, some CDNs                                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Module 24: Distributed Databases Deep Dive

Google Spanner

┌─────────────────────────────────────────────────────────────────────────────┐
│                        GOOGLE SPANNER                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  "Globally distributed, strongly consistent, SQL database"                  │
│                                                                              │
│  KEY INNOVATIONS:                                                           │
│  ────────────────                                                           │
│                                                                              │
│  1. TRUETIME                                                                │
│     GPS + atomic clocks in every datacenter                                 │
│     TT.now() returns [earliest, latest] interval                            │
│     Typical uncertainty: 1-7ms                                              │
│                                                                              │
│  2. EXTERNAL CONSISTENCY                                                    │
│     If T1 commits before T2 starts, T1's commit timestamp < T2's           │
│     Real-time ordering preserved globally!                                  │
│                                                                              │
│  3. COMMIT WAIT                                                             │
│     After assigning timestamp, wait until TT.after(timestamp)               │
│     Ensures no other transaction can get earlier timestamp                  │
│                                                                              │
│  ARCHITECTURE:                                                              │
│  ─────────────                                                              │
│                                                                              │
│  ┌──────────────────────────────────────────────────────────────────┐       │
│  │                         UNIVERSE                                  │       │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐         │       │
│  │  │  Zone A  │  │  Zone B  │  │  Zone C  │  │  Zone D  │         │       │
│  │  │ (US-East)│  │ (US-West)│  │ (Europe) │  │  (Asia)  │         │       │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘         │       │
│  │        │             │             │             │               │       │
│  │        └─────────────┴─────────────┴─────────────┘               │       │
│  │                    Paxos Groups                                   │       │
│  └──────────────────────────────────────────────────────────────────┘       │
│                                                                              │
│  DATA MODEL:                                                                │
│  ───────────                                                                │
│  Directory (unit of data movement) → Paxos Group                           │
│  Directory contains contiguous key range                                    │
│  Tables can be interleaved (parent-child locality)                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
Spanner TrueTime & Commit Wait

CockroachDB

┌─────────────────────────────────────────────────────────────────────────────┐
│                       COCKROACHDB                                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  "Open-source Spanner" - Distributed SQL without TrueTime                   │
│                                                                              │
│  KEY DIFFERENCES FROM SPANNER:                                              │
│  ─────────────────────────────                                              │
│  • No TrueTime → uses HLC (Hybrid Logical Clocks)                          │
│  • May require transaction restarts for clock skew                          │
│  • Uses Raft instead of Paxos                                               │
│                                                                              │
│  ARCHITECTURE:                                                              │
│  ─────────────                                                              │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                           CLUSTER                                    │    │
│  │  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐                │    │
│  │  │    Node 1    │ │    Node 2    │ │    Node 3    │                │    │
│  │  │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │                │    │
│  │  │ │ Store 1  │ │ │ │ Store 2  │ │ │ │ Store 3  │ │                │    │
│  │  │ │ ┌──────┐ │ │ │ │ ┌──────┐ │ │ │ │ ┌──────┐ │ │                │    │
│  │  │ │ │Range1│ │ │ │ │ │Range1│ │ │ │ │ │Range1│ │ │  ← Raft group  │    │
│  │  │ │ │Range2│ │ │ │ │ │Range3│ │ │ │ │ │Range2│ │ │                │    │
│  │  │ │ │Range4│ │ │ │ │ │Range5│ │ │ │ │ │Range3│ │ │                │    │
│  │  │ │ └──────┘ │ │ │ │ └──────┘ │ │ │ │ └──────┘ │ │                │    │
│  │  │ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │                │    │
│  │  └──────────────┘ └──────────────┘ └──────────────┘                │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│  TRANSACTION PROTOCOL:                                                      │
│  ─────────────────────                                                      │
│  Uses parallel commits + write intents                                      │
│  1. Write intents (provisional values) to all keys                          │
│  2. Commit transaction record                                               │
│  3. Resolve intents asynchronously                                          │
│                                                                              │
│  SERIALIZABLE ISOLATION by default!                                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Cassandra

┌─────────────────────────────────────────────────────────────────────────────┐
│                         CASSANDRA                                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  "Distributed wide-column store with tunable consistency"                   │
│                                                                              │
│  ARCHITECTURE: Leaderless, peer-to-peer                                     │
│  ─────────────                                                              │
│                                                                              │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐                     │
│  │ Node 1  │◄──│ Node 2  │◄──│ Node 3  │◄──│ Node 4  │                     │
│  │         │──►│         │──►│         │──►│         │                     │
│  └────▲────┘   └────▲────┘   └────▲────┘   └────▲────┘                     │
│       │             │             │             │                           │
│       └─────────────┴─────────────┴─────────────┘                           │
│                     Gossip Protocol                                         │
│                                                                              │
│  DATA MODEL:                                                                │
│  ───────────                                                                │
│  Keyspace → Tables → Rows (identified by partition key)                    │
│  Wide rows: Many columns per row                                            │
│  Clustering columns for sorting within partition                            │
│                                                                              │
│  CONSISTENCY LEVELS:                                                        │
│  ───────────────────                                                        │
│  ONE:          Fastest, one replica responds                                │
│  QUORUM:       Majority of replicas                                         │
│  LOCAL_QUORUM: Majority in local datacenter                                 │
│  ALL:          All replicas (strongest, slowest)                            │
│                                                                              │
│  WRITE PATH:                                                                │
│  ───────────                                                                │
│  1. Commit log (durability)                                                 │
│  2. Memtable (in-memory)                                                    │
│  3. Flush to SSTable (immutable)                                            │
│  4. Compaction (merge SSTables)                                             │
│                                                                              │
│  ANTI-ENTROPY:                                                              │
│  ─────────────                                                              │
│  • Read repair during queries                                               │
│  • Merkle tree comparison                                                   │
│  • Hinted handoff for temporary failures                                    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

When to Use What?

┌────────────────────┬────────────────────────────────────────────────────────┐
│ Database           │ Best For                                               │
├────────────────────┼────────────────────────────────────────────────────────┤
│ Spanner            │ Global consistency, financial, inventory               │
│                    │ Google Cloud, need TrueTime guarantees                 │
├────────────────────┼────────────────────────────────────────────────────────┤
│ CockroachDB        │ Strong consistency, SQL, open-source                   │
│                    │ Multi-region, serializable transactions                │
├────────────────────┼────────────────────────────────────────────────────────┤
│ Cassandra          │ High write throughput, time-series                     │
│                    │ Availability over consistency, IoT                     │
├────────────────────┼────────────────────────────────────────────────────────┤
│ DynamoDB           │ Serverless, predictable performance                    │
│                    │ AWS native, simple key-value                           │
├────────────────────┼────────────────────────────────────────────────────────┤
│ MongoDB            │ Document model, flexible schema                        │
│                    │ Rapid development, JSON-native                         │
├────────────────────┼────────────────────────────────────────────────────────┤
│ TiDB               │ MySQL compatible, HTAP                                 │
│                    │ Hybrid analytics + transactions                        │
└────────────────────┴────────────────────────────────────────────────────────┘

Module 25: Distributed Storage Systems

HDFS / GFS Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                      HDFS ARCHITECTURE                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  DESIGNED FOR: Large files, sequential reads, batch processing              │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                        NAMENODE (Master)                            │    │
│  │  • File system namespace (directory tree)                           │    │
│  │  • File → Block mapping                                             │    │
│  │  • Block → DataNode mapping                                         │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                │                                            │
│            ┌──────────────────┬┴─────────────────┐                          │
│            │                  │                  │                          │
│            ▼                  ▼                  ▼                          │
│  ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐                │
│  │   DataNode 1    │ │   DataNode 2    │ │   DataNode 3    │                │
│  │                 │ │                 │ │                 │                │
│  │ [Block A copy1] │ │ [Block A copy2] │ │ [Block A copy3] │                │
│  │ [Block B copy1] │ │ [Block C copy1] │ │ [Block B copy2] │                │
│  │ [Block D copy1] │ │ [Block B copy3] │ │ [Block D copy2] │                │
│  └─────────────────┘ └─────────────────┘ └─────────────────┘                │
│                                                                              │
│  BLOCK SIZE: 128MB (vs 4KB in traditional FS)                               │
│  REPLICATION: 3 copies by default                                           │
│  RACK AWARENESS: Copies on different racks                                  │
│                                                                              │
│  WRITE PATH:                                                                │
│  ───────────                                                                │
│  1. Client asks NameNode for DataNodes                                      │
│  2. Client writes to first DataNode                                         │
│  3. First DataNode pipelines to second                                      │
│  4. Second pipelines to third                                               │
│  5. ACK propagates back                                                     │
│                                                                              │
│  READ PATH:                                                                 │
│  ──────────                                                                 │
│  1. Client asks NameNode for block locations                                │
│  2. Client reads from nearest DataNode                                      │
│  3. Checksum verification                                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Object Storage (S3 Architecture)

┌─────────────────────────────────────────────────────────────────────────────┐
│                      S3-STYLE OBJECT STORAGE                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  KEY CONCEPTS:                                                              │
│  ─────────────                                                              │
│  • Objects (not files): blob + metadata + key                               │
│  • Buckets: containers for objects                                          │
│  • Flat namespace (no real directories)                                     │
│  • Immutable objects (versioning for updates)                               │
│                                                                              │
│  ARCHITECTURE (Simplified):                                                 │
│  ─────────────────────────                                                  │
│                                                                              │
│    Client                                                                   │
│      │                                                                      │
│      ▼                                                                      │
│  ┌─────────────────────┐                                                    │
│  │    Load Balancer    │                                                    │
│  └──────────┬──────────┘                                                    │
│             │                                                               │
│      ┌──────┴──────┐                                                        │
│      ▼             ▼                                                        │
│  ┌────────┐   ┌────────┐                                                    │
│  │Frontend│   │Frontend│  (HTTP handling, auth, routing)                    │
│  └───┬────┘   └───┬────┘                                                    │
│      │            │                                                         │
│      ▼            ▼                                                         │
│  ┌─────────────────────┐                                                    │
│  │   Metadata Store    │  (key → data location mapping)                    │
│  │  (Distributed DB)   │                                                    │
│  └──────────┬──────────┘                                                    │
│             │                                                               │
│      ┌──────┴──────┬──────────┐                                             │
│      ▼             ▼          ▼                                             │
│  ┌────────┐   ┌────────┐  ┌────────┐                                        │
│  │ Storage│   │ Storage│  │ Storage│  (actual data blobs)                   │
│  │  Node  │   │  Node  │  │  Node  │                                        │
│  └────────┘   └────────┘  └────────┘                                        │
│                                                                              │
│  DURABILITY: 11 9's (99.999999999%)                                         │
│  HOW: Replicate across multiple datacenters + erasure coding                │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

### Advanced: Tiered Storage & S3-backed LSM Trees

Modern data systems like **Apache Kafka**, **Redpanda**, and **Rockset** use "Tiered Storage" to achieve infinite retention without scaling expensive SSDs.

#### The Architecture:
1.  **Hot Tier (Local NVMe)**: Stores the most recent data ($L_0$ SSTables or latest Kafka segments). Optimized for low-latency writes and tailing reads.
2.  **Cold Tier (Object Storage/S3)**: As data ages or SSTables are compacted, they are pushed to S3. 
3.  **Shadow Metadata**: The database maintains a local index of which blocks are local and which are in S3.

#### Advantages:
- **Decoupled Scaling**: Scale compute (CPU) independently of storage (S3 is effectively infinite).
- **Instant Cluster Rebalancing**: Since 99% of data is in S3, "moving a partition" only requires moving the metadata, not the TBs of actual data.
- **Cost**: S3 is $\sim 10x$ cheaper than provisioned EBS/SSD storage.

### Erasure Coding (Reed-Solomon)

For modern storage systems (S3, Azure Blob, HDFS 3.x), simple 3x replication is often too expensive (200% storage overhead). **Erasure Coding** provides higher durability with much lower overhead.

#### How it works: (k, m) notation
A file is split into **k** data chunks, and **m** parity chunks are calculated.
- Total chunks = $n = k + m$
- Can tolerate the loss of **any m chunks**.
- Storage overhead = $n / k$ (e.g., for (10, 4), overhead is $14/10 = 1.4x$, vs 3.0x for 3-way replication).

```text
┌───────────────────────────────────────────────────────────────┐
│                    REED-SOLOMON EXAMPLE (4, 2)                │
├───────────────────────────────────────────────────────────────┤
│                                                               │
│  DATA CHUNKS (k=4)              PARITY CHUNKS (m=2)           │
│  ┌────┐ ┌────┐ ┌────┐ ┌────┐    ┌────┐ ┌────┐                 │
│  │ D1 │ │ D2 │ │ D3 │ │ D4 │ ──► │ P1 │ │ P2 │                 │
│  └────┘ └────┘ └────┘ └────┘    └────┘ └────┘                 │
│                                                               │
│  DISTRIBUTION: Each chunk on a different disk/node/rack.      │
│                                                               │
│  RECOVERY: Any 4 of these 6 chunks can reconstruct the file.  │
│  If D1 and D3 are lost, P1 and P2 + D2 and D4 can fix it.     │
│                                                               │
└───────────────────────────────────────────────────────────────┘

Comparison: Replication vs. Erasure Coding

Feature3-Way ReplicationErasure Coding (10, 4)
Storage Overhead200% (3x)40% (1.4x)
Fault Tolerance2 nodes4 nodes
CPU CostLow (simple copy)High (matrix math)
Read LatencyLow (any replica)High if reconstructing
Write LatencyLowHigh (parity calculation)
Staff Tip: Erasure coding is best for cold data (infrequently accessed) where storage cost dominates. For hot data, replication is preferred to avoid the CPU and latency penalties of “reconstruction reads.”

---

## Advanced: Cloud-Native Database Architectures

Standard distributed databases (Shared-Nothing) scale by partitioning data across nodes. However, cloud-native databases like **AWS Aurora** and **Alibaba PolarDB** use a **Shared-Storage** architecture that fundamentally changes how replication and recovery work.

### "The Log is the Database" (AWS Aurora)

In a traditional database (MySQL/Postgres), the engine must write data pages, redo logs, undo logs, and binlogs. In a distributed environment, this creates massive network I/O. 

Aurora’s breakthrough was to **decouple compute from storage** and only send the **Redo Log** over the network.

#### How Aurora Works:
1.  **Log-Only Replication**: The primary instance doesn't send full pages to replicas. It only sends redo log records to the storage tier.
2.  **Smart Storage Tier**: The storage nodes are "database-aware." They receive log records and apply them to data pages in the background.
3.  **Quorum Writes**: Data is striped across 3 Availability Zones (AZs) with 2 copies in each (Total = 6 replicas).
    - **Write Quorum**: 4 of 6 (Tolerates 1 AZ failure + 1 node).
    - **Read Quorum**: 3 of 6.
4.  **Instantaneous Failover**: Since replicas share the same storage tier, a replica can become primary almost instantly by just replaying the last few log records.

```text
┌─────────────────────────────────────────────────────────────────────────────┐
│                    SHARED-NOTHING vs. CLOUD-NATIVE                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SHARED-NOTHING (Cassandra/Spanner)          CLOUD-NATIVE (Aurora/PolarDB)   │
│  ┌────────┐  ┌────────┐  ┌────────┐          ┌──────────────┐ (Compute)     │
│  │ Node 1 │  │ Node 2 │  │ Node 3 │          │ Read/Write   │               │
│  │ (C+S)  │  │ (C+S)  │  │ (C+S)  │          └──────┬───────┘               │
│  └────────┘  └────────┘  └────────┘                 │ (Redo Log Only)       │
│       ▲           ▲           ▲              ┌──────▼───────┐ (Storage)     │
│       └───────────┴───────────┘              │ Log-Structured│               │
│             (Sharding)                       │ Shared Storage│               │
│                                              └───────────────┘               │
│                                                                              │
│  Scale: By adding nodes (Sharding)           Scale: By decoupling layers    │
│  Recovery: Slow (data movement)              Recovery: Instant (shared disk)│
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
Staff Tip: Shared-storage architectures are generally better for RDBMS workloads (SQL) that are hard to shard but need high availability and read scalability. Shared-nothing is still king for massive horizontal scale (billions of rows) where a single storage tier would become a bottleneck.

Compute/Storage Separation: The Snowflake Model

While Aurora uses “Database-Aware” storage, analytical systems like Snowflake and Google BigQuery use a different pattern: Storage/Compute Separation via Object Storage.

1. Decoupled Scaling

  • Compute: Stateless “Virtual Warehouses” (clusters of VMs). Can be turned off or scaled to 100 nodes in seconds.
  • Storage: Highly durable, cheap Object Storage (S3/Azure Blob/GCS).

2. The Metadata Layer (The “Brain”)

The metadata layer tracks:
  • Which files (S3 objects) belong to which table.
  • Statistics for pruning (min/max values in each file).
  • Transactional state (using MVCC).

3. Local Caching

To avoid the latency of S3 on every query, compute nodes use Ephemeral SSDs to cache frequently accessed blocks.
  • Result: Performance of local disk with the elasticity of the cloud.
FeatureAurora (DB-Aware Storage)Snowflake (Object Storage)
Storage UnitData Pages / Redo LogImmutable Files (Parquet/Proprietary)
LatencyLow (Optimized for OLTP)High (Optimized for OLAP/Throughput)
ElasticityHigh (Read Replicas)Extreme (Compute clusters on-demand)
ConsistencyQuorum-based (4-of-6)Eventual/Strong via Metadata Layer

Advanced: The Shuffle Problem & Distributed Query Execution

While databases like Spanner handle OLTP (Online Transaction Processing), large-scale analytics (OLAP) systems like Presto/Trino, Spark, and ClickHouse face a different challenge: The Shuffle.

1. What is a Shuffle?

A shuffle is the process of redistributing data across a cluster so that all records with the same key end up on the same physical node. This is required for:
  • Joins: To join Table A and Table B on user_id, all rows for user_id=123 from both tables must meet on the same node.
  • Aggregations: To GROUP BY city, all “NYC” rows must be on one node.

2. The Shuffle Bottleneck

Shuffle is the most expensive operation in distributed systems because it involves:
  • Network I/O: Moving terabytes of data across the wire.
  • Disk I/O: Buffering data to disk if it doesn’t fit in RAM (spilling).
  • Serialization: Converting objects to bytes and back.

3. Execution Models: Exchange vs. Push-Based

  • Exchange Operators (Volcano Model): Each node pulls data from its predecessors. This is simple but can be slow due to “pull” overhead.
  • Push-Based (Vectorized): Modern engines like DuckDB or Photon (Databricks) push batches of data (vectors) through the pipeline, maximizing CPU cache efficiency.

4. Join Strategies

StrategyMechanismBest For
Broadcast JoinSend the small table to every node holding the large table.One small table (< 100MB). Zero Shuffle for the large table.
Shuffle JoinHash both tables by the join key and redistribute.Two large tables. Requires moving 100% of data.
Colocated JoinData is already partitioned by the join key at storage time.Best Performance. No network movement.
Staff Tip: To optimize distributed queries, your goal is to minimize the shuffle. Use broadcast joins whenever possible, and try to partition your “fact” and “dimension” tables on the same key at the storage level.

Advanced Storage: Storage Engines (LSM-Trees vs. B-Trees)

In distributed databases like Cassandra, RocksDB, and Spanner, the Storage Engine—the part that actually writes to disk—is a critical design choice.

1. B-Trees (Read-Optimized)

Standard in RDBMS (Postgres, MySQL).
  • Structure: Sorted tree of fixed-size blocks (pages).
  • Write: Overwrites data in place.
  • Pros: Fast, deterministic reads.
  • Cons: Write amplification (updating a small row requires writing a full page) and random disk I/O.

2. LSM-Trees (Write-Optimized)

Log-Structured Merge-Trees are used in Cassandra, BigTable, and RocksDB.

The LSM Write Path (The “Log” part)

Writes never overwrite data. They are always appended.
  1. WAL (Write Ahead Log): Append-only log for durability.
  2. Memtable: In-memory sorted buffer (often a Skip List or Red-Black Tree).
  3. SSTable (Sorted String Table): When the memtable is full, it’s flushed to disk as an immutable sorted file.

The LSM Read Path

  1. Check the Memtable.
  2. Check the Bloom Filter (a probabilistic data structure that tells you if a key might exist or definitely doesn’t exist).
  3. If Bloom filter says “maybe,” check the SSTable Index and read from disk.

Compaction (The “Merge” part)

Since SSTables are immutable, updates/deletes just add more records. Compaction merges these files in the background.
StrategyBehaviorBest For
Size-TieredMerges SSTables of similar size.Write-heavy workloads (Cassandra).
LeveledOrganizes SSTables into levels (L0,L1,...L_0, L_1, ...). Each level is 10x10x larger.Read-heavy workloads; minimizes read amplification (RocksDB).

Comparison: The Three Amplifications

In Staff-level interviews, you must discuss the RUM Conjecture (Read, Update, Memory):
FeatureB-TreeLSM-Tree
Write AmplificationHigh (random I/O, page splits)Lower (sequential appends)
Read AmplificationLow (fixed tree depth)Higher (checking multiple SSTables)
Space AmplificationLow (mostly compact)High (duplicate versions, tombstones)
Staff Tip: If your system has high write volume (e.g., logging, metrics, or a distributed ledger), choose an LSM-Tree. If you need low-latency range reads on a stable dataset, choose a B-Tree.

Advanced: The Bw-Tree & LLAMA (Lock-Free Storage)

For high-concurrency systems (like Microsoft SQL Server’s Hekaton or Azure Cosmos DB), traditional B-Trees suffer from latch contention (locking) and “in-place” update overhead. The Bw-Tree (Buzzword Tree) solves this using a log-structured, lock-free approach.

1. The Mapping Table

The core innovation is a Mapping Table that maps a logical Page ID to a physical memory pointer.
  • Benefit: Instead of updating a page in-place (which requires a lock), you create a new version of the page and update the pointer in the mapping table using a single Atomic CAS (Compare-and-Swap).

2. Delta Records

Instead of rewriting a 4KB page for a 10-byte change, Bw-Tree appends a Delta Record to the existing page.
  • Chain: A logical page becomes a chain: [Mapping Table] -> [Delta 2] -> [Delta 1] -> [Base Page].
  • Consolidation: When the chain grows too long (e.g., > 8 deltas), a thread “consolidates” them into a new Base Page.

3. LLAMA (Latch-Free, Log-structured Access Method)

LLAMA is the storage subsystem that manages these pages. It ensures that both the in-memory state and the on-disk state are log-structured and latch-free.
FeatureB-TreeBw-Tree
ConcurrencyLatches/Locks (Blocking)CAS/Mapping Table (Lock-free)
UpdatesIn-place (High I/O)Delta-appends (Low I/O)
Cache EfficiencyLow (Pointer chasing)High (Log-structured)

Advanced: WAL Internals & Zero-Copy I/O

At the “Principal” level, performance is often gated by how fast you can write the Write-Ahead Log (WAL). High-performance logs (like Kafka or Finagle) use OS-level optimizations to bypass the CPU.

1. Zero-Copy (sendfile)

In a traditional write, data moves: Disk → Kernel Buffer → App Buffer → Socket Buffer → NIC. With Zero-Copy (sendfile system call), the data moves: Disk → Kernel Buffer → NIC. This reduces context switches and memory bus contention, allowing a single node to saturate a 100Gbps link.

2. Group Commit

Instead of calling fsync() for every request (which is slow due to disk head movement), the system buffers multiple writes and performs one large synchronous write.
  • Trade-off: Increases latency for individual requests but massively increases total throughput.

3. Direct I/O (O_DIRECT)

Databases like ScyllaDB or Oracle bypass the OS Page Cache entirely.
  • Why? The OS cache uses LRU (Least Recently Used), but databases often know better (e.g., “I’m doing a sequential scan, don’t cache this”). Direct I/O allows the database to manage its own “Buffer Pool” with domain-specific knowledge.

Module 26: Stream Processing

Kafka Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                        KAFKA ARCHITECTURE                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  CORE CONCEPTS:                                                             │
│  ──────────────                                                             │
│  • Topic: Category of messages                                              │
│  • Partition: Ordered, immutable sequence of messages                       │
│  • Broker: Kafka server                                                     │
│  • Consumer Group: Set of consumers sharing work                            │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                        TOPIC: orders                                │    │
│  │                                                                     │    │
│  │  Partition 0    Partition 1    Partition 2    Partition 3          │    │
│  │  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐          │    │
│  │  │0│1│2│3│4│    │0│1│2│3│ │    │0│1│2│   │    │0│1│2│3│4│5│        │    │
│  │  └─────────┘    └─────────┘    └─────────┘    └─────────┘          │    │
│  │      │              │              │              │                │    │
│  │  ┌───┴───┐      ┌───┴───┐      ┌───┴───┐      ┌───┴───┐           │    │
│  │  │Broker1│      │Broker2│      │Broker3│      │Broker1│           │    │
│  │  │Leader │      │Leader │      │Leader │      │Leader │           │    │
│  │  └───────┘      └───────┘      └───────┘      └───────┘           │    │
│  │                                                                     │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│  CONSUMER GROUPS:                                                           │
│  ────────────────                                                           │
│                                                                              │
│  Consumer Group A              Consumer Group B                             │
│  ┌────────────────────┐        ┌────────────────────┐                       │
│  │ C1: P0, P1         │        │ C1: P0             │                       │
│  │ C2: P2, P3         │        │ C2: P1             │                       │
│  │                    │        │ C3: P2             │                       │
│  │                    │        │ C4: P3             │                       │
│  └────────────────────┘        └────────────────────┘                       │
│                                                                              │
│  Each partition consumed by exactly ONE consumer per group                  │
│  Different groups can consume same partition independently                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
Kafka Architecture

Kafka Guarantees

┌─────────────────────────────────────────────────────────────────────────────┐
│                      KAFKA GUARANTEES                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ORDERING:                                                                  │
│  ─────────                                                                  │
│  ✓ Total order within partition                                             │
│  ✗ No order guarantee across partitions                                     │
│                                                                              │
│  DURABILITY:                                                                │
│  ───────────                                                                │
│  acks=0: No wait (fastest, least durable)                                   │
│  acks=1: Wait for leader (balanced)                                         │
│  acks=all: Wait for all replicas (slowest, most durable)                    │
│                                                                              │
│  EXACTLY-ONCE (Idempotent Producer + Transactions):                         │
│  ─────────────────────────────────────────────────                          │
│                                                                              │
│  producer = KafkaProducer(                                                  │
│      enable_idempotence=True,      # Prevents duplicates                    │
│      transactional_id="my-txn-id"  # For transactions                       │
│  )                                                                          │
│                                                                              │
│  producer.init_transactions()                                               │
│  producer.begin_transaction()                                               │
│  producer.send(topic1, message1)                                            │
│  producer.send(topic2, message2)                                            │
│  producer.commit_transaction()  # Atomic across topics!                     │
│                                                                              │
│  CONSUMER EXACTLY-ONCE:                                                     │
│  ──────────────────────                                                     │
│  1. Consume messages                                                        │
│  2. Process and produce results                                             │
│  3. Commit offsets + production atomically                                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Stream Processing Concepts

┌─────────────────────────────────────────────────────────────────────────────┐
│                    STREAM PROCESSING                                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  WINDOWING:                                                                 │
│  ──────────                                                                 │
│                                                                              │
│  TUMBLING (Fixed, non-overlapping)                                          │
│  ├────────┤├────────┤├────────┤                                             │
│  │ 0-10   ││ 10-20  ││ 20-30  │                                             │
│  ├────────┤├────────┤├────────┤                                             │
│                                                                              │
│  SLIDING (Overlapping)                                                      │
│  ├──────────┤                                                               │
│       ├──────────┤                                                          │
│            ├──────────┤                                                     │
│  Window every 5 min, size 10 min                                            │
│                                                                              │
│  SESSION (Gap-based)                                                        │
│  ├──────┤    ├───────────┤     ├───┤                                        │
│  │ User │    │ User      │     │   │                                        │
│  │ A    │    │ A         │     │ A │                                        │
│  └──────┘    └───────────┘     └───┘                                        │
│         gap          gap                                                    │
│                                                                              │
### Watermarks (Handling Late Data)

Watermarks are the bridge between **Event Time** and **Processing Time**. They tell a stream processor that it can safely "close" a time window because no more events from that period are expected.

#### Key Concepts:
- **Event time**: When the event actually happened (recorded by the sensor/mobile app).
- **Processing time**: When the event reached the Flink/Spark/Dataflow cluster.
- **Skew**: The difference between event time and processing time (caused by network delays, outages).

#### Watermark Heuristics
1. **Fixed Delay**: $W = T_{max\_seen} - delay$. (e.g., "I'll wait 5 seconds for late data").
2. **Heuristic/Adaptive**: Track the $99^{th}$ percentile of skew and adjust the watermark dynamically.

#### Watermark Propagation
In a complex DAG (Directed Acyclic Graph) of operators, a node with multiple inputs must take the **minimum** of its input watermarks:
$W_{out} = \min(W_{in1}, W_{in2}, ...)$

```text
┌───────────────────────────────────────────────────────────────┐
│                    WATERMARK PROPAGATION                      │
├───────────────────────────────────────────────────────────────┤
│                                                               │
│  Source 1 (W=10) ───┐                                         │
│                     │      ┌───────────────┐                  │
│                     ├─────►│ Join Operator │ ───► W_out=10    │
│  Source 2 (W=12) ───┘      │ (Min logic)   │                  │
│                            └───────────────┘                  │
│                                                               │
│  WHY MIN? If the Join operator used W=12, it might drop late  │
│  events from Source 1 (which are only up to 10).              │
│                                                               │
└───────────────────────────────────────────────────────────────┘

Handling Late Data (Post-Watermark)

When an event arrives with t<Wt < W, it is considered Late. Options:
  • Drop: Discard the event (standard for real-time dashboards).
  • Side Output: Send to a separate “late-events” stream for manual correction.
  • Update: Re-calculate the window and emit an updated result (costly).


---

## Advanced: Distributed SQL Internals (Raft-per-Range)

Modern NewSQL databases like **CockroachDB**, **TiDB**, and **YugabyteDB** combine the scalability of NoSQL with the transactional guarantees of traditional RDBMS. The key innovation is **Raft-per-Range**.

### 1. The Range Abstraction

Data is divided into **Ranges** (also called **Regions** in TiDB), each ~64-128MB. Each range is a contiguous slice of the keyspace.

```text
┌─────────────────────────────────────────────────────────────────────────────┐
│                    RAFT-PER-RANGE ARCHITECTURE                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  KEYSPACE:  [A──────────────────────────────────────────────────────Z]       │
│                                                                              │
│  Range 1: [A───────G]   Range 2: [G───────M]   Range 3: [M───────Z]         │
│                                                                              │
│  Each Range has its OWN Raft Group:                                         │
│                                                                              │
│  ┌─────────────────┐   ┌─────────────────┐   ┌─────────────────┐            │
│  │ Raft Group 1    │   │ Raft Group 2    │   │ Raft Group 3    │            │
│  │ L=Node1         │   │ L=Node2         │   │ L=Node3         │            │
│  │ F=Node2, Node3  │   │ F=Node3, Node1  │   │ F=Node1, Node2  │            │
│  └─────────────────┘   └─────────────────┘   └─────────────────┘            │
│                                                                              │
│  BENEFIT: A transaction touching only Range 1 doesn't wait for Ranges 2/3.  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

2. Multi-Range Transactions (Percolator Model)

When a transaction touches keys in multiple ranges (e.g., a bank transfer from User A to User B), the database must coordinate across Raft groups. CockroachDB and TiDB use a variant of Google’s Percolator protocol. The Two-Phase Commit (Simplified):
  1. Prewrite: Write “intent” records (locks) to all involved keys across ranges. The first key is the Primary Lock.
  2. Commit: If prewrite succeeds, write a commit record to the primary key’s row. Then, asynchronously resolve all other intents.
  3. Recovery: If a transaction crashes, other transactions can detect stale intents and either roll them forward (if primary is committed) or roll them back.

3. Timestamp Oracle (TSO)

To achieve Serializable Snapshot Isolation (SSI), the database needs globally ordered timestamps. This is handled by a Timestamp Oracle (TSO):
  • In TiDB, the TSO is a centralized service (Placement Driver).
  • In CockroachDB, it’s decentralized using Hybrid Logical Clocks (HLC).

4. Multi-Region SQL

For global deployments, data placement becomes critical for latency.
StrategyDescriptionUse Case
Global TablesSmall, read-mostly tables replicated everywhere.Config, feature flags.
Regional TablesData pinned to a specific region.GDPR compliance.
Regional by RowRows are placed based on a column (e.g., user_country).Multi-region user data.
Staff Tip: For Staff+ interviews, be ready to discuss how these databases handle follower reads (reading from a non-leader replica for lower latency) and the consistency trade-offs involved (stale reads vs. waiting for leader).

Key Interview Questions

TrueTime + Commit Wait:
  1. TrueTime API returns time interval [earliest, latest]
    • GPS receivers + atomic clocks
    • Uncertainty typically 1-7ms
  2. Commit protocol:
    • Transaction gets timestamp T at commit
    • Wait until TrueTime.after(T) is true
    • This ensures T is definitely in the past
    • No other transaction can get earlier timestamp
  3. Result: If T1 commits before T2 starts:
    • T1’s timestamp < T2’s timestamp
    • Real-time ordering preserved globally
Trade-off: Commit latency includes wait time (average ~7ms)
Architecture:
─────────────
Client → Load Balancer → Cache Nodes

Components:
1. Hash Ring: Virtual nodes for balance
2. Client library: Consistent hashing logic
3. Cache nodes: Memcached/Redis instances

Write path:
1. Client hashes key
2. Finds responsible node on ring
3. Writes to that node + N-1 replicas

Read path:
1. Client hashes key
2. Reads from first available replica

Handling node failure:
1. Detect via heartbeat
2. Remove from ring
3. Keys automatically route to next node
4. Warm cache gradually

Adding node:
1. Add to ring with virtual nodes
2. Only 1/N keys need to move
3. Background migration

Key decisions:
- Virtual nodes: 100-200 per physical
- Replication factor: 3 typical
- Quorum reads: Optional for consistency
Three mechanisms:
  1. Idempotent Producer
    producer = KafkaProducer(enable_idempotence=True)
    
    • Broker deduplicates by (producer_id, sequence_number)
    • Prevents duplicates from retries
  2. Transactional Producer
    producer.begin_transaction()
    producer.send(topic1, msg1)
    producer.send(topic2, msg2)
    producer.commit_transaction()
    
    • Atomic writes across partitions
    • Uses two-phase commit internally
  3. Consumer read_committed
    consumer = KafkaConsumer(
        isolation_level='read_committed'
    )
    
    • Only sees committed messages
End-to-end exactly-once:
  • Consume → Process → Produce atomically
  • Commit consumer offset in same transaction
  • If crash, replay from last committed offset
CASSANDRA:
✓ Open source, no vendor lock-in
✓ More control over deployment
✓ Better for very high write throughput
✓ CQL (SQL-like) query language
✗ Operational complexity
✗ Need to manage cluster yourself

DYNAMODB:
✓ Fully managed, serverless option
✓ Predictable latency (single-digit ms)
✓ Auto-scaling built in
✓ AWS integration (IAM, Lambda, etc.)
✗ Vendor lock-in
✗ Can be expensive at scale
✗ Less flexible query patterns

CHOOSE CASSANDRA:
- Need multi-cloud or on-premise
- Write-heavy workload (millions/sec)
- Complex queries within partition
- Budget constrained at large scale

CHOOSE DYNAMODB:
- AWS-native architecture
- Minimal ops overhead
- Predictable performance SLA
- Pay-per-request pricing works

Next Steps

Continue to Track 6: Production Excellence

Learn observability, chaos engineering, and SRE practices