Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Data Partitioning & Sharding

When data outgrows a single machine, you need to partition (shard) it across multiple nodes. This module covers the strategies, trade-offs, and production patterns for effective data distribution. Think of partitioning like organizing a library: you could put all books on one giant shelf (single node), but eventually you run out of space and it takes too long to find anything. Instead, you split books across rooms — fiction in Room A, non-fiction in Room B, reference in Room C. The challenge is choosing the split so that most visitors only need to visit one room, and no single room is overwhelmed while others sit empty.
Module Duration: 10-14 hours
Key Topics: Hash Partitioning, Range Partitioning, Consistent Hashing, Rebalancing, Hot Spots
Interview Focus: Partition key selection, secondary indexes, cross-partition queries

Why Partition?

┌─────────────────────────────────────────────────────────────────────────────┐
│                    WHY PARTITIONING IS NECESSARY                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SINGLE NODE LIMITS:                                                        │
│  ───────────────────                                                        │
│  Storage:   1 server = 16 TB SSD (maybe 100 TB HDD)                        │
│  Memory:    1 server = 512 GB - 4 TB RAM                                   │
│  CPU:       1 server = 64-128 cores                                        │
│  Network:   1 server = 10-100 Gbps                                         │
│                                                                              │
│  REAL-WORLD SCALE:                                                          │
│  ─────────────────                                                          │
│  Facebook posts:     ~100 PB                                               │
│  Netflix catalog:    ~10 PB (with all encodings)                           │
│  Google search:      ~100 PB+ index                                        │
│  Twitter timeline:   ~100 TB of tweets/day                                 │
│                                                                              │
│  PARTITIONING BENEFITS:                                                     │
│  ──────────────────────                                                     │
│  ┌────────────────────────────────────────────────────────────────────┐    │
│  │ 1. STORAGE SCALING                                                 │    │
│  │    100 TB / 4 nodes = 25 TB per node ✓                            │    │
│  ├────────────────────────────────────────────────────────────────────┤    │
│  │ 2. QUERY THROUGHPUT                                                │    │
│  │    1M QPS / 10 partitions = 100K QPS per partition ✓              │    │
│  ├────────────────────────────────────────────────────────────────────┤    │
│  │ 3. WRITE THROUGHPUT                                                │    │
│  │    Writes distributed across partitions (if no hot spots)         │    │
│  ├────────────────────────────────────────────────────────────────────┤    │
│  │ 4. FAULT ISOLATION                                                 │    │
│  │    One partition failure doesn't affect others                    │    │
│  └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│  PARTITIONING CHALLENGES:                                                   │
│  ────────────────────────                                                   │
│  • Cross-partition queries (expensive)                                     │
│  • Cross-partition transactions (very expensive)                           │
│  • Rebalancing when adding/removing nodes                                  │
│  • Hot spots (uneven load distribution)                                    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Partitioning Strategies

Hash Partitioning

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HASH PARTITIONING                                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: partition = hash(key) mod N                                      │
│                                                                              │
│  EXAMPLE:                                                                   │
│  ─────────                                                                  │
│  Keys: user_123, user_456, user_789                                        │
│  Partitions: 4                                                             │
│                                                                              │
│  hash("user_123") = 7829461 mod 4 = 1 → Partition 1                        │
│  hash("user_456") = 2847193 mod 4 = 1 → Partition 1                        │
│  hash("user_789") = 9182736 mod 4 = 0 → Partition 0                        │
│                                                                              │
│  ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐   │
│  │ Partition 0   │ │ Partition 1   │ │ Partition 2   │ │ Partition 3   │   │
│  │               │ │               │ │               │ │               │   │
│  │ user_789      │ │ user_123      │ │ ...           │ │ ...           │   │
│  │ user_012      │ │ user_456      │ │               │ │               │   │
│  │ ...           │ │ ...           │ │               │ │               │   │
│  └───────────────┘ └───────────────┘ └───────────────┘ └───────────────┘   │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Even distribution (if good hash function)                               │
│  ✓ No hot spots from sequential keys                                       │
│  ✓ Simple to implement                                                     │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ Range queries impossible (data scattered)                               │
│  ✗ Adding nodes requires rehashing ALL data                                │
│  ✗ Rebalancing is expensive                                                │
│                                                                              │
│  USED BY: Memcached (original), simple database sharding                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Range Partitioning

┌─────────────────────────────────────────────────────────────────────────────┐
│                    RANGE PARTITIONING                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Assign contiguous key ranges to partitions                       │
│                                                                              │
│  EXAMPLE - Alphabetical:                                                    │
│  ─────────────────────────                                                  │
│  ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐   │
│  │ Partition 0   │ │ Partition 1   │ │ Partition 2   │ │ Partition 3   │   │
│  │ Keys: A-F     │ │ Keys: G-L     │ │ Keys: M-R     │ │ Keys: S-Z     │   │
│  │               │ │               │ │               │ │               │   │
│  │ alice         │ │ george        │ │ mary          │ │ steve         │   │
│  │ bob           │ │ harry         │ │ nancy         │ │ tom           │   │
│  │ carol         │ │ iris          │ │ oliver        │ │ uma           │   │
│  └───────────────┘ └───────────────┘ └───────────────┘ └───────────────┘   │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Range queries efficient (scan one partition)                            │
│  ✓ Sorted data within partition                                            │
│  ✓ Rebalancing: just split/merge ranges                                    │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ HOT SPOTS! Sequential keys go to same partition                        │
│  ✗ Uneven distribution if data isn't uniform                              │
│  ✗ Manual tuning often needed                                              │
│                                                                              │
│  HOT SPOT EXAMPLE:                                                          │
│  ─────────────────                                                          │
│  Key = timestamp                                                           │
│  All current writes → one partition (current time range)                   │
│  That partition is overloaded while others idle                            │
│                                                                              │
│  USED BY: HBase, Bigtable, traditional RDBMS sharding                      │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Compound Key Partitioning

