Skip to main content

Introduction

Event-driven architecture (EDA) is the backbone of modern distributed systems. From real-time notifications to complex business workflows, understanding EDA is crucial for system design interviews.
Why This Matters: Companies like Netflix, Uber, and LinkedIn process billions of events daily. Understanding event-driven patterns shows you can design scalable, decoupled systems.

Core Concepts

Events vs Messages vs Commands

┌─────────────────────────────────────────────────────────────┐
│                    Message Types                             │
├─────────────────┬───────────────────┬───────────────────────┤
│     Event       │      Command      │       Query           │
├─────────────────┼───────────────────┼───────────────────────┤
│ "OrderPlaced"   │ "PlaceOrder"      │ "GetOrder"            │
│ Past tense      │ Imperative        │ Question              │
│ Immutable       │ Intent to act     │ Request for data      │
│ Broadcast       │ Point-to-point    │ Point-to-point        │
│ No response     │ May have response │ Expects response      │
│ Fire & forget   │ Expects handling  │ Synchronous           │
└─────────────────┴───────────────────┴───────────────────────┘

Event Anatomy

from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any
import uuid

@dataclass
class Event:
    """
    Base event structure following CloudEvents specification.
    """
    # Required fields
    event_type: str                    # e.g., "order.placed"
    source: str                        # e.g., "order-service"
    data: Dict[str, Any]               # Event payload
    
    # Auto-generated fields
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)
    
    # Optional metadata
    correlation_id: str = None         # For tracing
    causation_id: str = None           # What caused this event
    version: str = "1.0"               # Schema version
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "type": self.event_type,
            "source": self.source,
            "time": self.timestamp.isoformat(),
            "data": self.data,
            "correlation_id": self.correlation_id,
            "causation_id": self.causation_id,
            "dataversion": self.version
        }


# Example domain events
@dataclass
class OrderPlacedEvent(Event):
    def __init__(self, order_id: str, customer_id: str, 
                 items: list, total: float, **kwargs):
        super().__init__(
            event_type="order.placed",
            source="order-service",
            data={
                "order_id": order_id,
                "customer_id": customer_id,
                "items": items,
                "total": total,
                "currency": "USD"
            },
            **kwargs
        )


@dataclass  
class PaymentProcessedEvent(Event):
    def __init__(self, order_id: str, payment_id: str,
                 amount: float, status: str, **kwargs):
        super().__init__(
            event_type="payment.processed",
            source="payment-service",
            data={
                "order_id": order_id,
                "payment_id": payment_id,
                "amount": amount,
                "status": status
            },
            **kwargs
        )

Event-Driven Patterns

1. Event Notification

                    Event Published

      ┌───────────────────┼───────────────────┐
      │                   │                   │
      ▼                   ▼                   ▼
┌──────────┐       ┌──────────┐       ┌──────────┐
│Service A │       │Service B │       │Service C │
│          │       │          │       │          │
│ Reacts   │       │ Reacts   │       │ Ignores  │
└──────────┘       └──────────┘       └──────────┘

Use Case: Loose coupling, simple notifications
Example: "UserRegistered" → Email service sends welcome email

2. Event-Carried State Transfer

Event with full state:
{
  "type": "order.updated",
  "data": {
    "order_id": "123",
    "status": "shipped",
    "items": [...],          ← Full order data
    "shipping_address": {...},
    "customer": {...}
  }
}

Benefits:
• Consumers don't need to query source
• Reduces coupling
• Works offline

Drawbacks:
• Larger message size
• Data might be stale

3. Event Sourcing

┌─────────────────────────────────────────────────────────────┐
│                     Event Store                              │
├─────────────────────────────────────────────────────────────┤
│ 1. AccountOpened(id=A1, owner="John", balance=0)            │
│ 2. MoneyDeposited(id=A1, amount=1000)                       │
│ 3. MoneyWithdrawn(id=A1, amount=200)                        │
│ 4. MoneyTransferred(from=A1, to=A2, amount=300)             │
│ 5. InterestApplied(id=A1, rate=0.05)                        │
├─────────────────────────────────────────────────────────────┤
│                         │                                    │
│                         ▼                                    │
│              Replay Events to Get State                      │
│                         │                                    │
│                         ▼                                    │
│              Current Balance: $525                           │
└─────────────────────────────────────────────────────────────┘
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Optional
import json

class DomainEvent(ABC):
    """Base class for domain events."""
    
    @abstractmethod
    def apply(self, aggregate):
        """Apply event to aggregate."""
        pass


