Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Distributed Database Systems Concept

Distributed Database Systems

This module covers the distributed systems concepts essential for engineers joining teams at Vitess, PlanetScale, CockroachDB, TiDB, Neon, or building distributed database features at any company.
Target Audience: Distributed database engineers
Prerequisites: Storage Engine, Performance Engineering modules
Key Comparisons: PostgreSQL, Vitess, CockroachDB, Spanner, TiDB
Interview Relevance: Staff+ distributed systems roles

Part 1: Replication Fundamentals

Replication is the foundation of high availability and read scalability in distributed databases. It involves keeping a copy of the same data on multiple machines that are connected via a network. Real-world analogy: Replication is like a chain of notaries who all maintain identical copies of an official ledger. When someone makes a change, the original notary (primary) writes it down and sends a copy to the other notaries (replicas). The fundamental tension is between speed and accuracy: do you tell the client “done!” as soon as the primary writes it (fast but risky — what if the primary’s building burns down?), or do you wait until at least one other notary confirms they have the copy (slower but safer)? That is the core trade-off between asynchronous and synchronous replication.

1.0 The CAP Theorem

The CAP theorem states that a distributed data store can only provide two of the following three guarantees: Consistency, Availability, and Partition Tolerance. Since network partitions are inevitable in distributed systems, we must choose between Consistency (CP) and Availability (AP). Real-world analogy: Imagine a company with offices in New York and London. The phone line between them is the network. CAP says: if the phone line goes down (partition), you must choose. Either both offices stop accepting orders until the line is restored (Consistency — both offices agree on the state), or both offices keep accepting orders independently and reconcile the mess later (Availability — the system stays up but may have conflicts). You cannot have both during the partition. The nuance that CAP does not capture: in practice, partitions are rare, and the real engineering question is the latency-consistency trade-off during normal operation. CAP Theorem

1.1 Replication Topologies

How do we decide which node accepts writes and how data flows between nodes? This choice fundamentally impacts the system’s consistency and availability characteristics. Replication Topologies
  • Single Leader: All writes go to one node (Leader). Simple to reason about but the leader is a bottleneck and single point of failure for writes. (e.g., PostgreSQL, MySQL).
  • Multi-Leader: Writes can be accepted by multiple nodes. Great for multi-region setups but requires complex conflict resolution. (e.g., DynamoDB Global Tables).
  • Leaderless: Writes are sent to multiple nodes (quorum). High availability but eventual consistency and “read repair” are needed. (e.g., Cassandra, Dynamo).
┌─────────────────────────────────────────────────────────────────────────────┐
│                      REPLICATION TOPOLOGIES                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Single-Leader (Primary-Replica)                                           │
│   ─────────────────────────────────────────────────────────────────────────  │
│   ┌──────────┐       ┌──────────┐                                           │
│   │  Primary │──────→│ Replica  │                                           │
│   │  (R/W)   │       │  (R)     │                                           │
│   └──────────┘       └──────────┘                                           │
│        │             ┌──────────┐                                           │
│        └────────────→│ Replica  │                                           │
│                      │  (R)     │                                           │
│                      └──────────┘                                           │
│   • PostgreSQL streaming replication                                        │
│   • MySQL replication                                                       │
│   • Simple, but primary is SPOF                                             │
│                                                                              │
│   Multi-Leader                                                               │
│   ─────────────────────────────────────────────────────────────────────────  │
│   ┌──────────┐←─────→┌──────────┐                                           │
│   │ Leader A │       │ Leader B │                                           │
│   │  (R/W)   │       │  (R/W)   │                                           │
│   └──────────┘       └──────────┘                                           │
│   • BDR (Bi-Directional Replication)                                        │
│   • Conflict resolution required                                            │
│   • Used for multi-datacenter                                               │
│                                                                              │
│   Leaderless (Dynamo-style)                                                  │
│   ─────────────────────────────────────────────────────────────────────────  │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐                                  │
│   │  Node A  │  │  Node B  │  │  Node C  │                                  │
│   │  (R/W)   │  │  (R/W)   │  │  (R/W)   │                                  │
│   └──────────┘  └──────────┘  └──────────┘                                  │
│   • Cassandra, DynamoDB, Riak                                               │
│   • Quorum reads/writes: R + W > N                                          │
│   • Conflict resolution via vector clocks / LWW                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 PostgreSQL Streaming Replication

PostgreSQL uses physical replication, streaming the Write-Ahead Log (WAL) records from the primary to the standby. This is efficient and ensures the standby is an exact physical copy of the primary. Streaming Replication
┌─────────────────────────────────────────────────────────────────────────────┐
│                      STREAMING REPLICATION                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Primary                          Standby                                   │
│   ┌─────────────────────────┐     ┌─────────────────────────────┐          │
│   │                         │     │                              │          │
│   │  Backend ──► WAL Buffer │────►│  WAL Receiver ──► WAL Files │          │
│   │                ↓        │     │                      ↓       │          │
│   │           WAL Files     │     │               Startup Process│          │
│   │                │        │     │                 (applies WAL)│          │
│   │                ↓        │     │                      ↓       │          │
│   │           WAL Sender ────────►│              Shared Buffers  │          │
│   │                         │     │                              │          │
│   └─────────────────────────┘     └─────────────────────────────┘          │
│                                                                              │
│   Synchronous Replication:                                                   │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   synchronous_commit = 'remote_apply'                                       │
│                                                                              │
│   Client ──► Primary: COMMIT                                                │
│              Primary ──► WAL write                                          │
│              Primary ──► Send to Standby                                    │
│              Standby ──► Receive WAL                                        │
│              Standby ──► Apply WAL (replay)                                 │
│              Standby ──► ACK to Primary                                     │
│              Primary ──► ACK to Client                                      │
│                                                                              │
│   Modes:                                                                     │
│   • off: Don't wait (async)                                                 │
│   • local: Wait for local WAL flush                                         │
│   • remote_write: Wait for standby to receive (not flush)                   │
│   • on/remote_flush: Wait for standby WAL flush                             │
│   • remote_apply: Wait for standby to apply (strongest)                     │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1.3 Replication Lag

