Skip to main content
Distributed Database Systems Concept

Distributed Database Systems

This module covers the distributed systems concepts essential for engineers joining teams at Vitess, PlanetScale, CockroachDB, TiDB, Neon, or building distributed database features at any company.
Target Audience: Distributed database engineers
Prerequisites: Storage Engine, Performance Engineering modules
Key Comparisons: PostgreSQL, Vitess, CockroachDB, Spanner, TiDB
Interview Relevance: Staff+ distributed systems roles

Part 1: Replication Fundamentals

Replication is the foundation of high availability and read scalability in distributed databases. It involves keeping a copy of the same data on multiple machines that are connected via a network.

1.0 The CAP Theorem

The CAP theorem states that a distributed data store can only provide two of the following three guarantees: Consistency, Availability, and Partition Tolerance. Since network partitions are inevitable in distributed systems, we must choose between Consistency (CP) and Availability (AP). CAP Theorem

1.1 Replication Topologies

How do we decide which node accepts writes and how data flows between nodes? This choice fundamentally impacts the system’s consistency and availability characteristics. Replication Topologies
  • Single Leader: All writes go to one node (Leader). Simple to reason about but the leader is a bottleneck and single point of failure for writes. (e.g., PostgreSQL, MySQL).
  • Multi-Leader: Writes can be accepted by multiple nodes. Great for multi-region setups but requires complex conflict resolution. (e.g., DynamoDB Global Tables).
  • Leaderless: Writes are sent to multiple nodes (quorum). High availability but eventual consistency and “read repair” are needed. (e.g., Cassandra, Dynamo).
┌─────────────────────────────────────────────────────────────────────────────┐
│                      REPLICATION TOPOLOGIES                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Single-Leader (Primary-Replica)                                           │
│   ─────────────────────────────────────────────────────────────────────────  │
│   ┌──────────┐       ┌──────────┐                                           │
│   │  Primary │──────→│ Replica  │                                           │
│   │  (R/W)   │       │  (R)     │                                           │
│   └──────────┘       └──────────┘                                           │
│        │             ┌──────────┐                                           │
│        └────────────→│ Replica  │                                           │
│                      │  (R)     │                                           │
│                      └──────────┘                                           │
│   • PostgreSQL streaming replication                                        │
│   • MySQL replication                                                       │
│   • Simple, but primary is SPOF                                             │
│                                                                              │
│   Multi-Leader                                                               │
│   ─────────────────────────────────────────────────────────────────────────  │
│   ┌──────────┐←─────→┌──────────┐                                           │
│   │ Leader A │       │ Leader B │                                           │
│   │  (R/W)   │       │  (R/W)   │                                           │
│   └──────────┘       └──────────┘                                           │
│   • BDR (Bi-Directional Replication)                                        │
│   • Conflict resolution required                                            │
│   • Used for multi-datacenter                                               │
│                                                                              │
│   Leaderless (Dynamo-style)                                                  │
│   ─────────────────────────────────────────────────────────────────────────  │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐                                  │
│   │  Node A  │  │  Node B  │  │  Node C  │                                  │
│   │  (R/W)   │  │  (R/W)   │  │  (R/W)   │                                  │
│   └──────────┘  └──────────┘  └──────────┘                                  │
│   • Cassandra, DynamoDB, Riak                                               │
│   • Quorum reads/writes: R + W > N                                          │
│   • Conflict resolution via vector clocks / LWW                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 PostgreSQL Streaming Replication

PostgreSQL uses physical replication, streaming the Write-Ahead Log (WAL) records from the primary to the standby. This is efficient and ensures the standby is an exact physical copy of the primary. Streaming Replication
┌─────────────────────────────────────────────────────────────────────────────┐
│                      STREAMING REPLICATION                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Primary                          Standby                                   │
│   ┌─────────────────────────┐     ┌─────────────────────────────┐          │
│   │                         │     │                              │          │
│   │  Backend ──► WAL Buffer │────►│  WAL Receiver ──► WAL Files │          │
│   │                ↓        │     │                      ↓       │          │
│   │           WAL Files     │     │               Startup Process│          │
│   │                │        │     │                 (applies WAL)│          │
│   │                ↓        │     │                      ↓       │          │
│   │           WAL Sender ────────►│              Shared Buffers  │          │
│   │                         │     │                              │          │
│   └─────────────────────────┘     └─────────────────────────────┘          │
│                                                                              │
│   Synchronous Replication:                                                   │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   synchronous_commit = 'remote_apply'                                       │
│                                                                              │
│   Client ──► Primary: COMMIT                                                │
│              Primary ──► WAL write                                          │
│              Primary ──► Send to Standby                                    │
│              Standby ──► Receive WAL                                        │
│              Standby ──► Apply WAL (replay)                                 │
│              Standby ──► ACK to Primary                                     │
│              Primary ──► ACK to Client                                      │
│                                                                              │
│   Modes:                                                                     │
│   • off: Don't wait (async)                                                 │
│   • local: Wait for local WAL flush                                         │
│   • remote_write: Wait for standby to receive (not flush)                   │
│   • on/remote_flush: Wait for standby WAL flush                             │
│   • remote_apply: Wait for standby to apply (strongest)                     │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1.3 Replication Lag