@dataclass
class AccountOpened(DomainEvent):
    account_id: str
    owner: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    
    def apply(self, account):
        account.account_id = self.account_id
        account.owner = self.owner
        account.balance = 0
        account.is_open = True


@dataclass
class MoneyDeposited(DomainEvent):
    account_id: str
    amount: float
    timestamp: datetime = field(default_factory=datetime.utcnow)
    
    def apply(self, account):
        account.balance += self.amount


@dataclass
class MoneyWithdrawn(DomainEvent):
    account_id: str
    amount: float
    timestamp: datetime = field(default_factory=datetime.utcnow)
    
    def apply(self, account):
        account.balance -= self.amount


class BankAccount:
    """Event-sourced bank account aggregate."""
    
    def __init__(self):
        self.account_id: Optional[str] = None
        self.owner: Optional[str] = None
        self.balance: float = 0
        self.is_open: bool = False
        self._events: List[DomainEvent] = []
        self._version: int = 0
    
    def open_account(self, account_id: str, owner: str):
        """Command: Open a new account."""
        if self.is_open:
            raise ValueError("Account already open")
        
        event = AccountOpened(account_id, owner)
        self._apply_event(event)
        self._events.append(event)
    
    def deposit(self, amount: float):
        """Command: Deposit money."""
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if not self.is_open:
            raise ValueError("Account not open")
        
        event = MoneyDeposited(self.account_id, amount)
        self._apply_event(event)
        self._events.append(event)
    
    def withdraw(self, amount: float):
        """Command: Withdraw money."""
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if not self.is_open:
            raise ValueError("Account not open")
        if self.balance < amount:
            raise ValueError("Insufficient funds")
        
        event = MoneyWithdrawn(self.account_id, amount)
        self._apply_event(event)
        self._events.append(event)
    
    def _apply_event(self, event: DomainEvent):
        """Apply event to current state."""
        event.apply(self)
        self._version += 1
    
    def get_uncommitted_events(self) -> List[DomainEvent]:
        """Get events not yet persisted."""
        return self._events.copy()
    
    def clear_uncommitted_events(self):
        """Clear events after persistence."""
        self._events.clear()
    
    @classmethod
    def from_events(cls, events: List[DomainEvent]) -> 'BankAccount':
        """Reconstruct account from event history."""
        account = cls()
        for event in events:
            account._apply_event(event)
        return account


class EventStore:
    """Simple in-memory event store."""
    
    def __init__(self):
        self._events: Dict[str, List[Dict]] = {}
    
    def save_events(
        self, 
        aggregate_id: str, 
        events: List[DomainEvent],
        expected_version: int
    ):
        """Save events with optimistic concurrency."""
        stream = self._events.get(aggregate_id, [])
        
        if len(stream) != expected_version:
            raise ConcurrencyError(
                f"Expected version {expected_version}, "
                f"but stream has {len(stream)} events"
            )
        
        for event in events:
            stream.append({
                "type": type(event).__name__,
                "data": event.__dict__,
                "version": len(stream) + 1
            })
        
        self._events[aggregate_id] = stream
    
    def get_events(self, aggregate_id: str) -> List[DomainEvent]:
        """Load all events for an aggregate."""
        stream = self._events.get(aggregate_id, [])
        
        events = []
        for record in stream:
            event_class = globals()[record["type"]]
            events.append(event_class(**record["data"]))
        
        return events


class ConcurrencyError(Exception):
    pass


# Usage example
def example():
    store = EventStore()
    
    # Create new account
    account = BankAccount()
    account.open_account("ACC-001", "John Doe")
    account.deposit(1000)
    account.withdraw(200)
    
    # Save to event store
    store.save_events(
        "ACC-001",
        account.get_uncommitted_events(),
        0
    )
    account.clear_uncommitted_events()
    
    # Reconstruct from events
    events = store.get_events("ACC-001")
    reconstructed = BankAccount.from_events(events)
    
    print(f"Balance: ${reconstructed.balance}")  # $800

4. CQRS (Command Query Responsibility Segregation)