Replication lag is the delay between a write happening on the primary and it being visible on the standby. It is the enemy of read-your-writes consistency.
-- Measuring replication lag on primary
-- Why four separate LSN columns? They represent the replication pipeline stages:
-- sent_lsn: WAL sent over the network (bottleneck: network bandwidth)
-- write_lsn: WAL written to standby's OS buffer (bottleneck: standby disk speed)
-- flush_lsn: WAL fsynced to standby's disk (bottleneck: standby fsync latency)
-- replay_lsn: WAL applied to standby's database (bottleneck: replay speed)
-- The gap between sent_lsn and replay_lsn is the total replication lag.
SELECT 
    client_addr,
    usename,
    application_name,
    state,
    sent_lsn,
    write_lsn,
    flush_lsn,
    replay_lsn,
    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replay_lag_bytes,
    pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) AS send_lag_bytes
FROM pg_stat_replication;

-- On standby (PG 10+)
SELECT 
    now() - pg_last_xact_replay_timestamp() AS replay_lag,
    pg_is_in_recovery() AS is_standby,
    pg_last_wal_receive_lsn() AS last_receive,
    pg_last_wal_replay_lsn() AS last_replay;

-- Lag breakdown
/*
 * Total lag = network delay + disk write time + replay time
 *
 * Network delay: Time to send WAL over network
 * Disk write time: Time to write WAL on standby
 * Replay time: Time to apply WAL changes
 *
 * Common causes of lag:
 * 1. Large transactions (big INSERTs/DELETEs)
 * 2. Long-running queries on standby (conflict)
 * 3. Network congestion
 * 4. Slow standby disks
 * 5. Hot standby conflicts (max_standby_streaming_delay)
 */

1.4 Conflict Handling

When running read-only queries on a standby, conflicts can occur with the incoming replication stream. For example, a query might be reading a row that the primary has just deleted.
-- Hot Standby conflicts

-- On standby, queries can conflict with replay:
-- 1. Query needs row that replay is deleting
-- 2. Query uses snapshot that replay is advancing
-- 3. Replay needs exclusive lock query is holding

-- Configuration
hot_standby = on                       -- Enable read queries on standby
hot_standby_feedback = on              -- Send xmin to primary (prevents some conflicts)
max_standby_streaming_delay = 30s      -- Max time to delay replay for queries
max_standby_archive_delay = 30s        -- Same for archive recovery

-- When conflict occurs:
-- 1. Replay waits up to max_standby_*_delay
-- 2. If still conflicting, cancel the query
-- 3. Error: "canceling statement due to conflict with recovery"

-- Monitoring conflicts
SELECT 
    datname,
    confl_tablespace,
    confl_lock,
    confl_snapshot,
    confl_bufferpin,
    confl_deadlock
FROM pg_stat_database_conflicts;

Part 2: Consensus Algorithms

In distributed systems, we often need multiple nodes to agree on a value (e.g., “who is the leader?” or “what is the next log entry?”). This is the problem of consensus. Real-world analogy: Consensus is like a group of friends trying to agree on a restaurant over a flaky group chat. Messages can be delayed, duplicated, or lost. One person (the leader) proposes a restaurant. The others reply “yes” or “no.” If a majority says “yes,” the decision is final — even if some friends never got the message. If the proposer’s phone dies, someone else eventually steps up and proposes again. The key insight: you need a majority (quorum), not unanimity. Three out of five is enough because any two majorities must share at least one member, ensuring no contradictory decisions can both succeed.

2.1 Raft Fundamentals

Raft is a consensus algorithm designed to be easy to understand. It decomposes the problem into leader election, log replication, and safety. Raft Consensus Flow Raft Leader Election Process:
  1. Follower timeout: If follower doesn’t hear from leader, becomes candidate
  2. Request votes: Candidate increments term and requests votes from peers
  3. Vote granting: Followers vote for first candidate in each term
  4. Majority wins: Candidate with majority becomes leader
  5. Heartbeats: Leader sends periodic heartbeats to maintain authority

2.2 Raft Implementation Details

Raft ensures that if any machine applies a log entry to its state machine, no other machine will ever apply a different command for the same log index.
/* Raft node state
 * 
 * Why persistent vs volatile state matters:
 * Persistent state (currentTerm, votedFor, log) must survive crashes. If a node
 * restarts and forgets it already voted, it could vote twice in the same term,
 * allowing two leaders -- violating Raft's safety guarantee.
 * Volatile state (commitIndex, lastApplied) can be reconstructed after restart
 * by replaying the log and receiving updates from the leader.
 */
typedef struct RaftNode {
    /* Persistent state (on disk -- must be fsynced before responding) */
    uint64_t currentTerm;      /* Latest term seen -- monotonically increasing */
    NodeId votedFor;           /* Candidate voted for in current term (or NULL) */
    RaftLog *log;              /* Log entries -- the replicated state machine input */
    
    /* Volatile state (all nodes -- reconstructed after crash) */
    uint64_t commitIndex;      /* Highest entry known to be committed (majority) */
    uint64_t lastApplied;      /* Highest entry applied to state machine */
    
    /* Volatile state (leader only -- rebuilt after election) */
    uint64_t *nextIndex;       /* Next entry to send to each follower (optimistic) */
    uint64_t *matchIndex;      /* Highest confirmed replicated entry per follower */
    
    /* Timing -- randomized election timeout prevents split votes */
    Timestamp electionTimeout;
    Timestamp lastHeartbeat;
} RaftNode;

/* AppendEntries RPC (heart of Raft) */
typedef struct AppendEntriesRequest {
    uint64_t term;             /* Leader's term */
    NodeId leaderId;           /* For followers to redirect clients */
    uint64_t prevLogIndex;     /* Index of log entry before new ones */
    uint64_t prevLogTerm;      /* Term of prevLogIndex entry */
    LogEntry *entries;         /* Entries to append (empty for heartbeat) */
    int numEntries;
    uint64_t leaderCommit;     /* Leader's commit index */
} AppendEntriesRequest;