┌─────────────────────────────────────────────────────────────────────────────┐
│                    COMPOUND KEY PARTITIONING                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Partition by first part, sort by second part                    │
│                                                                              │
│  Primary Key = (partition_key, clustering_key)                              │
│                                                                              │
│  EXAMPLE - Social Media Posts:                                              │
│  ──────────────────────────────                                              │
│  Key = (user_id, timestamp)                                                │
│                                                                              │
│  Partition by: user_id (hash)                                              │
│  Sorted by: timestamp (within partition)                                   │
│                                                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │ Partition: user_123                                                   │ │
│  │ ┌────────────────────────────────────────────────────────────────┐   │ │
│  │ │ timestamp: 2024-01-01 | post: "Happy New Year!"               │   │ │
│  │ │ timestamp: 2024-01-02 | post: "Back to work..."              │   │ │
│  │ │ timestamp: 2024-01-03 | post: "Coffee time ☕"                │   │ │
│  │ └────────────────────────────────────────────────────────────────┘   │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                              │
│  BENEFITS:                                                                  │
│  ─────────                                                                  │
│  ✓ Partition key provides distribution                                     │
│  ✓ Clustering key enables range queries within partition                   │
│  ✓ "Get all posts by user in time range" = single partition scan          │
│                                                                              │
│  CASSANDRA EXAMPLE:                                                         │
│  ──────────────────                                                         │
│  ```cql                                                                     │
│  CREATE TABLE posts (                                                       │
│      user_id UUID,                                                          │
│      timestamp TIMESTAMP,                                                   │
│      content TEXT,                                                          │
│      PRIMARY KEY ((user_id), timestamp)                                    │
│  ) WITH CLUSTERING ORDER BY (timestamp DESC);                              │
│                                                                              │
│  -- Efficient: single partition                                            │
│  SELECT * FROM posts WHERE user_id = ? AND timestamp > ?;                  │
│                                                                              │
│  -- Inefficient: scans all partitions                                      │
│  SELECT * FROM posts WHERE content CONTAINS 'coffee';                      │
│  ```                                                                        │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Consistent Hashing

Must-Know Algorithm: Consistent hashing is foundational for distributed caching, databases, and load balancing. Expect to explain and implement it in interviews.

Basic Consistent Hashing

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CONSISTENT HASHING                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  PROBLEM WITH hash(key) mod N:                                              │
│  ─────────────────────────────                                              │
│  Adding/removing node → almost ALL keys rehash to different nodes!         │
│                                                                              │
│  Before: 3 nodes                 After: 4 nodes                             │
│  hash(k) mod 3 = ?               hash(k) mod 4 = ?                          │
│  Key 1: 0                        Key 1: 1 (moved!)                          │
│  Key 2: 1                        Key 2: 2 (moved!)                          │
│  Key 3: 2                        Key 3: 3 (moved!)                          │
│                                                                              │
│  CONSISTENT HASHING SOLUTION:                                               │
│  ────────────────────────────                                               │
│  Map both nodes and keys to a ring (0 to 2^32 - 1)                         │
│  Key goes to first node clockwise from its position                        │
│                                                                              │
│                    ┌─────────────────────────────────────┐                  │
│                    │           HASH RING                 │                  │
│                    │                                     │                  │
│                    │              0°                     │                  │
│                    │              ▲                      │                  │
│                    │         Node A │                    │                  │
│                    │              │                      │                  │
│                    │     270° ◄───┼───► 90°             │                  │
│                    │    Node D    │    Node B           │                  │
│                    │              │                      │                  │
│                    │              ▼                      │                  │
│                    │         Node C                      │                  │
│                    │             180°                    │                  │
│                    └─────────────────────────────────────┘                  │
│                                                                              │
│  Key at 45° → goes to Node B (first node clockwise)                        │
│  Key at 200° → goes to Node D (first node clockwise)                       │
│                                                                              │
│  ADDING A NODE:                                                             │
│  ──────────────                                                             │
│  Add Node E at 135° → only keys between C (180°) and E (135°) move         │
│  Other keys stay with their original nodes!                                │
│                                                                              │
│  ON AVERAGE: Only K/N keys need to move (K = keys, N = nodes)              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Virtual Nodes

