Data Partitioning & Sharding
When data outgrows a single machine, you need to partition (shard) it across multiple nodes. This module covers the strategies, trade-offs, and production patterns for effective data distribution.Module Duration: 10-14 hours
Key Topics: Hash Partitioning, Range Partitioning, Consistent Hashing, Rebalancing, Hot Spots
Interview Focus: Partition key selection, secondary indexes, cross-partition queries
Key Topics: Hash Partitioning, Range Partitioning, Consistent Hashing, Rebalancing, Hot Spots
Interview Focus: Partition key selection, secondary indexes, cross-partition queries
Why Partition?
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ WHY PARTITIONING IS NECESSARY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SINGLE NODE LIMITS: │
│ ─────────────────── │
│ Storage: 1 server = 16 TB SSD (maybe 100 TB HDD) │
│ Memory: 1 server = 512 GB - 4 TB RAM │
│ CPU: 1 server = 64-128 cores │
│ Network: 1 server = 10-100 Gbps │
│ │
│ REAL-WORLD SCALE: │
│ ───────────────── │
│ Facebook posts: ~100 PB │
│ Netflix catalog: ~10 PB (with all encodings) │
│ Google search: ~100 PB+ index │
│ Twitter timeline: ~100 TB of tweets/day │
│ │
│ PARTITIONING BENEFITS: │
│ ────────────────────── │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 1. STORAGE SCALING │ │
│ │ 100 TB / 4 nodes = 25 TB per node ✓ │ │
│ ├────────────────────────────────────────────────────────────────────┤ │
│ │ 2. QUERY THROUGHPUT │ │
│ │ 1M QPS / 10 partitions = 100K QPS per partition ✓ │ │
│ ├────────────────────────────────────────────────────────────────────┤ │
│ │ 3. WRITE THROUGHPUT │ │
│ │ Writes distributed across partitions (if no hot spots) │ │
│ ├────────────────────────────────────────────────────────────────────┤ │
│ │ 4. FAULT ISOLATION │ │
│ │ One partition failure doesn't affect others │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ PARTITIONING CHALLENGES: │
│ ──────────────────────── │
│ • Cross-partition queries (expensive) │
│ • Cross-partition transactions (very expensive) │
│ • Rebalancing when adding/removing nodes │
│ • Hot spots (uneven load distribution) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Partitioning Strategies
Hash Partitioning
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HASH PARTITIONING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: partition = hash(key) mod N │
│ │
│ EXAMPLE: │
│ ───────── │
│ Keys: user_123, user_456, user_789 │
│ Partitions: 4 │
│ │
│ hash("user_123") = 7829461 mod 4 = 1 → Partition 1 │
│ hash("user_456") = 2847193 mod 4 = 1 → Partition 1 │
│ hash("user_789") = 9182736 mod 4 = 0 → Partition 0 │
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │ Partition 3 │ │
│ │ │ │ │ │ │ │ │ │
│ │ user_789 │ │ user_123 │ │ ... │ │ ... │ │
│ │ user_012 │ │ user_456 │ │ │ │ │ │
│ │ ... │ │ ... │ │ │ │ │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ PROS: │
│ ───── │
│ ✓ Even distribution (if good hash function) │
│ ✓ No hot spots from sequential keys │
│ ✓ Simple to implement │
│ │
│ CONS: │
│ ───── │
│ ✗ Range queries impossible (data scattered) │
│ ✗ Adding nodes requires rehashing ALL data │
│ ✗ Rebalancing is expensive │
│ │
│ USED BY: Memcached (original), simple database sharding │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Range Partitioning
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ RANGE PARTITIONING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Assign contiguous key ranges to partitions │
│ │
│ EXAMPLE - Alphabetical: │
│ ───────────────────────── │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │ Partition 3 │ │
│ │ Keys: A-F │ │ Keys: G-L │ │ Keys: M-R │ │ Keys: S-Z │ │
│ │ │ │ │ │ │ │ │ │
│ │ alice │ │ george │ │ mary │ │ steve │ │
│ │ bob │ │ harry │ │ nancy │ │ tom │ │
│ │ carol │ │ iris │ │ oliver │ │ uma │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ PROS: │
│ ───── │
│ ✓ Range queries efficient (scan one partition) │
│ ✓ Sorted data within partition │
│ ✓ Rebalancing: just split/merge ranges │
│ │
│ CONS: │
│ ───── │
│ ✗ HOT SPOTS! Sequential keys go to same partition │
│ ✗ Uneven distribution if data isn't uniform │
│ ✗ Manual tuning often needed │
│ │
│ HOT SPOT EXAMPLE: │
│ ───────────────── │
│ Key = timestamp │
│ All current writes → one partition (current time range) │
│ That partition is overloaded while others idle │
│ │
│ USED BY: HBase, Bigtable, traditional RDBMS sharding │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Compound Key Partitioning
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ COMPOUND KEY PARTITIONING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Partition by first part, sort by second part │
│ │
│ Primary Key = (partition_key, clustering_key) │
│ │
│ EXAMPLE - Social Media Posts: │
│ ────────────────────────────── │
│ Key = (user_id, timestamp) │
│ │
│ Partition by: user_id (hash) │
│ Sorted by: timestamp (within partition) │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Partition: user_123 │ │
│ │ ┌────────────────────────────────────────────────────────────────┐ │ │
│ │ │ timestamp: 2024-01-01 | post: "Happy New Year!" │ │ │
│ │ │ timestamp: 2024-01-02 | post: "Back to work..." │ │ │
│ │ │ timestamp: 2024-01-03 | post: "Coffee time ☕" │ │ │
│ │ └────────────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ BENEFITS: │
│ ───────── │
│ ✓ Partition key provides distribution │
│ ✓ Clustering key enables range queries within partition │
│ ✓ "Get all posts by user in time range" = single partition scan │
│ │
│ CASSANDRA EXAMPLE: │
│ ────────────────── │
│ ```cql │
│ CREATE TABLE posts ( │
│ user_id UUID, │
│ timestamp TIMESTAMP, │
│ content TEXT, │
│ PRIMARY KEY ((user_id), timestamp) │
│ ) WITH CLUSTERING ORDER BY (timestamp DESC); │
│ │
│ -- Efficient: single partition │
│ SELECT * FROM posts WHERE user_id = ? AND timestamp > ?; │
│ │
│ -- Inefficient: scans all partitions │
│ SELECT * FROM posts WHERE content CONTAINS 'coffee'; │
│ ``` │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Consistent Hashing
Must-Know Algorithm: Consistent hashing is foundational for distributed caching, databases, and load balancing. Expect to explain and implement it in interviews.
Basic Consistent Hashing
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CONSISTENT HASHING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM WITH hash(key) mod N: │
│ ───────────────────────────── │
│ Adding/removing node → almost ALL keys rehash to different nodes! │
│ │
│ Before: 3 nodes After: 4 nodes │
│ hash(k) mod 3 = ? hash(k) mod 4 = ? │
│ Key 1: 0 Key 1: 1 (moved!) │
│ Key 2: 1 Key 2: 2 (moved!) │
│ Key 3: 2 Key 3: 3 (moved!) │
│ │
│ CONSISTENT HASHING SOLUTION: │
│ ──────────────────────────── │
│ Map both nodes and keys to a ring (0 to 2^32 - 1) │
│ Key goes to first node clockwise from its position │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ HASH RING │ │
│ │ │ │
│ │ 0° │ │
│ │ ▲ │ │
│ │ Node A │ │ │
│ │ │ │ │
│ │ 270° ◄───┼───► 90° │ │
│ │ Node D │ Node B │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Node C │ │
│ │ 180° │ │
│ └─────────────────────────────────────┘ │
│ │
│ Key at 45° → goes to Node B (first node clockwise) │
│ Key at 200° → goes to Node D (first node clockwise) │
│ │
│ ADDING A NODE: │
│ ────────────── │
│ Add Node E at 135° → only keys between C (180°) and E (135°) move │
│ Other keys stay with their original nodes! │
│ │
│ ON AVERAGE: Only K/N keys need to move (K = keys, N = nodes) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Virtual Nodes
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ VIRTUAL NODES (VNODES) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM: With few physical nodes, distribution can be uneven │
│ │
│ Example with 3 nodes: │
│ Node A: covers 50% of ring │
│ Node B: covers 30% of ring │
│ Node C: covers 20% of ring │
│ → Unbalanced! │
│ │
│ SOLUTION: Each physical node gets multiple virtual nodes │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ RING WITH VNODES │ │
│ │ │ │
│ │ 0° │ │
│ │ A1 ─── B1 │ │
│ │ / \ │ │
│ │ C2 A2 │ │
│ │ / \ │ │
│ │ B3 C1 │ │
│ │ \ / │ │
│ │ A3 B2 │ │
│ │ \ / │ │
│ │ C3 ─── A4 │ │
│ │ 180° │ │
│ │ │ │
│ │ A1, A2, A3, A4 = Virtual nodes for physical Node A │ │
│ │ B1, B2, B3 = Virtual nodes for physical Node B │ │
│ │ C1, C2, C3 = Virtual nodes for physical Node C │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ BENEFITS: │
│ ───────── │
│ ✓ Better load distribution (law of large numbers) │
│ ✓ Heterogeneous nodes: powerful node → more vnodes │
│ ✓ Smoother rebalancing when node added/removed │
│ │
│ TYPICAL VNODE COUNT: │
│ ──────────────────── │
│ Cassandra default: 256 vnodes per node │
│ DynamoDB: variable based on partition │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Implementation
Copy
import hashlib
from bisect import bisect_left, bisect_right
class ConsistentHash:
"""
Consistent hashing with virtual nodes.
Used by: Cassandra, DynamoDB, Memcached (with ketama), load balancers
"""
def __init__(self, nodes: list = None, virtual_nodes: int = 150):
self.virtual_nodes = virtual_nodes
self.ring = {} # position -> node
self.sorted_positions = [] # sorted list of positions
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key: str) -> int:
"""Hash function that returns position on ring (0 to 2^32)."""
return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32)
def add_node(self, node: str):
"""Add a node with its virtual nodes to the ring."""
for i in range(self.virtual_nodes):
vnode_key = f"{node}:{i}"
position = self._hash(vnode_key)
self.ring[position] = node
self.sorted_positions.append(position)
self.sorted_positions.sort()
def remove_node(self, node: str):
"""Remove a node and all its virtual nodes from the ring."""
for i in range(self.virtual_nodes):
vnode_key = f"{node}:{i}"
position = self._hash(vnode_key)
del self.ring[position]
self.sorted_positions.remove(position)
def get_node(self, key: str) -> str:
"""Get the node responsible for a key."""
if not self.ring:
return None
position = self._hash(key)
# Find first node clockwise from position
idx = bisect_right(self.sorted_positions, position)
if idx == len(self.sorted_positions):
idx = 0 # Wrap around
return self.ring[self.sorted_positions[idx]]
def get_nodes(self, key: str, n: int = 3) -> list:
"""Get N nodes for replication (walking clockwise)."""
if not self.ring:
return []
position = self._hash(key)
idx = bisect_right(self.sorted_positions, position)
nodes = []
seen = set()
while len(nodes) < n and len(seen) < len(set(self.ring.values())):
if idx >= len(self.sorted_positions):
idx = 0
node = self.ring[self.sorted_positions[idx]]
if node not in seen:
nodes.append(node)
seen.add(node)
idx += 1
return nodes
# Usage example
ch = ConsistentHash(["node-1", "node-2", "node-3"], virtual_nodes=150)
# Get responsible node for keys
print(ch.get_node("user:123")) # "node-2"
print(ch.get_node("user:456")) # "node-1"
# Get replica nodes (for replication factor 3)
print(ch.get_nodes("user:123", n=3)) # ["node-2", "node-3", "node-1"]
# Add a new node - minimal key movement
ch.add_node("node-4")
print(ch.get_node("user:123")) # might change to "node-4"
Secondary Indexes
Local Secondary Indexes
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ LOCAL SECONDARY INDEXES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Each partition maintains its own index │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Partition 0 │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ Data: user_001 {name: "Alice", city: "NYC"} │ │ │
│ │ │ user_007 {name: "Bob", city: "LA"} │ │ │
│ │ │ user_042 {name: "Carol", city: "NYC"} │ │ │
│ │ ├───────────────────────────────────────────────────────────────┤ │ │
│ │ │ Local Index (city): │ │ │
│ │ │ "NYC" → [user_001, user_042] │ │ │
│ │ │ "LA" → [user_007] │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Partition 1 │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ Data: user_003 {name: "Dave", city: "NYC"} │ │ │
│ │ │ user_019 {name: "Eve", city: "SF"} │ │ │
│ │ ├───────────────────────────────────────────────────────────────┤ │ │
│ │ │ Local Index (city): │ │ │
│ │ │ "NYC" → [user_003] │ │ │
│ │ │ "SF" → [user_019] │ │ │
│ │ └───────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ QUERY: "Find all users in NYC" │
│ ───────────────────────────────── │
│ Must query ALL partitions! (scatter-gather) │
│ │
│ PROS: │
│ ───── │
│ ✓ Fast writes (index update is local) │
│ ✓ No distributed transaction for index update │
│ │
│ CONS: │
│ ───── │
│ ✗ Read queries by secondary key → scatter-gather │
│ ✗ Latency = slowest partition │
│ │
│ USED BY: Cassandra, MongoDB │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Global Secondary Indexes
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ GLOBAL SECONDARY INDEXES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Index is partitioned by the indexed column │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DATA PARTITIONS (by user_id) │ │
│ │ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │ │
│ │ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │ │
│ │ │ user_001 (NYC) │ │ user_003 (NYC) │ │ user_005 (SF) │ │ │
│ │ │ user_007 (LA) │ │ user_019 (SF) │ │ user_023 (NYC) │ │ │
│ │ └───────────────────┘ └───────────────────┘ └───────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ GLOBAL INDEX PARTITIONS (by city) │ │
│ │ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │ │
│ │ │ Index Partition 0 │ │ Index Partition 1 │ │ Index Partition 2 │ │ │
│ │ │ Cities: A-G │ │ Cities: H-N │ │ Cities: O-Z │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ (none) │ │ LA → [user_007] │ │ NYC → [user_001, │ │ │
│ │ │ │ │ │ │ user_003, │ │ │
│ │ │ │ │ │ │ user_023] │ │ │
│ │ │ │ │ │ │ SF → [user_005, │ │ │
│ │ │ │ │ │ │ user_019] │ │ │
│ │ └───────────────────┘ └───────────────────┘ └───────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ QUERY: "Find all users in NYC" │
│ ───────────────────────────────── │
│ Go to index partition for "NYC" → single partition read! │
│ │
│ PROS: │
│ ───── │
│ ✓ Fast reads by secondary key (single partition) │
│ │
│ CONS: │
│ ───── │
│ ✗ Slow writes (must update index partition asynchronously) │
│ ✗ Index may be eventually consistent with data │
│ │
│ USED BY: DynamoDB (GSI), Elasticsearch │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Rebalancing Strategies
Fixed Number of Partitions
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ FIXED PARTITION STRATEGY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Create more partitions than nodes, assign partitions to nodes │
│ │
│ INITIAL: 4 nodes, 16 partitions │
│ ───────────────────────────────── │
│ Node 1: P0, P1, P2, P3 │
│ Node 2: P4, P5, P6, P7 │
│ Node 3: P8, P9, P10, P11 │
│ Node 4: P12, P13, P14, P15 │
│ │
│ ADD NODE 5: │
│ ──────────── │
│ Move some partitions to new node │
│ Node 1: P0, P1, P2 (gave up P3) │
│ Node 2: P4, P5, P6 (gave up P7) │
│ Node 3: P8, P9, P10 (gave up P11) │
│ Node 4: P12, P13, P14 (gave up P15) │
│ Node 5: P3, P7, P11, P15 (received) │
│ │
│ KEY INSIGHT: │
│ ──────────── │
│ Partitions don't change, only their assignment to nodes │
│ Data within partitions doesn't move! │
│ │
│ CHOOSING PARTITION COUNT: │
│ ───────────────────────── │
│ Too few: Large partitions, hard to rebalance evenly │
│ Too many: Overhead for each partition (metadata, connections) │
│ │
│ Rule of thumb: 10-100 partitions per node │
│ Kafka default: 50 partitions │
│ Cassandra/Riak: ~256 vnodes per node │
│ │
│ USED BY: Riak, Elasticsearch, Couchbase, Voldemort │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Dynamic Partitioning
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ DYNAMIC PARTITIONING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY: Split/merge partitions based on size │
│ │
│ INITIAL: 1 partition │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Partition 0 │ │
│ │ (all data) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ SPLIT when partition exceeds threshold (e.g., 10 GB): │
│ ┌────────────────────────────┐ ┌────────────────────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │
│ │ (keys A-M) │ │ (keys N-Z) │ │
│ └────────────────────────────┘ └────────────────────────────┘ │
│ │
│ CONTINUE SPLITTING as data grows: │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ P0 │ │ P1 │ │ P2 │ │ P3 │ │ P4 │ │
│ │ A-D │ │ E-G │ │ H-M │ │ N-R │ │ S-Z │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ MERGE when partition falls below threshold: │
│ If P0 and P1 both < 1 GB → merge into P0 (A-G) │
│ │
│ PRE-SPLITTING: │
│ ────────────── │
│ Problem: New database starts with 1 partition → hot spot │
│ Solution: Pre-create partitions based on expected key range │
│ │
│ USED BY: HBase, MongoDB (chunks), RethinkDB │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Advanced: Geo-Partitioning & Data Sovereignty
At Staff/Principal level, partitioning isn’t just about performance—it’s about compliance and locality. As laws like GDPR (Europe) and CCPA (California) become stricter, where data lives physically becomes a legal requirement.1. Locality-Aware Partitioning
In a global database (e.g., CockroachDB, Spanner, or YugabyteDB), you can pin specific partitions to specific geographic regions.- The Goal: Keep data close to the user to minimize speed-of-light latency and comply with data residency laws.
- The Mechanism: Using “Placement Policies” or “Partitioning Tags.”
Copy
-- CockroachDB Example: Pinning data to regions
ALTER TABLE users
PARTITION BY LIST (region) (
PARTITION europe VALUES IN ('eu-west-1', 'eu-central-1'),
PARTITION us_east VALUES IN ('us-east-1', 'us-east-2'),
PARTITION asia VALUES IN ('ap-northeast-1')
);
-- Pin the EUROPE partition to nodes in the EU regions
ALTER PARTITION europe OF INDEX users_pkey
CONFIGURE ZONE USING constraints = '[+region=europe]';
2. Follow-the-Workload (Dynamic Migration)
Sophisticated systems can dynamically move partitions based on the time of day or access patterns.- Daylight Shifting: Move active user data to the “waking” hemisphere to ensure local low-latency access.
- Access Locality: If a user from Germany suddenly starts accessing a “US East” partition frequently, the system can migrate that partition (or a replica) to “EU West” automatically.
3. Data Sovereignty Challenges
When data is pinned to a region, Cross-Region Operations become extremely complex:- Global Joins: Joining a “pinned” German user with a “pinned” Japanese order requires cross-continental network hops (200ms+).
- Global Secondary Indexes: If the index is global, writing to the German partition might require updating an index node in the US, breaking sovereignty if the index contains PII (Personally Identifiable Information).
| Strategy | Performance | Compliance | Cost |
|---|---|---|---|
| Global Sharding | High (Average) | Poor (Data can be anywhere) | Low |
| Regional Sharding | Best (Local access) | Strict (Data stays in region) | High (Redundancy per region) |
| Geo-Replication | Good (Read local) | Medium (Copies exist everywhere) | Medium |
Hot Spots and Skew
Identifying Hot Spots
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HOT SPOT SCENARIOS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. CELEBRITY PROBLEM │
│ ───────────────── │
│ User ID = partition key │
│ Justin Bieber has 100M followers │
│ All reads/writes to his partition! │
│ │
│ Solutions: │
│ - Add random suffix: bieber_1, bieber_2, ... bieber_100 │
│ - Spread load across 100 partitions │
│ - Application aggregates results │
│ │
│ 2. TIME-SERIES PROBLEM │
│ ────────────────── │
│ Partition key = date │
│ All writes go to "today" partition │
│ │
│ Solutions: │
│ - Compound key: (sensor_id, date) │
│ - Spread writes across sensors first │
│ │
│ 3. SEASONAL TRAFFIC │
│ ──────────────── │
│ Black Friday: one product partition gets 100x traffic │
│ │
│ Solutions: │
│ - Predictive scaling │
│ - Pre-warm caches │
│ - Temporary key salting │
│ │
│ MONITORING FOR HOT SPOTS: │
│ ───────────────────────── │
│ • QPS per partition │
│ • Latency percentiles per partition │
│ • Disk/memory usage per partition │
│ • Alerts when variance exceeds threshold │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Hot Spot Mitigation
Copy
# Hot spot mitigation strategies
class HotKeySharding:
"""
Spread hot keys across multiple sub-partitions.
"""
def __init__(self, hot_keys: set, shard_count: int = 100):
self.hot_keys = hot_keys
self.shard_count = shard_count
def get_partition_key(self, key: str) -> str:
"""
For hot keys, add a shard suffix.
For normal keys, use as-is.
"""
if key in self.hot_keys:
shard = hash(str(random.random())) % self.shard_count
return f"{key}:shard_{shard}"
return key
def get_all_shards(self, key: str) -> list:
"""
For reading, must query all shards of a hot key.
"""
if key in self.hot_keys:
return [f"{key}:shard_{i}" for i in range(self.shard_count)]
return [key]
class WriteBuffer:
"""
Buffer writes and batch them to reduce hot spot pressure.
"""
def __init__(self, flush_interval: float = 1.0, max_buffer_size: int = 1000):
self.buffer = defaultdict(list)
self.flush_interval = flush_interval
self.max_buffer_size = max_buffer_size
self.last_flush = time.time()
async def write(self, key: str, value: any):
"""Buffer a write."""
self.buffer[key].append(value)
if self._should_flush(key):
await self._flush_key(key)
def _should_flush(self, key: str) -> bool:
buffer_full = len(self.buffer[key]) >= self.max_buffer_size
time_exceeded = time.time() - self.last_flush > self.flush_interval
return buffer_full or time_exceeded
async def _flush_key(self, key: str):
"""Batch write buffered values."""
values = self.buffer.pop(key, [])
if values:
# Single batch write instead of many individual writes
await db.batch_write(key, values)
self.last_flush = time.time()
Cross-Partition Operations
Scatter-Gather Pattern
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ SCATTER-GATHER PATTERN │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ For queries that can't be answered by a single partition │
│ │
│ QUERY: "Count all users with age > 30" │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ COORDINATOR │ │
│ └────────────────┬────────────────────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ SCATTER SCATTER SCATTER │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Part. 0 │ │ Part. 1 │ │ Part. 2 │ │
│ │ │ │ │ │ │ │
│ │ count=42 │ │ count=37 │ │ count=55 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ GATHER │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ COORDINATOR: 42 + 37 + 55 = 134 │ │
│ └─────────────────────────────────────────┘ │
│ │
│ LATENCY: Max of all partition responses │
│ AVAILABILITY: Requires all partitions to be up │
│ │
│ OPTIMIZATIONS: │
│ ────────────── │
│ • Timeout and return partial results │
│ • Parallel execution with deadline │
│ • Result streaming (don't wait for all) │
│ • Predicate pushdown (filter at partition) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Cross-Partition Transactions
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CROSS-PARTITION TRANSACTIONS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SCENARIO: Transfer money between users on different partitions │
│ │
│ User A (Partition 0): balance = 100 │
│ User B (Partition 1): balance = 50 │
│ Transfer: A → B, amount = 30 │
│ │
│ APPROACH 1: Two-Phase Commit (2PC) │
│ ───────────────────────────────── │
│ Coordinator ──► Partition 0: PREPARE (A.balance -= 30) │
│ └─► Partition 1: PREPARE (B.balance += 30) │
│ │
│ Both respond OK → COMMIT │
│ Any fails → ABORT │
│ │
│ Problems: Blocking, coordinator failure │
│ │
│ APPROACH 2: Saga Pattern │
│ ──────────────────────── │
│ Step 1: Debit A (Partition 0) │
│ Step 2: Credit B (Partition 1) │
│ If Step 2 fails: Compensate by crediting A │
│ │
│ Problems: Temporary inconsistency, complex compensation │
│ │
│ APPROACH 3: Avoid Cross-Partition (Best!) │
│ ────────────────────────────────────────── │
│ - Design partition key to keep related data together │
│ - Denormalize to avoid joins │
│ - Use outbox pattern for eventual consistency │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Interview Practice
Q1: Design partition key for Twitter
Q1: Design partition key for Twitter
Question: How would you partition Twitter’s tweet storage?Requirements Analysis:
- Queries: Get user’s tweets, home timeline, search
- Scale: 500M tweets/day, 200M users
Copy
Option A: Partition by user_id
✓ "Get user's tweets" = single partition
✗ "Home timeline" = scatter-gather (follow list)
Option B: Partition by tweet_id
✓ Good distribution (random IDs)
✗ "Get user's tweets" = scatter-gather
Recommended: Hybrid
- Tweets table: partition by user_id
- Timeline table: partition by viewer_id
(pre-materialized, fan-out on write)
- Search: separate inverted index
Q2: Handle celebrity hot spots
Q2: Handle celebrity hot spots
Question: A celebrity with 100M followers posts. How do you handle the hot spot?Solution:
Copy
# 1. Identify hot users dynamically
hot_users = get_users_above_follower_threshold(1_000_000)
# 2. Shard their content
def get_post_partition(user_id: str, post_id: str) -> str:
if user_id in hot_users:
# Spread across 100 sub-partitions
shard = hash(post_id) % 100
return f"{user_id}:shard_{shard}"
return user_id
# 3. For reads, query all shards and merge
def get_celebrity_posts(user_id: str) -> list:
shards = [f"{user_id}:shard_{i}" for i in range(100)]
results = parallel_query(shards)
return merge_and_sort(results)
# 4. Use caching aggressively for celebrities
@cache(ttl=60)
def get_celebrity_posts_cached(user_id: str):
return get_celebrity_posts(user_id)
Q3: Rebalancing without downtime
Q3: Rebalancing without downtime
Question: How do you rebalance partitions without downtime?Process:
Copy
1. PREPARE
- Identify source and target nodes
- Calculate data to move
- Estimate time required
2. COPY (background)
- Stream data from source to target
- Source still serves reads/writes
- Track "high watermark" of copied data
3. CATCH UP
- Replay changes since copy started
- Use change log/WAL
- Repeat until caught up
4. SWITCH
- Brief pause (or use dual-write)
- Update routing table
- New requests go to target
5. CLEANUP
- Remove data from source
- Verify target is healthy
Key: Never stop serving requests!
Advanced Design Scenarios
Scenario 1: Designing Partitioning for an E‑Commerce Platform
You are designing the data model and partitioning for a large e-commerce platform (catalog, carts, orders, inventory, users, payments). Key workloads:- Product browsing and search
- User-specific operations (cart, wishlist, recommendations)
- Order placement and history
- Inventory management per warehouse/region
- Users, carts, wishlists: Partition by
user_id(hash or consistent hash)- “Get cart” and “get wishlist” → single partition
- Cross-user queries (analytics) use separate OLAP or stream to a warehouse
- Orders: Partition by
user_idororder_id(with correlation to user)- Materialize read models for “orders by user” and “orders by status” using CQRS/Event Sourcing
- Inventory: Partition by
(warehouse_id, product_id)- Allows local, warehouse-level operations and avoids cross-partition inventory transactions
- Search: Separate inverted index service (Elasticsearch / OpenSearch) with its own sharding
- Many OLTP queries become single-partition and fast
- Cross-entity queries (“top selling products globally”) handled via denormalized tables or separate systems
- Consistency for cross-partition operations handled via Sagas/2PC as needed
Scenario 2: Multi-Tenant SaaS Partitioning
You are building a multi-tenant SaaS where each tenant has its own users, projects, and data. Goals:- Strong data isolation between tenants
- Ability to move tenants between clusters or regions
- Ability to put large tenants on dedicated resources
- Database-per-tenant:
- Pros: Strong isolation, easy per-tenant backup/restore
- Cons: Operational overhead when tenants = 10k+
- Shared database, tenant_id in every row:
- Pros: Easier to operate at scale
- Cons: Harder to move tenants, noisy neighbor effects
- Hybrid:
- Small tenants: shared database with partitioning by
tenant_id - Large tenants: own database or shard, with routing table
- Small tenants: shared database with partitioning by
- Maintain a Tenant Catalog mapping
tenant_id → {cluster, database, partition_key} - Route each request through a Tenant Router that picks the correct connection before query execution
- For large tenants, use more partitions or dedicated clusters and update catalog accordingly
Scenario 3: Time-Series Platform (Metrics/Logs)
You are designing a time-series platform that ingests telemetry, metrics, or logs from millions of devices. Characteristics:- Very high write throughput (append-only)
- Queries are often time-bounded and per device / per stream
- Data retention and tiering (hot vs cold storage)
- Primary key:
(tenant_id, stream_id, time_bucket) - Partition by hash(tenant_id, stream_id); time is part of clustering key
- Create time buckets (e.g., daily or hourly) to allow range scans and efficient TTL/compaction
- If you partition by raw timestamp, “now” is a hot partition
- Instead, spread by
(tenant_id, stream_id)and keep time in the clustering key
- Recent buckets stored on fast nodes (SSD, high IOPS)
- Older buckets moved to cheaper storage (object store) with precomputed aggregates
Key Takeaways
Partition Key is Critical
Choose partition key carefully—it determines data distribution, query efficiency, and hot spot risk.
Consistent Hashing Enables Scale
Virtual nodes provide even distribution and minimal data movement when nodes change.
Secondary Indexes Have Trade-offs
Local = fast writes, slow reads. Global = fast reads, eventually consistent writes.
Avoid Cross-Partition Operations
Design data model to keep related data together. Cross-partition = expensive.