Replication lag is the delay between a write happening on the primary and it being visible on the standby. It is the enemy of read-your-writes consistency.
-- Measuring replication lag on primary
SELECT 
    client_addr,
    usename,
    application_name,
    state,
    sent_lsn,
    write_lsn,
    flush_lsn,
    replay_lsn,
    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replay_lag_bytes,
    pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) AS send_lag_bytes
FROM pg_stat_replication;

-- On standby (PG 10+)
SELECT 
    now() - pg_last_xact_replay_timestamp() AS replay_lag,
    pg_is_in_recovery() AS is_standby,
    pg_last_wal_receive_lsn() AS last_receive,
    pg_last_wal_replay_lsn() AS last_replay;

-- Lag breakdown
/*
 * Total lag = network delay + disk write time + replay time
 *
 * Network delay: Time to send WAL over network
 * Disk write time: Time to write WAL on standby
 * Replay time: Time to apply WAL changes
 *
 * Common causes of lag:
 * 1. Large transactions (big INSERTs/DELETEs)
 * 2. Long-running queries on standby (conflict)
 * 3. Network congestion
 * 4. Slow standby disks
 * 5. Hot standby conflicts (max_standby_streaming_delay)
 */

1.4 Conflict Handling

When running read-only queries on a standby, conflicts can occur with the incoming replication stream. For example, a query might be reading a row that the primary has just deleted.
-- Hot Standby conflicts

-- On standby, queries can conflict with replay:
-- 1. Query needs row that replay is deleting
-- 2. Query uses snapshot that replay is advancing
-- 3. Replay needs exclusive lock query is holding

-- Configuration
hot_standby = on                       -- Enable read queries on standby
hot_standby_feedback = on              -- Send xmin to primary (prevents some conflicts)
max_standby_streaming_delay = 30s      -- Max time to delay replay for queries
max_standby_archive_delay = 30s        -- Same for archive recovery

-- When conflict occurs:
-- 1. Replay waits up to max_standby_*_delay
-- 2. If still conflicting, cancel the query
-- 3. Error: "canceling statement due to conflict with recovery"

-- Monitoring conflicts
SELECT 
    datname,
    confl_tablespace,
    confl_lock,
    confl_snapshot,
    confl_bufferpin,
    confl_deadlock
FROM pg_stat_database_conflicts;

Part 2: Consensus Algorithms

In distributed systems, we often need multiple nodes to agree on a value (e.g., “who is the leader?” or “what is the next log entry?”). This is the problem of consensus.

2.1 Raft Fundamentals

Raft is a consensus algorithm designed to be easy to understand. It decomposes the problem into leader election, log replication, and safety. Raft Consensus Flow Raft Leader Election Process:
  1. Follower timeout: If follower doesn’t hear from leader, becomes candidate
  2. Request votes: Candidate increments term and requests votes from peers
  3. Vote granting: Followers vote for first candidate in each term
  4. Majority wins: Candidate with majority becomes leader
  5. Heartbeats: Leader sends periodic heartbeats to maintain authority

2.2 Raft Implementation Details

Raft ensures that if any machine applies a log entry to its state machine, no other machine will ever apply a different command for the same log index.
/* Raft node state */
typedef struct RaftNode {
    /* Persistent state (on disk) */
    uint64_t currentTerm;      /* Latest term seen */
    NodeId votedFor;           /* Candidate voted for in current term */
    RaftLog *log;              /* Log entries */
    
    /* Volatile state (all nodes) */
    uint64_t commitIndex;      /* Highest committed entry */
    uint64_t lastApplied;      /* Highest applied to state machine */
    
    /* Volatile state (leader only) */
    uint64_t *nextIndex;       /* Next entry to send to each follower */
    uint64_t *matchIndex;      /* Highest replicated entry per follower */
    
    /* Timing */
    Timestamp electionTimeout;
    Timestamp lastHeartbeat;
} RaftNode;

