Standard partitioning (Key-Range or Hash) works well for one-dimensional keys. However, for multi-dimensional data (e.g., searching for users by (latitude, longitude) or (age, income)), a single-key partition leads to expensive “scatter-gather” queries.Z-Order Curves (Morton Order) map multi-dimensional data into a single dimension while preserving locality.
import hashlibfrom bisect import bisect_rightclass ConsistentHash: def __init__(self, nodes=None, virtual_nodes=100): self.virtual_nodes = virtual_nodes self.ring = {} # hash -> node self.sorted_hashes = [] if nodes: for node in nodes: self.add_node(node) def _hash(self, key): """Generate hash for a key""" return int(hashlib.md5( key.encode() ).hexdigest(), 16) def add_node(self, node): """Add a node with virtual nodes""" for i in range(self.virtual_nodes): virtual_key = f"{node}:{i}" h = self._hash(virtual_key) self.ring[h] = node self.sorted_hashes.append(h) self.sorted_hashes.sort() def remove_node(self, node): """Remove a node and its virtual nodes""" for i in range(self.virtual_nodes): virtual_key = f"{node}:{i}" h = self._hash(virtual_key) del self.ring[h] self.sorted_hashes.remove(h) def get_node(self, key): """Find the node responsible for this key""" if not self.ring: return None h = self._hash(key) # Find first node with hash >= key's hash idx = bisect_right(self.sorted_hashes, h) # Wrap around if needed if idx >= len(self.sorted_hashes): idx = 0 return self.ring[self.sorted_hashes[idx]] def get_nodes(self, key, n=3): """Get n nodes for replication""" if not self.ring: return [] h = self._hash(key) idx = bisect_right(self.sorted_hashes, h) nodes = [] seen = set() for i in range(len(self.sorted_hashes)): node_idx = (idx + i) % len(self.sorted_hashes) node = self.ring[self.sorted_hashes[node_idx]] if node not in seen: nodes.append(node) seen.add(node) if len(nodes) >= n: break return nodes# Usagech = ConsistentHash(['node1', 'node2', 'node3'])# Get node for a keynode = ch.get_node("user:12345")# Get 3 nodes for replicationreplicas = ch.get_nodes("user:12345", n=3)
┌─────────────────────────────────────────────────────────────────────────────┐│ S3-STYLE OBJECT STORAGE │├─────────────────────────────────────────────────────────────────────────────┤│ ││ KEY CONCEPTS: ││ ───────────── ││ • Objects (not files): blob + metadata + key ││ • Buckets: containers for objects ││ • Flat namespace (no real directories) ││ • Immutable objects (versioning for updates) ││ ││ ARCHITECTURE (Simplified): ││ ───────────────────────── ││ ││ Client ││ │ ││ ▼ ││ ┌─────────────────────┐ ││ │ Load Balancer │ ││ └──────────┬──────────┘ ││ │ ││ ┌──────┴──────┐ ││ ▼ ▼ ││ ┌────────┐ ┌────────┐ ││ │Frontend│ │Frontend│ (HTTP handling, auth, routing) ││ └───┬────┘ └───┬────┘ ││ │ │ ││ ▼ ▼ ││ ┌─────────────────────┐ ││ │ Metadata Store │ (key → data location mapping) ││ │ (Distributed DB) │ ││ └──────────┬──────────┘ ││ │ ││ ┌──────┴──────┬──────────┐ ││ ▼ ▼ ▼ ││ ┌────────┐ ┌────────┐ ┌────────┐ ││ │ Storage│ │ Storage│ │ Storage│ (actual data blobs) ││ │ Node │ │ Node │ │ Node │ ││ └────────┘ └────────┘ └────────┘ ││ ││ DURABILITY: 11 9's (99.999999999%) ││ HOW: Replicate across multiple datacenters + erasure coding ││ │└─────────────────────────────────────────────────────────────────────────────┘### Advanced: Tiered Storage & S3-backed LSM TreesModern data systems like **Apache Kafka**, **Redpanda**, and **Rockset** use "Tiered Storage" to achieve infinite retention without scaling expensive SSDs.#### The Architecture:1. **Hot Tier (Local NVMe)**: Stores the most recent data ($L_0$ SSTables or latest Kafka segments). Optimized for low-latency writes and tailing reads.2. **Cold Tier (Object Storage/S3)**: As data ages or SSTables are compacted, they are pushed to S3. 3. **Shadow Metadata**: The database maintains a local index of which blocks are local and which are in S3.#### Advantages:- **Decoupled Scaling**: Scale compute (CPU) independently of storage (S3 is effectively infinite).- **Instant Cluster Rebalancing**: Since 99% of data is in S3, "moving a partition" only requires moving the metadata, not the TBs of actual data.- **Cost**: S3 is $\sim 10x$ cheaper than provisioned EBS/SSD storage.### Erasure Coding (Reed-Solomon)For modern storage systems (S3, Azure Blob, HDFS 3.x), simple 3x replication is often too expensive (200% storage overhead). **Erasure Coding** provides higher durability with much lower overhead.#### How it works: (k, m) notationA file is split into **k** data chunks, and **m** parity chunks are calculated.- Total chunks = $n = k + m$- Can tolerate the loss of **any m chunks**.- Storage overhead = $n / k$ (e.g., for (10, 4), overhead is $14/10 = 1.4x$, vs 3.0x for 3-way replication).```text┌───────────────────────────────────────────────────────────────┐│ REED-SOLOMON EXAMPLE (4, 2) │├───────────────────────────────────────────────────────────────┤│ ││ DATA CHUNKS (k=4) PARITY CHUNKS (m=2) ││ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ││ │ D1 │ │ D2 │ │ D3 │ │ D4 │ ──► │ P1 │ │ P2 │ ││ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ ││ ││ DISTRIBUTION: Each chunk on a different disk/node/rack. ││ ││ RECOVERY: Any 4 of these 6 chunks can reconstruct the file. ││ If D1 and D3 are lost, P1 and P2 + D2 and D4 can fix it. ││ │└───────────────────────────────────────────────────────────────┘
Staff Tip: Erasure coding is best for cold data (infrequently accessed) where storage cost dominates. For hot data, replication is preferred to avoid the CPU and latency penalties of “reconstruction reads.”
Copy
---## Advanced: Cloud-Native Database ArchitecturesStandard distributed databases (Shared-Nothing) scale by partitioning data across nodes. However, cloud-native databases like **AWS Aurora** and **Alibaba PolarDB** use a **Shared-Storage** architecture that fundamentally changes how replication and recovery work.### "The Log is the Database" (AWS Aurora)In a traditional database (MySQL/Postgres), the engine must write data pages, redo logs, undo logs, and binlogs. In a distributed environment, this creates massive network I/O. Aurora’s breakthrough was to **decouple compute from storage** and only send the **Redo Log** over the network.#### How Aurora Works:1. **Log-Only Replication**: The primary instance doesn't send full pages to replicas. It only sends redo log records to the storage tier.2. **Smart Storage Tier**: The storage nodes are "database-aware." They receive log records and apply them to data pages in the background.3. **Quorum Writes**: Data is striped across 3 Availability Zones (AZs) with 2 copies in each (Total = 6 replicas). - **Write Quorum**: 4 of 6 (Tolerates 1 AZ failure + 1 node). - **Read Quorum**: 3 of 6.4. **Instantaneous Failover**: Since replicas share the same storage tier, a replica can become primary almost instantly by just replaying the last few log records.```text┌─────────────────────────────────────────────────────────────────────────────┐│ SHARED-NOTHING vs. CLOUD-NATIVE │├─────────────────────────────────────────────────────────────────────────────┤│ ││ SHARED-NOTHING (Cassandra/Spanner) CLOUD-NATIVE (Aurora/PolarDB) ││ ┌────────┐ ┌────────┐ ┌────────┐ ┌──────────────┐ (Compute) ││ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ Read/Write │ ││ │ (C+S) │ │ (C+S) │ │ (C+S) │ └──────┬───────┘ ││ └────────┘ └────────┘ └────────┘ │ (Redo Log Only) ││ ▲ ▲ ▲ ┌──────▼───────┐ (Storage) ││ └───────────┴───────────┘ │ Log-Structured│ ││ (Sharding) │ Shared Storage│ ││ └───────────────┘ ││ ││ Scale: By adding nodes (Sharding) Scale: By decoupling layers ││ Recovery: Slow (data movement) Recovery: Instant (shared disk)││ │└─────────────────────────────────────────────────────────────────────────────┘
Staff Tip: Shared-storage architectures are generally better for RDBMS workloads (SQL) that are hard to shard but need high availability and read scalability. Shared-nothing is still king for massive horizontal scale (billions of rows) where a single storage tier would become a bottleneck.
While Aurora uses “Database-Aware” storage, analytical systems like Snowflake and Google BigQuery use a different pattern: Storage/Compute Separation via Object Storage.
Advanced: The Shuffle Problem & Distributed Query Execution
While databases like Spanner handle OLTP (Online Transaction Processing), large-scale analytics (OLAP) systems like Presto/Trino, Spark, and ClickHouse face a different challenge: The Shuffle.
A shuffle is the process of redistributing data across a cluster so that all records with the same key end up on the same physical node. This is required for:
Joins: To join Table A and Table B on user_id, all rows for user_id=123 from both tables must meet on the same node.
Aggregations: To GROUP BY city, all “NYC” rows must be on one node.
Exchange Operators (Volcano Model): Each node pulls data from its predecessors. This is simple but can be slow due to “pull” overhead.
Push-Based (Vectorized): Modern engines like DuckDB or Photon (Databricks) push batches of data (vectors) through the pipeline, maximizing CPU cache efficiency.
Send the small table to every node holding the large table.
One small table (< 100MB). Zero Shuffle for the large table.
Shuffle Join
Hash both tables by the join key and redistribute.
Two large tables. Requires moving 100% of data.
Colocated Join
Data is already partitioned by the join key at storage time.
Best Performance. No network movement.
Staff Tip: To optimize distributed queries, your goal is to minimize the shuffle. Use broadcast joins whenever possible, and try to partition your “fact” and “dimension” tables on the same key at the storage level.
In Staff-level interviews, you must discuss the RUM Conjecture (Read, Update, Memory):
Feature
B-Tree
LSM-Tree
Write Amplification
High (random I/O, page splits)
Lower (sequential appends)
Read Amplification
Low (fixed tree depth)
Higher (checking multiple SSTables)
Space Amplification
Low (mostly compact)
High (duplicate versions, tombstones)
Staff Tip: If your system has high write volume (e.g., logging, metrics, or a distributed ledger), choose an LSM-Tree. If you need low-latency range reads on a stable dataset, choose a B-Tree.
For high-concurrency systems (like Microsoft SQL Server’s Hekaton or Azure Cosmos DB), traditional B-Trees suffer from latch contention (locking) and “in-place” update overhead. The Bw-Tree (Buzzword Tree) solves this using a log-structured, lock-free approach.
The core innovation is a Mapping Table that maps a logical Page ID to a physical memory pointer.
Benefit: Instead of updating a page in-place (which requires a lock), you create a new version of the page and update the pointer in the mapping table using a single Atomic CAS (Compare-and-Swap).
LLAMA is the storage subsystem that manages these pages. It ensures that both the in-memory state and the on-disk state are log-structured and latch-free.
At the “Principal” level, performance is often gated by how fast you can write the Write-Ahead Log (WAL). High-performance logs (like Kafka or Finagle) use OS-level optimizations to bypass the CPU.
In a traditional write, data moves: Disk → Kernel Buffer → App Buffer → Socket Buffer → NIC.
With Zero-Copy (sendfile system call), the data moves: Disk → Kernel Buffer → NIC.
This reduces context switches and memory bus contention, allowing a single node to saturate a 100Gbps link.
Instead of calling fsync() for every request (which is slow due to disk head movement), the system buffers multiple writes and performs one large synchronous write.
Trade-off: Increases latency for individual requests but massively increases total throughput.
Databases like ScyllaDB or Oracle bypass the OS Page Cache entirely.
Why? The OS cache uses LRU (Least Recently Used), but databases often know better (e.g., “I’m doing a sequential scan, don’t cache this”). Direct I/O allows the database to manage its own “Buffer Pool” with domain-specific knowledge.
┌─────────────────────────────────────────────────────────────────────────────┐│ STREAM PROCESSING │├─────────────────────────────────────────────────────────────────────────────┤│ ││ WINDOWING: ││ ────────── ││ ││ TUMBLING (Fixed, non-overlapping) ││ ├────────┤├────────┤├────────┤ ││ │ 0-10 ││ 10-20 ││ 20-30 │ ││ ├────────┤├────────┤├────────┤ ││ ││ SLIDING (Overlapping) ││ ├──────────┤ ││ ├──────────┤ ││ ├──────────┤ ││ Window every 5 min, size 10 min ││ ││ SESSION (Gap-based) ││ ├──────┤ ├───────────┤ ├───┤ ││ │ User │ │ User │ │ │ ││ │ A │ │ A │ │ A │ ││ └──────┘ └───────────┘ └───┘ ││ gap gap ││ │### Watermarks (Handling Late Data)Watermarks are the bridge between **Event Time** and **Processing Time**. They tell a stream processor that it can safely "close" a time window because no more events from that period are expected.#### Key Concepts:- **Event time**: When the event actually happened (recorded by the sensor/mobile app).- **Processing time**: When the event reached the Flink/Spark/Dataflow cluster.- **Skew**: The difference between event time and processing time (caused by network delays, outages).#### Watermark Heuristics1. **Fixed Delay**: $W = T_{max\_seen} - delay$. (e.g., "I'll wait 5 seconds for late data").2. **Heuristic/Adaptive**: Track the $99^{th}$ percentile of skew and adjust the watermark dynamically.#### Watermark PropagationIn a complex DAG (Directed Acyclic Graph) of operators, a node with multiple inputs must take the **minimum** of its input watermarks:$W_{out} = \min(W_{in1}, W_{in2}, ...)$```text┌───────────────────────────────────────────────────────────────┐│ WATERMARK PROPAGATION │├───────────────────────────────────────────────────────────────┤│ ││ Source 1 (W=10) ───┐ ││ │ ┌───────────────┐ ││ ├─────►│ Join Operator │ ───► W_out=10 ││ Source 2 (W=12) ───┘ │ (Min logic) │ ││ └───────────────┘ ││ ││ WHY MIN? If the Join operator used W=12, it might drop late ││ events from Source 1 (which are only up to 10). ││ │└───────────────────────────────────────────────────────────────┘
When an event arrives with t<W, it is considered Late. Options:
Drop: Discard the event (standard for real-time dashboards).
Side Output: Send to a separate “late-events” stream for manual correction.
Update: Re-calculate the window and emit an updated result (costly).
Copy
---## Advanced: Distributed SQL Internals (Raft-per-Range)Modern NewSQL databases like **CockroachDB**, **TiDB**, and **YugabyteDB** combine the scalability of NoSQL with the transactional guarantees of traditional RDBMS. The key innovation is **Raft-per-Range**.### 1. The Range AbstractionData is divided into **Ranges** (also called **Regions** in TiDB), each ~64-128MB. Each range is a contiguous slice of the keyspace.```text┌─────────────────────────────────────────────────────────────────────────────┐│ RAFT-PER-RANGE ARCHITECTURE │├─────────────────────────────────────────────────────────────────────────────┤│ ││ KEYSPACE: [A──────────────────────────────────────────────────────Z] ││ ││ Range 1: [A───────G] Range 2: [G───────M] Range 3: [M───────Z] ││ ││ Each Range has its OWN Raft Group: ││ ││ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ││ │ Raft Group 1 │ │ Raft Group 2 │ │ Raft Group 3 │ ││ │ L=Node1 │ │ L=Node2 │ │ L=Node3 │ ││ │ F=Node2, Node3 │ │ F=Node3, Node1 │ │ F=Node1, Node2 │ ││ └─────────────────┘ └─────────────────┘ └─────────────────┘ ││ ││ BENEFIT: A transaction touching only Range 1 doesn't wait for Ranges 2/3. ││ │└─────────────────────────────────────────────────────────────────────────────┘
When a transaction touches keys in multiple ranges (e.g., a bank transfer from User A to User B), the database must coordinate across Raft groups. CockroachDB and TiDB use a variant of Google’s Percolator protocol.The Two-Phase Commit (Simplified):
Prewrite: Write “intent” records (locks) to all involved keys across ranges. The first key is the Primary Lock.
Commit: If prewrite succeeds, write a commit record to the primary key’s row. Then, asynchronously resolve all other intents.
Recovery: If a transaction crashes, other transactions can detect stale intents and either roll them forward (if primary is committed) or roll them back.
For global deployments, data placement becomes critical for latency.
Strategy
Description
Use Case
Global Tables
Small, read-mostly tables replicated everywhere.
Config, feature flags.
Regional Tables
Data pinned to a specific region.
GDPR compliance.
Regional by Row
Rows are placed based on a column (e.g., user_country).
Multi-region user data.
Staff Tip: For Staff+ interviews, be ready to discuss how these databases handle follower reads (reading from a non-leader replica for lower latency) and the consistency trade-offs involved (stale reads vs. waiting for leader).
Q: How does Google Spanner achieve external consistency?
TrueTime + Commit Wait:
TrueTime API returns time interval [earliest, latest]
GPS receivers + atomic clocks
Uncertainty typically 1-7ms
Commit protocol:
Transaction gets timestamp T at commit
Wait until TrueTime.after(T) is true
This ensures T is definitely in the past
No other transaction can get earlier timestamp
Result: If T1 commits before T2 starts:
T1’s timestamp < T2’s timestamp
Real-time ordering preserved globally
Trade-off: Commit latency includes wait time (average ~7ms)
Q: Design a distributed cache with consistent hashing
Copy
Architecture:─────────────Client → Load Balancer → Cache NodesComponents:1. Hash Ring: Virtual nodes for balance2. Client library: Consistent hashing logic3. Cache nodes: Memcached/Redis instancesWrite path:1. Client hashes key2. Finds responsible node on ring3. Writes to that node + N-1 replicasRead path:1. Client hashes key2. Reads from first available replicaHandling node failure:1. Detect via heartbeat2. Remove from ring3. Keys automatically route to next node4. Warm cache graduallyAdding node:1. Add to ring with virtual nodes2. Only 1/N keys need to move3. Background migrationKey decisions:- Virtual nodes: 100-200 per physical- Replication factor: 3 typical- Quorum reads: Optional for consistency
Q: How does Kafka provide exactly-once semantics?
Three mechanisms:
Idempotent Producer
Copy
producer = KafkaProducer(enable_idempotence=True)
Broker deduplicates by (producer_id, sequence_number)
Q: Compare Cassandra vs DynamoDB for a new project
Copy
CASSANDRA:✓ Open source, no vendor lock-in✓ More control over deployment✓ Better for very high write throughput✓ CQL (SQL-like) query language✗ Operational complexity✗ Need to manage cluster yourselfDYNAMODB:✓ Fully managed, serverless option✓ Predictable latency (single-digit ms)✓ Auto-scaling built in✓ AWS integration (IAM, Lambda, etc.)✗ Vendor lock-in✗ Can be expensive at scale✗ Less flexible query patternsCHOOSE CASSANDRA:- Need multi-cloud or on-premise- Write-heavy workload (millions/sec)- Complex queries within partition- Budget constrained at large scaleCHOOSE DYNAMODB:- AWS-native architecture- Minimal ops overhead- Predictable performance SLA- Pay-per-request pricing works