┌─────────────────────────────────────────────────────────────────────────────┐
│                    VIRTUAL NODES (VNODES)                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  PROBLEM: With few physical nodes, distribution can be uneven              │
│                                                                              │
│  Example with 3 nodes:                                                      │
│  Node A: covers 50% of ring                                                │
│  Node B: covers 30% of ring                                                │
│  Node C: covers 20% of ring                                                │
│  → Unbalanced!                                                             │
│                                                                              │
│  SOLUTION: Each physical node gets multiple virtual nodes                  │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                    RING WITH VNODES                                 │   │
│  │                                                                     │   │
│  │                         0°                                          │   │
│  │                    A1 ─── B1                                        │   │
│  │                   /         \                                       │   │
│  │                 C2           A2                                     │   │
│  │                /               \                                    │   │
│  │              B3                 C1                                  │   │
│  │                \               /                                    │   │
│  │                 A3           B2                                     │   │
│  │                   \         /                                       │   │
│  │                    C3 ─── A4                                        │   │
│  │                        180°                                         │   │
│  │                                                                     │   │
│  │  A1, A2, A3, A4 = Virtual nodes for physical Node A                │   │
│  │  B1, B2, B3 = Virtual nodes for physical Node B                    │   │
│  │  C1, C2, C3 = Virtual nodes for physical Node C                    │   │
│  │                                                                     │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  BENEFITS:                                                                  │
│  ─────────                                                                  │
│  ✓ Better load distribution (law of large numbers)                         │
│  ✓ Heterogeneous nodes: powerful node → more vnodes                        │
│  ✓ Smoother rebalancing when node added/removed                            │
│                                                                              │
│  TYPICAL VNODE COUNT:                                                       │
│  ────────────────────                                                       │
│  Cassandra default: 256 vnodes per node                                    │
│  DynamoDB: variable based on partition                                     │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation

import hashlib
from bisect import bisect_left, bisect_right

class ConsistentHash:
    """
    Consistent hashing with virtual nodes.
    Used by: Cassandra, DynamoDB, Memcached (with ketama), load balancers
    
    The key insight: instead of hash(key) % N (which reshuffles everything
    when N changes), we map both keys and nodes onto the same circular
    space. Adding a node only "steals" keys from its immediate neighbors
    on the ring, leaving all other assignments unchanged. Virtual nodes
    smooth out the distribution -- without them, 3 physical nodes might
    split the ring 60/30/10 instead of the desired 33/33/33.
    """
    
    def __init__(self, nodes: list = None, virtual_nodes: int = 150):
        # 150 vnodes per physical node is a good default for clusters
        # up to ~100 nodes. More vnodes = better balance but more memory
        # for the ring data structure and slower lookups.
        self.virtual_nodes = virtual_nodes
        self.ring = {}  # position -> node
        self.sorted_positions = []  # sorted list of positions
        
        if nodes:
            for node in nodes:
                self.add_node(node)
    
    def _hash(self, key: str) -> int:
        """Hash function that returns position on ring (0 to 2^32)."""
        return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32)
    
    def add_node(self, node: str):
        """Add a node with its virtual nodes to the ring."""
        for i in range(self.virtual_nodes):
            vnode_key = f"{node}:{i}"
            position = self._hash(vnode_key)
            self.ring[position] = node
            self.sorted_positions.append(position)
        
        self.sorted_positions.sort()
    
    def remove_node(self, node: str):
        """Remove a node and all its virtual nodes from the ring."""
        for i in range(self.virtual_nodes):
            vnode_key = f"{node}:{i}"
            position = self._hash(vnode_key)
            del self.ring[position]
            self.sorted_positions.remove(position)
    
    def get_node(self, key: str) -> str:
        """Get the node responsible for a key."""
        if not self.ring:
            return None
        
        position = self._hash(key)
        
        # Find first node clockwise from position
        idx = bisect_right(self.sorted_positions, position)
        if idx == len(self.sorted_positions):
            idx = 0  # Wrap around
        
        return self.ring[self.sorted_positions[idx]]
    
    def get_nodes(self, key: str, n: int = 3) -> list:
        """Get N nodes for replication (walking clockwise)."""
        if not self.ring:
            return []
        
        position = self._hash(key)
        idx = bisect_right(self.sorted_positions, position)
        
        nodes = []
        seen = set()
        
        while len(nodes) < n and len(seen) < len(set(self.ring.values())):
            if idx >= len(self.sorted_positions):
                idx = 0
            
            node = self.ring[self.sorted_positions[idx]]
            if node not in seen:
                nodes.append(node)
                seen.add(node)
            idx += 1
        
        return nodes


# Usage example
ch = ConsistentHash(["node-1", "node-2", "node-3"], virtual_nodes=150)

# Get responsible node for keys
print(ch.get_node("user:123"))  # "node-2"
print(ch.get_node("user:456"))  # "node-1"

# Get replica nodes (for replication factor 3)
print(ch.get_nodes("user:123", n=3))  # ["node-2", "node-3", "node-1"]

# Add a new node - minimal key movement
ch.add_node("node-4")
print(ch.get_node("user:123"))  # might change to "node-4"

Secondary Indexes

Local Secondary Indexes