/* AppendEntries RPC (heart of Raft) */
typedef struct AppendEntriesRequest {
    uint64_t term;             /* Leader's term */
    NodeId leaderId;           /* For followers to redirect clients */
    uint64_t prevLogIndex;     /* Index of log entry before new ones */
    uint64_t prevLogTerm;      /* Term of prevLogIndex entry */
    LogEntry *entries;         /* Entries to append (empty for heartbeat) */
    int numEntries;
    uint64_t leaderCommit;     /* Leader's commit index */
} AppendEntriesRequest;

typedef struct AppendEntriesResponse {
    uint64_t term;             /* Follower's current term */
    bool success;              /* True if follower matched prevLogIndex/Term */
    uint64_t matchIndex;       /* Follower's last matching index (optimization) */
} AppendEntriesResponse;

/* Leader handles AppendEntries response */
void handleAppendEntriesResponse(RaftNode *node, NodeId follower, 
                                  AppendEntriesResponse *resp) {
    if (resp->term > node->currentTerm) {
        /* Stale leader - step down */
        becomeFollower(node, resp->term);
        return;
    }
    
    if (resp->success) {
        /* Update follower's progress */
        node->matchIndex[follower] = resp->matchIndex;
        node->nextIndex[follower] = resp->matchIndex + 1;
        
        /* Check if we can advance commit index */
        maybeAdvanceCommitIndex(node);
    } else {
        /* Log mismatch - back off and retry */
        node->nextIndex[follower] = max(1, node->nextIndex[follower] - 1);
        /* Could use matchIndex hint for faster catch-up */
    }
}

2.3 Multi-Paxos vs Raft

While Raft is popular for its clarity, Paxos (specifically Multi-Paxos) is the historical standard. Both achieve the same goal but differ in terminology and flexibility.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      PAXOS vs RAFT COMPARISON                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Aspect              Paxos                   Raft                           │
│   ─────────────────────────────────────────────────────────────────────────  │
│   Leadership          Per-decision            Stable leader                  │
│   Leader election     Dueling proposers       Explicit election              │
│   Log management      Gaps allowed            No gaps (contiguous)           │
│   Understandability   Complex                 Designed for clarity           │
│   Implementation      Many variations         Single specification           │
│                                                                              │
│   Multi-Paxos Phases:                                                        │
│   1. Prepare: Proposer gets promise from majority                           │
│   2. Accept: Proposer sends value, acceptors accept                         │
│   3. Learn: Value is learned when majority accept                           │
│                                                                              │
│   Multi-Paxos Optimization:                                                  │
│   • Skip Prepare phase when leader is stable                                │
│   • Effectively becomes like Raft                                           │
│                                                                              │
│   Used By:                                                                   │
│   • Paxos: Spanner (modified), Chubby                                       │
│   • Raft: etcd, CockroachDB, TiKV, Consul                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

2.4 Raft in CockroachDB

CockroachDB uses Raft not just for the whole cluster, but for each individual “Range” (shard) of data. This allows for fine-grained high availability and consistency.
// CockroachDB uses Raft per Range (shard)

// Each Range (~512MB of data) has its own Raft group
type Range struct {
    RangeID        RangeID
    StartKey       Key
    EndKey         Key
    RaftGroup      *raft.RawNode
    LeaseHolder    NodeID  // Not same as Raft leader
}

// Range Lease (optimization for reads)
// 
// Raft leader election: ~1-2 seconds
// Lease acquisition: Sub-millisecond
//
// Lease allows fast local reads without Raft consensus
// Lease holder is usually the Raft leader but can differ

// Write path in CockroachDB:
// 1. Client sends write to any node
// 2. Node routes to Range lease holder
// 3. Lease holder proposes to Raft
// 4. Raft replicates to majority
// 5. Entry committed and applied
// 6. Response to client

// Read path (with lease):
// 1. Client sends read to any node
// 2. Node routes to lease holder
// 3. Lease holder checks lease validity
// 4. If valid: read directly from local state
// 5. If expired: acquire new lease first

Part 3: Sharding Strategies

When a dataset becomes too large for a single node, we must split it across multiple nodes. This process is called sharding (or partitioning).

3.1 Sharding Approaches

There are several ways to determine which shard a particular row belongs to. Sharding Strategies
  • Key-Based (Hash): shard_id = hash(key) % num_shards. Even distribution, but resharding is expensive and range queries are inefficient.
  • Range-Based: shard_id determined by key ranges (e.g., A-M, N-Z). Efficient range queries, but prone to “hot spots” if keys are sequential (e.g., timestamps).
  • Directory-Based: A lookup service maps keys to shards. Flexible placement, but the lookup service is a bottleneck.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      SHARDING STRATEGIES                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. Hash Sharding                                                           │
