Documentation Index
Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
Use this file to discover all available pages before exploring further.
What are Distributed Systems?
A distributed system is a collection of independent computers that appear to users as a single coherent system. Leslie Lamport put it best: “A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” That quote captures the essential challenge — once your system spans multiple machines, you inherit a whole category of problems (network partitions, clock skew, partial failures) that simply do not exist on a single machine. The analogy that works best: imagine a team of chefs in separate kitchens, communicating only by passing notes through a mail slot. They need to coordinate on what dishes to prepare, in what order, using shared ingredients — but notes can be delayed, lost, or arrive out of order. That is distributed systems in a nutshell. Everything that follows in this chapter is about managing that coordination challenge.Why Distributed?
Scalability
Handle more load than a single machine
Reliability
Survive individual machine failures
Latency
Serve users from nearby locations
Compliance
Data locality requirements
The Eight Fallacies
These fallacies, originally articulated by Peter Deutsch and James Gosling at Sun Microsystems, are the assumptions that bite engineers hardest when they move from single-machine to distributed systems. In interviews, casually referencing one or two of these when discussing failure modes signals deep understanding.- The network is reliable — Packets get lost. AWS reports measurable packet loss even within a single availability zone. Design for retries and idempotency from day one.
- Latency is zero — Cross-region calls take 100-300ms. A design that makes 10 sequential cross-service calls adds a full second of latency. Batch and parallelize.
- Bandwidth is infinite — A chatty microservice that transfers 1MB payloads at 10K QPS needs 10 GB/s of bandwidth and costs real money in cloud egress fees.
- The network is secure — Always encrypt in transit (mTLS between services). Internal networks are not safe; lateral movement is how most breaches escalate.
- Topology doesn’t change — Containers and VMs spin up and down constantly. Hard-coded IP addresses will break. Use service discovery.
- There is one administrator — In a cloud/microservices world, dozens of teams own different parts of the infrastructure with different policies.
- Transport cost is zero — AWS charges $0.01-0.09/GB for cross-AZ and cross-region data transfer. At scale, this becomes a major line item.
- The network is homogeneous — Your services run on different hardware, different OS versions, different network cards with different MTU sizes.
Consistency Models
Strong Consistency
All nodes see the same data at the same time. This is the most intuitive model — it behaves as if there is only one copy of the data. The price you pay is latency (every write must be acknowledged by replicas before returning) and availability (if replicas are unreachable, writes block). Strong consistency is non-negotiable for use cases where “stale reads” cause real harm: bank balances, inventory counts, seat reservations.Eventual Consistency
All nodes will eventually have the same data.Consistency Implementation Examples
- Python
- JavaScript
Consistency Levels
| Level | Description | Trade-off |
|---|---|---|
| Strong | All reads see latest write | High latency |
| Eventual | Reads may be stale | Low latency |
| Causal | Respects cause-effect | Medium |
| Read-your-writes | See your own writes | Good UX |
| Session | Consistency within session | Practical |
Consensus Algorithms
How do distributed nodes agree on a value?Paxos (Simplified)
Raft (Easier to Understand)
Raft Implementation
- Python
- JavaScript
Distributed Transactions
Two-Phase Commit (2PC)
Two-Phase Commit Implementation
- Python
- JavaScript
Saga Pattern
Choreography vs Orchestration
Handling Failures
Circuit Breaker Pattern
- Python
- JavaScript
Retry with Exponential Backoff
- Python
- JavaScript
Bulkhead Pattern
Isolate failures to prevent cascade.- Python
- JavaScript
Key Takeaways
| Concept | Remember |
|---|---|
| CAP Theorem | Pick 2 of 3: Consistency, Availability, Partition Tolerance |
| Consensus | Use Raft for leader election, state machine replication |
| Transactions | 2PC for strong consistency, Sagas for microservices |
| Failures | Design for failure: retries, circuit breakers, bulkheads |
Interview Questions
Q1: Explain the CAP theorem. Then tell me why most real-world system design decisions aren’t actually “pick two out of three.”
Strong Answer
Strong Answer
The CAP theorem states that a distributed system can provide at most two of three guarantees simultaneously: Consistency (every read sees the most recent write), Availability (every request gets a non-error response), and Partition tolerance (the system continues to operate despite network partitions between nodes).
- Why “pick two” is misleading: Network partitions aren’t optional — they will happen in any distributed system. So the real choice is between CP (consistency + partition tolerance) and AP (availability + partition tolerance). You’re never truly choosing to give up partition tolerance, because the network will partition whether you designed for it or not.
- In practice, it’s a spectrum, not a binary: Systems like DynamoDB let you tune consistency per-request. A shopping cart read can be eventually consistent (AP), while a checkout that decrements inventory uses strongly consistent reads (CP). You’re making the CAP trade-off at the operation level, not the system level.
- The PACELC extension is more useful: It says “if there’s a Partition, choose A or C; Else, choose Latency or Consistency.” This captures what engineers actually deal with day-to-day — even when the network is fine, strong consistency adds latency (because you’re waiting for replica acknowledgment). DynamoDB is PA/EL (available during partitions, low latency otherwise). Spanner is PC/EC (consistent always, pays the latency cost via TrueTime).
- Real-world example: At a company running a global e-commerce platform, product catalog reads are AP (showing a slightly stale price for 2 seconds is fine), but the payment ledger is CP (an incorrect balance means real money loss). Same infrastructure, different CAP choices per use case.
- You’re designing a global inventory system for a flash sale with 100K concurrent users. Walk me through specifically where you’d choose CP vs AP within that single system.
- Your team just adopted CockroachDB because “it’s CP and still highly available.” How is that possible, and what are you actually giving up compared to a system like Cassandra?
Q2: What is a vector clock, and why can’t you just use wall-clock timestamps to order events in a distributed system?
Strong Answer
Strong Answer
A vector clock is a data structure that tracks causal ordering of events across multiple nodes. Each node maintains a vector of counters — one per node in the system — and increments its own counter on every local event. When messages are exchanged, nodes merge vectors by taking the element-wise maximum.
- Why wall clocks fail: Physical clocks on different machines are never perfectly synchronized. Even with NTP, you get clock skew of 1-10ms between machines in the same datacenter, and much worse across regions. If Node A writes at
T=100and Node B writes atT=101, you can’t be sure A actually happened first — Node B’s clock might just be 2ms ahead. Google’s Spanner solves this with TrueTime (atomic clocks + GPS), but that’s a $100M+ infrastructure investment. - What vector clocks actually give you: They establish a partial order. If event A’s vector clock is strictly less than or equal to B’s on all components, A happened before B. If neither dominates, the events are concurrent — meaning there’s a genuine conflict that needs resolution. This is fundamentally different from timestamps, which impose a total order that might be wrong.
- The practical trade-off: Vector clocks grow linearly with the number of nodes. In a system with 1,000 nodes, every piece of data carries a 1,000-element vector — that’s real overhead on every message and storage operation. This is why systems like Dynamo and Riak used them but with pruning strategies, and why many systems (Cassandra, for example) opted for Last-Write-Wins with timestamps instead, accepting the rare data loss for simplicity.
- Real-world example: Amazon’s original Dynamo paper used vector clocks for their shopping cart. When concurrent updates happened (say, two users on the same account adding items simultaneously), the system could detect the conflict and merge the carts instead of silently dropping one write.
- If vector clocks grow too large for a 500-node cluster, what alternatives exist? How do systems like Cassandra handle this differently?
- Walk me through a concrete scenario where using Last-Write-Wins timestamps would silently lose data but vector clocks would catch the conflict.
Q3: Your system uses Raft for consensus across 5 nodes. The leader crashes. Walk me through exactly what happens next, step by step.
Strong Answer
Strong Answer
When the Raft leader crashes, the cluster goes through a leader election process. Here’s the step-by-step sequence:
- Step 1 — Detection via heartbeat timeout: Each follower has a randomized election timeout (typically 150-300ms). The leader sends heartbeats at a shorter interval (e.g., every 50-100ms). When a follower doesn’t receive a heartbeat within its timeout, it suspects the leader is dead.
- Step 2 — A follower becomes a candidate: The first follower whose timeout expires increments its
currentTerm, transitions to candidate state, votes for itself, and sendsRequestVoteRPCs to all other nodes. The randomized timeout is critical — it makes split votes unlikely because one node almost always times out first. - Step 3 — Voting: Each node votes for at most one candidate per term. A node grants its vote only if the candidate’s log is at least as up-to-date as the voter’s log (this is the “election safety” property). This guarantees the new leader has all committed entries.
- Step 4 — Winning the election: The candidate needs a majority (3 out of 5 nodes). Once it gets 3 votes, it becomes the new leader and immediately sends heartbeats to assert authority and prevent other elections.
- Step 5 — Uncommitted entries: Any log entries the old leader had replicated to fewer than 3 nodes are not committed and may be overwritten. Entries replicated to 3+ nodes are committed and will survive. This is why clients should only consider a write successful after the leader confirms it’s committed (replicated to a majority).
- During the election, the system is unavailable for writes — typically 150-500ms of downtime. Reads can be served from followers if your system allows stale reads; otherwise reads also block.
- What happens if the network partitions into a group of 2 and a group of 3, and the old leader is in the group of 2? Does the old leader know it’s no longer the leader?
- You’re running Raft in production and noticing frequent leader elections even though no nodes are actually crashing. What’s causing this and how do you fix it?
Q4: Explain Two-Phase Commit. What is the fundamental flaw, and what alternatives exist for distributed transactions in a microservices architecture?
Strong Answer
Strong Answer
Two-Phase Commit (2PC) is a protocol that ensures atomicity across multiple participants in a distributed transaction. Phase 1 (Prepare): the coordinator asks each participant “can you commit?” and each participant acquires locks, writes to a redo log, and votes yes or no. Phase 2 (Commit/Abort): if all voted yes, the coordinator tells everyone to commit; if any voted no, everyone aborts.
- The fundamental flaw is the blocking problem: If the coordinator crashes after sending “prepare” but before sending the commit/abort decision, all participants are stuck holding locks indefinitely. They’ve voted yes, so they can’t unilaterally abort (another participant might have already committed). They can’t commit either (the coordinator might have decided to abort). This makes 2PC a blocking protocol — a single coordinator failure can freeze the entire system.
- In a microservices world, 2PC is usually the wrong choice because: (1) it requires all services to be available simultaneously, defeating the independence that microservices are supposed to provide; (2) holding distributed locks across services creates latency and contention; (3) the coordinator is a single point of failure unless you add complexity like 3PC or Paxos-based commit.
- The Saga pattern is the practical alternative: Instead of one big transaction, you decompose it into a sequence of local transactions. Each service does its work and publishes an event. If a step fails, you execute compensating transactions to undo previous steps. For example: Order Service creates order -> Payment Service charges card -> Inventory Service reserves stock. If inventory fails, you refund the payment and cancel the order.
- Saga trade-offs: You lose isolation (intermediate states are visible) and compensating transactions can be complex (how do you “un-send” an email?). But you gain availability, loose coupling, and independent scalability — which matters more in microservices. Uber processes millions of trips daily using sagas, not 2PC.
- You’re implementing a saga for an e-commerce checkout. The payment succeeds but the inventory reservation fails. Your compensating transaction to refund the payment also fails (the payment provider is down). What now?
- How would you ensure exactly-once semantics in a saga when messages can be delivered more than once?
Q5: You’re running a distributed key-value store with 3 replicas. A client writes a value and immediately reads it back but gets stale data. Explain why this happens and how you’d fix it.
Strong Answer
Strong Answer
This is the classic read-after-write consistency problem in eventually consistent systems. Here’s exactly what’s happening:
- The write goes to one replica (or the leader) and returns success before all replicas are updated. If the subsequent read hits a different replica that hasn’t received the update yet, it returns stale data. With 3 replicas using async replication, the write might succeed on 1 node in 5ms while the other 2 nodes receive the update 50-200ms later.
- Fix 1 — Quorum reads and writes (W + R > N): With N=3 replicas, set W=2 (write must succeed on 2 nodes) and R=2 (read must come from 2 nodes). Since W+R=4 > N=3, at least one node in the read set must have the latest write. The client picks the value with the highest version/timestamp. This is how DynamoDB’s strongly consistent reads work, and Cassandra’s
QUORUMconsistency level. - Fix 2 — Read-your-writes consistency (session-based): Route all of a user’s reads to the same replica that accepted their writes, typically using session affinity (sticky sessions by user ID). This is simpler than quorums and good enough for many use cases — the user sees their own writes even if other users see stale data briefly.
- Fix 3 — Write to leader, read from leader: In a leader-follower setup, route both reads and writes to the leader for operations that need consistency. The downside: the leader becomes a bottleneck. This is why most systems only do this for specific critical paths, not all reads.
- What I’d actually recommend depends on the use case: For a user profile update, read-your-writes (Fix 2) is sufficient and cheap. For an inventory decrement during checkout, quorum (Fix 1) is necessary. For a social media feed, you might just accept eventual consistency because the cost of stale reads is low.
W + R > N — this is bread-and-butter distributed systems knowledge.
Follow-ups:
- You’re using quorum reads/writes (W=2, R=2, N=3) and one replica goes down permanently. What happens to your read and write availability?
- A customer reports that even with quorum reads, they’re occasionally seeing stale data. What could cause this, and how would you debug it?
Q6: What is the difference between the Saga pattern’s choreography and orchestration approaches? When would you choose one over the other?
Strong Answer
Strong Answer
Both choreography and orchestration are ways to coordinate the steps of a saga (a sequence of local transactions with compensating actions). The difference is in who controls the flow.
- Choreography (event-driven): Each service publishes domain events after completing its local transaction, and other services subscribe to those events. There’s no central controller — the flow emerges from event subscriptions. Order Service publishes
OrderCreated, Payment Service hears it and charges the card, publishesPaymentCompleted, Inventory Service hears that and reserves stock. For compensation, services publish failure events:PaymentFailedtriggersOrderCancelled. - Orchestration (command-driven): A central Saga Orchestrator sends explicit commands to each service in sequence. The orchestrator knows the full workflow and handles branching/compensation logic. “Step 1: tell Payment to charge. Step 2: if success, tell Inventory to reserve. If Payment failed, tell Order to cancel.”
- Choose choreography when: You have 3-4 simple services, the flow is mostly linear, teams are autonomous and don’t want a central coordinator, and you already have an event bus (Kafka, SNS/SQS). It scales well organizationally because no single team owns the flow.
- Choose orchestration when: The workflow has more than 5 steps, complex branching logic (if payment is partial, do X; if international, add customs step), you need clear visibility into saga state (which step are we on? where did it fail?), or you need to add/reorder steps without touching every service. Netflix uses orchestrators for their complex content workflows.
- The hidden cost of choreography: At scale, the implicit flow becomes very hard to reason about. When something goes wrong, you’re tracing events across 6 services’ logs to reconstruct what happened. With an orchestrator, you look at one place. I’ve seen teams start with choreography for its elegance and migrate to orchestration after the third production incident where no one could figure out the sequence of events.
- You chose choreography and now have 8 services in the saga. A step in the middle fails and the compensating events create a cascade that takes 45 seconds to fully resolve. How do you improve this?
- How do you handle the case where the orchestrator itself crashes mid-saga? How does the system recover?
Q7: What is the Circuit Breaker pattern, and how does it differ from simple retry logic? Describe a production scenario where retries would make the problem worse but a circuit breaker would help.
Strong Answer
Strong Answer
A circuit breaker monitors calls to a downstream service and “opens” (stops sending traffic) when failures exceed a threshold. It has three states: Closed (normal — requests flow through), Open (tripped — requests fail immediately without calling the downstream), and Half-Open (testing — a limited number of requests are let through to check if the service has recovered).
- How it differs from retries: Retries address transient failures — a single request that might succeed on the second attempt. A circuit breaker addresses sustained failures — when the downstream service is down or degraded and retrying will only make things worse. They’re complementary: retries inside a circuit breaker means “try a few times, but if the service is consistently failing, stop trying and fail fast.”
- The production scenario where retries are destructive: Imagine your payment service is overloaded and responding with 503s at 2-second latency. You have 10 upstream servers each sending 100 requests/second, all with retry logic (3 retries). Without a circuit breaker, each failed request generates 3 more attempts. Your 1,000 req/s becomes 3,000 req/s to an already overloaded service. This is the retry storm (or “thundering herd”) — you’re pouring gasoline on the fire. A circuit breaker detects the failure rate, opens the circuit, and returns errors immediately to callers. The payment service gets breathing room to recover.
- Key configuration decisions: The failure threshold (too low and you trip on transient errors; too high and you’re slow to react), the timeout duration (how long to stay open before trying again), and what counts as a failure (5xx responses yes, 4xx responses no — a
400 Bad Requestmeans the client sent garbage, not that the service is broken). - What most people miss — the fallback strategy: The circuit breaker is only half the pattern. The other half is what you do when it’s open. For a payment service: queue the charge for later processing. For a recommendation engine: show popular items instead. For a search service: return cached results. The fallback is where the real engineering judgment lives.
- You have circuit breakers on 5 downstream services. All 5 trip simultaneously at 3 AM. What’s your incident response playbook, and what does this correlated failure tell you about the root cause?
- Your circuit breaker is flapping — rapidly alternating between open and closed. What’s causing this and how do you tune it?
Q8: Explain the Bulkhead pattern. How does it complement the Circuit Breaker pattern, and when would you use both together?
Strong Answer
Strong Answer
The Bulkhead pattern isolates different parts of your system into separate resource pools so that a failure in one doesn’t cascade to others. The name comes from ship design — bulkheads divide a ship’s hull into watertight compartments so a breach in one section doesn’t sink the entire vessel.
- Concrete implementation: You create separate thread pools (or connection pools, or semaphore pools) for each downstream dependency. Your payment service gets 5 concurrent slots, inventory gets 10, notifications get 20. If the payment service hangs, only those 5 threads are blocked — the other 30 threads continue serving inventory and notification requests normally. Without a bulkhead, all 35 threads could pile up waiting for the slow payment service, and your entire application becomes unresponsive.
- How it complements Circuit Breaker: A circuit breaker stops calling a failing service. A bulkhead limits the damage while the failure is being detected. They operate on different timescales: the bulkhead provides immediate isolation (thread pool exhaustion is prevented from second one), while the circuit breaker kicks in after enough failures accumulate (maybe 10-30 seconds). Together, the bulkhead contains the blast radius while the circuit breaker stops the bleeding.
- When to use both together: Almost always, in production systems with multiple downstream dependencies. Example: your API gateway talks to 4 microservices. Each gets its own bulkhead (connection pool with a max size and a queue timeout). Each also gets its own circuit breaker. If the recommendation service goes down, its bulkhead prevents the 50 pending requests from consuming all your gateway threads, and its circuit breaker prevents new requests from even trying after the failure threshold is hit. Meanwhile, search, auth, and cart services continue serving traffic without any impact.
- The configuration challenge: Setting bulkhead sizes requires understanding your traffic patterns. Too small and you reject legitimate requests during peak load. Too large and the isolation is ineffective. In practice, you start with per-service p99 latency times expected concurrency, add 20% headroom, and tune based on production metrics. Netflix’s Hystrix library popularized this pattern, and Resilience4j carries it forward in modern Java.
- You’ve set up bulkheads per-service, but a bug in your API gateway layer means all services share the same outbound HTTP connection pool, bypassing the bulkheads. How would you detect this in production?
- Your payment bulkhead is set to 5 concurrent slots. During a Black Friday traffic spike, legitimate payment requests are being rejected because the pool is full. How do you handle this without removing the isolation benefit?
Q9: A distributed system you operate is experiencing a “split brain” scenario. What does this mean, what damage can it cause, and how do you resolve it?
Strong Answer
Strong Answer
Split brain occurs when a network partition causes a distributed system to divide into two or more groups, each believing it’s the “real” cluster. Both sides continue operating independently — accepting writes, serving reads, potentially electing their own leaders — without knowing the other side is doing the same thing.
- The damage is real and specific: In a database cluster, split brain means two primary nodes both accepting writes. Client A writes
balance = 100to Primary-Left while Client B writesbalance = 200to Primary-Right. When the partition heals, you have conflicting data with no way to automatically determine which is correct. In the worst case, both sides committed financial transactions with different balances — reconciliation is a manual nightmare. This happened to GitHub in 2018 when a brief network partition caused their MySQL primary to split, leading to data inconsistency that took hours to resolve. - Prevention mechanisms: (1) Quorum-based systems (Raft, ZooKeeper) prevent split brain by design — a leader needs a majority to commit, so only one side of a partition can have a quorum. With 5 nodes split 2-3, only the group of 3 can elect a leader. (2) Fencing tokens (also called “epoch numbers”) — when a new leader is elected, it gets a monotonically increasing token. Storage systems reject writes from tokens lower than the latest they’ve seen, so a stale leader’s writes are rejected. (3) STONITH (Shoot The Other Node In The Head) — in traditional HA systems, when you suspect split brain, you forcefully power off the other node via out-of-band management (IPMI/iLO) before promoting yourself. Brutal, but effective.
- Resolution after the fact: If split brain already occurred, you need to: identify the divergence point (last common write), choose a winning side (usually the one with more confirmed transactions), replay the losing side’s writes as conflicts for manual resolution, and alert the operations team. This is why many systems prefer to become unavailable (CP) rather than risk split brain (AP with no conflict resolution).
- You’re running a 2-node database cluster for cost reasons (no quorum possible). How do you prevent split brain without adding a third node?
- Your ZooKeeper ensemble of 5 nodes is split 2-2-1 across three network segments. What happens? Can any side make progress?
Q10: You’re designing a system that needs to process each message exactly once, but your message broker provides at-least-once delivery. How do you achieve exactly-once semantics?
Strong Answer
Strong Answer
True exactly-once delivery is impossible in a distributed system (proven by the Two Generals Problem). What we actually implement is “effectively exactly-once” processing by combining at-least-once delivery with idempotent processing on the consumer side.
- The core pattern — idempotency keys: Every message gets a unique identifier (UUID, or a natural key like
order_id + action). The consumer maintains an idempotency store (a table or cache of processed message IDs). Before processing, check: “Have I seen this ID before?” If yes, skip or return the cached result. If no, process and record the ID atomically. - The atomic part is critical: The message processing and the idempotency record must be in the same transaction. If you process the message, then crash before recording the ID, the message will be redelivered and reprocessed. The pattern is:
BEGIN TRANSACTION; INSERT INTO idempotency_log (message_id); do_the_work(); COMMIT;. If the work involves a different datastore than the idempotency log, you need the Outbox Pattern — write the idempotency record and a “pending work” record to the same DB in one transaction, then a separate worker picks up and forwards the pending work. - Idempotency store considerations: Use a fast lookup store (Redis with TTL, or a database table with an index on message_id). Set a TTL on entries — you don’t need to remember every message forever, just long enough to cover the broker’s redelivery window (typically 7 days for SQS, configurable for Kafka). At high throughput (100K+ messages/sec), the idempotency lookup itself becomes a bottleneck — use bloom filters as a first pass to avoid hitting the database for messages you’ve definitely never seen.
- Kafka’s approach: Kafka 0.11+ added idempotent producers (deduplication using sequence numbers per producer-partition pair) and transactional producers (atomic writes across multiple partitions). This gets you exactly-once within Kafka, but the moment data leaves Kafka and hits your database, you’re back to needing application-level idempotency.
- Your idempotency store is in Redis and the Redis node crashes, losing all keys. Messages are redelivered and reprocessed. How do you make the system resilient to this?
- You’re processing 200K messages/second. Your idempotency table has 500 million rows and lookups are getting slow. How do you optimize this without losing the exactly-once guarantee?