┌─────────────────────────────────────────────────────────────┐
│                        CQRS Pattern                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│                      ┌──────────┐                           │
│                      │  Client  │                           │
│                      └────┬─────┘                           │
│                           │                                  │
│              ┌────────────┴────────────┐                    │
│              │                         │                     │
│         Commands                    Queries                  │
│              │                         │                     │
│        ┌─────▼─────┐            ┌─────▼─────┐               │
│        │  Command  │            │   Query   │               │
│        │  Handler  │            │  Handler  │               │
│        └─────┬─────┘            └─────┬─────┘               │
│              │                         │                     │
│        ┌─────▼─────┐            ┌─────▼─────┐               │
│        │   Write   │───Events──►│   Read    │               │
│        │   Model   │            │   Model   │               │
│        │(Normalized│            │(Optimized │               │
│        │  for      │            │ for       │               │
│        │  writes)  │            │ queries)  │               │
│        └───────────┘            └───────────┘               │
│                                                              │
└─────────────────────────────────────────────────────────────┘
from dataclasses import dataclass
from typing import Dict, List, Any
from abc import ABC, abstractmethod

# Commands
@dataclass
class PlaceOrderCommand:
    customer_id: str
    items: List[Dict]
    shipping_address: Dict


@dataclass
class CancelOrderCommand:
    order_id: str
    reason: str


# Command Handlers (Write Side)
class OrderCommandHandler:
    def __init__(self, event_store, event_publisher):
        self.event_store = event_store
        self.event_publisher = event_publisher
    
    def handle_place_order(self, cmd: PlaceOrderCommand):
        # Business logic
        order_id = generate_order_id()
        total = calculate_total(cmd.items)
        
        # Create event
        event = OrderPlacedEvent(
            order_id=order_id,
            customer_id=cmd.customer_id,
            items=cmd.items,
            total=total
        )
        
        # Persist and publish
        self.event_store.save(event)
        self.event_publisher.publish(event)
        
        return order_id
    
    def handle_cancel_order(self, cmd: CancelOrderCommand):
        # Load current state
        events = self.event_store.get_events(cmd.order_id)
        order = Order.from_events(events)
        
        # Validate
        if not order.can_cancel():
            raise ValueError("Order cannot be cancelled")
        
        # Create event
        event = OrderCancelledEvent(
            order_id=cmd.order_id,
            reason=cmd.reason
        )
        
        self.event_store.save(event)
        self.event_publisher.publish(event)


# Read Model (Query Side)
class OrderReadModel:
    """
    Denormalized view optimized for queries.
    Updated by consuming events.
    """
    
    def __init__(self, db):
        self.db = db
    
    def on_order_placed(self, event: OrderPlacedEvent):
        """Project event to read model."""
        self.db.insert("orders", {
            "order_id": event.data["order_id"],
            "customer_id": event.data["customer_id"],
            "items": event.data["items"],
            "total": event.data["total"],
            "status": "placed",
            "created_at": event.timestamp
        })
        
        # Update customer's order count
        self.db.increment(
            "customers",
            {"customer_id": event.data["customer_id"]},
            "order_count"
        )
    
    def on_order_cancelled(self, event):
        """Update read model on cancellation."""
        self.db.update(
            "orders",
            {"order_id": event.data["order_id"]},
            {"status": "cancelled", "cancelled_at": event.timestamp}
        )


# Query Handlers
class OrderQueryHandler:
    def __init__(self, db):
        self.db = db
    
    def get_order(self, order_id: str) -> Dict:
        """Simple read from denormalized store."""
        return self.db.find_one("orders", {"order_id": order_id})
    
    def get_customer_orders(
        self, 
        customer_id: str,
        status: str = None,
        limit: int = 10
    ) -> List[Dict]:
        """Optimized query - no joins needed."""
        query = {"customer_id": customer_id}
        if status:
            query["status"] = status
        
        return self.db.find(
            "orders",
            query,
            sort=[("created_at", -1)],
            limit=limit
        )
    
    def get_order_statistics(self, customer_id: str) -> Dict:
        """Pre-computed statistics."""
        return self.db.find_one(
            "customers",
            {"customer_id": customer_id}
        )

Message Broker Patterns

Topic Design

Event Topics (Fan-out):
┌───────────────────────────────────────────────────────────┐
│                    orders.events                           │
├───────────────────────────────────────────────────────────┤
│ • order.placed                                             │
│ • order.confirmed                                          │
│ • order.shipped                                            │
│ • order.delivered                                          │
│ • order.cancelled                                          │
└───────────────────────────────────────────────────────────┘

        ├──► Inventory Service (updates stock)
        ├──► Analytics Service (updates metrics)
        ├──► Email Service (sends notifications)
        └──► Shipping Service (triggers fulfillment)

