Track 5: Data Systems at Scale
Building and understanding data systems that handle millions of operations per second.Track Duration: 44-54 hours
Modules: 5
Key Topics: Partitioning, Consistent Hashing, Spanner, Kafka, Stream Processing
Modules: 5
Key Topics: Partitioning, Consistent Hashing, Spanner, Kafka, Stream Processing
Module 22: Partitioning Strategies
Why Partition?
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ WHY PARTITIONING? │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SINGLE NODE LIMITS: │
│ ─────────────────── │
│ Data: Can't fit TB of data on one disk │
│ CPU: Can't handle 1M QPS on one CPU │
│ Memory: Can't cache everything in one RAM │
│ │
│ SOLUTION: Split data across multiple nodes (partitions/shards) │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ FULL DATASET │ │
│ │ │ │
│ │ ┌─────────────┬─────────────┬─────────────┬─────────────┐ │ │
│ │ │ Partition 0 │ Partition 1 │ Partition 2 │ Partition 3 │ │ │
│ │ │ (A-F) │ (G-L) │ (M-R) │ (S-Z) │ │ │
│ │ └──────┬──────┴──────┬──────┴──────┬──────┴──────┬──────┘ │ │
│ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ Node 1 Node 2 Node 3 Node 4 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ GOAL: Even data distribution + even query load │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Key-Range Partitioning
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ KEY-RANGE PARTITIONING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Assign contiguous ranges of keys to each partition │
│ │
│ Example: Partitioning by user_id (alphabetical) │
│ │
│ Partition 1: user_id A-C │
│ Partition 2: user_id D-F │
│ Partition 3: user_id G-I │
│ ... │
│ │
│ PROS: │
│ ───── │
│ ✓ Range queries efficient (scan contiguous partition) │
│ ✓ Sorted within partition │
│ ✓ Easy to understand │
│ │
│ CONS: │
│ ───── │
│ ✗ HOT SPOTS! Some ranges get more traffic │
│ ✗ Timestamps cause sequential writes to one partition │
│ ✗ Manual rebalancing often needed │
│ │
│ HOT SPOT EXAMPLE: │
│ ───────────────── │
│ Key = timestamp │
│ All writes go to "current time" partition │
│ Result: One partition overloaded, others idle │
│ │
│ SOLUTION: Compound key │
│ Key = (sensor_id, timestamp) │
│ Writes spread across partitions by sensor_id │
│ │
│ USED BY: HBase, BigTable, traditional RDBMS sharding │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Hash Partitioning
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HASH PARTITIONING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Hash the key, use hash to determine partition │
│ │
│ partition = hash(key) % num_partitions │
│ │
│ Example: │
│ hash("user123") = 847291 │
│ 847291 % 4 = 3 │
│ → Goes to partition 3 │
│ │
│ PROS: │
│ ───── │
│ ✓ Even distribution (good hash = uniform spread) │
│ ✓ No hot spots from key patterns │
│ ✓ Works with any key type │
│ │
│ CONS: │
│ ───── │
│ ✗ Range queries inefficient (must query all partitions) │
│ ✗ Changing num_partitions = rehash everything! │
│ ✗ No natural ordering │
│ │
│ REHASHING PROBLEM: │
│ ────────────────── │
│ Before: 4 partitions │
│ hash("user123") % 4 = 3 │
│ │
│ After: 5 partitions │
│ hash("user123") % 5 = 1 │
│ │
│ Almost all data moves! (See: Consistent Hashing) │
│ │
│ USED BY: MongoDB (with hashed shard key), Cassandra, Riak │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Secondary Indexes
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ SECONDARY INDEXES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM: Data partitioned by primary key │
│ But queries often filter by other columns │
│ │
│ Example: │
│ Primary key: user_id (partitioned by this) │
│ Query: SELECT * FROM users WHERE city = 'NYC' │
│ │
│ ═══════════════════════════════════════════════════════════════════════ │
│ │
│ OPTION 1: LOCAL INDEX (document-partitioned) │
│ ───────────────────────────────────────────── │
│ Each partition maintains its own index │
│ │
│ Partition 1 Partition 2 Partition 3 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Data: A-F │ │ Data: G-L │ │ Data: M-Z │ │
│ │ Index: city │ │ Index: city │ │ Index: city │ │
│ │ NYC: [a,c] │ │ NYC: [h,j] │ │ NYC: [n,p] │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Query city=NYC: Must query ALL partitions (scatter-gather) │
│ Write: Fast (update one partition's index) │
│ │
│ ═══════════════════════════════════════════════════════════════════════ │
│ │
│ OPTION 2: GLOBAL INDEX (term-partitioned) │
│ ───────────────────────────────────────── │
│ Index partitioned separately from data │
│ │
│ Index Partition: Cities A-M Index Partition: Cities N-Z │
│ ┌────────────────────────┐ ┌────────────────────────┐ │
│ │ Boston: [partitions] │ │ NYC: [partitions] │ │
│ │ Chicago: [partitions] │ │ Seattle: [partitions] │ │
│ └────────────────────────┘ └────────────────────────┘ │
│ │
│ Query city=NYC: Query one index partition (fast!) │
│ Write: May need to update multiple index partitions (slow) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Module 23: Consistent Hashing
The foundational algorithm for distributed systems.Virtual Nodes
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ VIRTUAL NODES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM: Few nodes = uneven distribution │
│ │
│ 3 nodes might hash to: │
│ N1: 45°, N2: 90°, N3: 300° │
│ N1 handles 45°, N2 handles 45°, N3 handles 270° (uneven!) │
│ │
│ SOLUTION: Each physical node = many virtual nodes │
│ │
│ Physical Node A → Virtual: A1, A2, A3, A4, A5 (5 positions on ring) │
│ Physical Node B → Virtual: B1, B2, B3, B4, B5 (5 positions on ring) │
│ Physical Node C → Virtual: C1, C2, C3, C4, C5 (5 positions on ring) │
│ │
│ With 15 points on ring, distribution is much more even! │
│ │
│ TYPICAL: 100-200 virtual nodes per physical node │
│ │
│ HETEROGENEOUS HARDWARE: │
│ ─────────────────────── │
│ Powerful server → 200 virtual nodes │
│ Weak server → 50 virtual nodes │
│ Proportional load distribution! │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Implementation
Copy
import hashlib
from bisect import bisect_right
class ConsistentHash:
def __init__(self, nodes=None, virtual_nodes=100):
self.virtual_nodes = virtual_nodes
self.ring = {} # hash -> node
self.sorted_hashes = []
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
"""Generate hash for a key"""
return int(hashlib.md5(
key.encode()
).hexdigest(), 16)
def add_node(self, node):
"""Add a node with virtual nodes"""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:{i}"
h = self._hash(virtual_key)
self.ring[h] = node
self.sorted_hashes.append(h)
self.sorted_hashes.sort()
def remove_node(self, node):
"""Remove a node and its virtual nodes"""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:{i}"
h = self._hash(virtual_key)
del self.ring[h]
self.sorted_hashes.remove(h)
def get_node(self, key):
"""Find the node responsible for this key"""
if not self.ring:
return None
h = self._hash(key)
# Find first node with hash >= key's hash
idx = bisect_right(self.sorted_hashes, h)
# Wrap around if needed
if idx >= len(self.sorted_hashes):
idx = 0
return self.ring[self.sorted_hashes[idx]]
def get_nodes(self, key, n=3):
"""Get n nodes for replication"""
if not self.ring:
return []
h = self._hash(key)
idx = bisect_right(self.sorted_hashes, h)
nodes = []
seen = set()
for i in range(len(self.sorted_hashes)):
node_idx = (idx + i) % len(self.sorted_hashes)
node = self.ring[self.sorted_hashes[node_idx]]
if node not in seen:
nodes.append(node)
seen.add(node)
if len(nodes) >= n:
break
return nodes
# Usage
ch = ConsistentHash(['node1', 'node2', 'node3'])
# Get node for a key
node = ch.get_node("user:12345")
# Get 3 nodes for replication
replicas = ch.get_nodes("user:12345", n=3)
Alternative: Rendezvous Hashing
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ RENDEZVOUS HASHING (HRW) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ "Highest Random Weight" - alternative to consistent hashing │
│ │
│ ALGORITHM: │
│ ────────── │
│ For each key: │
│ 1. Compute weight(key, node) for each node │
│ 2. Pick node with highest weight │
│ │
│ weight = hash(key + node_id) │
│ │
│ EXAMPLE: │
│ ──────── │
│ key = "user:123" │
│ hash("user:123" + "node1") = 847 │
│ hash("user:123" + "node2") = 234 │
│ hash("user:123" + "node3") = 912 ← Winner! │
│ │
│ PROS: │
│ ───── │
│ ✓ Simpler than consistent hashing │
│ ✓ No virtual nodes needed for balance │
│ ✓ Easy to get k nodes (top k weights) │
│ │
│ CONS: │
│ ───── │
│ ✗ O(n) lookup (must compute all weights) │
│ ✗ Consistent hashing is O(log n) │
│ │
│ USED BY: Uber's Ringpop, some CDNs │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Module 24: Distributed Databases Deep Dive
Google Spanner
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ GOOGLE SPANNER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ "Globally distributed, strongly consistent, SQL database" │
│ │
│ KEY INNOVATIONS: │
│ ──────────────── │
│ │
│ 1. TRUETIME │
│ GPS + atomic clocks in every datacenter │
│ TT.now() returns [earliest, latest] interval │
│ Typical uncertainty: 1-7ms │
│ │
│ 2. EXTERNAL CONSISTENCY │
│ If T1 commits before T2 starts, T1's commit timestamp < T2's │
│ Real-time ordering preserved globally! │
│ │
│ 3. COMMIT WAIT │
│ After assigning timestamp, wait until TT.after(timestamp) │
│ Ensures no other transaction can get earlier timestamp │
│ │
│ ARCHITECTURE: │
│ ───────────── │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ UNIVERSE │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Zone A │ │ Zone B │ │ Zone C │ │ Zone D │ │ │
│ │ │ (US-East)│ │ (US-West)│ │ (Europe) │ │ (Asia) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │ │
│ │ └─────────────┴─────────────┴─────────────┘ │ │
│ │ Paxos Groups │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ DATA MODEL: │
│ ─────────── │
│ Directory (unit of data movement) → Paxos Group │
│ Directory contains contiguous key range │
│ Tables can be interleaved (parent-child locality) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
CockroachDB
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ COCKROACHDB │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ "Open-source Spanner" - Distributed SQL without TrueTime │
│ │
│ KEY DIFFERENCES FROM SPANNER: │
│ ───────────────────────────── │
│ • No TrueTime → uses HLC (Hybrid Logical Clocks) │
│ • May require transaction restarts for clock skew │
│ • Uses Raft instead of Paxos │
│ │
│ ARCHITECTURE: │
│ ───────────── │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CLUSTER │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │
│ │ │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │ │ │
│ │ │ │ Store 1 │ │ │ │ Store 2 │ │ │ │ Store 3 │ │ │ │
│ │ │ │ ┌──────┐ │ │ │ │ ┌──────┐ │ │ │ │ ┌──────┐ │ │ │ │
│ │ │ │ │Range1│ │ │ │ │ │Range1│ │ │ │ │ │Range1│ │ │ ← Raft group │ │
│ │ │ │ │Range2│ │ │ │ │ │Range3│ │ │ │ │ │Range2│ │ │ │ │
│ │ │ │ │Range4│ │ │ │ │ │Range5│ │ │ │ │ │Range3│ │ │ │ │
│ │ │ │ └──────┘ │ │ │ │ └──────┘ │ │ │ │ └──────┘ │ │ │ │
│ │ │ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ TRANSACTION PROTOCOL: │
│ ───────────────────── │
│ Uses parallel commits + write intents │
│ 1. Write intents (provisional values) to all keys │
│ 2. Commit transaction record │
│ 3. Resolve intents asynchronously │
│ │
│ SERIALIZABLE ISOLATION by default! │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Cassandra
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CASSANDRA │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ "Distributed wide-column store with tunable consistency" │
│ │
│ ARCHITECTURE: Leaderless, peer-to-peer │
│ ───────────── │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │◄──│ Node 2 │◄──│ Node 3 │◄──│ Node 4 │ │
│ │ │──►│ │──►│ │──►│ │ │
│ └────▲────┘ └────▲────┘ └────▲────┘ └────▲────┘ │
│ │ │ │ │ │
│ └─────────────┴─────────────┴─────────────┘ │
│ Gossip Protocol │
│ │
│ DATA MODEL: │
│ ─────────── │
│ Keyspace → Tables → Rows (identified by partition key) │
│ Wide rows: Many columns per row │
│ Clustering columns for sorting within partition │
│ │
│ CONSISTENCY LEVELS: │
│ ─────────────────── │
│ ONE: Fastest, one replica responds │
│ QUORUM: Majority of replicas │
│ LOCAL_QUORUM: Majority in local datacenter │
│ ALL: All replicas (strongest, slowest) │
│ │
│ WRITE PATH: │
│ ─────────── │
│ 1. Commit log (durability) │
│ 2. Memtable (in-memory) │
│ 3. Flush to SSTable (immutable) │
│ 4. Compaction (merge SSTables) │
│ │
│ ANTI-ENTROPY: │
│ ───────────── │
│ • Read repair during queries │
│ • Merkle tree comparison │
│ • Hinted handoff for temporary failures │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
When to Use What?
Copy
┌────────────────────┬────────────────────────────────────────────────────────┐
│ Database │ Best For │
├────────────────────┼────────────────────────────────────────────────────────┤
│ Spanner │ Global consistency, financial, inventory │
│ │ Google Cloud, need TrueTime guarantees │
├────────────────────┼────────────────────────────────────────────────────────┤
│ CockroachDB │ Strong consistency, SQL, open-source │
│ │ Multi-region, serializable transactions │
├────────────────────┼────────────────────────────────────────────────────────┤
│ Cassandra │ High write throughput, time-series │
│ │ Availability over consistency, IoT │
├────────────────────┼────────────────────────────────────────────────────────┤
│ DynamoDB │ Serverless, predictable performance │
│ │ AWS native, simple key-value │
├────────────────────┼────────────────────────────────────────────────────────┤
│ MongoDB │ Document model, flexible schema │
│ │ Rapid development, JSON-native │
├────────────────────┼────────────────────────────────────────────────────────┤
│ TiDB │ MySQL compatible, HTAP │
│ │ Hybrid analytics + transactions │
└────────────────────┴────────────────────────────────────────────────────────┘
Module 25: Distributed Storage Systems
HDFS / GFS Architecture
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HDFS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DESIGNED FOR: Large files, sequential reads, batch processing │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ NAMENODE (Master) │ │
│ │ • File system namespace (directory tree) │ │
│ │ • File → Block mapping │ │
│ │ • Block → DataNode mapping │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┬┴─────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ DataNode 1 │ │ DataNode 2 │ │ DataNode 3 │ │
│ │ │ │ │ │ │ │
│ │ [Block A copy1] │ │ [Block A copy2] │ │ [Block A copy3] │ │
│ │ [Block B copy1] │ │ [Block C copy1] │ │ [Block B copy2] │ │
│ │ [Block D copy1] │ │ [Block B copy3] │ │ [Block D copy2] │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ BLOCK SIZE: 128MB (vs 4KB in traditional FS) │
│ REPLICATION: 3 copies by default │
│ RACK AWARENESS: Copies on different racks │
│ │
│ WRITE PATH: │
│ ─────────── │
│ 1. Client asks NameNode for DataNodes │
│ 2. Client writes to first DataNode │
│ 3. First DataNode pipelines to second │
│ 4. Second pipelines to third │
│ 5. ACK propagates back │
│ │
│ READ PATH: │
│ ────────── │
│ 1. Client asks NameNode for block locations │
│ 2. Client reads from nearest DataNode │
│ 3. Checksum verification │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Object Storage (S3 Architecture)
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ S3-STYLE OBJECT STORAGE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ KEY CONCEPTS: │
│ ───────────── │
│ • Objects (not files): blob + metadata + key │
│ • Buckets: containers for objects │
│ • Flat namespace (no real directories) │
│ • Immutable objects (versioning for updates) │
│ │
│ ARCHITECTURE (Simplified): │
│ ───────────────────────── │
│ │
│ Client │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Load Balancer │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ ▼ ▼ │
│ ┌────────┐ ┌────────┐ │
│ │Frontend│ │Frontend│ (HTTP handling, auth, routing) │
│ └───┬────┘ └───┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────┐ │
│ │ Metadata Store │ (key → data location mapping) │
│ │ (Distributed DB) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────┴──────┬──────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Storage│ │ Storage│ │ Storage│ (actual data blobs) │
│ │ Node │ │ Node │ │ Node │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
│ DURABILITY: 11 9's (99.999999999%) │
│ HOW: Replicate across multiple datacenters + erasure coding │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Module 26: Stream Processing
Kafka Architecture
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CORE CONCEPTS: │
│ ────────────── │
│ • Topic: Category of messages │
│ • Partition: Ordered, immutable sequence of messages │
│ • Broker: Kafka server │
│ • Consumer Group: Set of consumers sharing work │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TOPIC: orders │ │
│ │ │ │
│ │ Partition 0 Partition 1 Partition 2 Partition 3 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │0│1│2│3│4│ │0│1│2│3│ │ │0│1│2│ │ │0│1│2│3│4│5│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ │ │
│ │ │Broker1│ │Broker2│ │Broker3│ │Broker1│ │ │
│ │ │Leader │ │Leader │ │Leader │ │Leader │ │ │
│ │ └───────┘ └───────┘ └───────┘ └───────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ CONSUMER GROUPS: │
│ ──────────────── │
│ │
│ Consumer Group A Consumer Group B │
│ ┌────────────────────┐ ┌────────────────────┐ │
│ │ C1: P0, P1 │ │ C1: P0 │ │
│ │ C2: P2, P3 │ │ C2: P1 │ │
│ │ │ │ C3: P2 │ │
│ │ │ │ C4: P3 │ │
│ └────────────────────┘ └────────────────────┘ │
│ │
│ Each partition consumed by exactly ONE consumer per group │
│ Different groups can consume same partition independently │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Kafka Guarantees
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA GUARANTEES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ORDERING: │
│ ───────── │
│ ✓ Total order within partition │
│ ✗ No order guarantee across partitions │
│ │
│ DURABILITY: │
│ ─────────── │
│ acks=0: No wait (fastest, least durable) │
│ acks=1: Wait for leader (balanced) │
│ acks=all: Wait for all replicas (slowest, most durable) │
│ │
│ EXACTLY-ONCE (Idempotent Producer + Transactions): │
│ ───────────────────────────────────────────────── │
│ │
│ producer = KafkaProducer( │
│ enable_idempotence=True, # Prevents duplicates │
│ transactional_id="my-txn-id" # For transactions │
│ ) │
│ │
│ producer.init_transactions() │
│ producer.begin_transaction() │
│ producer.send(topic1, message1) │
│ producer.send(topic2, message2) │
│ producer.commit_transaction() # Atomic across topics! │
│ │
│ CONSUMER EXACTLY-ONCE: │
│ ────────────────────── │
│ 1. Consume messages │
│ 2. Process and produce results │
│ 3. Commit offsets + production atomically │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Stream Processing Concepts
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ STREAM PROCESSING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ WINDOWING: │
│ ────────── │
│ │
│ TUMBLING (Fixed, non-overlapping) │
│ ├────────┤├────────┤├────────┤ │
│ │ 0-10 ││ 10-20 ││ 20-30 │ │
│ ├────────┤├────────┤├────────┤ │
│ │
│ SLIDING (Overlapping) │
│ ├──────────┤ │
│ ├──────────┤ │
│ ├──────────┤ │
│ Window every 5 min, size 10 min │
│ │
│ SESSION (Gap-based) │
│ ├──────┤ ├───────────┤ ├───┤ │
│ │ User │ │ User │ │ │ │
│ │ A │ │ A │ │ A │ │
│ └──────┘ └───────────┘ └───┘ │
│ gap gap │
│ │
│ WATERMARKS (handling late data): │
│ ──────────────────────────────── │
│ │
│ Event time: When event occurred (in data) │
│ Processing time: When event is processed (wall clock) │
│ │
│ Watermark = "No events with timestamp < W expected" │
│ │
│ ─────────────────► │
│ Events: [t=5] [t=3] [t=8] [t=2] [t=10] │
│ │ │
│ Watermark at t=7 │ │
│ t=2 is LATE! (2 < 7) ◄─┘ │
│ │
│ OPTIONS FOR LATE DATA: │
│ ────────────────────── │
│ • Drop (simplest) │
│ • Allow updates to old windows (within limit) │
│ • Side output for late data │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Key Interview Questions
Q: How does Google Spanner achieve external consistency?
Q: How does Google Spanner achieve external consistency?
TrueTime + Commit Wait:
- TrueTime API returns time interval [earliest, latest]
- GPS receivers + atomic clocks
- Uncertainty typically 1-7ms
- Commit protocol:
- Transaction gets timestamp T at commit
- Wait until TrueTime.after(T) is true
- This ensures T is definitely in the past
- No other transaction can get earlier timestamp
- Result: If T1 commits before T2 starts:
- T1’s timestamp < T2’s timestamp
- Real-time ordering preserved globally
Q: Design a distributed cache with consistent hashing
Q: Design a distributed cache with consistent hashing
Copy
Architecture:
─────────────
Client → Load Balancer → Cache Nodes
Components:
1. Hash Ring: Virtual nodes for balance
2. Client library: Consistent hashing logic
3. Cache nodes: Memcached/Redis instances
Write path:
1. Client hashes key
2. Finds responsible node on ring
3. Writes to that node + N-1 replicas
Read path:
1. Client hashes key
2. Reads from first available replica
Handling node failure:
1. Detect via heartbeat
2. Remove from ring
3. Keys automatically route to next node
4. Warm cache gradually
Adding node:
1. Add to ring with virtual nodes
2. Only 1/N keys need to move
3. Background migration
Key decisions:
- Virtual nodes: 100-200 per physical
- Replication factor: 3 typical
- Quorum reads: Optional for consistency
Q: How does Kafka provide exactly-once semantics?
Q: How does Kafka provide exactly-once semantics?
Three mechanisms:
- Idempotent Producer
Copy
producer = KafkaProducer(enable_idempotence=True)- Broker deduplicates by (producer_id, sequence_number)
- Prevents duplicates from retries
- Transactional Producer
Copy
producer.begin_transaction() producer.send(topic1, msg1) producer.send(topic2, msg2) producer.commit_transaction()- Atomic writes across partitions
- Uses two-phase commit internally
- Consumer read_committed
Copy
consumer = KafkaConsumer( isolation_level='read_committed' )- Only sees committed messages
- Consume → Process → Produce atomically
- Commit consumer offset in same transaction
- If crash, replay from last committed offset
Q: Compare Cassandra vs DynamoDB for a new project
Q: Compare Cassandra vs DynamoDB for a new project
Copy
CASSANDRA:
✓ Open source, no vendor lock-in
✓ More control over deployment
✓ Better for very high write throughput
✓ CQL (SQL-like) query language
✗ Operational complexity
✗ Need to manage cluster yourself
DYNAMODB:
✓ Fully managed, serverless option
✓ Predictable latency (single-digit ms)
✓ Auto-scaling built in
✓ AWS integration (IAM, Lambda, etc.)
✗ Vendor lock-in
✗ Can be expensive at scale
✗ Less flexible query patterns
CHOOSE CASSANDRA:
- Need multi-cloud or on-premise
- Write-heavy workload (millions/sec)
- Complex queries within partition
- Budget constrained at large scale
CHOOSE DYNAMODB:
- AWS-native architecture
- Minimal ops overhead
- Predictable performance SLA
- Pay-per-request pricing works
Next Steps
Continue to Track 6: Production Excellence
Learn observability, chaos engineering, and SRE practices