typedef struct AppendEntriesResponse {
    uint64_t term;             /* Follower's current term */
    bool success;              /* True if follower matched prevLogIndex/Term */
    uint64_t matchIndex;       /* Follower's last matching index (optimization) */
} AppendEntriesResponse;

/* Leader handles AppendEntries response */
void handleAppendEntriesResponse(RaftNode *node, NodeId follower, 
                                  AppendEntriesResponse *resp) {
    if (resp->term > node->currentTerm) {
        /* A higher term means a new election happened while we thought we were leader.
         * Step down immediately -- there can only be one leader per term. */
        becomeFollower(node, resp->term);
        return;
    }
    
    if (resp->success) {
        /* Follower confirmed it matched our prevLogIndex/Term and appended entries.
         * Advance our tracking of this follower's progress. */
        node->matchIndex[follower] = resp->matchIndex;
        node->nextIndex[follower] = resp->matchIndex + 1;
        
        /* Check if a majority of matchIndex values have advanced past commitIndex.
         * If so, we can safely advance commitIndex -- the entry is durable. */
        maybeAdvanceCommitIndex(node);
    } else {
        /* Log mismatch: the follower's log diverges from ours at prevLogIndex.
         * Back off one entry and retry. In practice, CockroachDB and etcd use
         * the matchIndex hint in the response to skip back multiple entries at once,
         * reducing the number of round trips from O(divergence) to O(1). */
        node->nextIndex[follower] = max(1, node->nextIndex[follower] - 1);
    }
}

2.3 Multi-Paxos vs Raft

While Raft is popular for its clarity, Paxos (specifically Multi-Paxos) is the historical standard. Both achieve the same goal but differ in terminology and flexibility.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      PAXOS vs RAFT COMPARISON                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Aspect              Paxos                   Raft                           │
│   ─────────────────────────────────────────────────────────────────────────  │
│   Leadership          Per-decision            Stable leader                  │
│   Leader election     Dueling proposers       Explicit election              │
│   Log management      Gaps allowed            No gaps (contiguous)           │
│   Understandability   Complex                 Designed for clarity           │
│   Implementation      Many variations         Single specification           │
│                                                                              │
│   Multi-Paxos Phases:                                                        │
│   1. Prepare: Proposer gets promise from majority                           │
│   2. Accept: Proposer sends value, acceptors accept                         │
│   3. Learn: Value is learned when majority accept                           │
│                                                                              │
│   Multi-Paxos Optimization:                                                  │
│   • Skip Prepare phase when leader is stable                                │
│   • Effectively becomes like Raft                                           │
│                                                                              │
│   Used By:                                                                   │
│   • Paxos: Spanner (modified), Chubby                                       │
│   • Raft: etcd, CockroachDB, TiKV, Consul                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

2.4 Raft in CockroachDB

CockroachDB uses Raft not just for the whole cluster, but for each individual “Range” (shard) of data. This allows for fine-grained high availability and consistency.
// CockroachDB uses Raft per Range (shard)

// Each Range (~512MB of data) has its own Raft group
type Range struct {
    RangeID        RangeID
    StartKey       Key
    EndKey         Key
    RaftGroup      *raft.RawNode
    LeaseHolder    NodeID  // Not same as Raft leader
}

// Range Lease (optimization for reads)
// 
// Raft leader election: ~1-2 seconds
// Lease acquisition: Sub-millisecond
//
// Lease allows fast local reads without Raft consensus
// Lease holder is usually the Raft leader but can differ

// Write path in CockroachDB:
// 1. Client sends write to any node
// 2. Node routes to Range lease holder
// 3. Lease holder proposes to Raft
// 4. Raft replicates to majority
// 5. Entry committed and applied
// 6. Response to client

// Read path (with lease):
// 1. Client sends read to any node
// 2. Node routes to lease holder
// 3. Lease holder checks lease validity
// 4. If valid: read directly from local state
// 5. If expired: acquire new lease first

Part 3: Sharding Strategies

When a dataset becomes too large for a single node, we must split it across multiple nodes. This process is called sharding (or partitioning). Real-world analogy: Sharding is like organizing a massive library across multiple buildings. You need a rule for which building holds which books. Hash sharding is like assigning buildings by the hash of the book’s ISBN — perfectly even distribution, but if someone asks for “all books by Author X,” you must visit every building. Range sharding is like putting A-M in Building 1 and N-Z in Building 2 — great for browsing a range of authors, but if most authors’ last names start with ‘S’, Building 2 is overcrowded (hot spot). Directory sharding keeps a master catalog that says exactly where each book is — flexible but the catalog itself becomes a bottleneck. Performance pitfall — cross-shard queries: The moment a query touches more than one shard, performance degrades dramatically. The system must scatter the query to all relevant shards, wait for the slowest one, and gather results. A query that takes 5ms on a single node can take 50ms when scattered across 10 shards. The golden rule of sharding: design your schema so that the most common queries hit a single shard. Co-locate related data by choosing a sharding key that matches your access patterns (e.g., tenant_id for multi-tenant SaaS).

3.1 Sharding Approaches

There are several ways to determine which shard a particular row belongs to. Sharding Strategies
  • Key-Based (Hash): shard_id = hash(key) % num_shards. Even distribution, but resharding is expensive and range queries are inefficient.
  • Range-Based: shard_id determined by key ranges (e.g., A-M, N-Z). Efficient range queries, but prone to “hot spots” if keys are sequential (e.g., timestamps).
  • Directory-Based: A lookup service maps keys to shards. Flexible placement, but the lookup service is a bottleneck.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      SHARDING STRATEGIES                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. Hash Sharding                                                           │
