Introduction
Event-driven architecture (EDA) is the backbone of modern distributed systems. From real-time notifications to complex business workflows, understanding EDA is crucial for system design interviews.Why This Matters: Companies like Netflix, Uber, and LinkedIn process billions of events daily. Understanding event-driven patterns shows you can design scalable, decoupled systems.
Core Concepts
Events vs Messages vs Commands
Copy
┌─────────────────────────────────────────────────────────────┐
│ Message Types │
├─────────────────┬───────────────────┬───────────────────────┤
│ Event │ Command │ Query │
├─────────────────┼───────────────────┼───────────────────────┤
│ "OrderPlaced" │ "PlaceOrder" │ "GetOrder" │
│ Past tense │ Imperative │ Question │
│ Immutable │ Intent to act │ Request for data │
│ Broadcast │ Point-to-point │ Point-to-point │
│ No response │ May have response │ Expects response │
│ Fire & forget │ Expects handling │ Synchronous │
└─────────────────┴───────────────────┴───────────────────────┘
Event Anatomy
- Python
- JavaScript
Copy
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any
import uuid
@dataclass
class Event:
"""
Base event structure following CloudEvents specification.
"""
# Required fields
event_type: str # e.g., "order.placed"
source: str # e.g., "order-service"
data: Dict[str, Any] # Event payload
# Auto-generated fields
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
# Optional metadata
correlation_id: str = None # For tracing
causation_id: str = None # What caused this event
version: str = "1.0" # Schema version
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"type": self.event_type,
"source": self.source,
"time": self.timestamp.isoformat(),
"data": self.data,
"correlation_id": self.correlation_id,
"causation_id": self.causation_id,
"dataversion": self.version
}
# Example domain events
@dataclass
class OrderPlacedEvent(Event):
def __init__(self, order_id: str, customer_id: str,
items: list, total: float, **kwargs):
super().__init__(
event_type="order.placed",
source="order-service",
data={
"order_id": order_id,
"customer_id": customer_id,
"items": items,
"total": total,
"currency": "USD"
},
**kwargs
)
@dataclass
class PaymentProcessedEvent(Event):
def __init__(self, order_id: str, payment_id: str,
amount: float, status: str, **kwargs):
super().__init__(
event_type="payment.processed",
source="payment-service",
data={
"order_id": order_id,
"payment_id": payment_id,
"amount": amount,
"status": status
},
**kwargs
)
Copy
const { v4: uuidv4 } = require('uuid');
class Event {
/**
* Base event structure following CloudEvents specification.
*/
constructor({
eventType,
source,
data,
correlationId = null,
causationId = null,
version = '1.0'
}) {
// Required fields
this.id = uuidv4();
this.type = eventType;
this.source = source;
this.time = new Date().toISOString();
this.data = data;
// Optional metadata
this.correlationId = correlationId;
this.causationId = causationId;
this.dataversion = version;
}
toJSON() {
return {
id: this.id,
type: this.type,
source: this.source,
time: this.time,
data: this.data,
correlationId: this.correlationId,
causationId: this.causationId,
dataversion: this.dataversion
};
}
}
// Domain events
class OrderPlacedEvent extends Event {
constructor({ orderId, customerId, items, total, ...options }) {
super({
eventType: 'order.placed',
source: 'order-service',
data: {
orderId,
customerId,
items,
total,
currency: 'USD'
},
...options
});
}
}
class PaymentProcessedEvent extends Event {
constructor({ orderId, paymentId, amount, status, ...options }) {
super({
eventType: 'payment.processed',
source: 'payment-service',
data: {
orderId,
paymentId,
amount,
status
},
...options
});
}
}
module.exports = { Event, OrderPlacedEvent, PaymentProcessedEvent };
Event-Driven Patterns
1. Event Notification
Copy
Event Published
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Service A │ │Service B │ │Service C │
│ │ │ │ │ │
│ Reacts │ │ Reacts │ │ Ignores │
└──────────┘ └──────────┘ └──────────┘
Use Case: Loose coupling, simple notifications
Example: "UserRegistered" → Email service sends welcome email
2. Event-Carried State Transfer
Copy
Event with full state:
{
"type": "order.updated",
"data": {
"order_id": "123",
"status": "shipped",
"items": [...], ← Full order data
"shipping_address": {...},
"customer": {...}
}
}
Benefits:
• Consumers don't need to query source
• Reduces coupling
• Works offline
Drawbacks:
• Larger message size
• Data might be stale
3. Event Sourcing
Copy
┌─────────────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────────────┤
│ 1. AccountOpened(id=A1, owner="John", balance=0) │
│ 2. MoneyDeposited(id=A1, amount=1000) │
│ 3. MoneyWithdrawn(id=A1, amount=200) │
│ 4. MoneyTransferred(from=A1, to=A2, amount=300) │
│ 5. InterestApplied(id=A1, rate=0.05) │
├─────────────────────────────────────────────────────────────┤
│ │ │
│ ▼ │
│ Replay Events to Get State │
│ │ │
│ ▼ │
│ Current Balance: $525 │
└─────────────────────────────────────────────────────────────┘
- Python
- JavaScript
Copy
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Optional
import json
class DomainEvent(ABC):
"""Base class for domain events."""
@abstractmethod
def apply(self, aggregate):
"""Apply event to aggregate."""
pass
@dataclass
class AccountOpened(DomainEvent):
account_id: str
owner: str
timestamp: datetime = field(default_factory=datetime.utcnow)
def apply(self, account):
account.account_id = self.account_id
account.owner = self.owner
account.balance = 0
account.is_open = True
@dataclass
class MoneyDeposited(DomainEvent):
account_id: str
amount: float
timestamp: datetime = field(default_factory=datetime.utcnow)
def apply(self, account):
account.balance += self.amount
@dataclass
class MoneyWithdrawn(DomainEvent):
account_id: str
amount: float
timestamp: datetime = field(default_factory=datetime.utcnow)
def apply(self, account):
account.balance -= self.amount
class BankAccount:
"""Event-sourced bank account aggregate."""
def __init__(self):
self.account_id: Optional[str] = None
self.owner: Optional[str] = None
self.balance: float = 0
self.is_open: bool = False
self._events: List[DomainEvent] = []
self._version: int = 0
def open_account(self, account_id: str, owner: str):
"""Command: Open a new account."""
if self.is_open:
raise ValueError("Account already open")
event = AccountOpened(account_id, owner)
self._apply_event(event)
self._events.append(event)
def deposit(self, amount: float):
"""Command: Deposit money."""
if amount <= 0:
raise ValueError("Amount must be positive")
if not self.is_open:
raise ValueError("Account not open")
event = MoneyDeposited(self.account_id, amount)
self._apply_event(event)
self._events.append(event)
def withdraw(self, amount: float):
"""Command: Withdraw money."""
if amount <= 0:
raise ValueError("Amount must be positive")
if not self.is_open:
raise ValueError("Account not open")
if self.balance < amount:
raise ValueError("Insufficient funds")
event = MoneyWithdrawn(self.account_id, amount)
self._apply_event(event)
self._events.append(event)
def _apply_event(self, event: DomainEvent):
"""Apply event to current state."""
event.apply(self)
self._version += 1
def get_uncommitted_events(self) -> List[DomainEvent]:
"""Get events not yet persisted."""
return self._events.copy()
def clear_uncommitted_events(self):
"""Clear events after persistence."""
self._events.clear()
@classmethod
def from_events(cls, events: List[DomainEvent]) -> 'BankAccount':
"""Reconstruct account from event history."""
account = cls()
for event in events:
account._apply_event(event)
return account
class EventStore:
"""Simple in-memory event store."""
def __init__(self):
self._events: Dict[str, List[Dict]] = {}
def save_events(
self,
aggregate_id: str,
events: List[DomainEvent],
expected_version: int
):
"""Save events with optimistic concurrency."""
stream = self._events.get(aggregate_id, [])
if len(stream) != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but stream has {len(stream)} events"
)
for event in events:
stream.append({
"type": type(event).__name__,
"data": event.__dict__,
"version": len(stream) + 1
})
self._events[aggregate_id] = stream
def get_events(self, aggregate_id: str) -> List[DomainEvent]:
"""Load all events for an aggregate."""
stream = self._events.get(aggregate_id, [])
events = []
for record in stream:
event_class = globals()[record["type"]]
events.append(event_class(**record["data"]))
return events
class ConcurrencyError(Exception):
pass
# Usage example
def example():
store = EventStore()
# Create new account
account = BankAccount()
account.open_account("ACC-001", "John Doe")
account.deposit(1000)
account.withdraw(200)
# Save to event store
store.save_events(
"ACC-001",
account.get_uncommitted_events(),
0
)
account.clear_uncommitted_events()
# Reconstruct from events
events = store.get_events("ACC-001")
reconstructed = BankAccount.from_events(events)
print(f"Balance: ${reconstructed.balance}") # $800
Copy
class DomainEvent {
constructor() {
this.timestamp = new Date().toISOString();
}
apply(aggregate) {
throw new Error('Must implement apply method');
}
}
class AccountOpened extends DomainEvent {
constructor(accountId, owner) {
super();
this.accountId = accountId;
this.owner = owner;
}
apply(account) {
account.accountId = this.accountId;
account.owner = this.owner;
account.balance = 0;
account.isOpen = true;
}
}
class MoneyDeposited extends DomainEvent {
constructor(accountId, amount) {
super();
this.accountId = accountId;
this.amount = amount;
}
apply(account) {
account.balance += this.amount;
}
}
class MoneyWithdrawn extends DomainEvent {
constructor(accountId, amount) {
super();
this.accountId = accountId;
this.amount = amount;
}
apply(account) {
account.balance -= this.amount;
}
}
class BankAccount {
constructor() {
this.accountId = null;
this.owner = null;
this.balance = 0;
this.isOpen = false;
this._events = [];
this._version = 0;
}
openAccount(accountId, owner) {
if (this.isOpen) {
throw new Error('Account already open');
}
const event = new AccountOpened(accountId, owner);
this._applyEvent(event);
this._events.push(event);
}
deposit(amount) {
if (amount <= 0) throw new Error('Amount must be positive');
if (!this.isOpen) throw new Error('Account not open');
const event = new MoneyDeposited(this.accountId, amount);
this._applyEvent(event);
this._events.push(event);
}
withdraw(amount) {
if (amount <= 0) throw new Error('Amount must be positive');
if (!this.isOpen) throw new Error('Account not open');
if (this.balance < amount) throw new Error('Insufficient funds');
const event = new MoneyWithdrawn(this.accountId, amount);
this._applyEvent(event);
this._events.push(event);
}
_applyEvent(event) {
event.apply(this);
this._version++;
}
getUncommittedEvents() {
return [...this._events];
}
clearUncommittedEvents() {
this._events = [];
}
static fromEvents(events) {
const account = new BankAccount();
events.forEach(event => account._applyEvent(event));
return account;
}
}
class EventStore {
constructor() {
this._streams = new Map();
}
saveEvents(aggregateId, events, expectedVersion) {
const stream = this._streams.get(aggregateId) || [];
if (stream.length !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, ` +
`but stream has ${stream.length} events`
);
}
events.forEach(event => {
stream.push({
type: event.constructor.name,
data: { ...event },
version: stream.length + 1
});
});
this._streams.set(aggregateId, stream);
}
getEvents(aggregateId) {
const stream = this._streams.get(aggregateId) || [];
const eventClasses = {
AccountOpened,
MoneyDeposited,
MoneyWithdrawn
};
return stream.map(record => {
const EventClass = eventClasses[record.type];
return Object.assign(new EventClass(), record.data);
});
}
}
class ConcurrencyError extends Error {
constructor(message) {
super(message);
this.name = 'ConcurrencyError';
}
}
module.exports = {
BankAccount,
EventStore,
AccountOpened,
MoneyDeposited,
MoneyWithdrawn
};
4. CQRS (Command Query Responsibility Segregation)
Copy
┌─────────────────────────────────────────────────────────────┐
│ CQRS Pattern │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ Client │ │
│ └────┬─────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ │ │
│ Commands Queries │
│ │ │ │
│ ┌─────▼─────┐ ┌─────▼─────┐ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │
│ ┌─────▼─────┐ ┌─────▼─────┐ │
│ │ Write │───Events──►│ Read │ │
│ │ Model │ │ Model │ │
│ │(Normalized│ │(Optimized │ │
│ │ for │ │ for │ │
│ │ writes) │ │ queries) │ │
│ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
- Python
- JavaScript
Copy
from dataclasses import dataclass
from typing import Dict, List, Any
from abc import ABC, abstractmethod
# Commands
@dataclass
class PlaceOrderCommand:
customer_id: str
items: List[Dict]
shipping_address: Dict
@dataclass
class CancelOrderCommand:
order_id: str
reason: str
# Command Handlers (Write Side)
class OrderCommandHandler:
def __init__(self, event_store, event_publisher):
self.event_store = event_store
self.event_publisher = event_publisher
def handle_place_order(self, cmd: PlaceOrderCommand):
# Business logic
order_id = generate_order_id()
total = calculate_total(cmd.items)
# Create event
event = OrderPlacedEvent(
order_id=order_id,
customer_id=cmd.customer_id,
items=cmd.items,
total=total
)
# Persist and publish
self.event_store.save(event)
self.event_publisher.publish(event)
return order_id
def handle_cancel_order(self, cmd: CancelOrderCommand):
# Load current state
events = self.event_store.get_events(cmd.order_id)
order = Order.from_events(events)
# Validate
if not order.can_cancel():
raise ValueError("Order cannot be cancelled")
# Create event
event = OrderCancelledEvent(
order_id=cmd.order_id,
reason=cmd.reason
)
self.event_store.save(event)
self.event_publisher.publish(event)
# Read Model (Query Side)
class OrderReadModel:
"""
Denormalized view optimized for queries.
Updated by consuming events.
"""
def __init__(self, db):
self.db = db
def on_order_placed(self, event: OrderPlacedEvent):
"""Project event to read model."""
self.db.insert("orders", {
"order_id": event.data["order_id"],
"customer_id": event.data["customer_id"],
"items": event.data["items"],
"total": event.data["total"],
"status": "placed",
"created_at": event.timestamp
})
# Update customer's order count
self.db.increment(
"customers",
{"customer_id": event.data["customer_id"]},
"order_count"
)
def on_order_cancelled(self, event):
"""Update read model on cancellation."""
self.db.update(
"orders",
{"order_id": event.data["order_id"]},
{"status": "cancelled", "cancelled_at": event.timestamp}
)
# Query Handlers
class OrderQueryHandler:
def __init__(self, db):
self.db = db
def get_order(self, order_id: str) -> Dict:
"""Simple read from denormalized store."""
return self.db.find_one("orders", {"order_id": order_id})
def get_customer_orders(
self,
customer_id: str,
status: str = None,
limit: int = 10
) -> List[Dict]:
"""Optimized query - no joins needed."""
query = {"customer_id": customer_id}
if status:
query["status"] = status
return self.db.find(
"orders",
query,
sort=[("created_at", -1)],
limit=limit
)
def get_order_statistics(self, customer_id: str) -> Dict:
"""Pre-computed statistics."""
return self.db.find_one(
"customers",
{"customer_id": customer_id}
)
Copy
// Commands
class PlaceOrderCommand {
constructor({ customerId, items, shippingAddress }) {
this.customerId = customerId;
this.items = items;
this.shippingAddress = shippingAddress;
}
}
class CancelOrderCommand {
constructor({ orderId, reason }) {
this.orderId = orderId;
this.reason = reason;
}
}
// Command Handlers (Write Side)
class OrderCommandHandler {
constructor(eventStore, eventPublisher) {
this.eventStore = eventStore;
this.eventPublisher = eventPublisher;
}
async handlePlaceOrder(cmd) {
// Business logic
const orderId = generateOrderId();
const total = calculateTotal(cmd.items);
// Create event
const event = new OrderPlacedEvent({
orderId,
customerId: cmd.customerId,
items: cmd.items,
total
});
// Persist and publish
await this.eventStore.save(event);
await this.eventPublisher.publish(event);
return orderId;
}
async handleCancelOrder(cmd) {
// Load current state
const events = await this.eventStore.getEvents(cmd.orderId);
const order = Order.fromEvents(events);
// Validate
if (!order.canCancel()) {
throw new Error('Order cannot be cancelled');
}
// Create event
const event = new OrderCancelledEvent({
orderId: cmd.orderId,
reason: cmd.reason
});
await this.eventStore.save(event);
await this.eventPublisher.publish(event);
}
}
// Read Model (Query Side)
class OrderReadModel {
constructor(db) {
this.db = db;
}
async onOrderPlaced(event) {
await this.db.insert('orders', {
orderId: event.data.orderId,
customerId: event.data.customerId,
items: event.data.items,
total: event.data.total,
status: 'placed',
createdAt: event.time
});
// Update customer's order count
await this.db.increment(
'customers',
{ customerId: event.data.customerId },
'orderCount'
);
}
async onOrderCancelled(event) {
await this.db.update(
'orders',
{ orderId: event.data.orderId },
{ status: 'cancelled', cancelledAt: event.time }
);
}
}
// Query Handlers
class OrderQueryHandler {
constructor(db) {
this.db = db;
}
async getOrder(orderId) {
return this.db.findOne('orders', { orderId });
}
async getCustomerOrders(customerId, { status, limit = 10 } = {}) {
const query = { customerId };
if (status) query.status = status;
return this.db.find('orders', query, {
sort: { createdAt: -1 },
limit
});
}
async getOrderStatistics(customerId) {
return this.db.findOne('customers', { customerId });
}
}
module.exports = {
PlaceOrderCommand,
CancelOrderCommand,
OrderCommandHandler,
OrderReadModel,
OrderQueryHandler
};
Message Broker Patterns
Topic Design
Copy
Event Topics (Fan-out):
┌───────────────────────────────────────────────────────────┐
│ orders.events │
├───────────────────────────────────────────────────────────┤
│ • order.placed │
│ • order.confirmed │
│ • order.shipped │
│ • order.delivered │
│ • order.cancelled │
└───────────────────────────────────────────────────────────┘
│
├──► Inventory Service (updates stock)
├──► Analytics Service (updates metrics)
├──► Email Service (sends notifications)
└──► Shipping Service (triggers fulfillment)
Command Topics (Point-to-Point):
┌───────────────────────────────────────────────────────────┐
│ inventory.commands │
├───────────────────────────────────────────────────────────┤
│ Consumer Group: inventory-service (single consumer) │
│ • reserve.stock │
│ • release.stock │
└───────────────────────────────────────────────────────────┘
Dead Letter Queues
Copy
┌─────────────────────────────────────────────────────────────┐
│ Dead Letter Queue Pattern │
├─────────────────────────────────────────────────────────────┤
│ │
│ Main Queue │
│ ┌──────────────────────────────────────────────┐ │
│ │ Message → Consumer │ │
│ │ │ │ │
│ │ ├── Success → ACK │ │
│ │ │ │ │
│ │ └── Failure (retry 3x) │ │
│ │ │ │ │
│ └───────────────────────┼──────────────────────┘ │
│ │ │
│ ▼ │
│ Dead Letter Queue │
│ ┌──────────────────────────────────────────────┐ │
│ │ • Failed messages for investigation │ │
│ │ • Retry with backoff │ │
│ │ • Alert operations team │ │
│ └──────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
- Python
- JavaScript
Copy
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Callable, Dict, Any
import logging
@dataclass
class DeadLetterEntry:
original_message: Dict[str, Any]
error: str
retry_count: int
first_failure: datetime
last_failure: datetime
topic: str
class DeadLetterHandler:
def __init__(
self,
dlq_store,
original_producer,
max_retries: int = 3,
retry_delay: timedelta = timedelta(minutes=5)
):
self.dlq_store = dlq_store
self.producer = original_producer
self.max_retries = max_retries
self.retry_delay = retry_delay
self.logger = logging.getLogger(__name__)
async def handle_failure(
self,
message: Dict[str, Any],
error: Exception,
topic: str,
retry_count: int = 0
):
"""Send failed message to DLQ."""
entry = DeadLetterEntry(
original_message=message,
error=str(error),
retry_count=retry_count + 1,
first_failure=datetime.utcnow(),
last_failure=datetime.utcnow(),
topic=topic
)
await self.dlq_store.save(entry)
self.logger.warning(
f"Message sent to DLQ: {message.get('id')} "
f"(retry {retry_count + 1})"
)
if retry_count + 1 >= self.max_retries:
await self.alert_operations(entry)
async def process_dlq(self):
"""Background job to retry DLQ messages."""
while True:
entries = await self.dlq_store.get_retryable(
max_retries=self.max_retries,
min_age=self.retry_delay
)
for entry in entries:
try:
# Retry sending to original topic
await self.producer.send(
entry.topic,
entry.original_message
)
await self.dlq_store.mark_success(entry)
except Exception as e:
# Update retry count
entry.retry_count += 1
entry.last_failure = datetime.utcnow()
entry.error = str(e)
await self.dlq_store.update(entry)
await asyncio.sleep(60)
async def alert_operations(self, entry: DeadLetterEntry):
"""Alert when max retries exceeded."""
self.logger.error(
f"DLQ: Max retries exceeded for message "
f"{entry.original_message.get('id')}"
)
# Integration with PagerDuty, Slack, etc.
Copy
class DeadLetterHandler {
constructor({
dlqStore,
originalProducer,
maxRetries = 3,
retryDelayMs = 5 * 60 * 1000
}) {
this.dlqStore = dlqStore;
this.producer = originalProducer;
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
async handleFailure(message, error, topic, retryCount = 0) {
const entry = {
originalMessage: message,
error: error.message,
retryCount: retryCount + 1,
firstFailure: new Date().toISOString(),
lastFailure: new Date().toISOString(),
topic
};
await this.dlqStore.save(entry);
console.warn(
`Message sent to DLQ: ${message.id} ` +
`(retry ${retryCount + 1})`
);
if (retryCount + 1 >= this.maxRetries) {
await this.alertOperations(entry);
}
}
async processDLQ() {
while (true) {
const entries = await this.dlqStore.getRetryable({
maxRetries: this.maxRetries,
minAgeMs: this.retryDelayMs
});
for (const entry of entries) {
try {
await this.producer.send(
entry.topic,
entry.originalMessage
);
await this.dlqStore.markSuccess(entry);
} catch (error) {
entry.retryCount++;
entry.lastFailure = new Date().toISOString();
entry.error = error.message;
await this.dlqStore.update(entry);
}
}
await new Promise(r => setTimeout(r, 60000));
}
}
async alertOperations(entry) {
console.error(
`DLQ: Max retries exceeded for message ` +
`${entry.originalMessage.id}`
);
// Integration with PagerDuty, Slack, etc.
}
}
module.exports = { DeadLetterHandler };
Saga Pattern
Choreography vs Orchestration
Copy
Choreography (Event-Based):
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Order │ │ Payment │ │ Inventory │
│ Service │ │ Service │ │ Service │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
│ OrderCreated │ │
├─────────────────► │
│ │ │
│ │ PaymentProcessed│
│ ├─────────────────►
│ │ │
│ │ │ StockReserved
◄─────────────────┼─────────────────┤
│ │ │
Orchestration (Command-Based):
┌──────────────┐
│ Saga │
│ Orchestrator │
└──────┬───────┘
│
┌───────────────┼───────────────┐
│ │ │
CreateOrder ProcessPayment ReserveStock
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Order │ │ Payment │ │Inventory│
│ Service │ │ Service │ │ Service │
└─────────┘ └─────────┘ └─────────┘
- Python
- JavaScript
Copy
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Callable
import asyncio
class SagaState(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
completed: bool = False
class OrderSagaOrchestrator:
"""
Orchestration-based saga for order processing.
"""
def __init__(
self,
order_service,
payment_service,
inventory_service,
shipping_service
):
self.order_service = order_service
self.payment_service = payment_service
self.inventory_service = inventory_service
self.shipping_service = shipping_service
self.state = SagaState.PENDING
self.completed_steps: List[SagaStep] = []
self.saga_id: Optional[str] = None
async def execute(self, order_data: dict) -> str:
"""Execute the order saga."""
self.state = SagaState.RUNNING
self.saga_id = generate_saga_id()
steps = [
SagaStep(
name="create_order",
action=lambda: self.order_service.create(order_data),
compensation=lambda: self.order_service.cancel(
order_data["order_id"]
)
),
SagaStep(
name="reserve_inventory",
action=lambda: self.inventory_service.reserve(
order_data["items"]
),
compensation=lambda: self.inventory_service.release(
order_data["items"]
)
),
SagaStep(
name="process_payment",
action=lambda: self.payment_service.charge(
order_data["customer_id"],
order_data["total"]
),
compensation=lambda: self.payment_service.refund(
order_data["payment_id"]
)
),
SagaStep(
name="schedule_shipping",
action=lambda: self.shipping_service.schedule(
order_data["order_id"],
order_data["shipping_address"]
),
compensation=lambda: self.shipping_service.cancel(
order_data["order_id"]
)
)
]
try:
for step in steps:
result = await step.action()
step.completed = True
self.completed_steps.append(step)
# Store result for later steps
if step.name == "process_payment":
order_data["payment_id"] = result["payment_id"]
self.state = SagaState.COMPLETED
return self.saga_id
except Exception as e:
# Compensate in reverse order
await self.compensate()
raise SagaFailedException(f"Saga failed: {e}")
async def compensate(self):
"""Execute compensation in reverse order."""
self.state = SagaState.COMPENSATING
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
# Log and continue - compensation must be idempotent
print(f"Compensation failed for {step.name}: {e}")
self.state = SagaState.FAILED
class SagaFailedException(Exception):
pass
Copy
const SagaState = {
PENDING: 'pending',
RUNNING: 'running',
COMPLETED: 'completed',
COMPENSATING: 'compensating',
FAILED: 'failed'
};
class SagaStep {
constructor({ name, action, compensation }) {
this.name = name;
this.action = action;
this.compensation = compensation;
this.completed = false;
}
}
class OrderSagaOrchestrator {
constructor({
orderService,
paymentService,
inventoryService,
shippingService
}) {
this.orderService = orderService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.shippingService = shippingService;
this.state = SagaState.PENDING;
this.completedSteps = [];
this.sagaId = null;
}
async execute(orderData) {
this.state = SagaState.RUNNING;
this.sagaId = generateSagaId();
const steps = [
new SagaStep({
name: 'create_order',
action: () => this.orderService.create(orderData),
compensation: () => this.orderService.cancel(
orderData.orderId
)
}),
new SagaStep({
name: 'reserve_inventory',
action: () => this.inventoryService.reserve(
orderData.items
),
compensation: () => this.inventoryService.release(
orderData.items
)
}),
new SagaStep({
name: 'process_payment',
action: () => this.paymentService.charge(
orderData.customerId,
orderData.total
),
compensation: () => this.paymentService.refund(
orderData.paymentId
)
}),
new SagaStep({
name: 'schedule_shipping',
action: () => this.shippingService.schedule(
orderData.orderId,
orderData.shippingAddress
),
compensation: () => this.shippingService.cancel(
orderData.orderId
)
})
];
try {
for (const step of steps) {
const result = await step.action();
step.completed = true;
this.completedSteps.push(step);
if (step.name === 'process_payment') {
orderData.paymentId = result.paymentId;
}
}
this.state = SagaState.COMPLETED;
return this.sagaId;
} catch (error) {
await this.compensate();
throw new SagaFailedError(`Saga failed: ${error.message}`);
}
}
async compensate() {
this.state = SagaState.COMPENSATING;
for (const step of [...this.completedSteps].reverse()) {
try {
await step.compensation();
} catch (error) {
console.error(
`Compensation failed for ${step.name}: ${error}`
);
}
}
this.state = SagaState.FAILED;
}
}
class SagaFailedError extends Error {
constructor(message) {
super(message);
this.name = 'SagaFailedError';
}
}
module.exports = { OrderSagaOrchestrator, SagaState };
Best Practices
Event Design Principles
Copy
✓ Events are immutable - never modify past events
✓ Events are self-describing - include all needed context
✓ Use past tense - "OrderPlaced", not "PlaceOrder"
✓ Version your events - for schema evolution
✓ Include correlation IDs - for tracing
✓ Keep events small - but with enough context
Idempotency
Critical: Event handlers MUST be idempotent. Events may be delivered more than once.
Copy
// ❌ Not idempotent
async function handleOrderPlaced(event) {
await incrementOrderCount(); // Will double-count on retry
}
// ✅ Idempotent
async function handleOrderPlaced(event) {
const exists = await checkProcessed(event.id);
if (exists) return;
await incrementOrderCount();
await markProcessed(event.id);
}
Interview Tips
Key Points to Cover:
-
When to use events vs direct calls
- Events: Decoupling, audit trails, reactive systems
- Direct calls: Strong consistency, simple workflows
-
Event Sourcing trade-offs
- Pros: Full history, temporal queries, debugging
- Cons: Complexity, eventual consistency, storage
-
CQRS considerations
- Best for: High read/write ratio, complex domains
- Avoid for: Simple CRUD applications
-
Saga patterns
- Choreography: Simple workflows, loose coupling
- Orchestration: Complex flows, visibility
Practice Problem
Design an E-commerce Order SystemRequirements:
- Place orders with multiple items
- Payment processing
- Inventory management
- Shipping scheduling
- Order status tracking
- What events would you define?
- How would you handle payment failures?
- How would you implement order status queries?
- What’s your strategy for out-of-stock items?