│   ─────────────────────────────────────────────────────────────────────────  │
│   shard = hash(sharding_key) % num_shards                                   │
│                                                                              │
│   Pros:                                                                      │
│   • Even data distribution                                                   │
│   • No hotspots for uniform access                                          │
│   Cons:                                                                      │
│   • Range scans hit all shards                                              │
│   • Resharding expensive (all data moves)                                   │
│                                                                              │
│   2. Range Sharding                                                          │
│   ─────────────────────────────────────────────────────────────────────────  │
│   shard = find_shard(sharding_key) where shard.start <= key < shard.end     │
│                                                                              │
│   Shard 1: [A, M)                                                           │
│   Shard 2: [M, T)                                                           │
│   Shard 3: [T, Z]                                                           │
│                                                                              │
│   Pros:                                                                      │
│   • Range scans efficient (within shard)                                    │
│   • Split/merge shards easily                                               │
│   Cons:                                                                      │
│   • Hotspots for sequential keys (auto-increment)                           │
│   • Uneven distribution possible                                            │
│                                                                              │
│   3. Consistent Hashing                                                      │
│   ─────────────────────────────────────────────────────────────────────────  │
│                ┌─────┐                                                       │
│           ┌────│ N1  │────┐                                                  │
│           │    └─────┘    │     Ring represents hash space                  │
│      ┌────┴────┐    ┌────┴────┐                                             │
│      │   N4    │    │   N2    │  Keys hash to position on ring              │
│      └────┬────┘    └────┬────┘  Assigned to next node clockwise            │
│           │    ┌─────┐    │                                                  │
│           └────│ N3  │────┘                                                  │
│                └─────┘                                                       │
│                                                                              │
│   Used by: DynamoDB, Cassandra, Riak                                        │
│   Adding node: Only neighbors affected                                       │
│   Virtual nodes: Improve balance                                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

3.2 Vitess Architecture

Vitess is a database clustering system for horizontal scaling of MySQL. It abstracts the sharding complexity from the application. Vitess Architecture
  • VTGate: A stateless proxy that routes queries to the correct shard(s). It speaks the MySQL protocol.
  • VTTablet: A sidecar process that runs alongside each MySQL instance, managing it and handling replication.
  • Topology Service: Stores the cluster configuration (keyspaces, shards) in a consistent store like etcd or ZooKeeper.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      VITESS ARCHITECTURE                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Application                                                                │
│        │                                                                     │
│        ▼                                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                           VTGate                                     │   │
│   │   • Query routing and planning                                       │   │
│   │   • Scatter-gather for cross-shard queries                          │   │
│   │   • Connection pooling                                               │   │
│   │   • Result merging                                                   │   │
│   └───────────────────────────┬─────────────────────────────────────────┘   │
│                               │                                              │
│            ┌──────────────────┼──────────────────┐                          │
│            ▼                  ▼                  ▼                          │
│   ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐              │
│   │    VTTablet     │ │    VTTablet     │ │    VTTablet     │              │
│   │  (Shard -80)    │ │ (Shard 80-c0)   │ │ (Shard c0-)     │              │
│   │                 │ │                 │ │                 │              │
│   │   ┌─────────┐   │ │   ┌─────────┐   │ │   ┌─────────┐   │              │
│   │   │  MySQL  │   │ │   │  MySQL  │   │ │   │  MySQL  │   │              │
│   │   │ Primary │   │ │   │ Primary │   │ │   │ Primary │   │              │
│   │   └─────────┘   │ │   └─────────┘   │ │   └─────────┘   │              │
│   │   ┌─────────┐   │ │   ┌─────────┐   │ │   ┌─────────┐   │              │
│   │   │ Replica │   │ │   │ Replica │   │ │   │ Replica │   │              │
│   │   └─────────┘   │ │   └─────────┘   │ │   └─────────┘   │              │
│   └─────────────────┘ └─────────────────┘ └─────────────────┘              │
│                                                                              │
│   Topology Service (etcd/Consul/ZooKeeper):                                 │
│   • Shard mapping                                                           │
│   • VTTablet health                                                         │
│   • Schema tracking                                                         │
│                                                                              │
│   VSchema (Virtual Schema):                                                  │
│   • Defines sharding keys per table                                         │
│   • Vindexes for key-to-shard mapping                                       │
│   • Sequence tables for auto-increment                                      │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

3.3 Vindex (Vitess Index)

A Vindex is a mapping that tells Vitess how to route a query based on a column value. It’s essentially the sharding key definition.
-- VSchema defining sharding

