Staff+ Level: Consensus is the hardest problem in distributed systems. Understanding these algorithms deeply sets you apart in senior interviews.
Why Consensus Matters
In distributed systems, multiple nodes must agree on:- Who is the leader?
- What is the correct order of operations?
- What is the current state?
Copy
┌─────────────────────────────────────────────────────────────────┐
│ THE CONSENSUS PROBLEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Scenario: 3 database replicas, one fails │
│ │
│ Client writes "x = 5" to Node A │
│ Client writes "x = 7" to Node B │
│ Node C is partitioned │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node A │ │ Node B │ ✗ │ Node C │ │
│ │ x = 5 │ │ x = 7 │ │ x = ? │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Questions: │
│ • Which value is correct? │
│ • Can clients still read/write? │
│ • What happens when C comes back? │
│ │
│ Consensus algorithms answer these questions! │
│ │
└─────────────────────────────────────────────────────────────────┘
FLP Impossibility Theorem
“In an asynchronous network, it’s impossible to guarantee consensus if even one node can fail.”This means we must make trade-offs:
- Paxos/Raft: Sacrifice liveness for safety (may get stuck, but never wrong)
- Eventual Consistency: Sacrifice strong consistency for availability
Raft Consensus Algorithm
Raft is designed to be understandable (unlike Paxos). It’s used in:- etcd (Kubernetes)
- CockroachDB
- TiKV
- Consul
Raft Basics
Copy
┌─────────────────────────────────────────────────────────────────┐
│ RAFT OVERVIEW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Node States: │
│ ┌──────────┐ timeout ┌───────────┐ wins ┌───────┐│
│ │ Follower │──────────────►│ Candidate │──────────►│ Leader││
│ └──────────┘ └───────────┘ └───────┘│
│ ▲ │ │ │
│ │ │ loses │ │
│ │ ▼ │ │
│ │ ┌───────────┐ │ │
│ └─────────────────────│ Follower │◄──────────────┘ │
│ discovers leader └───────────┘ steps down │
│ │
│ Key Properties: │
│ • Leader-based: All writes go through leader │
│ • Log replication: Leader replicates to followers │
│ • Terms: Logical clock for leader epochs │
│ • Majority: Need n/2 + 1 nodes to agree │
│ │
└─────────────────────────────────────────────────────────────────┘
Leader Election
Copy
┌─────────────────────────────────────────────────────────────────┐
│ LEADER ELECTION │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Follower doesn't hear from leader (timeout) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Follower │ │Follower │ │Follower │ │
│ │ Node A │ │ Node B │ │ Node C │ │
│ └────┬────┘ └─────────┘ └─────────┘ │
│ │ │
│ ▼ Timeout! Becomes Candidate │
│ │
│ 2. Candidate increments term, votes for self, requests votes │
│ │
│ ┌───────────┐ RequestVote ┌─────────┐ RequestVote ┌──────┐│
│ │ Candidate │───────────────►│Follower │◄─────────────│Follow││
│ │ Node A │ │ Node B │ │Node C││
│ │ Term: 2 │◄───────────────│ Term: 1 │──────────────│Term:1││
│ └───────────┘ VoteGranted └─────────┘ VoteGranted └──────┘│
│ │
│ 3. Receives majority → becomes Leader │
│ │
│ ┌─────────┐ Heartbeat ┌─────────┐ Heartbeat ┌─────────┐ │
│ │ Leader │────────────►│Follower │◄────────────│Follower │ │
│ │ Node A │◄────────────│ Node B │────────────►│ Node C │ │
│ │ Term: 2 │ │ Term: 2 │ │ Term: 2 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Log Replication
Copy
┌─────────────────────────────────────────────────────────────────┐
│ LOG REPLICATION │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Client sends command to Leader │
│ │
│ Client: SET x = 5 │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Leader Log: │ │
│ │ [1: SET x=3] [2: SET y=7] [3: SET x=5] │ │
│ │ ▲ uncommitted │ │
│ └─────────────────────────────────────────┘ │
│ │
│ 2. Leader sends AppendEntries to all followers │
│ │
│ Leader ─────AppendEntries(term=2, entry=[3: SET x=5])────► │
│ ◄────────────────Success────────────────────────── │
│ │
│ 3. Majority acknowledges → Entry is committed │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Leader Log: │ │
│ │ [1: SET x=3] [2: SET y=7] [3: SET x=5] │ │
│ │ ▲ committed! │ │
│ └─────────────────────────────────────────┘ │
│ │
│ 4. Leader applies to state machine, responds to client │
│ │
│ Commit Index: The highest log entry known to be committed │
│ Entries before commit index are safe to apply │
│ │
└─────────────────────────────────────────────────────────────────┘
Raft Implementation
- Python
Copy
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
import asyncio
import random
import time
import logging
logger = logging.getLogger(__name__)
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
term: int
index: int
command: str
@dataclass
class RequestVoteArgs:
term: int
candidate_id: str
last_log_index: int
last_log_term: int
@dataclass
class RequestVoteReply:
term: int
vote_granted: bool
@dataclass
class AppendEntriesArgs:
term: int
leader_id: str
prev_log_index: int
prev_log_term: int
entries: List[LogEntry]
leader_commit: int
@dataclass
class AppendEntriesReply:
term: int
success: bool
match_index: int = 0
class RaftNode:
"""
Simplified Raft implementation for educational purposes.
Production implementations need more error handling.
"""
def __init__(self, node_id: str, peers: List[str], rpc_client):
self.node_id = node_id
self.peers = peers
self.rpc = rpc_client
# Persistent state (saved to disk)
self.current_term = 0
self.voted_for: Optional[str] = None
self.log: List[LogEntry] = []
# Volatile state
self.state = NodeState.FOLLOWER
self.commit_index = 0
self.last_applied = 0
# Leader state (reinitialized after election)
self.next_index: Dict[str, int] = {}
self.match_index: Dict[str, int] = {}
# Timing
self.election_timeout = self._random_timeout()
self.last_heartbeat = time.time()
# State machine
self.state_machine: Dict[str, str] = {}
# Callbacks
self.on_become_leader: Optional[Callable] = None
self.on_commit: Optional[Callable] = None
def _random_timeout(self) -> float:
"""Random election timeout to prevent split votes"""
return random.uniform(0.15, 0.3) # 150-300ms
def _last_log_index(self) -> int:
return len(self.log)
def _last_log_term(self) -> int:
if not self.log:
return 0
return self.log[-1].term
def _get_log_term(self, index: int) -> int:
if index <= 0 or index > len(self.log):
return 0
return self.log[index - 1].term
# ==================== Leader Election ====================
async def run(self):
"""Main loop - runs forever"""
while True:
if self.state == NodeState.FOLLOWER:
await self._run_follower()
elif self.state == NodeState.CANDIDATE:
await self._run_candidate()
elif self.state == NodeState.LEADER:
await self._run_leader()
async def _run_follower(self):
"""Wait for heartbeat or start election"""
while self.state == NodeState.FOLLOWER:
if time.time() - self.last_heartbeat > self.election_timeout:
logger.info(f"{self.node_id}: Election timeout, becoming candidate")
self.state = NodeState.CANDIDATE
return
await asyncio.sleep(0.01)
async def _run_candidate(self):
"""Request votes from all peers"""
self.current_term += 1
self.voted_for = self.node_id
votes_received = 1 # Vote for self
logger.info(f"{self.node_id}: Starting election for term {self.current_term}")
# Reset election timeout
self.election_timeout = self._random_timeout()
election_start = time.time()
# Request votes in parallel
vote_requests = [
self._request_vote(peer) for peer in self.peers
]
responses = await asyncio.gather(*vote_requests, return_exceptions=True)
for response in responses:
if isinstance(response, Exception):
continue
if response.term > self.current_term:
# Discovered higher term, step down
self.current_term = response.term
self.state = NodeState.FOLLOWER
self.voted_for = None
return
if response.vote_granted:
votes_received += 1
# Check if we won
majority = (len(self.peers) + 1) // 2 + 1
if votes_received >= majority:
logger.info(f"{self.node_id}: Won election with {votes_received} votes")
self._become_leader()
else:
# Election failed, go back to follower
logger.info(f"{self.node_id}: Lost election, got {votes_received}/{majority} votes")
self.state = NodeState.FOLLOWER
async def _request_vote(self, peer: str) -> RequestVoteReply:
"""Send RequestVote RPC to a peer"""
args = RequestVoteArgs(
term=self.current_term,
candidate_id=self.node_id,
last_log_index=self._last_log_index(),
last_log_term=self._last_log_term()
)
return await self.rpc.request_vote(peer, args)
def handle_request_vote(self, args: RequestVoteArgs) -> RequestVoteReply:
"""Handle incoming RequestVote RPC"""
# If request term is old, reject
if args.term < self.current_term:
return RequestVoteReply(term=self.current_term, vote_granted=False)
# If we see a higher term, update and become follower
if args.term > self.current_term:
self.current_term = args.term
self.state = NodeState.FOLLOWER
self.voted_for = None
# Check if we can vote for this candidate
can_vote = (
(self.voted_for is None or self.voted_for == args.candidate_id) and
self._is_log_up_to_date(args.last_log_index, args.last_log_term)
)
if can_vote:
self.voted_for = args.candidate_id
self.last_heartbeat = time.time() # Reset timeout
return RequestVoteReply(term=self.current_term, vote_granted=True)
return RequestVoteReply(term=self.current_term, vote_granted=False)
def _is_log_up_to_date(self, last_index: int, last_term: int) -> bool:
"""Check if candidate's log is at least as up-to-date as ours"""
our_last_term = self._last_log_term()
our_last_index = self._last_log_index()
if last_term != our_last_term:
return last_term > our_last_term
return last_index >= our_last_index
# ==================== Leader Operations ====================
def _become_leader(self):
"""Initialize leader state"""
self.state = NodeState.LEADER
# Initialize next_index and match_index
next_idx = self._last_log_index() + 1
for peer in self.peers:
self.next_index[peer] = next_idx
self.match_index[peer] = 0
if self.on_become_leader:
self.on_become_leader()
async def _run_leader(self):
"""Send heartbeats and replicate log"""
while self.state == NodeState.LEADER:
await self._send_heartbeats()
await asyncio.sleep(0.05) # Heartbeat interval: 50ms
async def _send_heartbeats(self):
"""Send AppendEntries to all peers"""
tasks = [
self._send_append_entries(peer) for peer in self.peers
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_append_entries(self, peer: str):
"""Send AppendEntries RPC to a specific peer"""
next_idx = self.next_index[peer]
prev_log_index = next_idx - 1
prev_log_term = self._get_log_term(prev_log_index)
# Get entries to send
entries = self.log[next_idx - 1:] if next_idx <= len(self.log) else []
args = AppendEntriesArgs(
term=self.current_term,
leader_id=self.node_id,
prev_log_index=prev_log_index,
prev_log_term=prev_log_term,
entries=entries,
leader_commit=self.commit_index
)
try:
reply = await self.rpc.append_entries(peer, args)
if reply.term > self.current_term:
# Step down
self.current_term = reply.term
self.state = NodeState.FOLLOWER
self.voted_for = None
return
if reply.success:
# Update next_index and match_index
self.next_index[peer] = next_idx + len(entries)
self.match_index[peer] = prev_log_index + len(entries)
# Check if we can advance commit index
self._advance_commit_index()
else:
# Log inconsistency, decrement next_index and retry
self.next_index[peer] = max(1, self.next_index[peer] - 1)
except Exception as e:
logger.warning(f"Failed to send AppendEntries to {peer}: {e}")
def _advance_commit_index(self):
"""Advance commit index if majority have replicated"""
for n in range(self.commit_index + 1, self._last_log_index() + 1):
if self._get_log_term(n) != self.current_term:
continue
# Count replicas
count = 1 # Self
for peer in self.peers:
if self.match_index.get(peer, 0) >= n:
count += 1
majority = (len(self.peers) + 1) // 2 + 1
if count >= majority:
self.commit_index = n
self._apply_committed()
def handle_append_entries(self, args: AppendEntriesArgs) -> AppendEntriesReply:
"""Handle incoming AppendEntries RPC"""
# Old term, reject
if args.term < self.current_term:
return AppendEntriesReply(term=self.current_term, success=False)
# Update term and become follower
if args.term > self.current_term:
self.current_term = args.term
self.voted_for = None
self.state = NodeState.FOLLOWER
self.last_heartbeat = time.time()
# Check log consistency
if args.prev_log_index > 0:
if args.prev_log_index > len(self.log):
return AppendEntriesReply(term=self.current_term, success=False)
if self._get_log_term(args.prev_log_index) != args.prev_log_term:
# Conflict, delete this and all following entries
self.log = self.log[:args.prev_log_index - 1]
return AppendEntriesReply(term=self.current_term, success=False)
# Append new entries
for i, entry in enumerate(args.entries):
index = args.prev_log_index + 1 + i
if index <= len(self.log):
if self.log[index - 1].term != entry.term:
self.log = self.log[:index - 1]
self.log.append(entry)
else:
self.log.append(entry)
# Update commit index
if args.leader_commit > self.commit_index:
self.commit_index = min(args.leader_commit, self._last_log_index())
self._apply_committed()
return AppendEntriesReply(
term=self.current_term,
success=True,
match_index=self._last_log_index()
)
# ==================== Client Operations ====================
async def propose(self, command: str) -> bool:
"""Propose a new command (only works on leader)"""
if self.state != NodeState.LEADER:
return False
entry = LogEntry(
term=self.current_term,
index=self._last_log_index() + 1,
command=command
)
self.log.append(entry)
# Replicate to followers
await self._send_heartbeats()
return True
def _apply_committed(self):
"""Apply committed entries to state machine"""
while self.last_applied < self.commit_index:
self.last_applied += 1
entry = self.log[self.last_applied - 1]
# Apply to state machine
self._apply_command(entry.command)
if self.on_commit:
self.on_commit(entry)
def _apply_command(self, command: str):
"""Apply command to state machine (simple key-value store)"""
parts = command.split()
if parts[0] == "SET" and len(parts) == 3:
self.state_machine[parts[1]] = parts[2]
elif parts[0] == "DELETE" and len(parts) == 2:
self.state_machine.pop(parts[1], None)
Paxos
Paxos is the original consensus algorithm, but notoriously difficult to understand. Raft was designed as a more understandable alternative.Copy
┌─────────────────────────────────────────────────────────────────┐
│ BASIC PAXOS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Roles: │
│ • Proposer: Proposes values │
│ • Acceptor: Votes on proposals │
│ • Learner: Learns the chosen value │
│ │
│ Phase 1: Prepare │
│ ──────────────── │
│ Proposer ──────Prepare(n)──────► Acceptors │
│ ◄─────Promise(n, v)───── │
│ │
│ "I want to use proposal number n" │
│ "OK, I promise not to accept anything lower" │
│ │
│ Phase 2: Accept │
│ ─────────────── │
│ Proposer ──────Accept(n, v)────► Acceptors │
│ ◄─────Accepted(n, v)──── │
│ │
│ "Accept this value with proposal n" │
│ "OK, I accepted it" │
│ │
│ Value is chosen when majority of acceptors accept │
│ │
└─────────────────────────────────────────────────────────────────┘
Multi-Paxos
Copy
Single Paxos = Choose ONE value
Multi-Paxos = Choose a SEQUENCE of values (log)
Optimization: Elect a stable leader to skip Phase 1
This is essentially what Raft does with its leader election!
ZAB (Zookeeper Atomic Broadcast)
Used by Apache Zookeeper for coordination services.Copy
┌─────────────────────────────────────────────────────────────────┐
│ ZAB │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Similar to Raft but with key differences: │
│ │
│ 1. Transaction IDs (zxid) │
│ • 64-bit: high 32 = epoch, low 32 = counter │
│ • Epoch changes on leader election │
│ │
│ 2. Recovery Phase │
│ • New leader syncs state before accepting writes │
│ • Ensures all committed transactions are recovered │
│ │
│ 3. Ordering Guarantees │
│ • FIFO client order │
│ • Linearizable writes │
│ • Local reads (may be stale) │
│ │
│ Used for: │
│ • Configuration management │
│ • Distributed locks │
│ • Leader election │
│ • Service discovery │
│ │
└─────────────────────────────────────────────────────────────────┘
Practical Leader Election
For many systems, you don’t need full consensus—just leader election:Copy
import asyncio
import time
from typing import Optional, Callable
import redis
class RedisLeaderElection:
"""
Leader election using Redis with TTL-based leases.
Simpler than Raft, good for many use cases.
"""
def __init__(
self,
redis_client: redis.Redis,
node_id: str,
key: str = "leader",
ttl: int = 10
):
self.redis = redis_client
self.node_id = node_id
self.key = key
self.ttl = ttl
self.is_leader = False
self.on_become_leader: Optional[Callable] = None
self.on_lose_leadership: Optional[Callable] = None
async def run(self):
"""Main election loop"""
while True:
try:
if self.is_leader:
# Try to renew lease
renewed = await self._renew_lease()
if not renewed:
self._step_down()
else:
# Try to become leader
acquired = await self._acquire_lease()
if acquired:
self._become_leader()
await asyncio.sleep(self.ttl / 3) # Renew at 1/3 TTL
except Exception as e:
if self.is_leader:
self._step_down()
await asyncio.sleep(1)
async def _acquire_lease(self) -> bool:
"""Try to acquire leadership using SET NX"""
result = await self.redis.set(
self.key,
self.node_id,
nx=True, # Only set if not exists
ex=self.ttl
)
return result is True
async def _renew_lease(self) -> bool:
"""Renew lease only if we're still leader"""
# Lua script for atomic check-and-renew
script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
redis.call('EXPIRE', KEYS[1], ARGV[2])
return 1
else
return 0
end
"""
result = await self.redis.eval(script, 1, self.key, self.node_id, self.ttl)
return result == 1
def _become_leader(self):
self.is_leader = True
if self.on_become_leader:
self.on_become_leader()
def _step_down(self):
self.is_leader = False
if self.on_lose_leadership:
self.on_lose_leadership()
def get_leader(self) -> Optional[str]:
"""Get current leader (for followers to find leader)"""
return self.redis.get(self.key)
# Fencing tokens for preventing split-brain
class FencedLeaderElection(RedisLeaderElection):
"""
Leader election with fencing tokens.
Prevents split-brain when old leader doesn't know it's no longer leader.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fencing_token = 0
async def _acquire_lease(self) -> bool:
# Lua script for atomic acquire with incrementing token
script = """
if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then
local token = redis.call('INCR', KEYS[2])
return token
else
return 0
end
"""
result = await self.redis.eval(
script, 2,
self.key, f"{self.key}:token",
self.node_id, self.ttl
)
if result > 0:
self.fencing_token = result
return True
return False
def get_fencing_token(self) -> int:
"""
Return current fencing token.
Use this token when writing to shared resources.
Resource should reject writes with older tokens.
"""
return self.fencing_token
Comparison of Consensus Algorithms
| Algorithm | Used By | Complexity | Leader-based | Notes |
|---|---|---|---|---|
| Raft | etcd, CockroachDB | Medium | Yes | Easy to understand |
| Paxos | Chubby, Spanner | High | No* | Original, complex |
| ZAB | Zookeeper | Medium | Yes | Recovery-focused |
| PBFT | Hyperledger | Very High | Rotating | Byzantine fault tolerant |
| EPaxos | Research | Very High | No | Leaderless Paxos |
Interview Tips
What interviewers want to hear:
- Why consensus is hard: FLP impossibility, network partitions
- Safety vs Liveness: Raft prioritizes safety (never returns wrong answer)
- Leader-based simplicity: Single leader makes ordering easy
- Quorum math: Why n/2 + 1 is needed
- Real-world usage: “etcd uses Raft for Kubernetes configuration”
Common Questions
Q: Why do we need consensus at all?Without it, distributed nodes can disagree on state, leading to data loss or corruption. Consensus ensures all nodes agree on the order of operations.Q: What happens during a network partition?
The partition with majority continues operating. The minority partition cannot make progress (availability sacrificed for consistency).Q: Why is Raft easier than Paxos?
Raft separates concerns clearly (leader election, log replication, safety) and uses a strong leader model that makes the algorithm more intuitive.Q: When would you NOT use consensus?
When eventual consistency is acceptable (social feeds, analytics), or when you can tolerate some data loss. Consensus has latency and availability costs.