│   ─────────────────────────────────────────────────────────────────────────  │
│   shard = hash(sharding_key) % num_shards                                   │
│                                                                              │
│   Pros:                                                                      │
│   • Even data distribution                                                   │
│   • No hotspots for uniform access                                          │
│   Cons:                                                                      │
│   • Range scans hit all shards                                              │
│   • Resharding expensive (all data moves)                                   │
│                                                                              │
│   2. Range Sharding                                                          │
│   ─────────────────────────────────────────────────────────────────────────  │
│   shard = find_shard(sharding_key) where shard.start <= key < shard.end     │
│                                                                              │
│   Shard 1: [A, M)                                                           │
│   Shard 2: [M, T)                                                           │
│   Shard 3: [T, Z]                                                           │
│                                                                              │
│   Pros:                                                                      │
│   • Range scans efficient (within shard)                                    │
│   • Split/merge shards easily                                               │
│   Cons:                                                                      │
│   • Hotspots for sequential keys (auto-increment)                           │
│   • Uneven distribution possible                                            │
│                                                                              │
│   3. Consistent Hashing                                                      │
│   ─────────────────────────────────────────────────────────────────────────  │
│                ┌─────┐                                                       │
│           ┌────│ N1  │────┐                                                  │
│           │    └─────┘    │     Ring represents hash space                  │
│      ┌────┴────┐    ┌────┴────┐                                             │
│      │   N4    │    │   N2    │  Keys hash to position on ring              │
│      └────┬────┘    └────┬────┘  Assigned to next node clockwise            │
│           │    ┌─────┐    │                                                  │
│           └────│ N3  │────┘                                                  │
│                └─────┘                                                       │
│                                                                              │
│   Used by: DynamoDB, Cassandra, Riak                                        │
│   Adding node: Only neighbors affected                                       │
│   Virtual nodes: Improve balance                                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

3.2 Vitess Architecture

Vitess is a database clustering system for horizontal scaling of MySQL. It abstracts the sharding complexity from the application. Vitess Architecture
  • VTGate: A stateless proxy that routes queries to the correct shard(s). It speaks the MySQL protocol.
  • VTTablet: A sidecar process that runs alongside each MySQL instance, managing it and handling replication.
  • Topology Service: Stores the cluster configuration (keyspaces, shards) in a consistent store like etcd or ZooKeeper.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      VITESS ARCHITECTURE                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Application                                                                │
│        │                                                                     │
│        ▼                                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                           VTGate                                     │   │
│   │   • Query routing and planning                                       │   │
│   │   • Scatter-gather for cross-shard queries                          │   │
│   │   • Connection pooling                                               │   │
│   │   • Result merging                                                   │   │
│   └───────────────────────────┬─────────────────────────────────────────┘   │
│                               │                                              │
│            ┌──────────────────┼──────────────────┐                          │
│            ▼                  ▼                  ▼                          │
│   ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐              │
│   │    VTTablet     │ │    VTTablet     │ │    VTTablet     │              │
│   │  (Shard -80)    │ │ (Shard 80-c0)   │ │ (Shard c0-)     │              │
│   │                 │ │                 │ │                 │              │
│   │   ┌─────────┐   │ │   ┌─────────┐   │ │   ┌─────────┐   │              │
│   │   │  MySQL  │   │ │   │  MySQL  │   │ │   │  MySQL  │   │              │
│   │   │ Primary │   │ │   │ Primary │   │ │   │ Primary │   │              │
│   │   └─────────┘   │ │   └─────────┘   │ │   └─────────┘   │              │
│   │   ┌─────────┐   │ │   ┌─────────┐   │ │   ┌─────────┐   │              │
│   │   │ Replica │   │ │   │ Replica │   │ │   │ Replica │   │              │
│   │   └─────────┘   │ │   └─────────┘   │ │   └─────────┘   │              │
│   └─────────────────┘ └─────────────────┘ └─────────────────┘              │
│                                                                              │
│   Topology Service (etcd/Consul/ZooKeeper):                                 │
│   • Shard mapping                                                           │
│   • VTTablet health                                                         │
│   • Schema tracking                                                         │
│                                                                              │
│   VSchema (Virtual Schema):                                                  │
│   • Defines sharding keys per table                                         │
│   • Vindexes for key-to-shard mapping                                       │
│   • Sequence tables for auto-increment                                      │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

3.3 Vindex (Vitess Index)

A Vindex is a mapping that tells Vitess how to route a query based on a column value. It’s essentially the sharding key definition.
-- VSchema defining sharding

{
  "sharded": true,
  "vindexes": {
    "user_hash": {
      "type": "hash"
    },
    "order_user_idx": {
      "type": "lookup_hash",
      "params": {
        "table": "user_order_lookup",
        "from": "order_id",
        "to": "user_id"
      }
    }
  },
  "tables": {
    "users": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "user_hash"
        }
      ]
    },
    "orders": {
      "column_vindexes": [
        {
          "column": "user_id",
          "name": "user_hash"
        },
        {
          "column": "order_id",
          "name": "order_user_idx"
        }
      ]
    }
  }
}

-- Query routing examples:

-- Single shard (efficient):
SELECT * FROM orders WHERE user_id = 12345;
-- VTGate: hash(12345) → shard 80-c0 → route to one tablet

-- Cross-shard scatter-gather:
SELECT * FROM orders WHERE order_id = 67890;
-- VTGate: lookup order_id → get user_id → then single shard
-- OR if no lookup: scatter to ALL shards, gather results

-- Join across shards:
SELECT u.name, o.total 
FROM users u JOIN orders o ON u.user_id = o.user_id
WHERE u.user_id = 12345;
-- VTGate: Both tables sharded same way → single shard

SELECT u.name, p.name
FROM users u JOIN products p ON u.favorite_product = p.product_id;
-- VTGate: Different sharding keys → scatter-gather or nested loop

3.4 Resharding