{
  "sharded": true,
  "vindexes": {
    "user_hash": {
      "type": "hash"
    },
    "order_user_idx": {
      "type": "lookup_hash",
      "params": {
        "table": "user_order_lookup",
        "from": "order_id",
        "to": "user_id"
      }
    }
  },
  "tables": {
    "users": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "user_hash"
        }
      ]
    },
    "orders": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "user_hash"
        },
        {
          "column": "order_id",
          "name": "order_user_idx"
        }
      ]
    }
  }
}

-- Query routing examples:

-- Single shard (efficient):
SELECT * FROM orders WHERE user_id = 12345;
-- VTGate: hash(12345) → shard 80-c0 → route to one tablet

-- Cross-shard scatter-gather:
SELECT * FROM orders WHERE order_id = 67890;
-- VTGate: lookup order_id → get user_id → then single shard
-- OR if no lookup: scatter to ALL shards, gather results

-- Join across shards:
SELECT u.name, o.total 
FROM users u JOIN orders o ON u.user_id = o.user_id
WHERE u.user_id = 12345;
-- VTGate: Both tables sharded same way → single shard

SELECT u.name, p.name
FROM users u JOIN products p ON u.favorite_product = p.product_id;
-- VTGate: Different sharding keys → scatter-gather or nested loop

3.4 Resharding

Resharding is the process of changing the number of shards, usually splitting one shard into two as data grows. Vitess handles this online with minimal downtime using VReplication. Resharding Process
┌─────────────────────────────────────────────────────────────────────────────┐
│                      RESHARDING PROCESS (VITESS)                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Before: 2 shards                  After: 4 shards                         │
│   ┌─────────────────────────┐       ┌─────────────────┐ ┌─────────────────┐ │
│   │     Shard -80           │   ──► │   Shard -40     │ │   Shard 40-80   │ │
│   └─────────────────────────┘       └─────────────────┘ └─────────────────┘ │
│   ┌─────────────────────────┐       ┌─────────────────┐ ┌─────────────────┐ │
│   │     Shard 80-           │   ──► │   Shard 80-c0   │ │   Shard c0-     │ │
│   └─────────────────────────┘       └─────────────────┘ └─────────────────┘ │
│                                                                              │
│   Resharding Steps:                                                          │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   1. Create Target Shards                                                    │
│      • Provision new MySQL instances                                         │
│      • Initialize with schema                                                │
│      • Set up replication                                                    │
│                                                                              │
│   2. VReplication (copy phase)                                               │
│      • Stream data from source to target                                     │
│      • Filter rows by new keyspace ID                                        │
│      • Maintain consistent snapshot                                          │
│                                                                              │
│   3. VDiff (verification)                                                    │
│      • Compare source and target data                                        │
│      • Ensure consistency before cutover                                     │
│                                                                              │
│   4. Cutover (atomic switch)                                                 │
│      • Stop writes briefly                                                   │
│      • Switch serving to new shards                                          │
│      • Redirect VTGate routing                                               │
│      • Resume writes to new shards                                           │
│                                                                              │
│   5. Cleanup                                                                 │
│      • Remove old shard data                                                 │
│      • Decommission old tablets                                              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Part 4: Distributed Transactions

When a transaction spans multiple shards, we need a protocol to ensure atomicity: either all shards commit, or none do.

4.1 Two-Phase Commit (2PC)

The standard protocol for distributed atomic commit. It involves a Coordinator and multiple Participants. Two-Phase Commit
  1. Prepare Phase: Coordinator asks all participants “Can you commit?”. Participants lock resources and persist their vote.
  2. Commit Phase: If all say “Yes”, Coordinator tells everyone to “Commit”. If any say “No” (or timeout), Coordinator tells everyone to “Abort”.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      TWO-PHASE COMMIT                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Coordinator                 Participant A         Participant B            │
