Skip to main content

Cluster Operations & Multi-DC Replication

Module Duration: 8-10 hours Learning Style: Deep Technical + Hands-On Operations + Production Scenarios Outcome: Understand how Cassandra maintains cluster health, detects failures, and operates across datacenters

Introduction: The Challenge of Distributed Coordination

In a distributed database like Cassandra with potentially hundreds of nodes spread across multiple datacenters, how do nodes:
  • Discover each other and know who’s in the cluster?
  • Detect when a node fails or becomes unreachable?
  • Ensure data remains consistent across replicas?
  • Handle network partitions gracefully?
  • Replicate data across geographic regions?
The Cassandra Answer: A combination of gossip protocol (inspired by epidemiology), Phi Accrual Failure Detection, hinted handoff, read repair, anti-entropy repair, and sophisticated multi-DC replication strategies. This module explores each of these mechanisms in depth.

Part 1: The Gossip Protocol

What is Gossip?

Gossip is a peer-to-peer communication protocol inspired by how rumors spread in social networks or diseases spread through populations. In Cassandra, nodes exchange information about themselves and other nodes they know about. Key Characteristics:
  • Decentralized: No master node orchestrating communication
  • Eventually Consistent: Information propagates gradually
  • Fault Tolerant: Works even when some nodes are down
  • Scalable: Overhead doesn’t increase linearly with cluster size

The Gossip Mechanism

Every 1 second, each node:
  1. Increments its heartbeat counter (local version number)
  2. Selects 1-3 random nodes to gossip with
  3. Sends a GossipDigestSyn message containing:
    • Endpoint states for all known nodes
    • Generation numbers (when node was started)
    • Heartbeat versions (how recently we heard from each node)
  4. Receives GossipDigestAck with updates
  5. Applies newer information to its local state

Gossip Data Structures

// Core gossip state for a single endpoint
class EndpointState {
    // When this node was started (prevents confusion with restarts)
    int generation;

    // Version number incremented on every gossip round
    int heartbeatVersion;

    // Application states (node metadata)
    Map<ApplicationState, VersionedValue> applicationStates;
}

enum ApplicationState {
    STATUS,           // NORMAL, LEAVING, JOINING, REMOVING
    DC,              // Datacenter name
    RACK,            // Rack name
    SCHEMA,          // Schema version UUID
    LOAD,            // Disk usage
    SEVERITY,        // Node health (0.0-1.0)
    TOKENS,          // Token ranges owned
    INTERNAL_IP,     // Internal network address
    RPC_ADDRESS,     // Client-facing address
    RELEASE_VERSION  // Cassandra version
}

class VersionedValue {
    String value;
    int version;  // Incremented when value changes
}

Gossip Round Example

Let’s trace a gossip round between three nodes: Initial State:
Node A knows:
  A: gen=1, heartbeat=100, STATUS=NORMAL
  B: gen=1, heartbeat=95, STATUS=NORMAL
  C: gen=1, heartbeat=90, STATUS=NORMAL

Node B knows:
  A: gen=1, heartbeat=98, STATUS=NORMAL
  B: gen=1, heartbeat=100, STATUS=NORMAL
  C: gen=1, heartbeat=92, STATUS=NORMAL

Node C knows:
  A: gen=1, heartbeat=97, STATUS=NORMAL
  B: gen=1, heartbeat=93, STATUS=NORMAL
  C: gen=1, heartbeat=100, STATUS=NORMAL
Step 1: Node A increments its heartbeat and picks B to gossip with
A → B: GossipDigestSyn
{
  A: gen=1, maxVersion=101,  // A's own heartbeat
  B: gen=1, maxVersion=95,   // What A knows about B
  C: gen=1, maxVersion=90    // What A knows about C
}
Step 2: B compares with its state and responds
B → A: GossipDigestAck
{
  // B has newer info about itself
  B: gen=1, heartbeat=100, STATUS=NORMAL

  // B has newer info about C
  C: gen=1, heartbeat=92, STATUS=NORMAL
}
Step 3: A updates its state
Node A now knows:
  A: gen=1, heartbeat=101, STATUS=NORMAL
  B: gen=1, heartbeat=100, STATUS=NORMAL  // UPDATED
  C: gen=1, heartbeat=92, STATUS=NORMAL   // UPDATED
Step 4: A sends GossipDigestAck2 with its newer info
A → B: GossipDigestAck2
{
  A: gen=1, heartbeat=101, STATUS=NORMAL
}
Step 5: B updates its state
Node B now knows:
  A: gen=1, heartbeat=101, STATUS=NORMAL  // UPDATED
  B: gen=1, heartbeat=100, STATUS=NORMAL
  C: gen=1, heartbeat=92, STATUS=NORMAL
Result: After just one round, nodes A and B have converged on the latest information about A and B, and A learned newer info about C.

Gossip Propagation Speed

How quickly does information spread through gossip? Mathematical Model:
  • Cluster size: N nodes
  • Gossip fanout: f nodes per round (typically 3)
  • Gossip interval: 1 second
The number of nodes that know about a new piece of information grows exponentially:
Round 0: 1 node knows
Round 1: 1 + f nodes know (≈4)
Round 2: (1+f) × f nodes know (≈16)
Round 3: ((1+f) × f) × f nodes know (≈64)
General Formula: After r rounds, approximately min(f^r, N) nodes know the information. Example: In a 1000-node cluster with fanout=3:
  • Round 1: ~4 nodes know
  • Round 2: ~16 nodes know
  • Round 3: ~64 nodes know
  • Round 4: ~256 nodes know
  • Round 5: ~1000 nodes know (full propagation)
So information propagates to all 1000 nodes in about 5 seconds (log₃(1000) rounds).

Seed Nodes

Problem: When a new node joins, how does it know who to gossip with initially? Solution: Seed nodes - well-known nodes that new nodes contact first.
# cassandra.yaml
seed_provider:
  - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
      - seeds: "10.0.1.10,10.0.1.11,10.0.1.12"