Resharding is the process of changing the number of shards, usually splitting one shard into two as data grows. Vitess handles this online with minimal downtime using VReplication. Resharding Process
┌─────────────────────────────────────────────────────────────────────────────┐
│                      RESHARDING PROCESS (VITESS)                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Before: 2 shards                  After: 4 shards                         │
│   ┌─────────────────────────┐       ┌─────────────────┐ ┌─────────────────┐ │
│   │     Shard -80           │   ──► │   Shard -40     │ │   Shard 40-80   │ │
│   └─────────────────────────┘       └─────────────────┘ └─────────────────┘ │
│   ┌─────────────────────────┐       ┌─────────────────┐ ┌─────────────────┐ │
│   │     Shard 80-           │   ──► │   Shard 80-c0   │ │   Shard c0-     │ │
│   └─────────────────────────┘       └─────────────────┘ └─────────────────┘ │
│                                                                              │
│   Resharding Steps:                                                          │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   1. Create Target Shards                                                    │
│      • Provision new MySQL instances                                         │
│      • Initialize with schema                                                │
│      • Set up replication                                                    │
│                                                                              │
│   2. VReplication (copy phase)                                               │
│      • Stream data from source to target                                     │
│      • Filter rows by new keyspace ID                                        │
│      • Maintain consistent snapshot                                          │
│                                                                              │
│   3. VDiff (verification)                                                    │
│      • Compare source and target data                                        │
│      • Ensure consistency before cutover                                     │
│                                                                              │
│   4. Cutover (atomic switch)                                                 │
│      • Stop writes briefly                                                   │
│      • Switch serving to new shards                                          │
│      • Redirect VTGate routing                                               │
│      • Resume writes to new shards                                           │
│                                                                              │
│   5. Cleanup                                                                 │
│      • Remove old shard data                                                 │
│      • Decommission old tablets                                              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Part 4: Distributed Transactions

When a transaction spans multiple shards, we need a protocol to ensure atomicity: either all shards commit, or none do. Real-world analogy: A distributed transaction is like coordinating a multi-party real estate closing. The buyer, seller, bank, and title company all need to sign simultaneously. If any party backs out, the whole deal is off. Two-Phase Commit (2PC) is the closing agent who first asks everyone “Are you ready to sign?” (Prepare phase). If everyone says yes, the agent says “Sign now” (Commit phase). The danger: if the closing agent has a heart attack after everyone said “ready” but before saying “sign,” all parties are stuck — they cannot proceed or back out until the agent recovers. That blocking problem is why 2PC is the “necessary evil” of distributed databases.

4.1 Two-Phase Commit (2PC)

The standard protocol for distributed atomic commit. It involves a Coordinator and multiple Participants. Two-Phase Commit
  1. Prepare Phase: Coordinator asks all participants “Can you commit?”. Participants lock resources and persist their vote.
  2. Commit Phase: If all say “Yes”, Coordinator tells everyone to “Commit”. If any say “No” (or timeout), Coordinator tells everyone to “Abort”.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      TWO-PHASE COMMIT                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Coordinator                 Participant A         Participant B            │
│       │                            │                      │                  │
│   ════╪════════════════════════════╪══════════════════════╪════════════════  │
│       │   PREPARE Phase            │                      │                  │
│       │──────────────────────────► │                      │                  │
│       │      PREPARE               │                      │                  │
│       │──────────────────────────────────────────────────►│                  │
│       │                            │      PREPARE         │                  │
│       │                            │                      │                  │
│       │ ◄──────────────────────────│                      │                  │
│       │      VOTE_COMMIT           │                      │                  │
│       │ ◄─────────────────────────────────────────────────│                  │
│       │                            │      VOTE_COMMIT     │                  │
│   ════╪════════════════════════════╪══════════════════════╪════════════════  │
│       │   COMMIT Phase             │                      │                  │
│       │──────────────────────────► │                      │                  │
│       │      COMMIT                │                      │                  │
│       │──────────────────────────────────────────────────►│                  │
│       │                            │      COMMIT          │                  │
│       │ ◄──────────────────────────│                      │                  │
│       │      ACK                   │                      │                  │
│       │ ◄─────────────────────────────────────────────────│                  │
│       │                            │      ACK             │                  │
│       │                                                                      │
│                                                                              │
│   Problems with 2PC:                                                         │
│   • Blocking: If coordinator fails after PREPARE, participants wait         │
│   • Latency: 2 round trips minimum                                          │
│   • Single point of failure (coordinator)                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.2 Spanner’s TrueTime

Google Spanner achieves external consistency (linearizability) at a global scale using TrueTime, which exposes time as an interval of uncertainty. TrueTime and Spanner
  • TrueTime API: Returns [earliest, latest]. The actual time is guaranteed to be within this interval.
  • Commit Wait: A transaction Ti waits until TT.now().earliest > Ti.commit_timestamp before reporting success. This ensures that if T2 starts after T1 finishes, T2 will definitely see T1’s effects.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      TRUETIME AND SPANNER                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   TrueTime API:                                                              │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   TT.now()     → TTinterval { earliest, latest }                            │
│   TT.after(t)  → True if t has definitely passed                            │
│   TT.before(t) → True if t has definitely not arrived                       │
│                                                                              │
│   Uncertainty bound (ε):                                                     │
│   • Typical: 1-7 ms                                                         │
│   • Maintained by GPS + atomic clocks                                       │
│                                                                              │
│   Commit Wait:                                                               │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   Transaction T1 commits:                                                    │
│   1. Acquire locks                                                           │
│   2. Get commit timestamp s = TT.now().latest                               │
│   3. Wait until TT.after(s) is true                                         │
│   4. Release locks                                                           │
│                                                                              │
│   Why wait?                                                                  │
│   • Ensures T1's timestamp is in the past at all nodes                      │
│   • Any later transaction T2 will have s2 > s1                              │
│   • External consistency: If T1 commits before T2 starts, s1 < s2           │
│                                                                              │
│   Timeline:                                                                  │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   T1: [─────acquire locks─────][wait ε][commit]                             │
│   T2:                              [start]────────►                          │
│                                    │                                         │
│                            T2 sees T1's timestamp                            │
│                            is definitely in past                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.3 CockroachDB’s Hybrid-Logical Clocks