│       │                            │                      │                  │
│   ════╪════════════════════════════╪══════════════════════╪════════════════  │
│       │   PREPARE Phase            │                      │                  │
│       │──────────────────────────► │                      │                  │
│       │      PREPARE               │                      │                  │
│       │──────────────────────────────────────────────────►│                  │
│       │                            │      PREPARE         │                  │
│       │                            │                      │                  │
│       │ ◄──────────────────────────│                      │                  │
│       │      VOTE_COMMIT           │                      │                  │
│       │ ◄─────────────────────────────────────────────────│                  │
│       │                            │      VOTE_COMMIT     │                  │
│   ════╪════════════════════════════╪══════════════════════╪════════════════  │
│       │   COMMIT Phase             │                      │                  │
│       │──────────────────────────► │                      │                  │
│       │      COMMIT                │                      │                  │
│       │──────────────────────────────────────────────────►│                  │
│       │                            │      COMMIT          │                  │
│       │ ◄──────────────────────────│                      │                  │
│       │      ACK                   │                      │                  │
│       │ ◄─────────────────────────────────────────────────│                  │
│       │                            │      ACK             │                  │
│       │                                                                      │
│                                                                              │
│   Problems with 2PC:                                                         │
│   • Blocking: If coordinator fails after PREPARE, participants wait         │
│   • Latency: 2 round trips minimum                                          │
│   • Single point of failure (coordinator)                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.2 Spanner’s TrueTime

Google Spanner achieves external consistency (linearizability) at a global scale using TrueTime, which exposes time as an interval of uncertainty. TrueTime and Spanner
  • TrueTime API: Returns [earliest, latest]. The actual time is guaranteed to be within this interval.
  • Commit Wait: A transaction Ti waits until TT.now().earliest > Ti.commit_timestamp before reporting success. This ensures that if T2 starts after T1 finishes, T2 will definitely see T1’s effects.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      TRUETIME AND SPANNER                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   TrueTime API:                                                              │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   TT.now()     → TTinterval { earliest, latest }                            │
│   TT.after(t)  → True if t has definitely passed                            │
│   TT.before(t) → True if t has definitely not arrived                       │
│                                                                              │
│   Uncertainty bound (ε):                                                     │
│   • Typical: 1-7 ms                                                         │
│   • Maintained by GPS + atomic clocks                                       │
│                                                                              │
│   Commit Wait:                                                               │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   Transaction T1 commits:                                                    │
│   1. Acquire locks                                                           │
│   2. Get commit timestamp s = TT.now().latest                               │
│   3. Wait until TT.after(s) is true                                         │
│   4. Release locks                                                           │
│                                                                              │
│   Why wait?                                                                  │
│   • Ensures T1's timestamp is in the past at all nodes                      │
│   • Any later transaction T2 will have s2 > s1                              │
│   • External consistency: If T1 commits before T2 starts, s1 < s2           │
│                                                                              │
│   Timeline:                                                                  │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   T1: [─────acquire locks─────][wait ε][commit]                             │
│   T2:                              [start]────────►                          │
│                                    │                                         │
│                            T2 sees T1's timestamp                            │
│                            is definitely in past                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.3 CockroachDB’s Hybrid-Logical Clocks

CockroachDB cannot rely on atomic clocks like Spanner. Instead, it uses Hybrid Logical Clocks (HLC) to provide causality tracking and loose time synchronization. Hybrid-Logical Clocks
  • Physical Component: Wall clock time (NTP).
  • Logical Component: A counter to order events that happen within the same physical tick or when clocks move backwards.
  • Result: Provides “causal consistency” and allows for efficient snapshot isolation.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      HYBRID-LOGICAL CLOCKS (HLC)                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   HLC = (physical_time, logical_counter)                                    │
│                                                                              │
│   Properties:                                                                │
│   • Tracks causality like Lamport clocks                                    │
│   • Stays close to real time (unlike Lamport)                               │
│   • No special hardware needed (unlike TrueTime)                            │
│                                                                              │
│   Algorithm:                                                                 │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   On local event or send:                                                    │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │ pt = physical_time()                                                │    │
│   │ if pt > hlc.physical:                                               │    │
│   │     hlc.physical = pt                                               │    │
│   │     hlc.logical = 0                                                 │    │
│   │ else:                                                               │    │
│   │     hlc.logical++                                                   │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│   On receive message with timestamp msg_hlc:                                │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │ pt = physical_time()                                                │    │
│   │ if pt > max(hlc.physical, msg_hlc.physical):                       │    │
│   │     hlc.physical = pt                                               │    │
│   │     hlc.logical = 0                                                 │    │
│   │ elif msg_hlc.physical > hlc.physical:                              │    │
│   │     hlc.physical = msg_hlc.physical                                │    │
│   │     hlc.logical = msg_hlc.logical + 1                              │    │
│   │ elif hlc.physical > msg_hlc.physical:                              │    │
│   │     hlc.logical++                                                   │    │
│   │ else: // equal physical                                            │    │
│   │     hlc.logical = max(hlc.logical, msg_hlc.logical) + 1            │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│   CockroachDB's Uncertainty Window:                                          │
│   • Instead of commit wait, readers handle uncertainty                      │
│   • If read encounters write in uncertainty window → restart               │
│   • Max clock offset tracked cluster-wide                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.4 Distributed Transactions in Vitess

