When data outgrows a single machine, you need to partition (shard) it across multiple nodes. This module covers the strategies, trade-offs, and production patterns for effective data distribution.Think of partitioning like organizing a library: you could put all books on one giant shelf (single node), but eventually you run out of space and it takes too long to find anything. Instead, you split books across rooms — fiction in Room A, non-fiction in Room B, reference in Room C. The challenge is choosing the split so that most visitors only need to visit one room, and no single room is overwhelmed while others sit empty.
Must-Know Algorithm: Consistent hashing is foundational for distributed caching, databases, and load balancing. Expect to explain and implement it in interviews.
┌─────────────────────────────────────────────────────────────────────────────┐│ CONSISTENT HASHING │├─────────────────────────────────────────────────────────────────────────────┤│ ││ PROBLEM WITH hash(key) mod N: ││ ───────────────────────────── ││ Adding/removing node → almost ALL keys rehash to different nodes! ││ ││ Before: 3 nodes After: 4 nodes ││ hash(k) mod 3 = ? hash(k) mod 4 = ? ││ Key 1: 0 Key 1: 1 (moved!) ││ Key 2: 1 Key 2: 2 (moved!) ││ Key 3: 2 Key 3: 3 (moved!) ││ ││ CONSISTENT HASHING SOLUTION: ││ ──────────────────────────── ││ Map both nodes and keys to a ring (0 to 2^32 - 1) ││ Key goes to first node clockwise from its position ││ ││ ┌─────────────────────────────────────┐ ││ │ HASH RING │ ││ │ │ ││ │ 0° │ ││ │ ▲ │ ││ │ Node A │ │ ││ │ │ │ ││ │ 270° ◄───┼───► 90° │ ││ │ Node D │ Node B │ ││ │ │ │ ││ │ ▼ │ ││ │ Node C │ ││ │ 180° │ ││ └─────────────────────────────────────┘ ││ ││ Key at 45° → goes to Node B (first node clockwise) ││ Key at 200° → goes to Node D (first node clockwise) ││ ││ ADDING A NODE: ││ ────────────── ││ Add Node E at 135° → only keys between C (180°) and E (135°) move ││ Other keys stay with their original nodes! ││ ││ ON AVERAGE: Only K/N keys need to move (K = keys, N = nodes) ││ │└─────────────────────────────────────────────────────────────────────────────┘
import hashlibfrom bisect import bisect_left, bisect_rightclass ConsistentHash: """ Consistent hashing with virtual nodes. Used by: Cassandra, DynamoDB, Memcached (with ketama), load balancers The key insight: instead of hash(key) % N (which reshuffles everything when N changes), we map both keys and nodes onto the same circular space. Adding a node only "steals" keys from its immediate neighbors on the ring, leaving all other assignments unchanged. Virtual nodes smooth out the distribution -- without them, 3 physical nodes might split the ring 60/30/10 instead of the desired 33/33/33. """ def __init__(self, nodes: list = None, virtual_nodes: int = 150): # 150 vnodes per physical node is a good default for clusters # up to ~100 nodes. More vnodes = better balance but more memory # for the ring data structure and slower lookups. self.virtual_nodes = virtual_nodes self.ring = {} # position -> node self.sorted_positions = [] # sorted list of positions if nodes: for node in nodes: self.add_node(node) def _hash(self, key: str) -> int: """Hash function that returns position on ring (0 to 2^32).""" return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32) def add_node(self, node: str): """Add a node with its virtual nodes to the ring.""" for i in range(self.virtual_nodes): vnode_key = f"{node}:{i}" position = self._hash(vnode_key) self.ring[position] = node self.sorted_positions.append(position) self.sorted_positions.sort() def remove_node(self, node: str): """Remove a node and all its virtual nodes from the ring.""" for i in range(self.virtual_nodes): vnode_key = f"{node}:{i}" position = self._hash(vnode_key) del self.ring[position] self.sorted_positions.remove(position) def get_node(self, key: str) -> str: """Get the node responsible for a key.""" if not self.ring: return None position = self._hash(key) # Find first node clockwise from position idx = bisect_right(self.sorted_positions, position) if idx == len(self.sorted_positions): idx = 0 # Wrap around return self.ring[self.sorted_positions[idx]] def get_nodes(self, key: str, n: int = 3) -> list: """Get N nodes for replication (walking clockwise).""" if not self.ring: return [] position = self._hash(key) idx = bisect_right(self.sorted_positions, position) nodes = [] seen = set() while len(nodes) < n and len(seen) < len(set(self.ring.values())): if idx >= len(self.sorted_positions): idx = 0 node = self.ring[self.sorted_positions[idx]] if node not in seen: nodes.append(node) seen.add(node) idx += 1 return nodes# Usage examplech = ConsistentHash(["node-1", "node-2", "node-3"], virtual_nodes=150)# Get responsible node for keysprint(ch.get_node("user:123")) # "node-2"print(ch.get_node("user:456")) # "node-1"# Get replica nodes (for replication factor 3)print(ch.get_nodes("user:123", n=3)) # ["node-2", "node-3", "node-1"]# Add a new node - minimal key movementch.add_node("node-4")print(ch.get_node("user:123")) # might change to "node-4"
At Staff/Principal level, partitioning isn’t just about performance—it’s about compliance and locality. As laws like GDPR (Europe) and CCPA (California) become stricter, where data lives physically becomes a legal requirement.
In a global database (e.g., CockroachDB, Spanner, or YugabyteDB), you can pin specific partitions to specific geographic regions.
The Goal: Keep data close to the user to minimize speed-of-light latency and comply with data residency laws.
The Mechanism: Using “Placement Policies” or “Partitioning Tags.”
-- CockroachDB Example: Pinning data to regionsALTER TABLE users PARTITION BY LIST (region) ( PARTITION europe VALUES IN ('eu-west-1', 'eu-central-1'), PARTITION us_east VALUES IN ('us-east-1', 'us-east-2'), PARTITION asia VALUES IN ('ap-northeast-1') );-- Pin the EUROPE partition to nodes in the EU regionsALTER PARTITION europe OF INDEX users_pkey CONFIGURE ZONE USING constraints = '[+region=europe]';
Sophisticated systems can dynamically move partitions based on the time of day or access patterns.
Daylight Shifting: Move active user data to the “waking” hemisphere to ensure local low-latency access.
Access Locality: If a user from Germany suddenly starts accessing a “US East” partition frequently, the system can migrate that partition (or a replica) to “EU West” automatically.
When data is pinned to a region, Cross-Region Operations become extremely complex:
Global Joins: Joining a “pinned” German user with a “pinned” Japanese order requires cross-continental network hops (200ms+).
Global Secondary Indexes: If the index is global, writing to the German partition might require updating an index node in the US, breaking sovereignty if the index contains PII (Personally Identifiable Information).
Strategy
Performance
Compliance
Cost
Global Sharding
High (Average)
Poor (Data can be anywhere)
Low
Regional Sharding
Best (Local access)
Strict (Data stays in region)
High (Redundancy per region)
Geo-Replication
Good (Read local)
Medium (Copies exist everywhere)
Medium
Staff Tip: When designing for Global Scale, “Data Residency” is often a harder constraint than “Queries Per Second.” Always ask: “Does this record have a legal right to leave this continent?”
# Hot spot mitigation strategiesclass HotKeySharding: """ Spread hot keys across multiple sub-partitions. Real-world analogy: When a celebrity tweets, millions of users request that tweet simultaneously. If all requests hit one partition, it melts. This is like a bakery that normally serves 50 customers/hour suddenly facing 50,000. The solution: open 100 temporary counters (sub-shards), each serving from their own copy of the menu. Instagram uses a similar approach for celebrity profiles, and Twitter (now X) uses fan-out-on-write with sharded timelines. """ def __init__(self, hot_keys: set, shard_count: int = 100): self.hot_keys = hot_keys # Typically maintained by a background # job that monitors QPS per key self.shard_count = shard_count def get_partition_key(self, key: str) -> str: """ For hot keys, add a random shard suffix to spread writes. For normal keys, use as-is to keep single-partition lookups fast. Trade-off: writes become O(1) per shard, but reads now require scatter-gather across all shards. Only shard keys you KNOW are hot. """ if key in self.hot_keys: shard = hash(str(random.random())) % self.shard_count return f"{key}:shard_{shard}" return key def get_all_shards(self, key: str) -> list: """ For reading, must query all shards of a hot key. """ if key in self.hot_keys: return [f"{key}:shard_{i}" for i in range(self.shard_count)] return [key]class WriteBuffer: """ Buffer writes and batch them to reduce hot spot pressure. """ def __init__(self, flush_interval: float = 1.0, max_buffer_size: int = 1000): self.buffer = defaultdict(list) self.flush_interval = flush_interval self.max_buffer_size = max_buffer_size self.last_flush = time.time() async def write(self, key: str, value: any): """Buffer a write.""" self.buffer[key].append(value) if self._should_flush(key): await self._flush_key(key) def _should_flush(self, key: str) -> bool: buffer_full = len(self.buffer[key]) >= self.max_buffer_size time_exceeded = time.time() - self.last_flush > self.flush_interval return buffer_full or time_exceeded async def _flush_key(self, key: str): """Batch write buffered values.""" values = self.buffer.pop(key, []) if values: # Single batch write instead of many individual writes await db.batch_write(key, values) self.last_flush = time.time()
Question: How would you partition Twitter’s tweet storage?Requirements Analysis:
Queries: Get user’s tweets, home timeline, search
Scale: 500M tweets/day, 200M users
Approach:
Option A: Partition by user_id✓ "Get user's tweets" = single partition✗ "Home timeline" = scatter-gather (follow list)Option B: Partition by tweet_id✓ Good distribution (random IDs)✗ "Get user's tweets" = scatter-gatherRecommended: Hybrid- Tweets table: partition by user_id- Timeline table: partition by viewer_id (pre-materialized, fan-out on write)- Search: separate inverted index
Q2: Handle celebrity hot spots
Question: A celebrity with 100M followers posts. How do you handle the hot spot?Solution:
# 1. Identify hot users dynamicallyhot_users = get_users_above_follower_threshold(1_000_000)# 2. Shard their contentdef get_post_partition(user_id: str, post_id: str) -> str: if user_id in hot_users: # Spread across 100 sub-partitions shard = hash(post_id) % 100 return f"{user_id}:shard_{shard}" return user_id# 3. For reads, query all shards and mergedef get_celebrity_posts(user_id: str) -> list: shards = [f"{user_id}:shard_{i}" for i in range(100)] results = parallel_query(shards) return merge_and_sort(results)# 4. Use caching aggressively for celebrities@cache(ttl=60)def get_celebrity_posts_cached(user_id: str): return get_celebrity_posts(user_id)
Q3: Rebalancing without downtime
Question: How do you rebalance partitions without downtime?Process:
1. PREPARE - Identify source and target nodes - Calculate data to move - Estimate time required2. COPY (background) - Stream data from source to target - Source still serves reads/writes - Track "high watermark" of copied data3. CATCH UP - Replay changes since copy started - Use change log/WAL - Repeat until caught up4. SWITCH - Brief pause (or use dual-write) - Update routing table - New requests go to target5. CLEANUP - Remove data from source - Verify target is healthyKey: Never stop serving requests!
How do you choose a partition key for a system that needs to support both point lookups and range scans efficiently?
Strong Answer:
This is a fundamental tension in partitioning design. Hash partitioning gives you excellent point lookups (hash the key, go directly to one partition) and even data distribution, but destroys range scan ability because adjacent keys are scattered across partitions. Range partitioning preserves key order (enabling efficient range scans) but creates hot spots when access patterns cluster around certain key ranges.
The solution is usually a compound partition key. For example, in a time-series system, partition by hash(device_id) and use timestamp as the clustering key within each partition. Point lookups by device_id go to one partition, and range scans for “device X, last 24 hours” are also single-partition because the timestamps are ordered within the partition.
DynamoDB models this explicitly with its partition key (hash distributed) and sort key (range ordered within the partition). Cassandra uses a similar compound primary key model.
The design rule: pick the partition key to match your most frequent query pattern, then use secondary indexes or materialized views for alternative access patterns.
Follow-up: What if your most frequent query requires a range scan across a dimension that cannot be the sort key?Then you need a global secondary index (GSI) or a denormalized table. With a GSI (as in DynamoDB), the index is itself partitioned by the indexed attribute, so a range scan on that attribute becomes a single-partition read on the index. The cost: writes are slower because every data write triggers an asynchronous index update, and the index is eventually consistent with the base table. Alternatively, you can create a separate denormalized table optimized for the range scan query, maintained via change data capture (CDC) or an event stream. This is the CQRS pattern — separate read models for separate query patterns. The trade-off is storage cost and the complexity of keeping multiple representations in sync.
A celebrity posts on your platform and generates a massive hot spot on one partition. How do you handle this in real time?
Strong Answer:
This is the “celebrity problem” or “thundering herd on a single key.” The partition holding the celebrity’s data receives 1000x normal traffic, exceeding its capacity while other partitions sit idle.
Immediate mitigation: add a random suffix to the celebrity’s partition key, splitting their data across N sub-partitions (e.g., “celebrity123:shard_0” through “celebrity123:shard_99”). Writes are distributed randomly across sub-shards. Reads require scatter-gather across all sub-shards, but this is parallelizable and much better than overloading a single partition.
Detection must be automated. Monitor per-partition QPS and latency in real time. When a partition’s QPS exceeds a threshold (e.g., 5x the average), automatically add it to a “hot key” list and enable sub-sharding for that key. This is a dynamic, not static, process — celebrities come and go.
Caching is the complementary strategy. For read-heavy hot spots, place the celebrity’s data in a distributed cache (Redis Cluster, Memcached) with a short TTL. This absorbs the read storm without hitting the database. For write-heavy hot spots (likes, view counts), buffer writes in memory and flush to the database in batches (write coalescing).
Follow-up: How do you handle the transition when a hot key is detected and you need to add sub-sharding without downtime?The transition must be atomic from the client’s perspective. I would use a routing layer that maintains a mapping of hot keys to their sub-shard configuration. When a key is detected as hot: (1) The routing layer starts writing to sub-shards in addition to the original key (dual-write phase). (2) After a convergence period, all recent data is in sub-shards. (3) Switch reads to scatter-gather across sub-shards. (4) Stop writing to the original key. The critical detail is that during the transition, reads must check both the original key and the sub-shards to avoid missing data. A simpler approach, used by some systems: always use sub-sharding for all keys (with a low shard count like 4), so no dynamic transition is needed. The overhead of reading 4 sub-shards per key is minimal, and you are always prepared for hot keys.
Explain the trade-offs between local secondary indexes and global secondary indexes in a partitioned database.
Strong Answer:
A local secondary index is co-located with the data partition. Each partition maintains its own index over its local data only. Writes are fast because the index update is a local operation within the same partition. But reads by the indexed attribute require scatter-gather across all partitions because any partition might have matching data. Read latency equals the slowest partition.
A global secondary index is separately partitioned by the indexed attribute. A query like “find all users in NYC” goes to a single index partition, making reads fast. But writes are slow because inserting a record might require updating an index partition on a different node, which is either a distributed transaction (expensive) or eventually consistent (the index lags behind the data).
Cassandra uses local indexes (materialized views with scatter-gather). DynamoDB uses global indexes (GSIs that are eventually consistent). Elasticsearch uses global indexing with near-real-time consistency.
The decision depends on read/write ratio. If the workload is write-heavy and reads by secondary key are rare, local indexes are better. If the workload is read-heavy with frequent secondary key lookups, global indexes are better despite the write overhead.
Follow-up: In DynamoDB, what consistency issues can arise from the eventually consistent nature of GSIs?A write to the base table is immediately visible but the GSI update is asynchronous, typically within milliseconds but potentially delayed during high write throughput. This means a query on the GSI might not return a record that was just written to the base table. Worse, it can return stale attribute values if the record was recently updated. In practice, this means you cannot use a GSI query result as the source of truth for an immediate business decision. For example, if you use a GSI to check “is this username taken?” and the GSI lags, two users might simultaneously register the same username — the GSI returns “not taken” for both. The mitigation: use the base table (with a strongly consistent read) for invariant checks, and use GSIs only for queries where eventual consistency is acceptable (search, analytics, listing pages).