┌─────────────────────────────────────────────────────────────────────────────┐
│                    LOCAL SECONDARY INDEXES                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Each partition maintains its own index                           │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Partition 0                                                         │   │
│  │ ┌───────────────────────────────────────────────────────────────┐  │   │
│  │ │ Data: user_001 {name: "Alice", city: "NYC"}                  │  │   │
│  │ │       user_007 {name: "Bob", city: "LA"}                     │  │   │
│  │ │       user_042 {name: "Carol", city: "NYC"}                  │  │   │
│  │ ├───────────────────────────────────────────────────────────────┤  │   │
│  │ │ Local Index (city):                                          │  │   │
│  │ │   "NYC" → [user_001, user_042]                               │  │   │
│  │ │   "LA"  → [user_007]                                         │  │   │
│  │ └───────────────────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ Partition 1                                                         │   │
│  │ ┌───────────────────────────────────────────────────────────────┐  │   │
│  │ │ Data: user_003 {name: "Dave", city: "NYC"}                   │  │   │
│  │ │       user_019 {name: "Eve", city: "SF"}                     │  │   │
│  │ ├───────────────────────────────────────────────────────────────┤  │   │
│  │ │ Local Index (city):                                          │  │   │
│  │ │   "NYC" → [user_003]                                         │  │   │
│  │ │   "SF"  → [user_019]                                         │  │   │
│  │ └───────────────────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  QUERY: "Find all users in NYC"                                            │
│  ─────────────────────────────────                                          │
│  Must query ALL partitions! (scatter-gather)                               │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Fast writes (index update is local)                                     │
│  ✓ No distributed transaction for index update                             │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ Read queries by secondary key → scatter-gather                          │
│  ✗ Latency = slowest partition                                             │
│                                                                              │
│  USED BY: Cassandra, MongoDB                                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Global Secondary Indexes

┌─────────────────────────────────────────────────────────────────────────────┐
│                    GLOBAL SECONDARY INDEXES                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Index is partitioned by the indexed column                       │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ DATA PARTITIONS (by user_id)                                        │   │
│  │ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐  │   │
│  │ │ Partition 0       │ │ Partition 1       │ │ Partition 2       │  │   │
│  │ │ user_001 (NYC)    │ │ user_003 (NYC)    │ │ user_005 (SF)     │  │   │
│  │ │ user_007 (LA)     │ │ user_019 (SF)     │ │ user_023 (NYC)    │  │   │
│  │ └───────────────────┘ └───────────────────┘ └───────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ GLOBAL INDEX PARTITIONS (by city)                                   │   │
│  │ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐  │   │
│  │ │ Index Partition 0 │ │ Index Partition 1 │ │ Index Partition 2 │  │   │
│  │ │ Cities: A-G       │ │ Cities: H-N       │ │ Cities: O-Z       │  │   │
│  │ │                   │ │                   │ │                   │  │   │
│  │ │ (none)            │ │ LA → [user_007]   │ │ NYC → [user_001,  │  │   │
│  │ │                   │ │                   │ │        user_003,  │  │   │
│  │ │                   │ │                   │ │        user_023]  │  │   │
│  │ │                   │ │                   │ │ SF → [user_005,   │  │   │
│  │ │                   │ │                   │ │       user_019]   │  │   │
│  │ └───────────────────┘ └───────────────────┘ └───────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  QUERY: "Find all users in NYC"                                            │
│  ─────────────────────────────────                                          │
│  Go to index partition for "NYC" → single partition read!                  │
│                                                                              │
│  PROS:                                                                      │
│  ─────                                                                      │
│  ✓ Fast reads by secondary key (single partition)                          │
│                                                                              │
│  CONS:                                                                      │
│  ─────                                                                      │
│  ✗ Slow writes (must update index partition asynchronously)                │
│  ✗ Index may be eventually consistent with data                            │
│                                                                              │
│  USED BY: DynamoDB (GSI), Elasticsearch                                    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Rebalancing Strategies

Fixed Number of Partitions

┌─────────────────────────────────────────────────────────────────────────────┐
│                    FIXED PARTITION STRATEGY                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Create more partitions than nodes, assign partitions to nodes   │
│                                                                              │
│  INITIAL: 4 nodes, 16 partitions                                           │
│  ─────────────────────────────────                                          │
│  Node 1: P0, P1, P2, P3                                                    │
│  Node 2: P4, P5, P6, P7                                                    │
│  Node 3: P8, P9, P10, P11                                                  │
│  Node 4: P12, P13, P14, P15                                                │
│                                                                              │
│  ADD NODE 5:                                                                │
│  ────────────                                                               │
│  Move some partitions to new node                                          │
│  Node 1: P0, P1, P2         (gave up P3)                                   │
│  Node 2: P4, P5, P6         (gave up P7)                                   │
│  Node 3: P8, P9, P10        (gave up P11)                                  │
│  Node 4: P12, P13, P14      (gave up P15)                                  │
│  Node 5: P3, P7, P11, P15   (received)                                     │
│                                                                              │
│  KEY INSIGHT:                                                               │
│  ────────────                                                               │
│  Partitions don't change, only their assignment to nodes                   │
│  Data within partitions doesn't move!                                      │
│                                                                              │
│  CHOOSING PARTITION COUNT:                                                  │
│  ─────────────────────────                                                  │
│  Too few:  Large partitions, hard to rebalance evenly                      │
│  Too many: Overhead for each partition (metadata, connections)             │
│                                                                              │
│  Rule of thumb: 10-100 partitions per node                                 │
│  Kafka default: 50 partitions                                              │
│  Cassandra/Riak: ~256 vnodes per node                                      │
│                                                                              │
│  USED BY: Riak, Elasticsearch, Couchbase, Voldemort                        │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Dynamic Partitioning

