This module covers the distributed systems concepts essential for engineers joining teams at Vitess, PlanetScale, CockroachDB, TiDB, Neon, or building distributed database features at any company.
Replication is the foundation of high availability and read scalability in distributed databases. It involves keeping a copy of the same data on multiple machines that are connected via a network.
The CAP theorem states that a distributed data store can only provide two of the following three guarantees: Consistency, Availability, and Partition Tolerance. Since network partitions are inevitable in distributed systems, we must choose between Consistency (CP) and Availability (AP).
How do we decide which node accepts writes and how data flows between nodes? This choice fundamentally impacts the system’s consistency and availability characteristics.
Single Leader: All writes go to one node (Leader). Simple to reason about but the leader is a bottleneck and single point of failure for writes. (e.g., PostgreSQL, MySQL).
Multi-Leader: Writes can be accepted by multiple nodes. Great for multi-region setups but requires complex conflict resolution. (e.g., DynamoDB Global Tables).
Leaderless: Writes are sent to multiple nodes (quorum). High availability but eventual consistency and “read repair” are needed. (e.g., Cassandra, Dynamo).
PostgreSQL uses physical replication, streaming the Write-Ahead Log (WAL) records from the primary to the standby. This is efficient and ensures the standby is an exact physical copy of the primary.
Replication lag is the delay between a write happening on the primary and it being visible on the standby. It is the enemy of read-your-writes consistency.
Copy
-- Measuring replication lag on primarySELECT client_addr, usename, application_name, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replay_lag_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) AS send_lag_bytesFROM pg_stat_replication;-- On standby (PG 10+)SELECT now() - pg_last_xact_replay_timestamp() AS replay_lag, pg_is_in_recovery() AS is_standby, pg_last_wal_receive_lsn() AS last_receive, pg_last_wal_replay_lsn() AS last_replay;-- Lag breakdown/* * Total lag = network delay + disk write time + replay time * * Network delay: Time to send WAL over network * Disk write time: Time to write WAL on standby * Replay time: Time to apply WAL changes * * Common causes of lag: * 1. Large transactions (big INSERTs/DELETEs) * 2. Long-running queries on standby (conflict) * 3. Network congestion * 4. Slow standby disks * 5. Hot standby conflicts (max_standby_streaming_delay) */
When running read-only queries on a standby, conflicts can occur with the incoming replication stream. For example, a query might be reading a row that the primary has just deleted.
Copy
-- Hot Standby conflicts-- On standby, queries can conflict with replay:-- 1. Query needs row that replay is deleting-- 2. Query uses snapshot that replay is advancing-- 3. Replay needs exclusive lock query is holding-- Configurationhot_standby = on -- Enable read queries on standbyhot_standby_feedback = on -- Send xmin to primary (prevents some conflicts)max_standby_streaming_delay = 30s -- Max time to delay replay for queriesmax_standby_archive_delay = 30s -- Same for archive recovery-- When conflict occurs:-- 1. Replay waits up to max_standby_*_delay-- 2. If still conflicting, cancel the query-- 3. Error: "canceling statement due to conflict with recovery"-- Monitoring conflictsSELECT datname, confl_tablespace, confl_lock, confl_snapshot, confl_bufferpin, confl_deadlockFROM pg_stat_database_conflicts;
In distributed systems, we often need multiple nodes to agree on a value (e.g., “who is the leader?” or “what is the next log entry?”). This is the problem of consensus.
Raft is a consensus algorithm designed to be easy to understand. It decomposes the problem into leader election, log replication, and safety.Raft Leader Election Process:
Follower timeout: If follower doesn’t hear from leader, becomes candidate
Request votes: Candidate increments term and requests votes from peers
Vote granting: Followers vote for first candidate in each term
Majority wins: Candidate with majority becomes leader
Heartbeats: Leader sends periodic heartbeats to maintain authority
Raft ensures that if any machine applies a log entry to its state machine, no other machine will ever apply a different command for the same log index.
Copy
/* Raft node state */typedef struct RaftNode { /* Persistent state (on disk) */ uint64_t currentTerm; /* Latest term seen */ NodeId votedFor; /* Candidate voted for in current term */ RaftLog *log; /* Log entries */ /* Volatile state (all nodes) */ uint64_t commitIndex; /* Highest committed entry */ uint64_t lastApplied; /* Highest applied to state machine */ /* Volatile state (leader only) */ uint64_t *nextIndex; /* Next entry to send to each follower */ uint64_t *matchIndex; /* Highest replicated entry per follower */ /* Timing */ Timestamp electionTimeout; Timestamp lastHeartbeat;} RaftNode;/* AppendEntries RPC (heart of Raft) */typedef struct AppendEntriesRequest { uint64_t term; /* Leader's term */ NodeId leaderId; /* For followers to redirect clients */ uint64_t prevLogIndex; /* Index of log entry before new ones */ uint64_t prevLogTerm; /* Term of prevLogIndex entry */ LogEntry *entries; /* Entries to append (empty for heartbeat) */ int numEntries; uint64_t leaderCommit; /* Leader's commit index */} AppendEntriesRequest;typedef struct AppendEntriesResponse { uint64_t term; /* Follower's current term */ bool success; /* True if follower matched prevLogIndex/Term */ uint64_t matchIndex; /* Follower's last matching index (optimization) */} AppendEntriesResponse;/* Leader handles AppendEntries response */void handleAppendEntriesResponse(RaftNode *node, NodeId follower, AppendEntriesResponse *resp) { if (resp->term > node->currentTerm) { /* Stale leader - step down */ becomeFollower(node, resp->term); return; } if (resp->success) { /* Update follower's progress */ node->matchIndex[follower] = resp->matchIndex; node->nextIndex[follower] = resp->matchIndex + 1; /* Check if we can advance commit index */ maybeAdvanceCommitIndex(node); } else { /* Log mismatch - back off and retry */ node->nextIndex[follower] = max(1, node->nextIndex[follower] - 1); /* Could use matchIndex hint for faster catch-up */ }}
While Raft is popular for its clarity, Paxos (specifically Multi-Paxos) is the historical standard. Both achieve the same goal but differ in terminology and flexibility.
Copy
┌─────────────────────────────────────────────────────────────────────────────┐│ PAXOS vs RAFT COMPARISON │├─────────────────────────────────────────────────────────────────────────────┤│ ││ Aspect Paxos Raft ││ ───────────────────────────────────────────────────────────────────────── ││ Leadership Per-decision Stable leader ││ Leader election Dueling proposers Explicit election ││ Log management Gaps allowed No gaps (contiguous) ││ Understandability Complex Designed for clarity ││ Implementation Many variations Single specification ││ ││ Multi-Paxos Phases: ││ 1. Prepare: Proposer gets promise from majority ││ 2. Accept: Proposer sends value, acceptors accept ││ 3. Learn: Value is learned when majority accept ││ ││ Multi-Paxos Optimization: ││ • Skip Prepare phase when leader is stable ││ • Effectively becomes like Raft ││ ││ Used By: ││ • Paxos: Spanner (modified), Chubby ││ • Raft: etcd, CockroachDB, TiKV, Consul ││ │└─────────────────────────────────────────────────────────────────────────────┘
CockroachDB uses Raft not just for the whole cluster, but for each individual “Range” (shard) of data. This allows for fine-grained high availability and consistency.
Copy
// CockroachDB uses Raft per Range (shard)// Each Range (~512MB of data) has its own Raft grouptype Range struct { RangeID RangeID StartKey Key EndKey Key RaftGroup *raft.RawNode LeaseHolder NodeID // Not same as Raft leader}// Range Lease (optimization for reads)// // Raft leader election: ~1-2 seconds// Lease acquisition: Sub-millisecond//// Lease allows fast local reads without Raft consensus// Lease holder is usually the Raft leader but can differ// Write path in CockroachDB:// 1. Client sends write to any node// 2. Node routes to Range lease holder// 3. Lease holder proposes to Raft// 4. Raft replicates to majority// 5. Entry committed and applied// 6. Response to client// Read path (with lease):// 1. Client sends read to any node// 2. Node routes to lease holder// 3. Lease holder checks lease validity// 4. If valid: read directly from local state// 5. If expired: acquire new lease first
There are several ways to determine which shard a particular row belongs to.
Key-Based (Hash): shard_id = hash(key) % num_shards. Even distribution, but resharding is expensive and range queries are inefficient.
Range-Based: shard_id determined by key ranges (e.g., A-M, N-Z). Efficient range queries, but prone to “hot spots” if keys are sequential (e.g., timestamps).
Directory-Based: A lookup service maps keys to shards. Flexible placement, but the lookup service is a bottleneck.
Copy
┌─────────────────────────────────────────────────────────────────────────────┐│ SHARDING STRATEGIES │├─────────────────────────────────────────────────────────────────────────────┤│ ││ 1. Hash Sharding ││ ───────────────────────────────────────────────────────────────────────── ││ shard = hash(sharding_key) % num_shards ││ ││ Pros: ││ • Even data distribution ││ • No hotspots for uniform access ││ Cons: ││ • Range scans hit all shards ││ • Resharding expensive (all data moves) ││ ││ 2. Range Sharding ││ ───────────────────────────────────────────────────────────────────────── ││ shard = find_shard(sharding_key) where shard.start <= key < shard.end ││ ││ Shard 1: [A, M) ││ Shard 2: [M, T) ││ Shard 3: [T, Z] ││ ││ Pros: ││ • Range scans efficient (within shard) ││ • Split/merge shards easily ││ Cons: ││ • Hotspots for sequential keys (auto-increment) ││ • Uneven distribution possible ││ ││ 3. Consistent Hashing ││ ───────────────────────────────────────────────────────────────────────── ││ ┌─────┐ ││ ┌────│ N1 │────┐ ││ │ └─────┘ │ Ring represents hash space ││ ┌────┴────┐ ┌────┴────┐ ││ │ N4 │ │ N2 │ Keys hash to position on ring ││ └────┬────┘ └────┬────┘ Assigned to next node clockwise ││ │ ┌─────┐ │ ││ └────│ N3 │────┘ ││ └─────┘ ││ ││ Used by: DynamoDB, Cassandra, Riak ││ Adding node: Only neighbors affected ││ Virtual nodes: Improve balance ││ │└─────────────────────────────────────────────────────────────────────────────┘
Resharding is the process of changing the number of shards, usually splitting one shard into two as data grows. Vitess handles this online with minimal downtime using VReplication.
Copy
┌─────────────────────────────────────────────────────────────────────────────┐│ RESHARDING PROCESS (VITESS) │├─────────────────────────────────────────────────────────────────────────────┤│ ││ Before: 2 shards After: 4 shards ││ ┌─────────────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ││ │ Shard -80 │ ──► │ Shard -40 │ │ Shard 40-80 │ ││ └─────────────────────────┘ └─────────────────┘ └─────────────────┘ ││ ┌─────────────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ││ │ Shard 80- │ ──► │ Shard 80-c0 │ │ Shard c0- │ ││ └─────────────────────────┘ └─────────────────┘ └─────────────────┘ ││ ││ Resharding Steps: ││ ───────────────────────────────────────────────────────────────────────── ││ ││ 1. Create Target Shards ││ • Provision new MySQL instances ││ • Initialize with schema ││ • Set up replication ││ ││ 2. VReplication (copy phase) ││ • Stream data from source to target ││ • Filter rows by new keyspace ID ││ • Maintain consistent snapshot ││ ││ 3. VDiff (verification) ││ • Compare source and target data ││ • Ensure consistency before cutover ││ ││ 4. Cutover (atomic switch) ││ • Stop writes briefly ││ • Switch serving to new shards ││ • Redirect VTGate routing ││ • Resume writes to new shards ││ ││ 5. Cleanup ││ • Remove old shard data ││ • Decommission old tablets ││ │└─────────────────────────────────────────────────────────────────────────────┘
Google Spanner achieves external consistency (linearizability) at a global scale using TrueTime, which exposes time as an interval of uncertainty.
TrueTime API: Returns [earliest, latest]. The actual time is guaranteed to be within this interval.
Commit Wait: A transaction Ti waits until TT.now().earliest > Ti.commit_timestamp before reporting success. This ensures that if T2 starts after T1 finishes, T2 will definitely see T1’s effects.
Copy
┌─────────────────────────────────────────────────────────────────────────────┐│ TRUETIME AND SPANNER │├─────────────────────────────────────────────────────────────────────────────┤│ ││ TrueTime API: ││ ───────────────────────────────────────────────────────────────────────── ││ ││ TT.now() → TTinterval { earliest, latest } ││ TT.after(t) → True if t has definitely passed ││ TT.before(t) → True if t has definitely not arrived ││ ││ Uncertainty bound (ε): ││ • Typical: 1-7 ms ││ • Maintained by GPS + atomic clocks ││ ││ Commit Wait: ││ ───────────────────────────────────────────────────────────────────────── ││ ││ Transaction T1 commits: ││ 1. Acquire locks ││ 2. Get commit timestamp s = TT.now().latest ││ 3. Wait until TT.after(s) is true ││ 4. Release locks ││ ││ Why wait? ││ • Ensures T1's timestamp is in the past at all nodes ││ • Any later transaction T2 will have s2 > s1 ││ • External consistency: If T1 commits before T2 starts, s1 < s2 ││ ││ Timeline: ││ ───────────────────────────────────────────────────────────────────────── ││ ││ T1: [─────acquire locks─────][wait ε][commit] ││ T2: [start]────────► ││ │ ││ T2 sees T1's timestamp ││ is definitely in past ││ │└─────────────────────────────────────────────────────────────────────────────┘
CockroachDB cannot rely on atomic clocks like Spanner. Instead, it uses Hybrid Logical Clocks (HLC) to provide causality tracking and loose time synchronization.
Physical Component: Wall clock time (NTP).
Logical Component: A counter to order events that happen within the same physical tick or when clocks move backwards.
Result: Provides “causal consistency” and allows for efficient snapshot isolation.
Vitess supports different transaction modes depending on the consistency requirements.
Copy
-- Vitess transaction modes-- Single-shard transactions (efficient)BEGIN;UPDATE orders SET status = 'shipped' WHERE user_id = 123;UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 456;COMMIT;-- If both tables sharded by user_id and same shard → single MySQL transaction-- Cross-shard transactions (2PC)SET transaction_mode = 'multi';BEGIN;UPDATE orders SET status = 'shipped' WHERE user_id = 123;UPDATE orders SET status = 'shipped' WHERE user_id = 456; -- Different shard!COMMIT;/* * Vitess 2PC implementation: * * 1. PREPARE phase: * - VTGate sends PREPARE to all participating shards * - Each shard prepares locally * * 2. COMMIT phase: * - VTGate logs commit decision to "redo log" shard * - Sends COMMIT to all shards * - Redo log entry deleted after all ACKs * * 3. Recovery (if VTGate crashes): * - Scan redo log for incomplete transactions * - Resume COMMIT or ROLLBACK based on logged state */-- Best effort transactions (performance over consistency)SET transaction_mode = 'single';BEGIN;UPDATE orders SET status = 'shipped' WHERE user_id = 123;UPDATE orders SET status = 'shipped' WHERE user_id = 456;COMMIT;-- Each shard commits independently - no atomicity guarantee!-- Use when application can tolerate partial failures
Since transactions are distributed, conflicts are resolved using timestamps and priorities.
Copy
// CockroachDB conflict resolution// Scenario: T2 tries to read key that T1 has written intent for// Option 1: Wait for T1if t1.status == PENDING { // T2 waits for T1 to commit or abort // Uses distributed wait queue}// Option 2: Push T1's timestampif t2.timestamp < t1.timestamp { // T2 can read the previous value // No conflict if T2 reads at ts before T1's intent}if t2.timestamp >= t1.timestamp { // T2 "pushes" T1 to a higher timestamp // T1 will need to restart at new timestamp // Unless T1 has already committed}// Option 3: T2 aborts// If T1 has higher priority and T2 can't wait/* * Transaction priorities (for conflict resolution): * * - Default: Normal * - SET TRANSACTION PRIORITY HIGH * - SET TRANSACTION PRIORITY LOW * * Higher priority transactions win conflicts * Prevents starvation of long transactions */