Command Topics (Point-to-Point):
┌───────────────────────────────────────────────────────────┐
│                 inventory.commands                         │
├───────────────────────────────────────────────────────────┤
│ Consumer Group: inventory-service (single consumer)       │
│ • reserve.stock                                            │
│ • release.stock                                            │
└───────────────────────────────────────────────────────────┘

Dead Letter Queues

┌─────────────────────────────────────────────────────────────┐
│                Dead Letter Queue Pattern                     │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Main Queue                                                 │
│  ┌──────────────────────────────────────────────┐          │
│  │ Message → Consumer                           │          │
│  │              │                               │          │
│  │              ├── Success → ACK               │          │
│  │              │                               │          │
│  │              └── Failure (retry 3x)          │          │
│  │                       │                      │          │
│  └───────────────────────┼──────────────────────┘          │
│                          │                                  │
│                          ▼                                  │
│  Dead Letter Queue                                          │
│  ┌──────────────────────────────────────────────┐          │
│  │ • Failed messages for investigation          │          │
│  │ • Retry with backoff                         │          │
│  │ • Alert operations team                      │          │
│  └──────────────────────────────────────────────┘          │
│                                                              │
└─────────────────────────────────────────────────────────────┘
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Callable, Dict, Any
import logging

@dataclass
class DeadLetterEntry:
    original_message: Dict[str, Any]
    error: str
    retry_count: int
    first_failure: datetime
    last_failure: datetime
    topic: str

class DeadLetterHandler:
    def __init__(
        self, 
        dlq_store, 
        original_producer,
        max_retries: int = 3,
        retry_delay: timedelta = timedelta(minutes=5)
    ):
        self.dlq_store = dlq_store
        self.producer = original_producer
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.logger = logging.getLogger(__name__)
    
    async def handle_failure(
        self,
        message: Dict[str, Any],
        error: Exception,
        topic: str,
        retry_count: int = 0
    ):
        """Send failed message to DLQ."""
        entry = DeadLetterEntry(
            original_message=message,
            error=str(error),
            retry_count=retry_count + 1,
            first_failure=datetime.utcnow(),
            last_failure=datetime.utcnow(),
            topic=topic
        )
        
        await self.dlq_store.save(entry)
        
        self.logger.warning(
            f"Message sent to DLQ: {message.get('id')} "
            f"(retry {retry_count + 1})"
        )
        
        if retry_count + 1 >= self.max_retries:
            await self.alert_operations(entry)
    
    async def process_dlq(self):
        """Background job to retry DLQ messages."""
        while True:
            entries = await self.dlq_store.get_retryable(
                max_retries=self.max_retries,
                min_age=self.retry_delay
            )
            
            for entry in entries:
                try:
                    # Retry sending to original topic
                    await self.producer.send(
                        entry.topic,
                        entry.original_message
                    )
                    await self.dlq_store.mark_success(entry)
                    
                except Exception as e:
                    # Update retry count
                    entry.retry_count += 1
                    entry.last_failure = datetime.utcnow()
                    entry.error = str(e)
                    await self.dlq_store.update(entry)
            
            await asyncio.sleep(60)
    
    async def alert_operations(self, entry: DeadLetterEntry):
        """Alert when max retries exceeded."""
        self.logger.error(
            f"DLQ: Max retries exceeded for message "
            f"{entry.original_message.get('id')}"
        )
        # Integration with PagerDuty, Slack, etc.

Saga Pattern

Choreography vs Orchestration

Choreography (Event-Based):
┌───────────┐     ┌───────────┐     ┌───────────┐
│   Order   │     │  Payment  │     │ Inventory │
│  Service  │     │  Service  │     │  Service  │
└─────┬─────┘     └─────┬─────┘     └─────┬─────┘
      │                 │                 │
      │ OrderCreated    │                 │
      ├─────────────────►                 │
      │                 │                 │
      │                 │ PaymentProcessed│
      │                 ├─────────────────►
      │                 │                 │
      │                 │                 │ StockReserved
      ◄─────────────────┼─────────────────┤
      │                 │                 │

Orchestration (Command-Based):
                 ┌──────────────┐
                 │    Saga      │
                 │ Orchestrator │
                 └──────┬───────┘

        ┌───────────────┼───────────────┐
        │               │               │
   CreateOrder    ProcessPayment   ReserveStock
        │               │               │
   ┌────▼────┐    ┌────▼────┐    ┌────▼────┐
   │  Order  │    │ Payment │    │Inventory│
   │ Service │    │ Service │    │ Service │
   └─────────┘    └─────────┘    └─────────┘
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Callable
import asyncio