CockroachDB cannot rely on atomic clocks like Spanner. Instead, it uses Hybrid Logical Clocks (HLC) to provide causality tracking and loose time synchronization. Real-world analogy: HLC is like a wall clock with a sticky note. The wall clock (physical component) shows real time but might be slightly off across offices. The sticky note (logical counter) tracks “I received a message stamped at 3:00:05 but my clock says 3:00:03, so I bump my sticky note to say I’m logically after 3:00:05.” This way, causal ordering is preserved even when physical clocks disagree. The trade-off vs. TrueTime: HLC does not require GPS/atomic hardware, but it creates “uncertainty windows” where a reader might encounter a write whose timestamp could be in the future, forcing a transaction restart. Hybrid-Logical Clocks
  • Physical Component: Wall clock time (NTP).
  • Logical Component: A counter to order events that happen within the same physical tick or when clocks move backwards.
  • Result: Provides “causal consistency” and allows for efficient snapshot isolation.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      HYBRID-LOGICAL CLOCKS (HLC)                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   HLC = (physical_time, logical_counter)                                    │
│                                                                              │
│   Properties:                                                                │
│   • Tracks causality like Lamport clocks                                    │
│   • Stays close to real time (unlike Lamport)                               │
│   • No special hardware needed (unlike TrueTime)                            │
│                                                                              │
│   Algorithm:                                                                 │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   On local event or send:                                                    │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │ pt = physical_time()                                                │    │
│   │ if pt > hlc.physical:                                               │    │
│   │     hlc.physical = pt                                               │    │
│   │     hlc.logical = 0                                                 │    │
│   │ else:                                                               │    │
│   │     hlc.logical++                                                   │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│   On receive message with timestamp msg_hlc:                                │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │ pt = physical_time()                                                │    │
│   │ if pt > max(hlc.physical, msg_hlc.physical):                       │    │
│   │     hlc.physical = pt                                               │    │
│   │     hlc.logical = 0                                                 │    │
│   │ elif msg_hlc.physical > hlc.physical:                              │    │
│   │     hlc.physical = msg_hlc.physical                                │    │
│   │     hlc.logical = msg_hlc.logical + 1                              │    │
│   │ elif hlc.physical > msg_hlc.physical:                              │    │
│   │     hlc.logical++                                                   │    │
│   │ else: // equal physical                                            │    │
│   │     hlc.logical = max(hlc.logical, msg_hlc.logical) + 1            │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│   CockroachDB's Uncertainty Window:                                          │
│   • Instead of commit wait, readers handle uncertainty                      │
│   • If read encounters write in uncertainty window → restart               │
│   • Max clock offset tracked cluster-wide                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.4 Distributed Transactions in Vitess

Vitess supports different transaction modes depending on the consistency requirements.
-- Vitess transaction modes

-- Single-shard transactions (efficient)
-- Why this matters: when all touched rows live on the same shard, Vitess delegates
-- to a plain MySQL transaction -- no distributed coordination, no 2PC overhead.
-- This is why co-locating related data on the same shard is the #1 performance rule.
BEGIN;
UPDATE orders SET status = 'shipped' WHERE user_id = 123;
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 456;
COMMIT;
-- If both tables sharded by user_id and same shard -> single MySQL transaction

-- Cross-shard transactions (2PC)
-- Performance pitfall: 2PC adds two network round trips plus a redo log write.
-- A single-shard transaction might take 2ms; the same logic crossing two shards
-- takes 10-20ms. Design your schema to minimize cross-shard transactions.
SET transaction_mode = 'multi';
BEGIN;
UPDATE orders SET status = 'shipped' WHERE user_id = 123;
UPDATE orders SET status = 'shipped' WHERE user_id = 456;  -- Different shard!
COMMIT;

/*
 * Vitess 2PC implementation:
 *
 * 1. PREPARE phase:
 *    - VTGate sends PREPARE to all participating shards
 *    - Each shard prepares locally
 *
 * 2. COMMIT phase:
 *    - VTGate logs commit decision to "redo log" shard
 *    - Sends COMMIT to all shards
 *    - Redo log entry deleted after all ACKs
 *
 * 3. Recovery (if VTGate crashes):
 *    - Scan redo log for incomplete transactions
 *    - Resume COMMIT or ROLLBACK based on logged state
 */

-- Best effort transactions (performance over consistency)
SET transaction_mode = 'single';
BEGIN;
UPDATE orders SET status = 'shipped' WHERE user_id = 123;
UPDATE orders SET status = 'shipped' WHERE user_id = 456;
COMMIT;
-- Each shard commits independently - no atomicity guarantee!
-- Use when application can tolerate partial failures

Part 5: CockroachDB Deep Dive

CockroachDB is a distributed SQL database built on top of a key-value store (RocksDB) using Raft for consensus.

5.1 Range Architecture

Data is divided into contiguous chunks called Ranges. Each Range is a Raft group. CockroachDB Range Architecture

5.2 CockroachDB Transaction Flow

CockroachDB uses a decentralized transaction model with “Write Intents”.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      COCKROACHDB TRANSACTION FLOW                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. BEGIN TRANSACTION                                                       │
│      • Assign provisional commit timestamp                                   │
│      • No special messages needed                                            │
│                                                                              │
│   2. WRITE (e.g., UPDATE)                                                    │
│      Gateway                                                                 │
│         │                                                                    │
│         ▼ Route to range leaseholder                                        │
│      Leaseholder                                                             │
│         │                                                                    │
│         ├─► Write intent: (key, value, txn_id, timestamp)                   │
│         │   Intent = uncommitted write, visible to owner txn only           │
│         │                                                                    │
│         └─► Replicate via Raft                                              │
│                                                                              │
│   3. READ (in same transaction)                                              │
│      • Reader checks for intents                                             │
│      • Own intents: read the value                                          │
│      • Other intents: wait or push the transaction                          │
│                                                                              │
│   4. COMMIT                                                                  │
│      Gateway                                                                 │
│         │                                                                    │
│         ▼ Write transaction record                                          │
│      Transaction Record Range                                               │
│         │                                                                    │
│         ├─► Status: COMMITTED                                               │
│         └─► Timestamp: final commit timestamp                               │
│                                                                              │
│      Then (async or sync):                                                   │
│         ├─► Resolve intents → convert to committed values                   │
│         └─► Clean up transaction record                                     │
│                                                                              │
│   Intent Example:                                                            │
│   ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│   Key: /Table/users/123/name                                                │
│   ┌────────────────────────────────────────────────────────────────┐        │
│   │ Timestamp │ Type      │ Value       │ TxnID                    │        │
│   ├────────────────────────────────────────────────────────────────┤        │
│   │ ts=50     │ Value     │ "Alice"     │ (none - committed)       │        │
│   │ ts=100    │ Intent    │ "Alicia"    │ txn-abc-123              │        │
│   └────────────────────────────────────────────────────────────────┘        │
│                                                                              │
│   • Readers at ts=75 see "Alice"                                            │
│   • Readers at ts=100+ wait for intent resolution                           │
│   • If txn commits: intent becomes value                                    │
│   • If txn aborts: intent deleted                                           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