┌─────────────────────────────────────────────────────────────────────────────┐
│                    DYNAMIC PARTITIONING                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STRATEGY: Split/merge partitions based on size                            │
│                                                                              │
│  INITIAL: 1 partition                                                       │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      Partition 0                                    │   │
│  │                      (all data)                                     │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  SPLIT when partition exceeds threshold (e.g., 10 GB):                     │
│  ┌────────────────────────────┐ ┌────────────────────────────┐            │
│  │       Partition 0          │ │       Partition 1          │            │
│  │      (keys A-M)            │ │      (keys N-Z)            │            │
│  └────────────────────────────┘ └────────────────────────────┘            │
│                                                                              │
│  CONTINUE SPLITTING as data grows:                                         │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐        │
│  │   P0     │ │   P1     │ │   P2     │ │   P3     │ │   P4     │        │
│  │  A-D    │ │  E-G    │ │  H-M    │ │  N-R    │ │  S-Z    │        │
│  └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘        │
│                                                                              │
│  MERGE when partition falls below threshold:                               │
│  If P0 and P1 both < 1 GB → merge into P0 (A-G)                           │
│                                                                              │
│  PRE-SPLITTING:                                                            │
│  ──────────────                                                            │
│  Problem: New database starts with 1 partition → hot spot                 │
│  Solution: Pre-create partitions based on expected key range              │
│                                                                              │
│  USED BY: HBase, MongoDB (chunks), RethinkDB                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Advanced: Geo-Partitioning & Data Sovereignty

At Staff/Principal level, partitioning isn’t just about performance—it’s about compliance and locality. As laws like GDPR (Europe) and CCPA (California) become stricter, where data lives physically becomes a legal requirement.

1. Locality-Aware Partitioning

In a global database (e.g., CockroachDB, Spanner, or YugabyteDB), you can pin specific partitions to specific geographic regions.
  • The Goal: Keep data close to the user to minimize speed-of-light latency and comply with data residency laws.
  • The Mechanism: Using “Placement Policies” or “Partitioning Tags.”
-- CockroachDB Example: Pinning data to regions
ALTER TABLE users 
  PARTITION BY LIST (region) (
    PARTITION europe VALUES IN ('eu-west-1', 'eu-central-1'),
    PARTITION us_east VALUES IN ('us-east-1', 'us-east-2'),
    PARTITION asia VALUES IN ('ap-northeast-1')
  );

-- Pin the EUROPE partition to nodes in the EU regions
ALTER PARTITION europe OF INDEX users_pkey 
  CONFIGURE ZONE USING constraints = '[+region=europe]';

2. Follow-the-Workload (Dynamic Migration)

Sophisticated systems can dynamically move partitions based on the time of day or access patterns.
  • Daylight Shifting: Move active user data to the “waking” hemisphere to ensure local low-latency access.
  • Access Locality: If a user from Germany suddenly starts accessing a “US East” partition frequently, the system can migrate that partition (or a replica) to “EU West” automatically.

3. Data Sovereignty Challenges

When data is pinned to a region, Cross-Region Operations become extremely complex:
  • Global Joins: Joining a “pinned” German user with a “pinned” Japanese order requires cross-continental network hops (200ms+).
  • Global Secondary Indexes: If the index is global, writing to the German partition might require updating an index node in the US, breaking sovereignty if the index contains PII (Personally Identifiable Information).
StrategyPerformanceComplianceCost
Global ShardingHigh (Average)Poor (Data can be anywhere)Low
Regional ShardingBest (Local access)Strict (Data stays in region)High (Redundancy per region)
Geo-ReplicationGood (Read local)Medium (Copies exist everywhere)Medium
Staff Tip: When designing for Global Scale, “Data Residency” is often a harder constraint than “Queries Per Second.” Always ask: “Does this record have a legal right to leave this continent?”

Hot Spots and Skew

Identifying Hot Spots

┌─────────────────────────────────────────────────────────────────────────────┐
│                    HOT SPOT SCENARIOS                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  1. CELEBRITY PROBLEM                                                       │
│     ─────────────────                                                       │
│     User ID = partition key                                                │
│     Justin Bieber has 100M followers                                       │
│     All reads/writes to his partition!                                     │
│                                                                              │
│     Solutions:                                                             │
│     - Add random suffix: bieber_1, bieber_2, ... bieber_100               │
│     - Spread load across 100 partitions                                   │
│     - Application aggregates results                                      │
│                                                                              │
│  2. TIME-SERIES PROBLEM                                                    │
│     ──────────────────                                                     │
│     Partition key = date                                                   │
│     All writes go to "today" partition                                    │
│                                                                              │
│     Solutions:                                                             │
│     - Compound key: (sensor_id, date)                                     │
│     - Spread writes across sensors first                                  │
│                                                                              │
│  3. SEASONAL TRAFFIC                                                       │
│     ────────────────                                                        │
│     Black Friday: one product partition gets 100x traffic                 │
│                                                                              │
│     Solutions:                                                             │
│     - Predictive scaling                                                  │
│     - Pre-warm caches                                                     │
│     - Temporary key salting                                               │
│                                                                              │
│  MONITORING FOR HOT SPOTS:                                                 │
│  ─────────────────────────                                                  │
│  • QPS per partition                                                       │
│  • Latency percentiles per partition                                       │
│  • Disk/memory usage per partition                                         │
│  • Alerts when variance exceeds threshold                                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Hot Spot Mitigation

# Hot spot mitigation strategies