class SagaState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    FAILED = "failed"

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensation: Callable
    completed: bool = False

class OrderSagaOrchestrator:
    """
    Orchestration-based saga for order processing.
    """
    
    def __init__(
        self,
        order_service,
        payment_service,
        inventory_service,
        shipping_service
    ):
        self.order_service = order_service
        self.payment_service = payment_service
        self.inventory_service = inventory_service
        self.shipping_service = shipping_service
        
        self.state = SagaState.PENDING
        self.completed_steps: List[SagaStep] = []
        self.saga_id: Optional[str] = None
    
    async def execute(self, order_data: dict) -> str:
        """Execute the order saga."""
        self.state = SagaState.RUNNING
        self.saga_id = generate_saga_id()
        
        steps = [
            SagaStep(
                name="create_order",
                action=lambda: self.order_service.create(order_data),
                compensation=lambda: self.order_service.cancel(
                    order_data["order_id"]
                )
            ),
            SagaStep(
                name="reserve_inventory",
                action=lambda: self.inventory_service.reserve(
                    order_data["items"]
                ),
                compensation=lambda: self.inventory_service.release(
                    order_data["items"]
                )
            ),
            SagaStep(
                name="process_payment",
                action=lambda: self.payment_service.charge(
                    order_data["customer_id"],
                    order_data["total"]
                ),
                compensation=lambda: self.payment_service.refund(
                    order_data["payment_id"]
                )
            ),
            SagaStep(
                name="schedule_shipping",
                action=lambda: self.shipping_service.schedule(
                    order_data["order_id"],
                    order_data["shipping_address"]
                ),
                compensation=lambda: self.shipping_service.cancel(
                    order_data["order_id"]
                )
            )
        ]
        
        try:
            for step in steps:
                result = await step.action()
                step.completed = True
                self.completed_steps.append(step)
                
                # Store result for later steps
                if step.name == "process_payment":
                    order_data["payment_id"] = result["payment_id"]
            
            self.state = SagaState.COMPLETED
            return self.saga_id
            
        except Exception as e:
            # Compensate in reverse order
            await self.compensate()
            raise SagaFailedException(f"Saga failed: {e}")
    
    async def compensate(self):
        """Execute compensation in reverse order."""
        self.state = SagaState.COMPENSATING
        
        for step in reversed(self.completed_steps):
            try:
                await step.compensation()
            except Exception as e:
                # Log and continue - compensation must be idempotent
                print(f"Compensation failed for {step.name}: {e}")
        
        self.state = SagaState.FAILED


class SagaFailedException(Exception):
    pass

Best Practices

Event Design Principles

✓ Events are immutable - never modify past events
✓ Events are self-describing - include all needed context
✓ Use past tense - "OrderPlaced", not "PlaceOrder"
✓ Version your events - for schema evolution
✓ Include correlation IDs - for tracing
✓ Keep events small - but with enough context

Idempotency

Critical: Event handlers MUST be idempotent. Events may be delivered more than once.
// ❌ Not idempotent
async function handleOrderPlaced(event) {
    await incrementOrderCount();  // Will double-count on retry
}

// ✅ Idempotent
async function handleOrderPlaced(event) {
    const exists = await checkProcessed(event.id);
    if (exists) return;
    
    await incrementOrderCount();
    await markProcessed(event.id);
}

Interview Tips

Key Points to Cover:
  1. When to use events vs direct calls
    • Events: Decoupling, audit trails, reactive systems
    • Direct calls: Strong consistency, simple workflows
  2. Event Sourcing trade-offs
    • Pros: Full history, temporal queries, debugging
    • Cons: Complexity, eventual consistency, storage
  3. CQRS considerations
    • Best for: High read/write ratio, complex domains
    • Avoid for: Simple CRUD applications
  4. Saga patterns
    • Choreography: Simple workflows, loose coupling
    • Orchestration: Complex flows, visibility

Practice Problem

Design an E-commerce Order SystemRequirements:
  • Place orders with multiple items
  • Payment processing
  • Inventory management
  • Shipping scheduling
  • Order status tracking
Consider:
  1. What events would you define?
  2. How would you handle payment failures?
  3. How would you implement order status queries?
  4. What’s your strategy for out-of-stock items?