5.3 Read-Write Conflicts

Since transactions are distributed, conflicts are resolved using timestamps and priorities.
// CockroachDB conflict resolution

// Scenario: T2 tries to read key that T1 has written intent for

// Option 1: Wait for T1
if t1.status == PENDING {
    // T2 waits for T1 to commit or abort
    // Uses distributed wait queue
}

// Option 2: Push T1's timestamp
if t2.timestamp < t1.timestamp {
    // T2 can read the previous value
    // No conflict if T2 reads at ts before T1's intent
}

if t2.timestamp >= t1.timestamp {
    // T2 "pushes" T1 to a higher timestamp
    // T1 will need to restart at new timestamp
    // Unless T1 has already committed
}

// Option 3: T2 aborts
// If T1 has higher priority and T2 can't wait

/*
 * Transaction priorities (for conflict resolution):
 * 
 * - Default: Normal
 * - SET TRANSACTION PRIORITY HIGH
 * - SET TRANSACTION PRIORITY LOW
 * 
 * Higher priority transactions win conflicts
 * Prevents starvation of long transactions
 */

Part 6: Interview Questions

Distributed Systems Deep Dive

Key Design Points:
  1. Data Placement
    • Shard data by primary key range
    • Each shard replicated across zones/regions
    • Paxos or Raft for consensus per shard
  2. Time Synchronization
    • GPS + atomic clocks for TrueTime (Spanner approach)
    • Or HLC with uncertainty intervals (CockroachDB approach)
    • Critical for external consistency
  3. Transactions
    • Lock-free reads at snapshot timestamp
    • Pessimistic locking for writes
    • 2PC for cross-shard transactions
    • Commit wait to ensure external consistency
  4. Read Optimization
    • Stale reads from local replica (bounded staleness)
    • Strong reads require leader/leaseholder
    • Follower reads with clock check
  5. Schema Management
    • Online schema changes (like PG’s concurrent operations)
    • Distributed DDL coordination
    • Schema version tracked per shard
Vitess:
  • Middleware layer over MySQL
  • Application does sharding logic (VTGate)
  • MySQL handles storage (InnoDB)
  • Pros: Leverage MySQL ecosystem, operational simplicity per shard
  • Cons: 2PC for cross-shard, no distributed ACID by default
CockroachDB:
  • Ground-up distributed SQL
  • Raft consensus per range (~512MB)
  • RocksDB (LSM) for storage
  • HLC for timestamp ordering
  • Pros: Strong consistency, automatic sharding
  • Cons: Write amplification from LSM, HLC complexity
TiDB:
  • Hybrid: SQL layer (TiDB) + storage layer (TiKV)
  • Raft per region (96MB default)
  • RocksDB for storage (TiKV)
  • Percolator-style transactions
  • Pros: PD for scheduling, analytics with TiFlash
  • Cons: Complexity of multiple components
When to choose:
  • Vitess: Existing MySQL, horizontal read scaling
  • CockroachDB: Greenfield, strong consistency required
  • TiDB: Mixed OLTP/OLAP, MySQL compatibility
Scenario: Node fails mid-transactionTransaction Record Approach:
  1. Each transaction has a “transaction record” stored in a range
  2. Record tracks: status (PENDING/COMMITTED/ABORTED), timestamp, intents
  3. If coordinator fails, transaction record remains
Recovery Process:
  1. Other transactions encountering intents check transaction record
  2. If record says COMMITTED: resolve intent as committed
  3. If record says ABORTED: delete intent
  4. If record says PENDING and heartbeat expired:
    • Push transaction timestamp or abort it
    • Garbage collect stale intents
Lease Holder Failure:
  1. Raft detects leader failure (~3-10 seconds)
  2. New leader elected
  3. Uncommitted Raft entries replayed
  4. Transactions on failed node:
    • Can retry from any other node
    • Intents still visible, resolved based on txn record
Key Insight:
  • No transaction coordinator bottleneck
  • Any node can resolve intents using txn record
  • Heartbeats prevent zombie transactions
Requirements Analysis:
  • Tenant isolation
  • Tenant-level scaling
  • Cross-tenant queries (admin/analytics)
  • Even distribution
Strategy: Tenant-based sharding:
-- Primary key includes tenant_id
CREATE TABLE orders (
    tenant_id UUID,
    order_id UUID,
    ...
    PRIMARY KEY (tenant_id, order_id)
);

-- Shard by tenant_id
-- All tenant data co-located
Handling large tenants:
  • Sub-sharding by (tenant_id, secondary_key)
  • Example: (tenant_id, order_date) for time-based queries
  • Or dedicated shard per large tenant
Handling small tenants:
  • Hash tenant_id for even distribution
  • Many small tenants share shards
  • No hot shard issues
Cross-tenant queries:
  • Scatter-gather (expensive but possible)
  • Materialized views in analytics system
  • Separate OLAP store (TiFlash, ClickHouse)