class HotKeySharding:
    """
    Spread hot keys across multiple sub-partitions.
    
    Real-world analogy: When a celebrity tweets, millions of users request
    that tweet simultaneously. If all requests hit one partition, it melts.
    This is like a bakery that normally serves 50 customers/hour suddenly
    facing 50,000. The solution: open 100 temporary counters (sub-shards),
    each serving from their own copy of the menu.
    
    Instagram uses a similar approach for celebrity profiles, and Twitter
    (now X) uses fan-out-on-write with sharded timelines.
    """
    
    def __init__(self, hot_keys: set, shard_count: int = 100):
        self.hot_keys = hot_keys  # Typically maintained by a background
                                   # job that monitors QPS per key
        self.shard_count = shard_count
    
    def get_partition_key(self, key: str) -> str:
        """
        For hot keys, add a random shard suffix to spread writes.
        For normal keys, use as-is to keep single-partition lookups fast.
        
        Trade-off: writes become O(1) per shard, but reads now require
        scatter-gather across all shards. Only shard keys you KNOW are hot.
        """
        if key in self.hot_keys:
            shard = hash(str(random.random())) % self.shard_count
            return f"{key}:shard_{shard}"
        return key
    
    def get_all_shards(self, key: str) -> list:
        """
        For reading, must query all shards of a hot key.
        """
        if key in self.hot_keys:
            return [f"{key}:shard_{i}" for i in range(self.shard_count)]
        return [key]


class WriteBuffer:
    """
    Buffer writes and batch them to reduce hot spot pressure.
    """
    
    def __init__(self, flush_interval: float = 1.0, max_buffer_size: int = 1000):
        self.buffer = defaultdict(list)
        self.flush_interval = flush_interval
        self.max_buffer_size = max_buffer_size
        self.last_flush = time.time()
    
    async def write(self, key: str, value: any):
        """Buffer a write."""
        self.buffer[key].append(value)
        
        if self._should_flush(key):
            await self._flush_key(key)
    
    def _should_flush(self, key: str) -> bool:
        buffer_full = len(self.buffer[key]) >= self.max_buffer_size
        time_exceeded = time.time() - self.last_flush > self.flush_interval
        return buffer_full or time_exceeded
    
    async def _flush_key(self, key: str):
        """Batch write buffered values."""
        values = self.buffer.pop(key, [])
        if values:
            # Single batch write instead of many individual writes
            await db.batch_write(key, values)
        self.last_flush = time.time()

Cross-Partition Operations

Scatter-Gather Pattern

