Introduction: The Challenge of Distributed Coordination
In a distributed database like Cassandra with potentially hundreds of nodes spread across multiple datacenters, how do nodes:
Discover each other and know who’s in the cluster?
Detect when a node fails or becomes unreachable?
Ensure data remains consistent across replicas?
Handle network partitions gracefully?
Replicate data across geographic regions?
The Cassandra Answer: A combination of gossip protocol (inspired by epidemiology), Phi Accrual Failure Detection, hinted handoff, read repair, anti-entropy repair, and sophisticated multi-DC replication strategies.This module explores each of these mechanisms in depth.
Gossip is a peer-to-peer communication protocol inspired by how rumors spread in social networks or diseases spread through populations. In Cassandra, nodes exchange information about themselves and other nodes they know about.Key Characteristics:
Decentralized: No master node orchestrating communication
Eventually Consistent: Information propagates gradually
Fault Tolerant: Works even when some nodes are down
Scalable: Overhead doesn’t increase linearly with cluster size
// Core gossip state for a single endpointclass EndpointState { // When this node was started (prevents confusion with restarts) int generation; // Version number incremented on every gossip round int heartbeatVersion; // Application states (node metadata) Map<ApplicationState, VersionedValue> applicationStates;}enum ApplicationState { STATUS, // NORMAL, LEAVING, JOINING, REMOVING DC, // Datacenter name RACK, // Rack name SCHEMA, // Schema version UUID LOAD, // Disk usage SEVERITY, // Node health (0.0-1.0) TOKENS, // Token ranges owned INTERNAL_IP, // Internal network address RPC_ADDRESS, // Client-facing address RELEASE_VERSION // Cassandra version}class VersionedValue { String value; int version; // Incremented when value changes}
Step 1: Node A increments its heartbeat and picks B to gossip with
Copy
A → B: GossipDigestSyn{ A: gen=1, maxVersion=101, // A's own heartbeat B: gen=1, maxVersion=95, // What A knows about B C: gen=1, maxVersion=90 // What A knows about C}
Step 2: B compares with its state and responds
Copy
B → A: GossipDigestAck{ // B has newer info about itself B: gen=1, heartbeat=100, STATUS=NORMAL // B has newer info about C C: gen=1, heartbeat=92, STATUS=NORMAL}
Step 3: A updates its state
Copy
Node A now knows: A: gen=1, heartbeat=101, STATUS=NORMAL B: gen=1, heartbeat=100, STATUS=NORMAL // UPDATED C: gen=1, heartbeat=92, STATUS=NORMAL // UPDATED
Step 4: A sends GossipDigestAck2 with its newer info
Copy
A → B: GossipDigestAck2{ A: gen=1, heartbeat=101, STATUS=NORMAL}
Step 5: B updates its state
Copy
Node B now knows: A: gen=1, heartbeat=101, STATUS=NORMAL // UPDATED B: gen=1, heartbeat=100, STATUS=NORMAL C: gen=1, heartbeat=92, STATUS=NORMAL
Result: After just one round, nodes A and B have converged on the latest information about A and B, and A learned newer info about C.
How do you determine if a node is down vs. just slow or experiencing network delays?Naive Approach: Use a fixed timeout (e.g., “no response in 5 seconds = dead”)Problems:
Too short: false positives during network hiccups
Too long: slow to detect real failures
Network latency varies over time
Different nodes have different performance characteristics
Cassandra uses the Phi (Φ) Accrual Failure Detector, which provides a suspicion level rather than a binary up/down decision.Key Idea: Instead of “is this node down?”, ask “how confident am I that this node is down?”The Algorithm:
Track Arrival Times: Record when gossip heartbeats arrive from each node
Build a Statistical Model: Use a sliding window (default: last 1000 heartbeats) to model the expected inter-arrival time
Calculate Phi (Φ): When a heartbeat is late, calculate how suspicious this is:
Copy
Φ(t) = -log₁₀(P(T > t))
Where:
t = time since last heartbeat
P(T > t) = probability that inter-arrival time exceeds t
Φ = suspicion level
Make Decision: If Φ exceeds threshold (default: 8), mark node as down
t=0s: Heartbeat received Phi = 0 (just heard from node)t=1s: No heartbeat yet Expected: ~1.04s Actual: 1.0s (slightly early) Phi ≈ 0.5 (not suspicious)t=2s: Still no heartbeat Expected: ~1.04s Actual: 2.0s (very late) Phi ≈ 3.2 (getting suspicious)t=3s: Still no heartbeat Actual: 3.0s (extremely late) Phi ≈ 6.8 (very suspicious)t=4s: Still no heartbeat Actual: 4.0s Phi ≈ 9.2 (exceeds threshold of 8)ACTION: Mark Node B as DOWN
class FailureDetector { // Sliding window of arrival intervals (milliseconds) private final BoundedStatsDeque arrivalIntervals; // Default phi threshold private static final double PHI_THRESHOLD = 8.0; // Calculate current phi value for an endpoint public double phi(InetAddress endpoint) { long now = System.currentTimeMillis(); long lastHeartbeat = getLastHeartbeatTime(endpoint); long timeSinceHeartbeat = now - lastHeartbeat; // Get statistics from arrival interval history double mean = arrivalIntervals.mean(); double stddev = arrivalIntervals.stddev(); // Calculate probability using normal distribution double prob = 1.0 - probabilityOfInterval(timeSinceHeartbeat, mean, stddev); // Phi = -log10(probability) return -Math.log10(Math.max(prob, 1e-10)); } public boolean isAlive(InetAddress endpoint) { return phi(endpoint) < PHI_THRESHOLD; }}
# cassandra.yaml# Phi threshold (default: 8)# Lower = more sensitive (faster detection, more false positives)# Higher = less sensitive (slower detection, fewer false positives)phi_convict_threshold: 8# How long to wait before removing a dead node from gossipfailure_detector_timeout_in_ms: 60000 # 60 seconds
Tuning Advice:
High-latency networks (cross-region): Increase to 10-12
Low-latency networks (single DC): Can decrease to 6-7
Flaky networks: Increase to avoid false positives
Mission-critical availability: Decrease for faster failover
Hints are stored locally on the coordinator node in a special system table:
Copy
CREATE TABLE system.hints ( target_id uuid, -- UUID of down node hint_id timeuuid, -- When hint was created message_version int, mutation blob, -- The actual write mutation PRIMARY KEY ((target_id), hint_id))
Example Hint:
Copy
target_id: 550e8400-e29b-41d4-a716-446655440000 (Node C's UUID)hint_id: d2177dd0-eaa2-11de-a572-001b779c76e3 (2023-11-15 10:23:45)mutation: [binary data representing: INSERT INTO users (id, name) VALUES (42, 'Alice')]
The coordinator continuously tries to replay hints:
Copy
class HintedHandoffManager { // Check for hints every 10 minutes (default) private static final int HINT_SCAN_INTERVAL_MS = 600_000; public void scheduleHintDelivery() { while (true) { Thread.sleep(HINT_SCAN_INTERVAL_MS); for (UUID targetNode : getAllTargetNodes()) { if (isNodeAlive(targetNode)) { deliverHints(targetNode); } } } } private void deliverHints(UUID targetNode) { // Read hints for this target from local storage List<Hint> hints = readHintsFor(targetNode); // Replay them in order for (Hint hint : hints) { try { sendMutationToNode(targetNode, hint.mutation); deleteHint(hint); // Success! Remove hint } catch (Exception e) { // Node went down again, try later break; } } }}
t=0s: Node C goes down - Coordinator starts storing hints for Ct=10s: Hint #1 stored (INSERT INTO users...)t=15s: Hint #2 stored (UPDATE users...)t=20s: Hint #3 stored (DELETE FROM users...)t=300s (5 min): Node C comes back online - Gossip detects C is up - C marked as ALIVE in failure detectort=600s (10 min): Next hint delivery scan - Coordinator sees C is alive - Begins replaying hints #1, #2, #3 - Each mutation sent to C - Hints deleted after successful deliveryt=610s: All hints delivered, C is fully caught up
# cassandra.yaml# How long to store hints (default: 3 hours)max_hint_window_in_ms: 10800000# Hint storage directoryhints_directory: /var/lib/cassandra/hints# Max size of hints on disk (per node)max_hints_file_size_in_mb: 128# Max total size of all hintsmax_hints_size_per_host_in_mb: 1024# Throttle hint delivery speed (MB/s per delivery thread)hints_flush_period_in_ms: 10000
Hints are best-effort only. They fail in these scenarios:Scenario 1: Max Hint Window Exceeded
Copy
Node C is down for 4 hours (> 3 hour default max_hint_window)→ Hints are dropped after 3 hours→ Node C will have missing data→ Must run repair to recover
Scenario 2: Coordinator Crashes
Copy
Coordinator stores hints locally, then crashes→ Hints are lost (not replicated)→ Node C will have missing data→ Must run repair to recover
Scenario 3: Hint Storage Full
Copy
Node C is down for extended period→ Hints fill up disk space (hit max_hints_size_per_host_in_mb)→ Newer hints are dropped→ Node C will have missing data→ Must run repair to recover
Key Insight: Hints are a temporary bridge, not a replacement for repair!
# Check hint statisticsnodetool statushandoff# View hints storednodetool viewbuildstatuses# Force hint replay for a specific nodenodetool handoff <node_ip># Disable hinted handoff (dangerous!)nodetool disablehandoff# Enable hinted handoffnodetool enablehandoff
Happens automatically when you read with a consistency level that queries multiple replicas.Example: Read with CL=QUORUM (RF=3)
Copy
Client: SELECT * FROM users WHERE id = 42 WITH CL=QUORUM↓Coordinator queries replicas A, B, C: • A: {id: 42, name: 'Alice', email: 'alice@example.com', updated: t=100} • B: {id: 42, name: 'Alice', email: 'alice@example.com', updated: t=100} • C: {id: 42, name: 'Bob', email: 'bob@example.com', updated: t=50}↓Coordinator compares: • A and B agree (both at t=100) • C is outdated (t=50 < t=100)↓Coordinator sends repair mutation to C: UPDATE users SET name='Alice', email='alice@example.com' WHERE id=42↓Respond to client with newest data: 'Alice'
Not every read triggers repair! There’s a configurable probability:
Copy
-- Set read repair chance (0.0 to 1.0)ALTER TABLE users WITH read_repair_chance = 0.1; -- 10% of reads-- Set datacenter-local read repair chanceALTER TABLE users WITH dclocal_read_repair_chance = 0.1;
Why probabilistic?
Performance: Full comparison on every read is expensive
Traffic: Increases network and CPU overhead
Tunable: Higher for critical data, lower for less important data
Default Values (Cassandra 3.0+):
read_repair_chance = 0.0 (disabled)
dclocal_read_repair_chance = 0.1 (10%)
Modern Best Practice: Rely on repair, not read repair, for consistency. Use read_repair_chance = 0.0.
Purpose: Repair data that wasn’t queried in the read request.Example:
Copy
-- Table has 10 columnsCREATE TABLE users ( id int PRIMARY KEY, name text, email text, phone text, address text, -- ... 6 more columns);-- Client queries only 2 columnsSELECT name, email FROM users WHERE id = 42;
Blocking read repair only compares and repairs name and email columns.Background read repair compares and repairs all columns, running asynchronously:
Copy
Client query returns immediately with (name, email)↓Background thread compares full rows across replicas: • A: {id: 42, name: 'Alice', email: 'alice@...', phone: '555-1234', ...} • B: {id: 42, name: 'Alice', email: 'alice@...', phone: '555-1234', ...} • C: {id: 42, name: 'Alice', email: 'alice@...', phone: '555-5678', ...} ← Different!↓Repair mutation sent to C to fix phone column
Configuration:
Copy
# cassandra.yaml# Background read repair runs asynchronously# No user-facing config - always enabled for columns not in SELECT
1. Choose a token range to repair2. Build Merkle trees on each replica for that range3. Compare Merkle trees to find differences4. Stream missing/outdated data between replicas5. Repeat for all token ranges
$ nodetool repair my_keyspace[2023-11-15 10:00:00,123] Starting repair command #1[2023-11-15 10:00:00,456] Repair session 12345678 for range (-3074457345618258603,-3074457345618258602][2023-11-15 10:00:01,789] Building Merkle trees for users[2023-11-15 10:00:15,234] Merkle tree for users is ready[2023-11-15 10:00:15,567] Differences detected for users[2023-11-15 10:00:15,890] Streaming data to /10.0.1.11[2023-11-15 10:00:45,123] Streaming completed for users[2023-11-15 10:00:45,456] Repair session 12345678 completed[2023-11-15 10:00:45,789] Repair command #1 finished in 45.67 seconds
Best Practice: Run repair within gc_grace_seconds (default: 10 days)Why? Cassandra uses tombstones to mark deleted data. If a replica is down longer than gc_grace_seconds, tombstones can be garbage collected, and deleted data might “resurrect.”Timeline:
Copy
Day 0: DELETE FROM users WHERE id = 42 → Tombstone written to A and B → Node C is down (doesn't get tombstone)Day 5: Garbage collection runs → Tombstone still present (< 10 days old)Day 8: Repair runs → Tombstone streamed to Node C → Data properly deleted everywhereDay 12: Garbage collection runs → Tombstone removed (> 10 days old)
Bad Timeline:
Copy
Day 0: DELETE FROM users WHERE id = 42 → Tombstone written to A and B → Node C is downDay 12: Garbage collection runs → Tombstone removed (> 10 days old)Day 14: Node C comes back → Still has old data for id=42Day 15: Repair runs → Compares A (no data) vs C (old data) → Old data wins! (has timestamp, no tombstone to override) → DELETED DATA RESURRECTED! ⚠️
Replicas are spread across different racks within a DC
Each DC gets the specified number of replicas
Example: Token range [0, 100) with RF={us-east:3, eu-west:2}
Copy
Token range [0, 100) owned by Node A (us-east, rack1)Replica placement:1. Node A (us-east, rack1) -- Primary2. Node C (us-east, rack2) -- Different rack3. Node B (us-east, rack1) -- Back to rack14. Node E (eu-west, rack1) -- EU primary5. Node G (eu-west, rack2) -- Different rack
Visual:
Copy
us-east: rack1: [A*, B] rack2: [C*]eu-west: rack1: [E*] rack2: [G*]* = Replica for token range [0, 100)
-- Write from us-east with LOCAL_QUORUMINSERT INTO users (id, name) VALUES (1, 'Alice')WITH CONSISTENCY LOCAL_QUORUM;Required acknowledgments:✓ 2 of 3 replicas in us-east (quorum)✗ 0 of 3 replicas in eu-west (not required)Result: Write succeeds if 2 us-east nodes respond
Copy
-- Write with EACH_QUORUMINSERT INTO users (id, name) VALUES (1, 'Alice')WITH CONSISTENCY EACH_QUORUM;Required acknowledgments:✓ 2 of 3 replicas in us-east (quorum)✓ 2 of 3 replicas in eu-west (quorum)Result: Write succeeds only if 2 nodes in BOTH DCs respond
# Repair only local DCnodetool repair -dc us-east# Repair only specific datacenter from any nodenodetool repair -dc eu-west# Repair across all DCs (expensive!)nodetool repair -full
Best Practice: Run per-DC repairs regularly, cross-DC repairs less frequently.
New Node Joins↓Gossip with seeds↓Receive cluster topology↓Calculate token ownership↓Stream data from existing nodes↓Mark status as NORMAL↓Start serving traffic
Timeline:
Copy
t=0s: Start Cassandra [2023-11-15 10:00:00] Joining clustert=5s: Gossip establishes [2023-11-15 10:00:05] Cluster state receivedt=10s: Token calculation [2023-11-15 10:00:10] Calculating tokens (num_tokens=256)t=15s: Streaming begins [2023-11-15 10:00:15] Streaming from /10.0.1.10 [2023-11-15 10:00:15] Streaming from /10.0.1.11 [2023-11-15 10:00:15] Streaming from /10.0.2.10t=3600s (1 hour): Streaming completes (varies by data size) [2023-11-15 11:00:00] Streaming completedt=3605s: Node ready [2023-11-15 11:00:05] Node state: NORMAL [2023-11-15 11:00:05] Accepting client connections
# 1. Stop Node Cnodetool drainsystemctl stop cassandra# 2. Write data with CL=ONE (will succeed via hints)cqlsh -e "INSERT INTO test.users (id, name) VALUES (999, 'Hinted') USING CONSISTENCY ONE;"# 3. Check hints on coordinatorls -lh /var/lib/cassandra/hints/# 4. Restart Node Csystemctl start cassandra# 5. Wait for hint replay (10 min by default)# Or force: nodetool handoff <nodeC_ip># 6. Verify data on Node Ccqlsh <nodeC_ip> -e "SELECT * FROM test.users WHERE id = 999;"
CREATE TABLE test.users ( id int PRIMARY KEY, name text, email text) WITH read_repair_chance = 1.0; -- Always repair
Create Inconsistency:
Copy
# Write to Node A onlycqlsh <nodeA_ip> -e "INSERT INTO test.users (id, name, email)VALUES (500, 'Alice', 'alice@example.com') USING CONSISTENCY ONE;"# Write different value to Node B onlycqlsh <nodeB_ip> -e "INSERT INTO test.users (id, name, email)VALUES (500, 'Bob', 'bob@example.com') USING CONSISTENCY ONE;"
Trigger Read Repair:
Copy
# Read with QUORUM (triggers repair)cqlsh -e "SELECT * FROM test.users WHERE id = 500 USING CONSISTENCY QUORUM;"# Check both nodes now agreecqlsh <nodeA_ip> -e "SELECT * FROM test.users WHERE id = 500;"cqlsh <nodeB_ip> -e "SELECT * FROM test.users WHERE id = 500;"
Expected: Both nodes now have the same data (newest timestamp wins)
-- Write from DC1INSERT INTO multidc.test (id, val) VALUES (1, 'DC1 write')USING CONSISTENCY LOCAL_QUORUM;-- Immediately read from DC2SELECT * FROM multidc.test WHERE id = 1USING CONSISTENCY LOCAL_QUORUM;
Question: Is the data visible in DC2 immediately? Why or why not?Test EACH_QUORUM:
# Check hint statisticsnodetool statushandoff# Check hint directory sizedu -sh /var/lib/cassandra/hints/# Find which nodes have hintsls -lh /var/lib/cassandra/hints/
Solutions:
Copy
# 1. Force immediate hint deliverynodetool handoff <target_node_ip># 2. If node is permanently gone, remove hintsnodetool removenode <host_id># 3. Reduce max hint window# cassandra.yaml:max_hint_window_in_ms: 3600000 # 1 hour instead of 3