Important Characteristics:
  • Seeds are not special after bootstrap - just initial contact points
  • Every datacenter should have at least one seed
  • A node should not list itself as a seed
  • Seeds don’t form a special “seed cluster” - they’re regular nodes
  • Seeds just need to be reliable and long-lived
Common Misconception: Seeds are masters or leaders. False! They’re just stable contact points for gossip initialization.

Monitoring Gossip

You can observe gossip in action:
# View current gossip state
nodetool gossipinfo

# Example output:
/10.0.1.11
  generation:1638360000
  heartbeat:123456
  STATUS:NORMAL,-9223372036854775808
  DC:datacenter1
  RACK:rack1
  SCHEMA:e84b6a60-1f3e-11ec-9621-0242ac130002
  LOAD:2.5e+11
  TOKENS:<token1>,<token2>,...

/10.0.1.12
  generation:1638359000
  heartbeat:123400
  STATUS:NORMAL,0
  DC:datacenter1
  RACK:rack1
  # ... more states
# Check if gossip is running
nodetool statusgossip

# Disable gossip (dangerous!)
nodetool disablegossip

# Re-enable gossip
nodetool enablegossip

Part 2: Failure Detection

The Challenge

How do you determine if a node is down vs. just slow or experiencing network delays? Naive Approach: Use a fixed timeout (e.g., “no response in 5 seconds = dead”) Problems:
  • Too short: false positives during network hiccups
  • Too long: slow to detect real failures
  • Network latency varies over time
  • Different nodes have different performance characteristics

Phi Accrual Failure Detector

Cassandra uses the Phi (Φ) Accrual Failure Detector, which provides a suspicion level rather than a binary up/down decision. Key Idea: Instead of “is this node down?”, ask “how confident am I that this node is down?” The Algorithm:
  1. Track Arrival Times: Record when gossip heartbeats arrive from each node
  2. Build a Statistical Model: Use a sliding window (default: last 1000 heartbeats) to model the expected inter-arrival time
  3. Calculate Phi (Φ): When a heartbeat is late, calculate how suspicious this is:
Φ(t) = -log₁₀(P(T > t))
Where:
  • t = time since last heartbeat
  • P(T > t) = probability that inter-arrival time exceeds t
  • Φ = suspicion level
  1. Make Decision: If Φ exceeds threshold (default: 8), mark node as down

Understanding Phi Values

Phi (Φ)Probability of MistakeMeaning
110%Slightly suspicious
21%Moderately suspicious
30.1%Very suspicious
50.001%Almost certainly down
80.00001%Definitely down (default threshold)
100.0000001%Absolutely dead
Phi = 8 means: “There’s a 0.00001% chance I’m wrong about this node being down”

Phi Calculation Example

Let’s trace a failure detection scenario: Setup:
  • Node A monitoring Node B
  • Historical heartbeat intervals: 1.0s, 1.1s, 0.9s, 1.2s, 1.0s (mean ≈ 1.04s, σ ≈ 0.11s)
Timeline:
t=0s: Heartbeat received
  Phi = 0 (just heard from node)

t=1s: No heartbeat yet
  Expected: ~1.04s
  Actual: 1.0s (slightly early)
  Phi ≈ 0.5 (not suspicious)

t=2s: Still no heartbeat
  Expected: ~1.04s
  Actual: 2.0s (very late)
  Phi ≈ 3.2 (getting suspicious)

t=3s: Still no heartbeat
  Actual: 3.0s (extremely late)
  Phi ≈ 6.8 (very suspicious)

t=4s: Still no heartbeat
  Actual: 4.0s
  Phi ≈ 9.2 (exceeds threshold of 8)

ACTION: Mark Node B as DOWN

Implementation Details

class FailureDetector {
    // Sliding window of arrival intervals (milliseconds)
    private final BoundedStatsDeque arrivalIntervals;

    // Default phi threshold
    private static final double PHI_THRESHOLD = 8.0;

    // Calculate current phi value for an endpoint
    public double phi(InetAddress endpoint) {
        long now = System.currentTimeMillis();
        long lastHeartbeat = getLastHeartbeatTime(endpoint);
        long timeSinceHeartbeat = now - lastHeartbeat;

        // Get statistics from arrival interval history
        double mean = arrivalIntervals.mean();
        double stddev = arrivalIntervals.stddev();

        // Calculate probability using normal distribution
        double prob = 1.0 - probabilityOfInterval(timeSinceHeartbeat, mean, stddev);

        // Phi = -log10(probability)
        return -Math.log10(Math.max(prob, 1e-10));
    }

    public boolean isAlive(InetAddress endpoint) {
        return phi(endpoint) < PHI_THRESHOLD;
    }
}

Configuring Failure Detection

# cassandra.yaml

# Phi threshold (default: 8)
# Lower = more sensitive (faster detection, more false positives)
# Higher = less sensitive (slower detection, fewer false positives)
phi_convict_threshold: 8

# How long to wait before removing a dead node from gossip
failure_detector_timeout_in_ms: 60000  # 60 seconds
Tuning Advice:
  • High-latency networks (cross-region): Increase to 10-12
  • Low-latency networks (single DC): Can decrease to 6-7
  • Flaky networks: Increase to avoid false positives
  • Mission-critical availability: Decrease for faster failover

Observing Failure Detection

# Check failure detector thresholds
nodetool failuredetector

# Example output:
Endpoint          Phi
/10.0.1.11        0.23
/10.0.1.12        0.18
/10.0.1.13        12.45  # DOWN!
/10.0.1.14        0.31

Part 3: Hinted Handoff (Detailed)

We introduced hinted handoff in the write path module. Let’s dive deeper into its implementation and edge cases.

The Detailed Write Flow with Hints

Client writes with CL=QUORUM (RF=3)

Coordinator determines replicas: [A, B, C]

Coordinator sends write to A, B, C

Results:
  • Node A: Success (50ms)
  • Node B: Success (45ms)
  • Node C: Timeout (down)

QUORUM achieved (2/3 responses)

Coordinator stores HINT for Node C

Respond SUCCESS to client

Hint Storage Format