Vitess supports different transaction modes depending on the consistency requirements.
-- Vitess transaction modes

-- Single-shard transactions (efficient)
BEGIN;
UPDATE orders SET status = 'shipped' WHERE user_id = 123;
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 456;
COMMIT;
-- If both tables sharded by user_id and same shard → single MySQL transaction

-- Cross-shard transactions (2PC)
SET transaction_mode = 'multi';
BEGIN;
UPDATE orders SET status = 'shipped' WHERE user_id = 123;
UPDATE orders SET status = 'shipped' WHERE user_id = 456;  -- Different shard!
COMMIT;

/*
 * Vitess 2PC implementation:
 *
 * 1. PREPARE phase:
 *    - VTGate sends PREPARE to all participating shards
 *    - Each shard prepares locally
 *
 * 2. COMMIT phase:
 *    - VTGate logs commit decision to "redo log" shard
 *    - Sends COMMIT to all shards
 *    - Redo log entry deleted after all ACKs
 *
 * 3. Recovery (if VTGate crashes):
 *    - Scan redo log for incomplete transactions
 *    - Resume COMMIT or ROLLBACK based on logged state
 */

-- Best effort transactions (performance over consistency)
SET transaction_mode = 'single';
BEGIN;
UPDATE orders SET status = 'shipped' WHERE user_id = 123;
UPDATE orders SET status = 'shipped' WHERE user_id = 456;
COMMIT;
-- Each shard commits independently - no atomicity guarantee!
-- Use when application can tolerate partial failures

Part 5: CockroachDB Deep Dive

CockroachDB is a distributed SQL database built on top of a key-value store (RocksDB) using Raft for consensus.

5.1 Range Architecture

Data is divided into contiguous chunks called Ranges. Each Range is a Raft group. CockroachDB Range Architecture

5.2 CockroachDB Transaction Flow

CockroachDB uses a decentralized transaction model with “Write Intents”.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      COCKROACHDB TRANSACTION FLOW                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. BEGIN TRANSACTION                                                       │
│      • Assign provisional commit timestamp                                   │
│      • No special messages needed                                            │
│                                                                              │
│   2. WRITE (e.g., UPDATE)                                                    │
│      Gateway                                                                 │
│         │                                                                    │
│         ▼ Route to range leaseholder                                        │
│      Leaseholder                                                             │
│         │                                                                    │
│         ├─► Write intent: (key, value, txn_id, timestamp)                   │
│         │   Intent = uncommitted write, visible to owner txn only           │
│         │                                                                    │
│         └─► Replicate via Raft                                              │
│                                                                              │
│   3. READ (in same transaction)                                              │
│      • Reader checks for intents                                             │
│      • Own intents: read the value                                          │
│      • Other intents: wait or push the transaction                          │
│                                                                              │
│   4. COMMIT                                                                  │
│      Gateway                                                                 │
│         │                                                                    │
│         ▼ Write transaction record                                          │
│      Transaction Record Range                                               │
│         │                                                                    │
│         ├─► Status: COMMITTED                                               │
│         └─► Timestamp: final commit timestamp                               │
│                                                                              │
│      Then (async or sync):                                                   │
│         ├─► Resolve intents → convert to committed values                   │
│         └─► Clean up transaction record                                     │
│                                                                              │
│   Intent Example:                                                            │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   Key: /Table/users/123/name                                                │
│   ┌────────────────────────────────────────────────────────────────┐        │
│   │ Timestamp │ Type      │ Value       │ TxnID                    │        │
│   ├────────────────────────────────────────────────────────────────┤        │
│   │ ts=50     │ Value     │ "Alice"     │ (none - committed)       │        │
│   │ ts=100    │ Intent    │ "Alicia"    │ txn-abc-123              │        │
│   └────────────────────────────────────────────────────────────────┘        │
│                                                                              │
│   • Readers at ts=75 see "Alice"                                            │
│   • Readers at ts=100+ wait for intent resolution                           │
│   • If txn commits: intent becomes value                                    │
│   • If txn aborts: intent deleted                                           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

5.3 Read-Write Conflicts

Since transactions are distributed, conflicts are resolved using timestamps and priorities.
// CockroachDB conflict resolution

// Scenario: T2 tries to read key that T1 has written intent for

// Option 1: Wait for T1
if t1.status == PENDING {
    // T2 waits for T1 to commit or abort
    // Uses distributed wait queue
}

// Option 2: Push T1's timestamp
if t2.timestamp < t1.timestamp {
    // T2 can read the previous value
    // No conflict if T2 reads at ts before T1's intent
}