┌─────────────────────────────────────────────────────────────────────────────┐
│                    SCATTER-GATHER PATTERN                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  For queries that can't be answered by a single partition                   │
│                                                                              │
│  QUERY: "Count all users with age > 30"                                    │
│                                                                              │
│              ┌─────────────────────────────────────────┐                    │
│              │           COORDINATOR                   │                    │
│              └────────────────┬────────────────────────┘                    │
│                               │                                             │
│              ┌────────────────┼────────────────┐                            │
│              │                │                │                            │
│        SCATTER               SCATTER         SCATTER                        │
│              │                │                │                            │
│              ▼                ▼                ▼                            │
│       ┌──────────┐     ┌──────────┐     ┌──────────┐                       │
│       │ Part. 0  │     │ Part. 1  │     │ Part. 2  │                       │
│       │          │     │          │     │          │                       │
│       │ count=42 │     │ count=37 │     │ count=55 │                       │
│       └────┬─────┘     └────┬─────┘     └────┬─────┘                       │
│            │                │                │                              │
│            └────────────────┼────────────────┘                              │
│                      GATHER │                                               │
│                             ▼                                               │
│              ┌─────────────────────────────────────────┐                    │
│              │  COORDINATOR: 42 + 37 + 55 = 134       │                    │
│              └─────────────────────────────────────────┘                    │
│                                                                              │
│  LATENCY: Max of all partition responses                                   │
│  AVAILABILITY: Requires all partitions to be up                            │
│                                                                              │
│  OPTIMIZATIONS:                                                            │
│  ──────────────                                                            │
│  • Timeout and return partial results                                      │
│  • Parallel execution with deadline                                        │
│  • Result streaming (don't wait for all)                                   │
│  • Predicate pushdown (filter at partition)                                │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Cross-Partition Transactions

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CROSS-PARTITION TRANSACTIONS                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SCENARIO: Transfer money between users on different partitions            │
│                                                                              │
│  User A (Partition 0): balance = 100                                       │
│  User B (Partition 1): balance = 50                                        │
│  Transfer: A → B, amount = 30                                              │
│                                                                              │
│  APPROACH 1: Two-Phase Commit (2PC)                                        │
│  ─────────────────────────────────                                          │
│  Coordinator ──► Partition 0: PREPARE (A.balance -= 30)                    │
│              └─► Partition 1: PREPARE (B.balance += 30)                    │
│                                                                              │
│  Both respond OK → COMMIT                                                  │
│  Any fails → ABORT                                                         │
│                                                                              │
│  Problems: Blocking, coordinator failure                                   │
│                                                                              │
│  APPROACH 2: Saga Pattern                                                  │
│  ────────────────────────                                                   │
│  Step 1: Debit A (Partition 0)                                            │
│  Step 2: Credit B (Partition 1)                                           │
│  If Step 2 fails: Compensate by crediting A                               │
│                                                                              │
│  Problems: Temporary inconsistency, complex compensation                   │
│                                                                              │
│  APPROACH 3: Avoid Cross-Partition (Best!)                                 │
│  ──────────────────────────────────────────                                 │
│  - Design partition key to keep related data together                      │
│  - Denormalize to avoid joins                                              │
│  - Use outbox pattern for eventual consistency                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Interview Practice

Question: How would you partition Twitter’s tweet storage?Requirements Analysis:
  • Queries: Get user’s tweets, home timeline, search
  • Scale: 500M tweets/day, 200M users
Approach:
Option A: Partition by user_id
✓ "Get user's tweets" = single partition
✗ "Home timeline" = scatter-gather (follow list)

Option B: Partition by tweet_id
✓ Good distribution (random IDs)
✗ "Get user's tweets" = scatter-gather

Recommended: Hybrid
- Tweets table: partition by user_id
- Timeline table: partition by viewer_id
  (pre-materialized, fan-out on write)
- Search: separate inverted index
Question: A celebrity with 100M followers posts. How do you handle the hot spot?Solution:
# 1. Identify hot users dynamically
hot_users = get_users_above_follower_threshold(1_000_000)

# 2. Shard their content
def get_post_partition(user_id: str, post_id: str) -> str:
    if user_id in hot_users:
        # Spread across 100 sub-partitions
        shard = hash(post_id) % 100
        return f"{user_id}:shard_{shard}"
    return user_id

# 3. For reads, query all shards and merge
def get_celebrity_posts(user_id: str) -> list:
    shards = [f"{user_id}:shard_{i}" for i in range(100)]
    results = parallel_query(shards)
    return merge_and_sort(results)

# 4. Use caching aggressively for celebrities
@cache(ttl=60)
def get_celebrity_posts_cached(user_id: str):
    return get_celebrity_posts(user_id)
Question: How do you rebalance partitions without downtime?Process:
1. PREPARE
   - Identify source and target nodes
   - Calculate data to move
   - Estimate time required

2. COPY (background)
   - Stream data from source to target
   - Source still serves reads/writes
   - Track "high watermark" of copied data

3. CATCH UP
   - Replay changes since copy started
   - Use change log/WAL
   - Repeat until caught up

4. SWITCH
   - Brief pause (or use dual-write)
   - Update routing table
   - New requests go to target
   
5. CLEANUP
   - Remove data from source
   - Verify target is healthy

Key: Never stop serving requests!

Advanced Design Scenarios

Scenario 1: Designing Partitioning for an E‑Commerce Platform

You are designing the data model and partitioning for a large e-commerce platform (catalog, carts, orders, inventory, users, payments). Key workloads:
  • Product browsing and search
  • User-specific operations (cart, wishlist, recommendations)
  • Order placement and history
  • Inventory management per warehouse/region
Partitioning strategy:
  • Users, carts, wishlists: Partition by user_id (hash or consistent hash)
    • “Get cart” and “get wishlist” → single partition
    • Cross-user queries (analytics) use separate OLAP or stream to a warehouse
  • Orders: Partition by user_id or order_id (with correlation to user)
    • Materialize read models for “orders by user” and “orders by status” using CQRS/Event Sourcing
  • Inventory: Partition by (warehouse_id, product_id)
    • Allows local, warehouse-level operations and avoids cross-partition inventory transactions
  • Search: Separate inverted index service (Elasticsearch / OpenSearch) with its own sharding
Trade-offs:
  • Many OLTP queries become single-partition and fast
  • Cross-entity queries (“top selling products globally”) handled via denormalized tables or separate systems
  • Consistency for cross-partition operations handled via Sagas/2PC as needed

Scenario 2: Multi-Tenant SaaS Partitioning

You are building a multi-tenant SaaS where each tenant has its own users, projects, and data. Goals:
  • Strong data isolation between tenants
  • Ability to move tenants between clusters or regions
  • Ability to put large tenants on dedicated resources
Partitioning options:
  • Database-per-tenant:
    • Pros: Strong isolation, easy per-tenant backup/restore
    • Cons: Operational overhead when tenants = 10k+
  • Shared database, tenant_id in every row:
    • Pros: Easier to operate at scale
    • Cons: Harder to move tenants, noisy neighbor effects
  • Hybrid:
    • Small tenants: shared database with partitioning by tenant_id
    • Large tenants: own database or shard, with routing table
Implementation pattern:
  • Maintain a Tenant Catalog mapping tenant_id → {cluster, database, partition_key}
  • Route each request through a Tenant Router that picks the correct connection before query execution
  • For large tenants, use more partitions or dedicated clusters and update catalog accordingly

Scenario 3: Time-Series Platform (Metrics/Logs)

You are designing a time-series platform that ingests telemetry, metrics, or logs from millions of devices. Characteristics:
  • Very high write throughput (append-only)
  • Queries are often time-bounded and per device / per stream
  • Data retention and tiering (hot vs cold storage)
Partitioning strategy:
  • Primary key: (tenant_id, stream_id, time_bucket)
  • Partition by hash(tenant_id, stream_id); time is part of clustering key
  • Create time buckets (e.g., daily or hourly) to allow range scans and efficient TTL/compaction
Hot partitions problem:
  • If you partition by raw timestamp, “now” is a hot partition
  • Instead, spread by (tenant_id, stream_id) and keep time in the clustering key
Storage tiering:
  • Recent buckets stored on fast nodes (SSD, high IOPS)
  • Older buckets moved to cheaper storage (object store) with precomputed aggregates

Key Takeaways

Partition Key is Critical

Choose partition key carefully—it determines data distribution, query efficiency, and hot spot risk.

Consistent Hashing Enables Scale

Virtual nodes provide even distribution and minimal data movement when nodes change.

Secondary Indexes Have Trade-offs

Local = fast writes, slow reads. Global = fast reads, eventually consistent writes.

Avoid Cross-Partition Operations

Design data model to keep related data together. Cross-partition = expensive.

Next Steps

Data Systems

Deep dive into Spanner, Cassandra, DynamoDB internals

Transactions

Handle cross-partition transactions with 2PC and Saga

Interview Deep-Dive

Strong Answer:
  • This is a fundamental tension in partitioning design. Hash partitioning gives you excellent point lookups (hash the key, go directly to one partition) and even data distribution, but destroys range scan ability because adjacent keys are scattered across partitions. Range partitioning preserves key order (enabling efficient range scans) but creates hot spots when access patterns cluster around certain key ranges.
  • The solution is usually a compound partition key. For example, in a time-series system, partition by hash(device_id) and use timestamp as the clustering key within each partition. Point lookups by device_id go to one partition, and range scans for “device X, last 24 hours” are also single-partition because the timestamps are ordered within the partition.
  • DynamoDB models this explicitly with its partition key (hash distributed) and sort key (range ordered within the partition). Cassandra uses a similar compound primary key model.
  • The design rule: pick the partition key to match your most frequent query pattern, then use secondary indexes or materialized views for alternative access patterns.
Follow-up: What if your most frequent query requires a range scan across a dimension that cannot be the sort key?Then you need a global secondary index (GSI) or a denormalized table. With a GSI (as in DynamoDB), the index is itself partitioned by the indexed attribute, so a range scan on that attribute becomes a single-partition read on the index. The cost: writes are slower because every data write triggers an asynchronous index update, and the index is eventually consistent with the base table. Alternatively, you can create a separate denormalized table optimized for the range scan query, maintained via change data capture (CDC) or an event stream. This is the CQRS pattern — separate read models for separate query patterns. The trade-off is storage cost and the complexity of keeping multiple representations in sync.
Strong Answer:
  • This is the “celebrity problem” or “thundering herd on a single key.” The partition holding the celebrity’s data receives 1000x normal traffic, exceeding its capacity while other partitions sit idle.
  • Immediate mitigation: add a random suffix to the celebrity’s partition key, splitting their data across N sub-partitions (e.g., “celebrity123:shard_0” through “celebrity123:shard_99”). Writes are distributed randomly across sub-shards. Reads require scatter-gather across all sub-shards, but this is parallelizable and much better than overloading a single partition.
  • Detection must be automated. Monitor per-partition QPS and latency in real time. When a partition’s QPS exceeds a threshold (e.g., 5x the average), automatically add it to a “hot key” list and enable sub-sharding for that key. This is a dynamic, not static, process — celebrities come and go.
  • Caching is the complementary strategy. For read-heavy hot spots, place the celebrity’s data in a distributed cache (Redis Cluster, Memcached) with a short TTL. This absorbs the read storm without hitting the database. For write-heavy hot spots (likes, view counts), buffer writes in memory and flush to the database in batches (write coalescing).
Follow-up: How do you handle the transition when a hot key is detected and you need to add sub-sharding without downtime?The transition must be atomic from the client’s perspective. I would use a routing layer that maintains a mapping of hot keys to their sub-shard configuration. When a key is detected as hot: (1) The routing layer starts writing to sub-shards in addition to the original key (dual-write phase). (2) After a convergence period, all recent data is in sub-shards. (3) Switch reads to scatter-gather across sub-shards. (4) Stop writing to the original key. The critical detail is that during the transition, reads must check both the original key and the sub-shards to avoid missing data. A simpler approach, used by some systems: always use sub-sharding for all keys (with a low shard count like 4), so no dynamic transition is needed. The overhead of reading 4 sub-shards per key is minimal, and you are always prepared for hot keys.
Strong Answer:
  • A local secondary index is co-located with the data partition. Each partition maintains its own index over its local data only. Writes are fast because the index update is a local operation within the same partition. But reads by the indexed attribute require scatter-gather across all partitions because any partition might have matching data. Read latency equals the slowest partition.
  • A global secondary index is separately partitioned by the indexed attribute. A query like “find all users in NYC” goes to a single index partition, making reads fast. But writes are slow because inserting a record might require updating an index partition on a different node, which is either a distributed transaction (expensive) or eventually consistent (the index lags behind the data).
  • Cassandra uses local indexes (materialized views with scatter-gather). DynamoDB uses global indexes (GSIs that are eventually consistent). Elasticsearch uses global indexing with near-real-time consistency.
  • The decision depends on read/write ratio. If the workload is write-heavy and reads by secondary key are rare, local indexes are better. If the workload is read-heavy with frequent secondary key lookups, global indexes are better despite the write overhead.
Follow-up: In DynamoDB, what consistency issues can arise from the eventually consistent nature of GSIs?A write to the base table is immediately visible but the GSI update is asynchronous, typically within milliseconds but potentially delayed during high write throughput. This means a query on the GSI might not return a record that was just written to the base table. Worse, it can return stale attribute values if the record was recently updated. In practice, this means you cannot use a GSI query result as the source of truth for an immediate business decision. For example, if you use a GSI to check “is this username taken?” and the GSI lags, two users might simultaneously register the same username — the GSI returns “not taken” for both. The mitigation: use the base table (with a strongly consistent read) for invariant checks, and use GSIs only for queries where eventual consistency is acceptable (search, analytics, listing pages).