Migration strategy:
  • Start unsharded
  • Add tenant_id to all tables
  • Enable sharding when needed
  • Move large tenants to dedicated shards

Next Steps

PostgreSQL Contributing

Submit patches to PostgreSQL

Interview Preparation

Senior database engineer interviews

Interview Deep-Dive

Strong Answer:
  • CAP states that during a network partition, a distributed system must choose between consistency (every read returns the latest write) and availability (every request gets a response). Examples: CockroachDB and Spanner are CP — during a partition, ranges without quorum reject writes to preserve consistency. Cassandra and DynamoDB are AP — they continue accepting writes during partitions and resolve conflicts later via last-write-wins or vector clocks.
  • Why it is an oversimplification: (1) CAP is binary, but real systems operate on a spectrum. Spanner achieves effective 5-nines availability while being CP because TrueTime minimizes the window where the tradeoff manifests. (2) CAP says nothing about latency, which matters more in practice than theoretical availability. A CP system that takes 5 seconds to respond during normal operation is worse than an AP system with 50ms latency and occasional stale reads. (3) The PACELC extension is more useful: during Partition, choose A or C; Else (normal operation), choose Latency or Consistency. CockroachDB is PC/EC (consistent always), DynamoDB is PA/EL (available during partition, low latency normally).
Follow-up: How does CockroachDB maintain consistency without TrueTime like Spanner has?CockroachDB uses Hybrid Logical Clocks (HLC) instead of TrueTime. HLC tracks a physical timestamp component (from NTP) plus a logical counter for ordering events within the same physical tick. The tradeoff: CockroachDB cannot guarantee external consistency (if T2 starts after T1 commits on a different node, T2 is guaranteed to see T1). Instead, it handles uncertainty windows — if a read encounters a write with a timestamp within the clock skew window, it restarts the transaction at a higher timestamp. This means higher clock skew increases transaction restart rates but never compromises safety. Spanner avoids restarts by using commit-wait (waiting out the uncertainty window), which requires TrueTime’s low uncertainty bound (1-7ms from GPS/atomic clocks).
Strong Answer:
  • BEGIN: CockroachDB assigns a provisional commit timestamp using HLC. No messages are sent — the gateway node just records the transaction locally.
  • WRITE (UPDATE): The gateway routes the write to the leaseholder of the relevant Range (a ~512MB chunk of the keyspace). The leaseholder writes an “intent” — a provisional value tagged with the transaction ID. This intent is replicated via Raft to a majority of the Range’s replicas. The intent is visible only to the owning transaction; other transactions encountering it must wait or push.
  • READ: If the read hits a Range with no conflicting intents, it proceeds locally from the leaseholder (if the lease is valid). If it encounters another transaction’s intent, it checks that transaction’s record to determine if it is committed, aborted, or pending. Pending intents cause the reader to wait or push the writer’s timestamp.
  • COMMIT: The gateway writes a transaction record (status: COMMITTED) to the Range that owns the transaction’s key. This is a single Raft write. Once committed, the transaction is durable. Intent resolution (converting intents to committed values and cleaning up) happens asynchronously.
  • Key differences from PostgreSQL: PostgreSQL runs entirely within one process on one machine — no network round trips, no consensus protocol. CockroachDB’s write path requires at least one Raft round trip (majority acknowledgment) per Range touched. A transaction touching 3 Ranges requires 3 Raft rounds plus the commit record write. This is why CockroachDB’s write latency is inherently higher than PostgreSQL’s (typically 5-20ms vs sub-millisecond).
Follow-up: What happens if the gateway node crashes mid-transaction?The transaction’s intents remain on their respective Ranges, and the transaction record (if written) shows PENDING. Other transactions encountering these intents check the transaction record. If the heartbeat on the transaction has expired (the gateway is not renewing it), the intents can be cleaned up — either resolved as committed (if the record says COMMITTED) or aborted (if PENDING with expired heartbeat). No data is lost, no coordinator recovery protocol needed. This is a major advantage over traditional 2PC where coordinator failure can leave participants stuck in the PREPARED state.
Strong Answer:
  • Question 1: Do you have an existing MySQL ecosystem? Vitess is a sharding middleware for MySQL — if you have years of MySQL operational expertise, tooling, and monitoring, Vitess preserves that investment. CockroachDB is a greenfield distributed SQL database with its own operational model.
  • Question 2: Do you need cross-shard ACID transactions? CockroachDB provides distributed ACID transactions natively — a transaction can span any number of Ranges with full serializability. Vitess supports cross-shard 2PC but it is opt-in, slower, and less battle-tested. If your application requires frequent cross-shard transactions, CockroachDB is the stronger choice.
  • Question 3: What is your consistency model requirement? CockroachDB enforces serializable isolation by default. Vitess inherits MySQL’s isolation levels (typically READ COMMITTED or REPEATABLE READ per shard). For financial or inventory systems requiring strong consistency, CockroachDB is safer.
  • Question 4: How important is resharding flexibility? Vitess handles resharding online via VReplication with well-documented procedures. CockroachDB auto-splits and rebalances Ranges without operator intervention. If your shard key distribution is unpredictable, CockroachDB’s automatic rebalancing is a significant operational advantage.
  • Question 5: What is your team’s database expertise? Vitess adds a layer on top of MySQL, meaning your DBAs manage MySQL instances plus Vitess infrastructure (VTGate, VTTablet, topology service). CockroachDB is a single distributed system, but debugging distributed query plans and Raft group issues requires new skills.
Follow-up: What about TiDB as a third option?TiDB is compelling when you need MySQL wire-protocol compatibility (like Vitess) with CockroachDB-like distributed ACID semantics. TiDB separates compute (TiDB nodes) from storage (TiKV, using Raft per region). It also has TiFlash for real-time analytics — HTAP capability that neither Vitess nor CockroachDB offers natively. The tradeoff is component complexity: you are operating TiDB, TiKV, PD (placement driver), and optionally TiFlash. Choose TiDB when you need mixed OLTP/OLAP on the same dataset with MySQL compatibility.