if t2.timestamp >= t1.timestamp {
    // T2 "pushes" T1 to a higher timestamp
    // T1 will need to restart at new timestamp
    // Unless T1 has already committed
}

// Option 3: T2 aborts
// If T1 has higher priority and T2 can't wait

/*
 * Transaction priorities (for conflict resolution):
 * 
 * - Default: Normal
 * - SET TRANSACTION PRIORITY HIGH
 * - SET TRANSACTION PRIORITY LOW
 * 
 * Higher priority transactions win conflicts
 * Prevents starvation of long transactions
 */

Part 6: Interview Questions

Distributed Systems Deep Dive

Key Design Points:
  1. Data Placement
    • Shard data by primary key range
    • Each shard replicated across zones/regions
    • Paxos or Raft for consensus per shard
  2. Time Synchronization
    • GPS + atomic clocks for TrueTime (Spanner approach)
    • Or HLC with uncertainty intervals (CockroachDB approach)
    • Critical for external consistency
  3. Transactions
    • Lock-free reads at snapshot timestamp
    • Pessimistic locking for writes
    • 2PC for cross-shard transactions
    • Commit wait to ensure external consistency
  4. Read Optimization
    • Stale reads from local replica (bounded staleness)
    • Strong reads require leader/leaseholder
    • Follower reads with clock check
  5. Schema Management
    • Online schema changes (like PG’s concurrent operations)
    • Distributed DDL coordination
    • Schema version tracked per shard
Vitess:
  • Middleware layer over MySQL
  • Application does sharding logic (VTGate)
  • MySQL handles storage (InnoDB)
  • Pros: Leverage MySQL ecosystem, operational simplicity per shard
  • Cons: 2PC for cross-shard, no distributed ACID by default
CockroachDB:
  • Ground-up distributed SQL
  • Raft consensus per range (~512MB)
  • RocksDB (LSM) for storage
  • HLC for timestamp ordering
  • Pros: Strong consistency, automatic sharding
  • Cons: Write amplification from LSM, HLC complexity
TiDB:
  • Hybrid: SQL layer (TiDB) + storage layer (TiKV)
  • Raft per region (96MB default)
  • RocksDB for storage (TiKV)
  • Percolator-style transactions
  • Pros: PD for scheduling, analytics with TiFlash
  • Cons: Complexity of multiple components
When to choose:
  • Vitess: Existing MySQL, horizontal read scaling
  • CockroachDB: Greenfield, strong consistency required
  • TiDB: Mixed OLTP/OLAP, MySQL compatibility
Scenario: Node fails mid-transactionTransaction Record Approach:
  1. Each transaction has a “transaction record” stored in a range
  2. Record tracks: status (PENDING/COMMITTED/ABORTED), timestamp, intents
  3. If coordinator fails, transaction record remains
Recovery Process:
  1. Other transactions encountering intents check transaction record
  2. If record says COMMITTED: resolve intent as committed
  3. If record says ABORTED: delete intent
  4. If record says PENDING and heartbeat expired:
    • Push transaction timestamp or abort it
    • Garbage collect stale intents
Lease Holder Failure:
  1. Raft detects leader failure (~3-10 seconds)
  2. New leader elected
  3. Uncommitted Raft entries replayed
  4. Transactions on failed node:
    • Can retry from any other node
    • Intents still visible, resolved based on txn record
Key Insight:
  • No transaction coordinator bottleneck
  • Any node can resolve intents using txn record
  • Heartbeats prevent zombie transactions
Requirements Analysis:
  • Tenant isolation
  • Tenant-level scaling
  • Cross-tenant queries (admin/analytics)
  • Even distribution
Strategy: Tenant-based sharding:
-- Primary key includes tenant_id
CREATE TABLE orders (
    tenant_id UUID,
    order_id UUID,
    ...
    PRIMARY KEY (tenant_id, order_id)
);

-- Shard by tenant_id
-- All tenant data co-located
Handling large tenants:
  • Sub-sharding by (tenant_id, secondary_key)
  • Example: (tenant_id, order_date) for time-based queries
  • Or dedicated shard per large tenant
Handling small tenants:
  • Hash tenant_id for even distribution
  • Many small tenants share shards
  • No hot shard issues
Cross-tenant queries:
  • Scatter-gather (expensive but possible)
  • Materialized views in analytics system
  • Separate OLAP store (TiFlash, ClickHouse)
Migration strategy:
  • Start unsharded
  • Add tenant_id to all tables
  • Enable sharding when needed
  • Move large tenants to dedicated shards

Next Steps