Hints are stored locally on the coordinator node in a special system table:
CREATE TABLE system.hints (
    target_id uuid,           -- UUID of down node
    hint_id timeuuid,         -- When hint was created
    message_version int,
    mutation blob,            -- The actual write mutation
    PRIMARY KEY ((target_id), hint_id)
)
Example Hint:
target_id: 550e8400-e29b-41d4-a716-446655440000  (Node C's UUID)
hint_id: d2177dd0-eaa2-11de-a572-001b779c76e3    (2023-11-15 10:23:45)
mutation: [binary data representing: INSERT INTO users (id, name) VALUES (42, 'Alice')]

Hint Replay

The coordinator continuously tries to replay hints:
class HintedHandoffManager {
    // Check for hints every 10 minutes (default)
    private static final int HINT_SCAN_INTERVAL_MS = 600_000;

    public void scheduleHintDelivery() {
        while (true) {
            Thread.sleep(HINT_SCAN_INTERVAL_MS);

            for (UUID targetNode : getAllTargetNodes()) {
                if (isNodeAlive(targetNode)) {
                    deliverHints(targetNode);
                }
            }
        }
    }

    private void deliverHints(UUID targetNode) {
        // Read hints for this target from local storage
        List<Hint> hints = readHintsFor(targetNode);

        // Replay them in order
        for (Hint hint : hints) {
            try {
                sendMutationToNode(targetNode, hint.mutation);
                deleteHint(hint);  // Success! Remove hint
            } catch (Exception e) {
                // Node went down again, try later
                break;
            }
        }
    }
}

Hint Lifecycle Timeline

t=0s: Node C goes down
  - Coordinator starts storing hints for C

t=10s: Hint #1 stored (INSERT INTO users...)
t=15s: Hint #2 stored (UPDATE users...)
t=20s: Hint #3 stored (DELETE FROM users...)

t=300s (5 min): Node C comes back online
  - Gossip detects C is up
  - C marked as ALIVE in failure detector

t=600s (10 min): Next hint delivery scan
  - Coordinator sees C is alive
  - Begins replaying hints #1, #2, #3
  - Each mutation sent to C
  - Hints deleted after successful delivery

t=610s: All hints delivered, C is fully caught up

Hint Configuration

# cassandra.yaml

# How long to store hints (default: 3 hours)
max_hint_window_in_ms: 10800000

# Hint storage directory
hints_directory: /var/lib/cassandra/hints

# Max size of hints on disk (per node)
max_hints_file_size_in_mb: 128

# Max total size of all hints
max_hints_size_per_host_in_mb: 1024

# Throttle hint delivery speed (MB/s per delivery thread)
hints_flush_period_in_ms: 10000

When Hints Are NOT Enough

Hints are best-effort only. They fail in these scenarios: Scenario 1: Max Hint Window Exceeded
Node C is down for 4 hours (> 3 hour default max_hint_window)
→ Hints are dropped after 3 hours
→ Node C will have missing data
→ Must run repair to recover
Scenario 2: Coordinator Crashes
Coordinator stores hints locally, then crashes
→ Hints are lost (not replicated)
→ Node C will have missing data
→ Must run repair to recover
Scenario 3: Hint Storage Full
Node C is down for extended period
→ Hints fill up disk space (hit max_hints_size_per_host_in_mb)
→ Newer hints are dropped
→ Node C will have missing data
→ Must run repair to recover
Key Insight: Hints are a temporary bridge, not a replacement for repair!

Monitoring Hints

# Check hint statistics
nodetool statushandoff

# View hints stored
nodetool viewbuildstatuses

# Force hint replay for a specific node
nodetool handoff <node_ip>

# Disable hinted handoff (dangerous!)
nodetool disablehandoff

# Enable hinted handoff
nodetool enablehandoff

Part 4: Read Repair

The Problem

Even with hinted handoff, replicas can diverge:
  • Hints were dropped (too old)
  • Coordinator crashed before replaying hints
  • Network partitions prevented writes from reaching some replicas
Read repair fixes inconsistencies by comparing replicas during reads.

Read Repair Types

Cassandra has two types of read repair:
  1. Blocking Read Repair (Foreground)
  2. Background Read Repair

1. Blocking Read Repair (Foreground)

Happens automatically when you read with a consistency level that queries multiple replicas. Example: Read with CL=QUORUM (RF=3)
Client: SELECT * FROM users WHERE id = 42 WITH CL=QUORUM

Coordinator queries replicas A, B, C:
  • A: {id: 42, name: 'Alice', email: 'alice@example.com', updated: t=100}
  • B: {id: 42, name: 'Alice', email: 'alice@example.com', updated: t=100}
  • C: {id: 42, name: 'Bob',   email: 'bob@example.com',     updated: t=50}

Coordinator compares:
  • A and B agree (both at t=100)
  • C is outdated (t=50 < t=100)

Coordinator sends repair mutation to C:
  UPDATE users SET name='Alice', email='alice@example.com' WHERE id=42

Respond to client with newest data: 'Alice'
Key Points:
  • Repair happens synchronously (blocks the read)
  • Client gets the newest data
  • Divergent replicas are fixed immediately
  • Only repairs data actually requested in the query

Read Repair Probability

Not every read triggers repair! There’s a configurable probability:
-- Set read repair chance (0.0 to 1.0)
ALTER TABLE users WITH read_repair_chance = 0.1;  -- 10% of reads

-- Set datacenter-local read repair chance
ALTER TABLE users WITH dclocal_read_repair_chance = 0.1;
Why probabilistic?
  • Performance: Full comparison on every read is expensive
  • Traffic: Increases network and CPU overhead
  • Tunable: Higher for critical data, lower for less important data
Default Values (Cassandra 3.0+):
  • read_repair_chance = 0.0 (disabled)
  • dclocal_read_repair_chance = 0.1 (10%)
Modern Best Practice: Rely on repair, not read repair, for consistency. Use read_repair_chance = 0.0.

Read Repair Algorithm

class ReadRepairHandler {
    public Row handleRead(ReadCommand cmd, ConsistencyLevel cl) {
        // 1. Determine which replicas to query
        List<Replica> replicas = getReplicas(cmd.key);

        // 2. Send read to required replicas
        List<Row> responses = queryReplicas(replicas, cl);

        // 3. Find the newest version (highest timestamp)
        Row newestRow = findNewest(responses);

        // 4. Check if read repair triggered (probabilistic)
        if (shouldDoReadRepair()) {
            // 5. Compare all responses to newest
            for (int i = 0; i < responses.size(); i++) {
                if (!responses.get(i).equals(newestRow)) {
                    // 6. Send repair mutation to outdated replica
                    Mutation repair = computeDiff(newestRow, responses.get(i));
                    sendRepair(replicas.get(i), repair);
                }
            }
        }

        // 7. Return newest data to client
        return newestRow;
    }
}

2. Background Read Repair

Purpose: Repair data that wasn’t queried in the read request. Example:
-- Table has 10 columns
CREATE TABLE users (
    id int PRIMARY KEY,
    name text,
    email text,
    phone text,
    address text,
    -- ... 6 more columns
);

-- Client queries only 2 columns
SELECT name, email FROM users WHERE id = 42;
Blocking read repair only compares and repairs name and email columns. Background read repair compares and repairs all columns, running asynchronously:
Client query returns immediately with (name, email)

Background thread compares full rows across replicas:
  • A: {id: 42, name: 'Alice', email: 'alice@...', phone: '555-1234', ...}
  • B: {id: 42, name: 'Alice', email: 'alice@...', phone: '555-1234', ...}
  • C: {id: 42, name: 'Alice', email: 'alice@...', phone: '555-5678', ...}  ← Different!

Repair mutation sent to C to fix phone column
Configuration:
# cassandra.yaml
# Background read repair runs asynchronously
# No user-facing config - always enabled for columns not in SELECT

Read Repair Monitoring

# Check read repair statistics
nodetool tpstats

# Look for "ReadRepairStage" section:
# Pool Name                    Active   Pending      Completed   Blocked
# ReadRepairStage                   0         0         123456         0

# View table-specific read repair settings
nodetool tablestats keyspace.table

Part 5: Anti-Entropy Repair (The Big One)

Why We Need Repair

Neither hinted handoff nor read repair is sufficient:
  • Hints: Expire after max_hint_window, lost if coordinator crashes
  • Read repair: Only fixes data that’s queried
Problem: Unread data can drift indefinitely! Solution: Anti-entropy repair - actively compare all data across replicas.

The Repair Process Overview

1. Choose a token range to repair
2. Build Merkle trees on each replica for that range
3. Compare Merkle trees to find differences
4. Stream missing/outdated data between replicas
5. Repeat for all token ranges

Merkle Trees Explained

A Merkle tree (hash tree) allows efficient comparison of large datasets. Structure:
                    Root Hash
                   /          \
            Hash(AB)           Hash(CD)
            /     \            /      \
        Hash(A) Hash(B)   Hash(C)  Hash(D)
         /        |         |         \
     [Rows]   [Rows]    [Rows]     [Rows]
     1-1000  1001-2000 2001-3000  3001-4000
How It Works:
  1. Divide token range into segments (default: 2^15 = 32,768 segments)
  2. Hash each segment’s data (all rows in that token range)
  3. Build tree by hashing pairs of hashes upward
  4. Compare trees from root down
Example Comparison:
Node A Merkle Tree:
                Root: 0xABCD1234
               /              \
        0x1234ABCD           0x5678EFGH
        /        \            /        \
    0xAAAA    0xBBBB     0xCCCC    0xDDDD

Node B Merkle Tree:
                Root: 0xABCD1234
               /              \
        0x1234ABCD           0x9999ZZZZ  ← DIFFERENT!
        /        \            /        \
    0xAAAA    0xBBBB     0xXXXX    0xYYYY
Analysis:
  • Root hashes differ → trees are different
  • Left subtree hashes match → left half is identical (skip!)
  • Right subtree hashes differ → need to check further
  • Leaf hashes 0xCCCC vs 0xXXXX differ → stream segment 3
  • Leaf hashes 0xDDDD vs 0xYYYY differ → stream segment 4
Efficiency: Instead of comparing millions of rows, we compare ~30K hashes!

Merkle Tree Building

class MerkleTreeBuilder {
    private static final int TREE_DEPTH = 15;  // 2^15 = 32768 leaves

    public MerkleTree buildTree(ColumnFamilyStore cfs, Range<Token> range) {
        MerkleTree tree = new MerkleTree(TREE_DEPTH);

        // Read all data in the token range
        Iterator<Row> rows = cfs.rangeIterator(range);

        while (rows.hasNext()) {
            Row row = rows.next();
            Token token = getToken(row.key);

            // Find which leaf this row belongs to
            Leaf leaf = tree.getLeafForToken(token);

            // Update leaf hash with this row's hash
            leaf.updateHash(hash(row));
        }

        // Compute internal node hashes bottom-up
        tree.seal();

        return tree;
    }

    private byte[] hash(Row row) {
        MessageDigest md = MessageDigest.getInstance("MD5");
        // Hash all columns and their timestamps
        for (Cell cell : row.cells()) {
            md.update(cell.name().bytes);
            md.update(cell.value().bytes);
            md.update(longToBytes(cell.timestamp()));
        }
        return md.digest();
    }
}

Running Repair

Full Repair (all data):
nodetool repair
Repair Specific Keyspace:
nodetool repair my_keyspace
Repair Specific Table:
nodetool repair my_keyspace my_table
Incremental Repair (only unrepaired data):
nodetool repair -inc
Repair Specific Token Range:
nodetool repair -st <start_token> -et <end_token>

Repair Output Example

$ nodetool repair my_keyspace

[2023-11-15 10:00:00,123] Starting repair command #1
[2023-11-15 10:00:00,456] Repair session 12345678 for range (-3074457345618258603,-3074457345618258602]
[2023-11-15 10:00:01,789] Building Merkle trees for users
[2023-11-15 10:00:15,234] Merkle tree for users is ready
[2023-11-15 10:00:15,567] Differences detected for users
[2023-11-15 10:00:15,890] Streaming data to /10.0.1.11
[2023-11-15 10:00:45,123] Streaming completed for users
[2023-11-15 10:00:45,456] Repair session 12345678 completed

[2023-11-15 10:00:45,789] Repair command #1 finished in 45.67 seconds

Full vs. Incremental Repair

Full Repair:
  • Compares all data every time
  • Expensive for large datasets
  • Use when: major inconsistency suspected
Incremental Repair (Cassandra 2.2+):
  • Marks SSTables as “repaired” after successful repair
  • Only repairs unrepaired SSTables on subsequent runs
  • Much faster for ongoing maintenance
  • Requires special compaction strategy
Enable Incremental Repair:
ALTER TABLE users WITH compaction = {
    'class': 'LeveledCompactionStrategy',
    'sstable_size_in_mb': 160
};
# First incremental repair - repairs all data
nodetool repair -inc

# Subsequent repairs - only new data
nodetool repair -inc  # Much faster!

Repair Scheduling

Best Practice: Run repair within gc_grace_seconds (default: 10 days) Why? Cassandra uses tombstones to mark deleted data. If a replica is down longer than gc_grace_seconds, tombstones can be garbage collected, and deleted data might “resurrect.” Timeline:
Day 0: DELETE FROM users WHERE id = 42
  → Tombstone written to A and B
  → Node C is down (doesn't get tombstone)

Day 5: Garbage collection runs
  → Tombstone still present (< 10 days old)

Day 8: Repair runs
  → Tombstone streamed to Node C
  → Data properly deleted everywhere

Day 12: Garbage collection runs
  → Tombstone removed (> 10 days old)
Bad Timeline:
Day 0: DELETE FROM users WHERE id = 42
  → Tombstone written to A and B
  → Node C is down

Day 12: Garbage collection runs
  → Tombstone removed (> 10 days old)

Day 14: Node C comes back
  → Still has old data for id=42

Day 15: Repair runs
  → Compares A (no data) vs C (old data)
  → Old data wins! (has timestamp, no tombstone to override)
  → DELETED DATA RESURRECTED! ⚠️
Automated Scheduling: Use cassandra-reaper (open source tool):
# Install Reaper
docker run -p 8080:8080 thelastpickle/cassandra-reaper

# Schedule automatic repairs
reaper-cli schedule add \
  --cluster my_cluster \
  --keyspace my_keyspace \
  --interval 7 \
  --segments 64
Or simple cron:
# Run repair weekly
0 2 * * 0 nodetool repair -pr my_keyspace

Repair Performance Impact

Repair is resource-intensive:
ResourceImpactReason
CPUHighBuilding Merkle trees, hashing data
Disk I/OVery HighReading all SSTables
NetworkHighStreaming differing data
MemoryMediumStoring Merkle trees
Mitigation Strategies:
  1. Repair During Off-Peak Hours
# Run at 2 AM
0 2 * * * nodetool repair
  1. Limit Repair Parallelism
# Repair one token range at a time
nodetool repair -seq
  1. Throttle Streaming
# Limit streaming bandwidth to 16 MB/s
nodetool setstreamthroughput 16
  1. Use Sub-Range Repair
# Repair in smaller chunks
for range in $(calculate_ranges); do
  nodetool repair -st $start -et $end
  sleep 300  # 5-minute break between ranges
done

Part 6: Multi-Datacenter Replication

Why Multiple Datacenters?

  1. Geographic Distribution: Serve users with low latency worldwide
  2. Disaster Recovery: Survive datacenter failures
  3. Compliance: Keep data in specific regions (GDPR, etc.)
  4. Read Scalability: Read from local DC, reduce cross-DC traffic

Network Topology

Cassandra models datacenters and racks:
Cluster
├── Datacenter: us-east
│   ├── Rack: rack1
│   │   ├── Node A (10.0.1.10)
│   │   └── Node B (10.0.1.11)
│   └── Rack: rack2
│       ├── Node C (10.0.2.10)
│       └── Node D (10.0.2.11)
└── Datacenter: eu-west
    ├── Rack: rack1
    │   ├── Node E (10.1.1.10)
    │   └── Node F (10.1.1.11)
    └── Rack: rack2
        ├── Node G (10.1.2.10)
        └── Node H (10.1.2.11)

Configuring Datacenters

1. Define Topology (cassandra-rackdc.properties):
# Node A configuration
dc=us-east
rack=rack1

# Node E configuration
dc=eu-west
rack=rack1
2. Use NetworkTopologyStrategy:
CREATE KEYSPACE my_keyspace WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'us-east': 3,  -- 3 replicas in us-east
  'eu-west': 2   -- 2 replicas in eu-west
};

Multi-DC Replica Placement

With NetworkTopologyStrategy, Cassandra ensures:
  • Replicas are spread across different racks within a DC
  • Each DC gets the specified number of replicas
Example: Token range [0, 100) with RF={us-east:3, eu-west:2}
Token range [0, 100) owned by Node A (us-east, rack1)

Replica placement:
1. Node A (us-east, rack1)  -- Primary
2. Node C (us-east, rack2)  -- Different rack
3. Node B (us-east, rack1)  -- Back to rack1
4. Node E (eu-west, rack1)  -- EU primary
5. Node G (eu-west, rack2)  -- Different rack
Visual:
us-east:
  rack1: [A*, B]
  rack2: [C*]

eu-west:
  rack1: [E*]
  rack2: [G*]

* = Replica for token range [0, 100)

Multi-DC Consistency Levels

New consistency levels for multi-DC:
Consistency LevelMeaning
LOCAL_ONEAt least 1 replica in local DC
LOCAL_QUORUMQuorum of replicas in local DC
EACH_QUORUMQuorum in every DC
Example: Keyspace with RF={us-east:3, eu-west:3}
-- Write from us-east with LOCAL_QUORUM
INSERT INTO users (id, name) VALUES (1, 'Alice')
WITH CONSISTENCY LOCAL_QUORUM;

Required acknowledgments:
2 of 3 replicas in us-east (quorum)
0 of 3 replicas in eu-west (not required)

Result: Write succeeds if 2 us-east nodes respond
-- Write with EACH_QUORUM
INSERT INTO users (id, name) VALUES (1, 'Alice')
WITH CONSISTENCY EACH_QUORUM;

Required acknowledgments:
2 of 3 replicas in us-east (quorum)
2 of 3 replicas in eu-west (quorum)

Result: Write succeeds only if 2 nodes in BOTH DCs respond

Multi-DC Write Flow

Scenario: Client in us-east writes with CL=LOCAL_QUORUM, RF={us-east:3, eu-west:2}
Client (us-east) → Coordinator (us-east Node A)

Coordinator determines replicas:
  us-east: [A, B, C]
  eu-west: [E, F]

Coordinator sends writes:
  • A, B, C: Synchronously (wait for response)
  • E, F: Asynchronously (don't wait)

Wait for LOCAL_QUORUM (2/3 from us-east):
  • A: ACK (20ms)
  • B: ACK (25ms)
  • C: ... (waiting)

LOCAL_QUORUM satisfied → Respond to client

Background: Write propagates to eu-west
  • E: ACK (150ms) - cross-DC latency
  • F: ACK (160ms)
Key Points:
  • Local writes are fast (no cross-DC latency in critical path)
  • Remote writes happen asynchronously
  • Eventual consistency across DCs

Multi-DC Read Flow

Scenario: Client in eu-west reads with CL=LOCAL_QUORUM, RF={us-east:3, eu-west:2}
Client (eu-west) → Coordinator (eu-west Node E)

Coordinator determines LOCAL replicas:
  eu-west: [E, F]

Coordinator sends reads to E, F

E returns: {id: 1, name: 'Alice', timestamp: 100}
F returns: {id: 1, name: 'Alice', timestamp: 100}

Compare timestamps → Both match

Respond to client: 'Alice'
No cross-DC communication for reads with LOCAL_* consistency!

Multi-DC Repair

Repair operates per-DC by default:
# Repair only local DC
nodetool repair -dc us-east

# Repair only specific datacenter from any node
nodetool repair -dc eu-west

# Repair across all DCs (expensive!)
nodetool repair -full
Best Practice: Run per-DC repairs regularly, cross-DC repairs less frequently.

Monitoring Multi-DC Clusters

# View cluster status across DCs
nodetool status

# Example output:
Datacenter: us-east
===================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens  Owns   Host ID   Rack
UN  10.0.1.10     2.5 TB     256     50.1%  aaa-bbb   rack1
UN  10.0.1.11     2.3 TB     256     49.9%  ccc-ddd   rack1
UN  10.0.2.10     2.4 TB     256     50.0%  eee-fff   rack2

Datacenter: eu-west
===================
--  Address       Load       Tokens  Owns   Host ID   Rack
UN  10.1.1.10     1.8 TB     256     50.2%  ggg-hhh   rack1
UN  10.1.1.11     1.7 TB     256     49.8%  iii-jjj   rack2

Part 7: Cluster Management Operations

Adding a Node (Bootstrap)

Process:
  1. Install Cassandra on new node
  2. Configure cassandra.yaml:
    cluster_name: 'MyCluster'
    seeds: "10.0.1.10,10.0.1.11"  # Existing seed nodes
    listen_address: 10.0.3.10     # New node IP
    rpc_address: 10.0.3.10
    
  3. Start Cassandra:
    cassandra -f
    
  4. Bootstrap Process:
    New Node Joins
    
    Gossip with seeds
    
    Receive cluster topology
    
    Calculate token ownership
    
    Stream data from existing nodes
    
    Mark status as NORMAL
    
    Start serving traffic
    
Timeline:
t=0s: Start Cassandra
  [2023-11-15 10:00:00] Joining cluster

t=5s: Gossip establishes
  [2023-11-15 10:00:05] Cluster state received

t=10s: Token calculation
  [2023-11-15 10:00:10] Calculating tokens (num_tokens=256)

t=15s: Streaming begins
  [2023-11-15 10:00:15] Streaming from /10.0.1.10
  [2023-11-15 10:00:15] Streaming from /10.0.1.11
  [2023-11-15 10:00:15] Streaming from /10.0.2.10

t=3600s (1 hour): Streaming completes (varies by data size)
  [2023-11-15 11:00:00] Streaming completed

t=3605s: Node ready
  [2023-11-15 11:00:05] Node state: NORMAL
  [2023-11-15 11:00:05] Accepting client connections
Monitor Bootstrap:
# Check bootstrap status
nodetool netstats

# Example output:
Bootstrap 45.2%
Sending 123 files, 567.8 GB total
 /10.0.1.10: 45/100 files, 234.5 GB / 500 GB
 /10.0.1.11: 34/90 files, 189.2 GB / 450 GB

Decommissioning a Node

Purpose: Safely remove a node from the cluster Process:
# On the node to be removed
nodetool decommission
What Happens:
1. Node streams its data to other nodes
2. Token ranges reassigned
3. Node marks itself as DECOMMISSIONED
4. Gossip informs cluster
5. Node shuts down
Timeline:
t=0s: Run decommission
  [2023-11-15 10:00:00] Starting decommission

t=5s: Streaming begins
  [2023-11-15 10:00:05] Streaming to /10.0.1.11
  [2023-11-15 10:00:05] Streaming to /10.0.2.10

t=3600s: Streaming completes
  [2023-11-15 11:00:00] Streaming completed

t=3605s: Decommission complete
  [2023-11-15 11:00:05] Node decommissioned
  [2023-11-15 11:00:05] Safe to shutdown
Never Just Kill the Node! Decommission ensures data isn’t lost.

Replacing a Dead Node

Scenario: Node crashed and can’t be recovered (hardware failure) Process:
  1. Prepare new node with same IP (or use replace_address)
  2. Configure cassandra.yaml:
    # Use this to replace dead node with different IP
    cassandra.replace_address: 10.0.1.10  # IP of dead node
    
  3. Start new node:
    cassandra -f
    
  4. Streaming:
    New node streams data from other replicas
    Same token ownership as dead node
    Takes over dead node's responsibilities
    
  5. Remove replace_address and restart:
    # cassandra.yaml
    # cassandra.replace_address: 10.0.1.10  # COMMENT OUT!
    

Monitoring Cluster Health

Real-time Status:
# Cluster overview
nodetool status

# Detailed node info
nodetool info

# Performance stats
nodetool tpstats

# Table statistics
nodetool tablestats keyspace.table
Key Metrics to Monitor:
MetricCommandHealthy Range
Heap usagenodetool info< 75%
Pending compactionsnodetool compactionstats< 20
Dropped messagesnodetool tpstats0
Read/write latencynodetool tablehistograms< 10ms p95
Disk usagedf -h< 80%

Part 8: Hands-On Exercises

Exercise 1: Observe Gossip Propagation

Setup: 3-node cluster Task:
  1. Monitor gossip on Node A: watch -n 1 nodetool gossipinfo
  2. On Node B, change schema: ALTER TABLE users ADD phone text;
  3. Observe SCHEMA state propagate through gossip
  4. Measure propagation time
Expected: Schema UUID updates across all nodes within 1-3 seconds

Exercise 2: Test Failure Detection

Task:
  1. Monitor failure detector: watch -n 1 nodetool failuredetector
  2. Pause Node B: nodetool pausehandoff
  3. Simulate network delay: sudo tc qdisc add dev eth0 root netem delay 500ms
  4. Watch Phi values rise
  5. Remove delay: sudo tc qdisc del dev eth0 root
  6. Watch Phi values drop
Question: At what Phi value did the node get marked as DOWN?

Exercise 3: Hinted Handoff

Scenario: Test hint storage and replay Steps:
# 1. Stop Node C
nodetool drain
systemctl stop cassandra

# 2. Write data with CL=ONE (will succeed via hints)
cqlsh -e "INSERT INTO test.users (id, name) VALUES (999, 'Hinted') USING CONSISTENCY ONE;"

# 3. Check hints on coordinator
ls -lh /var/lib/cassandra/hints/

# 4. Restart Node C
systemctl start cassandra

# 5. Wait for hint replay (10 min by default)
# Or force: nodetool handoff <nodeC_ip>

# 6. Verify data on Node C
cqlsh <nodeC_ip> -e "SELECT * FROM test.users WHERE id = 999;"

Exercise 4: Read Repair in Action

Setup:
CREATE TABLE test.users (
    id int PRIMARY KEY,
    name text,
    email text
) WITH read_repair_chance = 1.0;  -- Always repair
Create Inconsistency:
# Write to Node A only
cqlsh <nodeA_ip> -e "INSERT INTO test.users (id, name, email)
VALUES (500, 'Alice', 'alice@example.com') USING CONSISTENCY ONE;"

# Write different value to Node B only
cqlsh <nodeB_ip> -e "INSERT INTO test.users (id, name, email)
VALUES (500, 'Bob', 'bob@example.com') USING CONSISTENCY ONE;"
Trigger Read Repair:
# Read with QUORUM (triggers repair)
cqlsh -e "SELECT * FROM test.users WHERE id = 500 USING CONSISTENCY QUORUM;"

# Check both nodes now agree
cqlsh <nodeA_ip> -e "SELECT * FROM test.users WHERE id = 500;"
cqlsh <nodeB_ip> -e "SELECT * FROM test.users WHERE id = 500;"
Expected: Both nodes now have the same data (newest timestamp wins)

Exercise 5: Run Repair and Analyze

Task:
# Start repair with verbose output
nodetool repair test users -full 2>&1 | tee repair.log

# Analyze repair output
grep "Merkle tree" repair.log
grep "Differences detected" repair.log
grep "Streaming" repair.log

# Check repair statistics
nodetool compactionstats
Questions:
  • How long did Merkle tree building take?
  • How much data was streamed?
  • Which token ranges had differences?

Exercise 6: Multi-DC Consistency

Setup: 2-DC cluster Create Keyspace:
CREATE KEYSPACE multidc WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'dc1': 2,
  'dc2': 2
};
Test Consistency Levels:
-- Write from DC1
INSERT INTO multidc.test (id, val) VALUES (1, 'DC1 write')
USING CONSISTENCY LOCAL_QUORUM;

-- Immediately read from DC2
SELECT * FROM multidc.test WHERE id = 1
USING CONSISTENCY LOCAL_QUORUM;
Question: Is the data visible in DC2 immediately? Why or why not? Test EACH_QUORUM:
INSERT INTO multidc.test (id, val) VALUES (2, 'Global write')
USING CONSISTENCY EACH_QUORUM;
Question: How does write latency compare to LOCAL_QUORUM?

Exercise 7: Simulate Node Failure and Recovery

Scenario: Test complete failure recovery workflow Steps:
  1. Create baseline data:
    for i in {1..1000}; do
      cqlsh -e "INSERT INTO test.users (id, name) VALUES ($i, 'User$i');"
    done
    
  2. Stop Node C:
    systemctl stop cassandra
    
  3. Continue writes (hints accumulate):
    for i in {1001..2000}; do
      cqlsh -e "INSERT INTO test.users (id, name) VALUES ($i, 'User$i') USING CONSISTENCY QUORUM;"
    done
    
  4. Restart Node C after 5 hours (exceeds hint window):
    # Simulate time passing...
    systemctl start cassandra
    
  5. Check for missing data:
    # Query Node C for recently written data
    cqlsh <nodeC_ip> -e "SELECT COUNT(*) FROM test.users WHERE id > 1000;"
    
  6. Run repair:
    nodetool repair test users
    
  7. Verify consistency:
    cqlsh <nodeC_ip> -e "SELECT COUNT(*) FROM test.users;"
    # Should match other nodes now
    

Part 9: Production Best Practices

Repair Schedules

Recommendation: Repair every 7 days (within gc_grace_seconds) Strategy 1: Full Cluster Repair (Small Clusters)
# Weekly full repair
0 2 * * 0 nodetool repair -full
Strategy 2: Per-Node Repair (Large Clusters)
# Each node repairs only its primary ranges
# Stagger across nodes throughout the week

# Node A: Sunday
0 2 * * 0 nodetool repair -pr

# Node B: Monday
0 2 * * 1 nodetool repair -pr

# Node C: Tuesday
0 2 * * 2 nodetool repair -pr
Strategy 3: Incremental Repair
# Run daily incremental repair (faster)
0 2 * * * nodetool repair -inc

Gossip Tuning

High Latency Networks (cross-region):
# cassandra.yaml

# Increase failure detection threshold
phi_convict_threshold: 12

# Increase gossip interval
gossip_interval_ms: 2000  # Default: 1000

# Increase shadow round interval
shadow_round_ms: 120000   # Default: 60000
Very Large Clusters (500+ nodes):
# Reduce gossip overhead
gossip_interval_ms: 2000

Multi-DC Best Practices

1. Use LOCAL Consistency Levels
-- Write locally, async replicate
INSERT ... USING CONSISTENCY LOCAL_QUORUM;

-- Read locally
SELECT ... USING CONSISTENCY LOCAL_QUORUM;
2. Configure Snitches Correctly
# cassandra.yaml
endpoint_snitch: GossipingPropertyFileSnitch

# cassandra-rackdc.properties
dc=us-east-1
rack=rack1
3. Set DC-Aware Load Balancing (driver config):
# Python driver
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy

cluster = Cluster(
    contact_points=['10.0.1.10'],
    load_balancing_policy=DCAwareRoundRobinPolicy(local_dc='us-east-1')
)
4. Separate Seeds Per DC
# In us-east nodes
seeds: "10.0.1.10,10.0.1.11"

# In eu-west nodes
seeds: "10.1.1.10,10.0.1.10"  # Local seed + one remote seed

Monitoring Checklist

MetricToolAlert Threshold
Pending gossip messagesnodetool tpstats> 100
Failure detector phinodetool failuredetector> 8 = DOWN
Hints storednodetool statushandoff> 10 GB per node
Pending compactionsnodetool compactionstats> 20
Last repair timeLog scraping> 10 days
Inter-DC latencynodetool proxyhistogramsp99 > 200ms

Part 10: Common Issues and Debugging

Issue 1: Gossip Not Propagating

Symptoms:
  • Nodes don’t see schema changes
  • nodetool describecluster shows different schema versions
Diagnosis:
# Check gossip status
nodetool statusgossip

# View schema versions
nodetool describecluster

# Example bad output:
Schema versions:
  e84b6a60-1f3e-11ec-9621-0242ac130002: [10.0.1.10, 10.0.1.11]
  f95c7b71-2g4f-22fd-a732-1353bd241113: [10.0.1.12]  # DIFFERENT!
Solutions:
# Force schema agreement
nodetool resetlocalschema

# Restart gossip
nodetool disablegossip
nodetool enablegossip

# Last resort: rolling restart

Issue 2: False Failure Detection

Symptoms:
  • Nodes marked as DOWN when actually healthy
  • Logs show: FatClient ... has been silent for 30000ms, removing from gossip
Diagnosis:
# Check phi values
nodetool failuredetector

# Check GC pauses (common cause)
grep "GC" /var/log/cassandra/system.log | grep "application stopped"
Solutions:
# Increase phi threshold (cassandra.yaml)
phi_convict_threshold: 12  # Default: 8

# Tune JVM GC (jvm.options)
-XX:MaxGCPauseMillis=500

Issue 3: Hints Accumulating

Symptoms:
  • Disk space filling up
  • /var/lib/cassandra/hints/ directory growing
Diagnosis:
# Check hint statistics
nodetool statushandoff

# Check hint directory size
du -sh /var/lib/cassandra/hints/

# Find which nodes have hints
ls -lh /var/lib/cassandra/hints/
Solutions:
# 1. Force immediate hint delivery
nodetool handoff <target_node_ip>

# 2. If node is permanently gone, remove hints
nodetool removenode <host_id>

# 3. Reduce max hint window
# cassandra.yaml:
max_hint_window_in_ms: 3600000  # 1 hour instead of 3

Issue 4: Repair Taking Too Long

Symptoms:
  • Repair runs for days
  • High CPU and disk I/O
Diagnosis:
# Check repair progress
nodetool compactionstats

# Check active repair sessions
nodetool repair_admin list
Solutions:
# 1. Use sub-range repair
nodetool repair -st <token> -et <token>

# 2. Use incremental repair
nodetool repair -inc

# 3. Limit parallelism
nodetool repair -seq

# 4. Throttle streaming
nodetool setstreamthroughput 16  # MB/s

Summary & Key Takeaways

Gossip Protocol:
  • Peer-to-peer state propagation
  • Exponential spread: log(N) rounds to full propagation
  • Seeds are just initial contact points
  • Runs every 1 second
Failure Detection:
  • Phi Accrual: suspicion level, not binary
  • Phi = 8 (default) = 99.999% confidence node is down
  • Adapts to network conditions
  • Configurable via phi_convict_threshold
Hinted Handoff:
  • Temporary solution for node failures
  • Stored locally on coordinator
  • Max window: 3 hours (default)
  • Not a replacement for repair!
Read Repair:
  • Fixes data queried in reads
  • Foreground (blocking) + Background (async)
  • Probabilistic (10% by default)
  • Limited scope
Anti-Entropy Repair:
  • Compares all data using Merkle trees
  • Essential for consistency
  • Must run within gc_grace_seconds
  • Resource-intensive
Multi-DC:
  • NetworkTopologyStrategy for replication
  • LOCAL_* consistency for low latency
  • Asynchronous cross-DC replication
  • Per-DC repair recommended
Cluster Operations:
  • Bootstrap: Automated, streams data
  • Decommission: Safe node removal
  • Replace: For dead node recovery
  • Never just kill nodes!

What’s Next?

Module 6: Performance Tuning & Production Operations

JVM tuning, monitoring, troubleshooting, and running Cassandra in production at scale