Distributed Messaging & Event Systems
Messaging systems are the nervous system of distributed architectures, enabling asynchronous communication, decoupling, and event-driven processing.Module Duration: 14-18 hours
Key Topics: Kafka, RabbitMQ, Event Sourcing, CQRS, Exactly-Once Semantics, Stream Processing
Interview Focus: Kafka internals, delivery guarantees, event-driven architecture
Key Topics: Kafka, RabbitMQ, Event Sourcing, CQRS, Exactly-Once Semantics, Stream Processing
Interview Focus: Kafka internals, delivery guarantees, event-driven architecture
Why Messaging Systems?
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ SYNCHRONOUS vs ASYNCHRONOUS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SYNCHRONOUS (RPC): │
│ ────────────────── │
│ Client ─────► Service A ─────► Service B ─────► Service C │
│ │◄─────────────────────────────────────────────────────│ │
│ │ │
│ Problems: │
│ • Caller blocked until all complete │
│ • One service down → entire chain fails │
│ • Tight coupling between services │
│ • Hard to add/remove services │
│ │
│ ASYNCHRONOUS (Messaging): │
│ ───────────────────────── │
│ Client ─────► Message Queue │
│ │◄─────── │ │
│ (returns) ├──────► Service A │
│ ├──────► Service B │
│ └──────► Service C │
│ │
│ Benefits: │
│ • Caller returns immediately │
│ • Services process independently │
│ • Loose coupling, easy to scale │
│ • Built-in retry, replay capability │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Message Queue Patterns
Point-to-Point (Work Queue)
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ POINT-TO-POINT (WORK QUEUE) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Each message consumed by exactly one consumer. │
│ │
│ ┌─────────────────┐ │
│ │ QUEUE │ │
│ │ │ │
│ Producer ─────────►│ [M1][M2][M3][M4]│ │
│ │ │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ Consumer A Consumer B Consumer C │
│ M1 M2 M3 │
│ │
│ USE CASES: │
│ ────────── │
│ • Task distribution (processing jobs) │
│ • Load balancing work across workers │
│ • Order processing, email sending │
│ │
│ ACKNOWLEDGMENT: │
│ ─────────────── │
│ Consumer receives message → processes → ACKs │
│ If consumer dies before ACK → message redelivered │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Publish-Subscribe (Fan-out)
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PUBLISH-SUBSCRIBE (FAN-OUT) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Each message delivered to ALL subscribers. │
│ │
│ ┌─────────────────┐ │
│ │ TOPIC │ │
│ │ │ │
│ Publisher ────────►│ Message │ │
│ │ │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │Subscriber │ │Subscriber │ │Subscriber │ │
│ │ (Email) │ │ (Analytics│ │ (Audit) │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
│ All three receive the same message │
│ │
│ USE CASES: │
│ ────────── │
│ • Event broadcasting │
│ • Real-time updates │
│ • Logging, monitoring, analytics │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Consumer Groups (Kafka Style)
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CONSUMER GROUPS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Combines work queue (within group) and pub-sub (across groups). │
│ │
│ ┌─────────────────────────────┐ │
│ │ TOPIC (4 partitions) │ │
│ │ ┌────┬────┬────┬────┐ │ │
│ │ │ P0 │ P1 │ P2 │ P3 │ │ │
│ │ └────┴────┴────┴────┘ │ │
│ └─────────────┬───────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │
│ │ Consumer Group A │ │ Consumer Group B │ │ Consumer Group C │ │
│ │ ┌───┐ ┌───┐ │ │ ┌───┐ │ │ ┌───┐ │ │
│ │ │C1 │ │C2 │ │ │ │C1 │ │ │ │C1 │ │ │
│ │ │P0 │ │P1 │ │ │ │P0 │ │ │ │P0 │ │ │
│ │ │P2 │ │P3 │ │ │ │P1 │ │ │ │P1 │ │ │
│ │ └───┘ └───┘ │ │ │P2 │ │ │ │P2 │ │ │
│ │ │ │ │P3 │ │ │ │P3 │ │ │
│ └───────────────────┘ │ └───┘ │ │ └───┘ │ │
│ └───────────────────┘ └───────────────────┘ │
│ │
│ RULES: │
│ ────── │
│ • Each partition → one consumer in a group │
│ • Multiple groups → each gets all messages │
│ • Consumer count > partition count → some idle │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Apache Kafka Deep Dive
Must-Know: Kafka is the de facto standard for event streaming. Expect detailed questions about partitions, replication, and exactly-once semantics.
Architecture
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ PRODUCERS │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────────┐│
│ │ KAFKA CLUSTER ││
│ │ ││
│ │ ┌─────────────────────────────────────────────────────────────────┐ ││
│ │ │ TOPIC: orders │ ││
│ │ │ │ ││
│ │ │ Partition 0 Partition 1 Partition 2 Partition 3 │ ││
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││
│ │ │ │ Broker 1│ │ Broker 2│ │ Broker 3│ │ Broker 1│ │ ││
│ │ │ │ (Leader)│ │ (Leader)│ │ (Leader)│ │ (Leader)│ │ ││
│ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ ││
│ │ │ │ │ │ │ │ ││
│ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │ ││
│ │ │ │ Broker 2│ │ Broker 3│ │ Broker 1│ │ Broker 2│ │ ││
│ │ │ │(Replica)│ │(Replica)│ │(Replica)│ │(Replica)│ │ ││
│ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ ││
│ │ │ │ │ │ │ │ ││
│ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │ ││
│ │ │ │ Broker 3│ │ Broker 1│ │ Broker 2│ │ Broker 3│ │ ││
│ │ │ │(Replica)│ │(Replica)│ │(Replica)│ │(Replica)│ │ ││
│ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ ││
│ │ │ │ ││
│ │ │ Replication Factor = 3 │ ││
│ │ └─────────────────────────────────────────────────────────────────┘ ││
│ │ ││
│ │ ┌──────────────────────────────────────────────────────┐ ││
│ │ │ ZOOKEEPER / KRAFT │ ││
│ │ │ • Cluster membership │ ││
│ │ │ • Leader election │ ││
│ │ │ • Topic/partition metadata │ ││
│ │ └──────────────────────────────────────────────────────┘ ││
│ └────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ CONSUMERS │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Partition Internals
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PARTITION STRUCTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PARTITION = Ordered, immutable sequence of records │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PARTITION 0 │ │
│ │ │ │
│ │ Offset: 0 1 2 3 4 5 6 7 8 9 ... │ │
│ │ ┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┐ │ │
│ │ │ M0 │ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ M8 │ M9 │ │ │
│ │ └────┴────┴────┴────┴────┴────┴────┴────┴────┴────┘ │ │
│ │ │ │
│ │ ▲ ▲ ▲ │ │
│ │ │ │ │ │ │
│ │ Log Start Consumer High-Water │ │
│ │ (retention) Offset Mark │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ SEGMENTS: │
│ ───────── │
│ Partition split into segment files for efficient cleanup │
│ │
│ /kafka-logs/topic-partition-0/ │
│ ├── 00000000000000000000.log (segment 1: offsets 0-999) │
│ ├── 00000000000000000000.index (offset → position) │
│ ├── 00000000000000001000.log (segment 2: offsets 1000-1999) │
│ ├── 00000000000000001000.index │
│ └── ... │
│ │
│ RETENTION: │
│ ────────── │
│ • Time-based: Delete segments older than X days │
│ • Size-based: Delete when partition exceeds X GB │
│ • Compaction: Keep only latest value per key │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Producer Delivery Guarantees
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PRODUCER ACKNOWLEDGMENTS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ACKS = 0 (Fire and forget) │
│ ───────────────────────── │
│ Producer ───► Broker │
│ (returns immediately, doesn't wait for ACK) │
│ │
│ • Fastest, lowest latency │
│ • No delivery guarantee │
│ • Use for: Metrics, logs where loss is acceptable │
│ │
│ ACKS = 1 (Leader acknowledgment) │
│ ──────────────────────────────── │
│ Producer ───► Leader ───► ACK │
│ (returns after leader writes) │
│ │
│ • Good balance of speed and safety │
│ • Leader crash before replication = data loss │
│ • Default setting for most use cases │
│ │
│ ACKS = -1/ALL (All ISR acknowledgment) │
│ ────────────────────────────────────── │
│ Producer ───► Leader ───► Replicas (all ISR) ───► ACK │
│ (returns after all in-sync replicas have it) │
│ │
│ • Strongest durability guarantee │
│ • Higher latency │
│ • Combined with min.insync.replicas for safety │
│ │
│ IDEMPOTENT PRODUCER: │
│ ──────────────────── │
│ enable.idempotence = true │
│ • Producer assigns sequence number to each message │
│ • Broker rejects duplicates │
│ • Exactly-once within a partition │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Consumer Offset Management
Copy
from kafka import KafkaConsumer, TopicPartition
# Manual offset management
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
enable_auto_commit=False, # Manual commit
auto_offset_reset='earliest'
)
for message in consumer:
try:
# Process message
process_order(message.value)
# Commit offset AFTER successful processing
consumer.commit({
TopicPartition(message.topic, message.partition):
OffsetAndMetadata(message.offset + 1, None)
})
except Exception as e:
# Don't commit - message will be redelivered
log.error(f"Failed to process: {e}")
# Optionally: seek back and retry
consumer.seek(
TopicPartition(message.topic, message.partition),
message.offset
)
# At-least-once pattern (process → commit)
# Problem: If crash after process, before commit → redelivery
# At-most-once pattern (commit → process)
# Problem: If crash after commit, before process → message lost
# Exactly-once pattern
# Solution: Transactional consumers with idempotent processing
Kafka Transactions (Exactly-Once)
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA TRANSACTIONS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TRANSACTIONAL PRODUCER: │
│ ──────────────────────── │
│ Atomically write to multiple partitions │
│ │
│ ```python │
│ producer = KafkaProducer( │
│ bootstrap_servers=['localhost:9092'], │
│ transactional_id='my-producer' │
│ ) │
│ │
│ producer.init_transactions() │
│ │
│ producer.begin_transaction() │
│ try: │
│ producer.send('topic-a', value=b'message1') │
│ producer.send('topic-b', value=b'message2') │
│ producer.commit_transaction() │
│ except: │
│ producer.abort_transaction() │
│ ``` │
│ │
│ READ_COMMITTED CONSUMERS: │
│ ────────────────────────── │
│ Only see committed transactional messages │
│ │
│ isolation.level = "read_committed" │
│ │
│ CONSUME-TRANSFORM-PRODUCE: │
│ ─────────────────────────── │
│ Exactly-once stream processing │
│ │
│ Input ───► Process ───► Output │
│ │ │
│ └──► Offset Commit │
│ │
│ All three operations in ONE transaction! │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Event Sourcing
Pattern Overview
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ EVENT SOURCING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TRADITIONAL (State-based): │
│ ─────────────────────────── │
│ Store CURRENT state only │
│ │
│ Account { id: 123, balance: 500 } │
│ │
│ Problem: No history, hard to debug, can't replay │
│ │
│ EVENT SOURCING: │
│ ─────────────── │
│ Store sequence of EVENTS that led to current state │
│ │
│ Event 1: AccountCreated { id: 123, owner: "Alice" } │
│ Event 2: MoneyDeposited { id: 123, amount: 1000 } │
│ Event 3: MoneyWithdrawn { id: 123, amount: 300 } │
│ Event 4: MoneyWithdrawn { id: 123, amount: 200 } │
│ │
│ Current state = replay(events) → balance: 500 │
│ │
│ BENEFITS: │
│ ───────── │
│ ✓ Complete audit trail │
│ ✓ Can rebuild state at any point │
│ ✓ Debug by replaying events │
│ ✓ Temporal queries (what was state on Jan 1?) │
│ ✓ Easy to add new projections │
│ │
│ CHALLENGES: │
│ ─────────── │
│ ✗ Schema evolution for events │
│ ✗ Replay can be slow │
│ ✗ Eventual consistency │
│ ✗ More complex than CRUD │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Event Store Implementation
Copy
from dataclasses import dataclass
from datetime import datetime
from typing import List
import json
@dataclass
class Event:
aggregate_id: str
event_type: str
data: dict
timestamp: datetime
version: int
class EventStore:
"""
Simple event store backed by a database.
Production: Use EventStoreDB, Kafka, or DynamoDB Streams.
"""
def __init__(self, db):
self.db = db
def append(self, aggregate_id: str, events: List[Event], expected_version: int):
"""
Append events with optimistic concurrency control.
"""
# Check current version
current_version = self._get_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current_version}"
)
# Append events
for i, event in enumerate(events):
event.version = expected_version + i + 1
self.db.execute(
"""
INSERT INTO events (aggregate_id, event_type, data, timestamp, version)
VALUES (?, ?, ?, ?, ?)
""",
(aggregate_id, event.event_type, json.dumps(event.data),
event.timestamp, event.version)
)
# Publish to subscribers
self._publish(events)
def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
"""Get all events for an aggregate from a given version."""
rows = self.db.execute(
"""
SELECT * FROM events
WHERE aggregate_id = ? AND version > ?
ORDER BY version
""",
(aggregate_id, from_version)
)
return [self._row_to_event(row) for row in rows]
class BankAccount:
"""
Event-sourced aggregate.
"""
def __init__(self, account_id: str):
self.id = account_id
self.balance = 0
self.is_closed = False
self.version = 0
self._pending_events = []
# Command handlers
def deposit(self, amount: float):
if self.is_closed:
raise ValueError("Account is closed")
if amount <= 0:
raise ValueError("Amount must be positive")
self._apply(MoneyDeposited(self.id, amount))
def withdraw(self, amount: float):
if self.is_closed:
raise ValueError("Account is closed")
if amount > self.balance:
raise ValueError("Insufficient funds")
self._apply(MoneyWithdrawn(self.id, amount))
# Event handlers
def _apply(self, event):
self._pending_events.append(event)
self._handle(event)
def _handle(self, event):
if isinstance(event, AccountCreated):
self.balance = 0
elif isinstance(event, MoneyDeposited):
self.balance += event.amount
elif isinstance(event, MoneyWithdrawn):
self.balance -= event.amount
elif isinstance(event, AccountClosed):
self.is_closed = True
self.version += 1
# Rehydration
@classmethod
def from_events(cls, account_id: str, events: List[Event]) -> 'BankAccount':
account = cls(account_id)
for event in events:
account._handle(event)
return account
# Usage
def transfer_money(from_id: str, to_id: str, amount: float, event_store: EventStore):
"""
Saga pattern for cross-aggregate operation.
"""
from_events = event_store.get_events(from_id)
from_account = BankAccount.from_events(from_id, from_events)
to_events = event_store.get_events(to_id)
to_account = BankAccount.from_events(to_id, to_events)
try:
from_account.withdraw(amount)
to_account.deposit(amount)
# Save both in transaction (or use saga with compensation)
event_store.append(from_id, from_account._pending_events, from_account.version)
event_store.append(to_id, to_account._pending_events, to_account.version)
except Exception as e:
# Compensation logic if needed
raise
CQRS (Command Query Responsibility Segregation)
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CQRS PATTERN │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TRADITIONAL: │
│ ──────────── │
│ Same model for reads and writes │
│ │
│ API ───► Service ───► Database │
│ │ ▲ │
│ └───────────────────────┘ │
│ │
│ CQRS: │
│ ───── │
│ Separate models for commands (writes) and queries (reads) │
│ │
│ WRITE PATH READ PATH │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Commands │ │ Queries │ │
│ │ (Create, Update) │ │ (List, Search) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Command Handler │ │ Query Handler │ │
│ │ (Domain Logic) │ │ (Simple reads) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Write Model │────────────►│ Read Model │ │
│ │ (Normalized) │ Events/ │ (Denormalized) │ │
│ │ │ Projections│ │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
│ BENEFITS: │
│ ───────── │
│ • Optimize read/write models independently │
│ • Scale reads and writes separately │
│ • Different storage for each (SQL writes, Elasticsearch reads) │
│ • Complex queries without affecting write performance │
│ │
│ PROJECTIONS (Event → Read Model): │
│ ────────────────────────────────── │
│ Events from write model → Update denormalized read model │
│ Example: OrderPlaced → Update UserOrderHistory, ProductSalesCount │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Projection Example
Copy
class OrderProjection:
"""
Builds read model from order events.
"""
def __init__(self, read_db, event_store):
self.read_db = read_db
self.event_store = event_store
def handle(self, event):
"""Route event to appropriate handler."""
handler = getattr(self, f'on_{event.event_type}', None)
if handler:
handler(event)
def on_OrderPlaced(self, event):
# Insert into orders read model
self.read_db.execute(
"""
INSERT INTO order_summaries (id, user_id, total, status, created_at)
VALUES (?, ?, ?, 'placed', ?)
""",
(event.data['order_id'], event.data['user_id'],
event.data['total'], event.timestamp)
)
# Update user statistics
self.read_db.execute(
"""
UPDATE user_stats
SET order_count = order_count + 1,
total_spent = total_spent + ?
WHERE user_id = ?
""",
(event.data['total'], event.data['user_id'])
)
def on_OrderShipped(self, event):
self.read_db.execute(
"""
UPDATE order_summaries
SET status = 'shipped', shipped_at = ?
WHERE id = ?
""",
(event.timestamp, event.data['order_id'])
)
def rebuild(self):
"""Rebuild entire read model from events."""
self.read_db.execute("TRUNCATE order_summaries")
self.read_db.execute("TRUNCATE user_stats")
for event in self.event_store.get_all_events():
self.handle(event)
Message Queue Comparison
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ MESSAGE QUEUE COMPARISON │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────┬─────────────┬─────────────┬─────────────┬─────────────┐ │
│ │ Feature │ Kafka │ RabbitMQ │ AWS SQS │ Pulsar │ │
│ ├────────────┼─────────────┼─────────────┼─────────────┼─────────────┤ │
│ │ Model │ Log │ Queue │ Queue │ Log │ │
│ │ Ordering │ Per-partition│ Per-queue │ Best-effort │ Per-partition│ │
│ │ Retention │ Configurable│ Until ACK │ 14 days max │ Configurable│ │
│ │ Replay │ Yes │ No │ No │ Yes │ │
│ │ Throughput │ Very high │ Moderate │ High │ Very high │ │
│ │ Latency │ Low │ Very low │ Moderate │ Low │ │
│ │ Exactly-once│ Yes (EOS) │ No │ No │ Yes │ │
│ │ Multi-tenancy│ Topics │ vHosts │ Queues │ Tenants │ │
│ └────────────┴─────────────┴─────────────┴─────────────┴─────────────┘ │
│ │
│ WHEN TO USE: │
│ ───────────── │
│ │
│ KAFKA: │
│ • Event streaming, log aggregation │
│ • High throughput (millions/sec) │
│ • Need replay capability │
│ • Event sourcing, CQRS │
│ │
│ RABBITMQ: │
│ • Complex routing (exchanges, bindings) │
│ • Low latency messaging │
│ • RPC patterns │
│ • Traditional message queue patterns │
│ │
│ SQS: │
│ • AWS-native workloads │
│ • Fully managed, no operations │
│ • Simple queue patterns │
│ • Lambda triggers │
│ │
│ PULSAR: │
│ • Multi-tenancy requirements │
│ • Geo-replication │
│ • Unified queuing and streaming │
│ • Functions (serverless) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Dead Letter Queues
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ DEAD LETTER QUEUES (DLQ) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM: What happens to messages that can't be processed? │
│ │
│ Without DLQ: │
│ • Poison message blocks queue │
│ • Infinite retry loop │
│ • Messages lost if discarded │
│ │
│ With DLQ: │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Main Queue ───► Consumer ──┬──► Success │ │
│ │ │ │ │
│ │ Retry │ Failure (after N retries) │ │
│ │ ↓ │ │ │
│ │ ┌──┴──────▼────────────┐ │ │
│ │ │ Dead Letter Queue │ │ │
│ │ │ │ │ │
│ │ │ Failed messages for │ │ │
│ │ │ inspection/replay │ │ │
│ │ └──────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
│ IMPLEMENTATION: │
│ ─────────────── │
│ │
│ ```python │
│ MAX_RETRIES = 3 │
│ │
│ def process_message(message): │
│ retry_count = message.headers.get('retry_count', 0) │
│ │
│ try: │
│ handle(message) │
│ ack(message) │
│ except TransientError: │
│ if retry_count < MAX_RETRIES: │
│ requeue_with_delay(message, retry_count + 1) │
│ else: │
│ send_to_dlq(message) │
│ except PermanentError: │
│ send_to_dlq(message) # Don't retry │
│ ``` │
│ │
│ DLQ HANDLING: │
│ ───────────── │
│ • Manual inspection and fix │
│ • Automated replay after fix deployed │
│ • Alerts for DLQ depth │
│ • Retention policy for old messages │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Interview Practice
Q1: Design a notification system
Q1: Design a notification system
Question: Design a system that sends notifications (push, email, SMS) for an e-commerce platform.Design:
Copy
Architecture:
Event Sources ───► Kafka ───► Notification Service ───► Channels
(orders, user) (router, templates) (Push/Email/SMS)
Components:
1. Event Ingestion
- Kafka topic: notifications.events
- Events: OrderPlaced, OrderShipped, PriceDropped, etc.
2. Notification Router
- Reads events from Kafka
- Applies user preferences
- Routes to appropriate channel
3. Channel Workers
- Separate consumer groups per channel
- Push: Firebase/APNS
- Email: SES/SendGrid
- SMS: Twilio
4. Template Service
- Render message from event + template
- i18n support
5. Rate Limiting
- Don't spam users
- Per-user, per-channel limits
6. Dead Letter Queue
- Failed deliveries for retry
- Alerting on high DLQ depth
Q2: Kafka vs RabbitMQ
Q2: Kafka vs RabbitMQ
Question: When would you choose Kafka over RabbitMQ?Choose Kafka when:
- High throughput needed (100K+ msg/sec)
- Need to replay messages
- Event sourcing / audit log
- Stream processing (Kafka Streams, ksqlDB)
- Multiple consumers reading same messages
- Long-term message retention
- Complex routing needed (fanout, topic, headers)
- Low latency is critical (< 1ms)
- Need RPC patterns (request/reply)
- Traditional work queues
- Smaller scale, simpler operations
- Need message priority
Q3: Exactly-once processing
Q3: Exactly-once processing
Question: How do you achieve exactly-once message processing?Answer:True exactly-once is impossible (Two Generals Problem). We achieve “effectively exactly-once”:Option 1: Idempotent ConsumerOption 2: Kafka Transactions (EOS)Option 3: Outbox PatternOption 4: Change Data Capture (CDC)
Copy
def process(message):
if already_processed(message.id):
return # Deduplicate
with transaction:
do_work(message)
mark_processed(message.id)
Copy
- Producer: enable.idempotence = true
- Consumer: isolation.level = read_committed
- Consume-transform-produce in single transaction
Copy
1. Write to DB + outbox table (same transaction)
2. Separate process reads outbox, publishes to Kafka
3. Delete from outbox after publish confirmed
Copy
- Debezium reads DB transaction log
- Publishes changes to Kafka
- Inherently ordered and exactly-once
Key Takeaways
Kafka is a Log, Not a Queue
Messages persist, can be replayed, and multiple consumers read independently.
Partitions Enable Parallelism
More partitions = more consumers = higher throughput. Choose partition key wisely.
Event Sourcing Provides Audit Trail
Store events, derive state. Enables debugging, replay, and temporal queries.
Exactly-Once = Idempotency
At-least-once delivery + idempotent processing = effectively exactly-once.