Senior/Staff Level Content: This section covers advanced topics that differentiate senior engineers from mid-level. Master these for L5+ (Senior) and Staff interviews at FAANG.
Consistency Deep Dive
Linearizability vs Serializability
Most candidates confuse these. Know the difference!Copy
┌─────────────────────────────────────────────────────────────────┐
│ Linearizability vs Serializability │
├─────────────────────────────────────────────────────────────────┤
│ │
│ LINEARIZABILITY (Single-object, real-time) │
│ ───────────────────────────────────────── │
│ • Operations appear to happen atomically at some point │
│ • Real-time ordering: if op A completes before op B starts, │
│ then A is ordered before B │
│ • Think: "single copy of data that everyone sees" │
│ │
│ Time ────────────────────────────────────────────► │
│ │
│ Client A: ──[write x=1]──────────── │
│ Client B: ──────[read x]─── │
│ │ │
│ └─► Must return 1 (write completed) │
│ │
│ SERIALIZABILITY (Multi-object, transactions) │
│ ─────────────────────────────────────────── │
│ • Transactions appear to execute in SOME serial order │
│ • Doesn't guarantee real-time ordering │
│ • Think: "transactions don't see partial state" │
│ │
│ T1: read(A), write(B) │
│ T2: read(B), write(A) │
│ Serial order: T1→T2 or T2→T1, but not interleaved │
│ │
│ STRICT SERIALIZABILITY │
│ ─────────────────────── │
│ • Both! Serial order + real-time ordering │
│ • Most expensive, what Spanner provides │
│ │
└─────────────────────────────────────────────────────────────────┘
Isolation Levels (Know All Four!)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Isolation Levels │
├─────────────────────────────────────────────────────────────────┤
│ │
│ READ UNCOMMITTED (Weakest) │
│ ───────────────────────── │
│ • Can see uncommitted changes (dirty reads) │
│ • Almost never used │
│ │
│ T1: write(x=1) ──────────── rollback │
│ T2: read(x) ─► 1 (dirty read!) │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ READ COMMITTED (Default in PostgreSQL) │
│ ───────────────────────────────────── │
│ • Only see committed data │
│ • Allows non-repeatable reads │
│ │
│ T1: read(x) ─► 1 ─────────── read(x) ─► 2 (changed!) │
│ T2: write(x=2), commit │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ REPEATABLE READ (Default in MySQL InnoDB) │
│ ───────────────────────────────────────── │
│ • Same query returns same results in a transaction │
│ • Allows phantom reads (new rows appear) │
│ │
│ T1: SELECT COUNT(*) WHERE age > 30 ─► 5 │
│ T2: INSERT INTO users (age=35), commit │
│ T1: SELECT COUNT(*) WHERE age > 30 ─► 6 (phantom!) │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ SERIALIZABLE (Strongest) │
│ ──────────────────────── │
│ • Full isolation, as if transactions ran serially │
│ • Highest safety, lowest performance │
│ │
└─────────────────────────────────────────────────────────────────┘
| Level | Dirty Read | Non-Repeatable Read | Phantom Read |
|---|---|---|---|
| Read Uncommitted | ✓ | ✓ | ✓ |
| Read Committed | ✗ | ✓ | ✓ |
| Repeatable Read | ✗ | ✗ | ✓ |
| Serializable | ✗ | ✗ | ✗ |
Interview Answer: “I’d use Read Committed for most cases. Serializable only for critical financial transactions where consistency matters more than throughput.”
Distributed Consensus Deep Dive
Leader Election: Why It’s Hard
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Split Brain Problem │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Initial State: Node A is leader │
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Node A │──────│ Node B │──────│ Node C │ │
│ │ LEADER │ │FOLLOWER│ │FOLLOWER│ │
│ └────────┘ └────────┘ └────────┘ │
│ │
│ Network partition occurs: │
│ │
│ Partition 1 ║ Partition 2 │
│ ┌────────┐ ║ ┌────────┐ ┌────────┐ │
│ │ Node A │ ║ │ Node B │─│ Node C │ │
│ │ LEADER │ ║ │FOLLOWER│ │FOLLOWER│ │
│ └────────┘ ║ └────────┘ └────────┘ │
│ ║ │ │
│ A thinks it's ║ B or C becomes leader! │
│ still leader! ║ (timeout, no heartbeat) │
│ ║ │
│ DANGER: Two leaders! (Split brain) │
│ │
│ Solution: Quorum-based voting │
│ • Need majority (N/2 + 1) to elect leader │
│ • Partition 1 has 1/3 (minority) → can't write │
│ • Partition 2 has 2/3 (majority) → can elect, can write │
│ │
└─────────────────────────────────────────────────────────────────┘
Raft: The Algorithm You Should Know
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Raft Consensus │
├─────────────────────────────────────────────────────────────────┤
│ │
│ KEY CONCEPTS: │
│ │
│ 1. TERMS (Logical Clock) │
│ • Time divided into terms │
│ • Each term has at most one leader │
│ • Term number monotonically increases │
│ │
│ Term 1 Term 2 Term 3 │
│ ├──Leader A──┼──Election──┼──Leader B──────► │
│ │ (failed) │ │
│ │
│ 2. LOG REPLICATION │
│ │
│ Leader Log: [1:x=1] [1:y=2] [2:x=3] [2:z=4] │
│ ↓ ↓ ↓ ↓ │
│ Follower 1: [1:x=1] [1:y=2] [2:x=3] [2:z=4] ✓ │
│ Follower 2: [1:x=1] [1:y=2] [2:x=3] (catching up) │
│ │
│ Entry committed when replicated to majority │
│ │
│ 3. LEADER ELECTION │
│ │
│ Follower timeout → Candidate → RequestVote RPC │
│ • Vote granted if: │
│ - Candidate's term >= voter's term │
│ - Candidate's log is at least as up-to-date │
│ - Voter hasn't voted for someone else this term │
│ • Majority votes → become Leader │
│ │
│ 4. SAFETY GUARANTEE │
│ • Elected leader has all committed entries │
│ • Leaders never overwrite their log │
│ │
└─────────────────────────────────────────────────────────────────┘
Clock Synchronization
Why Wall Clocks Fail
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Clock Synchronization Problems │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Physical clocks drift (100 ms/day typical) │
│ NTP sync has error bounds (1-10ms typically) │
│ │
│ Problem scenario: │
│ │
│ Machine A (clock: 10:00:00.000): write(x=1) @ 10:00:00.000 │
│ Machine B (clock: 10:00:00.050): write(x=2) @ 10:00:00.050 │
│ │
│ But B's clock is 100ms ahead! Real order: │
│ • B actually wrote at real time 09:59:59.950 │
│ • A wrote at real time 10:00:00.000 │
│ • A happened AFTER B, but timestamps say B is newer! │
│ │
│ Solution 1: Logical clocks (Lamport) │
│ Solution 2: Vector clocks │
│ Solution 3: Hybrid clocks (HLC) │
│ Solution 4: GPS/atomic clocks (Spanner's TrueTime) │
│ │
└─────────────────────────────────────────────────────────────────┘
Vector Clocks (Conflict Detection)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Vector Clocks │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Each node maintains vector: {A: count, B: count, C: count} │
│ │
│ Node A Node B Node C │
│ {A:0,B:0,C:0} {A:0,B:0,C:0} {A:0,B:0,C:0} │
│ │ │ │ │
│ write(x=1) │ │ │
│ {A:1,B:0,C:0} │ │ │
│ │─────────────►│ │ │
│ │ {A:1,B:0,C:0} │ │
│ │ │ │ │
│ │ write(y=2) │ │
│ │ {A:1,B:1,C:0} │ │
│ │ │─────────────►│ │
│ │ │ {A:1,B:1,C:0} │
│ │ │ │ │
│ │
│ Comparison: │
│ {A:1,B:2,C:0} vs {A:2,B:1,C:0} │
│ Neither dominates → CONFLICT! Must resolve │
│ │
│ {A:1,B:2,C:0} vs {A:1,B:3,C:0} │
│ Second dominates → Second is newer, no conflict │
│ │
└─────────────────────────────────────────────────────────────────┘
Data Partitioning Strategies
Partition Key Selection
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Choosing Partition Keys │
├─────────────────────────────────────────────────────────────────┤
│ │
│ GOOD Partition Key Properties: │
│ ✓ High cardinality (many unique values) │
│ ✓ Evenly distributed access │
│ ✓ Matches query patterns │
│ │
│ Example: E-commerce Orders │
│ │
│ ❌ BAD: partition by date │
│ • Today's partition gets all traffic (hot partition) │
│ • Old partitions sit idle │
│ │
│ ❌ BAD: partition by country │
│ • US partition has 60% of traffic │
│ • Small countries underutilized │
│ │
│ ✅ GOOD: partition by order_id │
│ • Random distribution │
│ • But: can't query "all orders for user X" easily │
│ │
│ ✅ BETTER: partition by user_id │
│ • User's orders on same partition (locality) │
│ • Query patterns match │
│ • Watch for celebrity users (hot partition) │
│ │
│ ✅ BEST: compound key (user_id, order_date) │
│ • Partition by user_id │
│ • Sort by order_date within partition │
│ • Efficient for "user X's orders in last month" │
│ │
└─────────────────────────────────────────────────────────────────┘
Handling Hot Partitions
Copy
# Strategy 1: Add random suffix
def get_partition_key(celebrity_id):
if is_celebrity(celebrity_id):
# Split celebrity across 10 partitions
suffix = random.randint(0, 9)
return f"{celebrity_id}_{suffix}"
return celebrity_id
# Reading requires scatter-gather
def get_celebrity_data(celebrity_id):
results = []
for suffix in range(10):
key = f"{celebrity_id}_{suffix}"
results.extend(query_partition(key))
return results
# Strategy 2: Time-based suffix
def get_partition_key_time(user_id):
# Different partition each hour
hour = datetime.now().hour
return f"{user_id}_{hour % 4}"
Exactly-Once Semantics
The Three Delivery Guarantees
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Message Delivery Guarantees │
├─────────────────────────────────────────────────────────────────┤
│ │
│ AT-MOST-ONCE │
│ ───────────── │
│ Send and forget. May lose messages. │
│ Use case: Metrics, logs (some loss OK) │
│ │
│ Producer ──[msg]──► Broker (might fail silently) │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ AT-LEAST-ONCE │
│ ───────────── │
│ Retry until ACK. May duplicate messages. │
│ Use case: Most applications (with idempotent consumers) │
│ │
│ Producer ──[msg]──► Broker ──[ACK]──► Producer │
│ │ │ │
│ └──[retry]─────────┘ (if no ACK, retry → duplicate) │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ EXACTLY-ONCE │
│ ──────────── │
│ Each message processed exactly once. Hard to achieve! │
│ Use case: Financial transactions, critical data │
│ │
│ Achieved via: Idempotency + Deduplication + Transactions │
│ │
└─────────────────────────────────────────────────────────────────┘
Implementing Exactly-Once
Copy
class ExactlyOnceProcessor:
"""
Exactly-once processing with idempotency keys
"""
def process_message(self, message):
idempotency_key = message.id
# Step 1: Check if already processed
if self.is_processed(idempotency_key):
return self.get_cached_result(idempotency_key)
# Step 2: Process with transaction
with self.db.transaction():
# Do the work
result = self.do_business_logic(message)
# Record as processed (same transaction!)
self.mark_processed(idempotency_key, result)
# Commit message offset (Kafka-style)
self.commit_offset(message.offset)
return result
def is_processed(self, key):
return self.db.exists(f"processed:{key}")
def mark_processed(self, key, result):
self.db.set(f"processed:{key}", result, ttl=86400)
Distributed Caching Patterns
Cache Stampede Prevention
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Cache Stampede Problem │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Popular key expires → 1000 servers hit DB simultaneously │
│ │
│ Server 1 ─┐ │
│ Server 2 ─┤ │
│ Server 3 ─┼───► Cache MISS ───► Database ← 💥 OVERWHELMED │
│ ... │ │
│ Server N ─┘ │
│ │
│ SOLUTIONS: │
│ │
│ 1. LOCKING (Mutex) │
│ First request acquires lock, others wait or get stale │
│ │
│ 2. PROBABILISTIC EARLY EXPIRATION │
│ Refresh before expiry with some probability │
│ │
│ 3. BACKGROUND REFRESH │
│ Never expire, refresh asynchronously │
│ │
└─────────────────────────────────────────────────────────────────┘
Copy
import random
import time
class StampedePreventingCache:
def get(self, key, fetch_func, ttl=3600, beta=1.0):
cached = self.cache.get(key)
if cached:
value, expiry, delta = cached
now = time.time()
# Probabilistic early refresh
# As we approach expiry, probability increases
gap = expiry - now
if gap > 0:
# XFetch algorithm
random_early = delta * beta * math.log(random.random())
if gap + random_early > 0:
return value # Use cached value
# Cache miss or early refresh triggered
# Use distributed lock to prevent stampede
lock_key = f"lock:{key}"
if self.acquire_lock(lock_key, timeout=5):
try:
start = time.time()
value = fetch_func()
delta = time.time() - start
self.cache.set(key, (value, time.time() + ttl, delta))
return value
finally:
self.release_lock(lock_key)
else:
# Someone else is refreshing, return stale or wait
if cached:
return cached[0] # Return stale
time.sleep(0.1)
return self.get(key, fetch_func, ttl, beta) # Retry
Rate Limiting at Scale
Distributed Rate Limiting
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Distributed Rate Limiting Strategies │
├─────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY 1: Centralized (Redis) │
│ ───────────────────────────────── │
│ All servers check same Redis │
│ ✓ Accurate ✗ Single point of failure │
│ │
│ Server 1 ─┐ │
│ Server 2 ─┼───► Redis ───► Accurate count │
│ Server 3 ─┘ │
│ │
│ STRATEGY 2: Local + Sync │
│ ─────────────────────── │
│ Local counters, periodically sync │
│ ✓ Fast ✗ Approximate │
│ │
│ Server 1: local_count=50 ──┐ │
│ Server 2: local_count=40 ──┼──► Sync every 1s │
│ Server 3: local_count=30 ──┘ │
│ │
│ STRATEGY 3: Token Bucket with Redis │
│ ───────────────────────────────── │
│ Each request: DECR if tokens > 0 │
│ Background: Refill tokens at fixed rate │
│ │
│ Lua script for atomic check-and-decrement: │
│ local tokens = redis.call('GET', key) │
│ if tokens > 0 then │
│ redis.call('DECR', key) │
│ return 1 -- allowed │
│ end │
│ return 0 -- rejected │
│ │
└─────────────────────────────────────────────────────────────────┘
CQRS (Command Query Responsibility Segregation)
CQRS separates read and write operations into different models, optimizing each for its specific use case.- Python
- JavaScript
Copy
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
from abc import ABC, abstractmethod
import asyncio
from enum import Enum
# ============== Commands (Write Side) ==============
class CommandType(Enum):
CREATE_ORDER = "create_order"
UPDATE_ORDER = "update_order"
CANCEL_ORDER = "cancel_order"
@dataclass
class Command:
command_type: CommandType
aggregate_id: str
payload: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.utcnow)
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> None:
pass
class OrderCommandHandler(CommandHandler):
def __init__(self, event_store: 'EventStore', event_bus: 'EventBus'):
self.event_store = event_store
self.event_bus = event_bus
async def handle(self, command: Command) -> str:
if command.command_type == CommandType.CREATE_ORDER:
return await self._handle_create_order(command)
elif command.command_type == CommandType.CANCEL_ORDER:
return await self._handle_cancel_order(command)
raise ValueError(f"Unknown command type: {command.command_type}")
async def _handle_create_order(self, command: Command) -> str:
# Business validation
order_id = str(uuid.uuid4())
# Create event
event = Event(
event_type=EventType.ORDER_CREATED,
aggregate_id=order_id,
payload={
"user_id": command.payload["user_id"],
"items": command.payload["items"],
"total": command.payload["total"],
},
version=1
)
# Persist event
await self.event_store.append(event)
# Publish for read model updates
await self.event_bus.publish(event)
return order_id
async def _handle_cancel_order(self, command: Command) -> None:
# Load current state from events
events = await self.event_store.get_events(command.aggregate_id)
order = OrderAggregate.from_events(events)
# Business validation
if order.status == OrderStatus.CANCELLED:
raise ValueError("Order already cancelled")
if order.status == OrderStatus.SHIPPED:
raise ValueError("Cannot cancel shipped order")
# Create cancellation event
event = Event(
event_type=EventType.ORDER_CANCELLED,
aggregate_id=command.aggregate_id,
payload={"reason": command.payload.get("reason", "User requested")},
version=order.version + 1
)
await self.event_store.append(event)
await self.event_bus.publish(event)
# ============== Queries (Read Side) ==============
@dataclass
class OrderReadModel:
"""Denormalized read model optimized for queries"""
id: str
user_id: str
status: str
items: List[Dict]
total: float
created_at: datetime
updated_at: datetime
class OrderQueryService:
def __init__(self, read_db: 'ReadDatabase'):
self.read_db = read_db
async def get_order(self, order_id: str) -> Optional[OrderReadModel]:
"""Fast read from denormalized model"""
return await self.read_db.find_one("orders", {"id": order_id})
async def get_user_orders(
self,
user_id: str,
status: Optional[str] = None,
limit: int = 10,
offset: int = 0
) -> List[OrderReadModel]:
"""Query with filters - optimized for read patterns"""
query = {"user_id": user_id}
if status:
query["status"] = status
return await self.read_db.find(
"orders",
query,
sort=[("created_at", -1)],
limit=limit,
skip=offset
)
async def get_orders_by_status(self, status: str) -> List[OrderReadModel]:
"""Admin query - different index"""
return await self.read_db.find(
"orders",
{"status": status},
sort=[("updated_at", -1)]
)
# ============== Read Model Projector ==============
class OrderProjector:
"""Updates read model based on events"""
def __init__(self, read_db: 'ReadDatabase'):
self.read_db = read_db
async def project(self, event: 'Event') -> None:
handler = getattr(self, f"_handle_{event.event_type.value}", None)
if handler:
await handler(event)
async def _handle_order_created(self, event: 'Event') -> None:
order = OrderReadModel(
id=event.aggregate_id,
user_id=event.payload["user_id"],
status="pending",
items=event.payload["items"],
total=event.payload["total"],
created_at=event.timestamp,
updated_at=event.timestamp
)
await self.read_db.insert("orders", order.__dict__)
async def _handle_order_cancelled(self, event: 'Event') -> None:
await self.read_db.update(
"orders",
{"id": event.aggregate_id},
{
"status": "cancelled",
"cancellation_reason": event.payload["reason"],
"updated_at": event.timestamp
}
)
Copy
// ============== Commands (Write Side) ==============
const CommandType = {
CREATE_ORDER: 'create_order',
UPDATE_ORDER: 'update_order',
CANCEL_ORDER: 'cancel_order'
};
class Command {
constructor(commandType, aggregateId, payload) {
this.commandType = commandType;
this.aggregateId = aggregateId;
this.payload = payload;
this.timestamp = new Date();
this.correlationId = crypto.randomUUID();
}
}
class OrderCommandHandler {
constructor(eventStore, eventBus) {
this.eventStore = eventStore;
this.eventBus = eventBus;
}
async handle(command) {
switch (command.commandType) {
case CommandType.CREATE_ORDER:
return this.handleCreateOrder(command);
case CommandType.CANCEL_ORDER:
return this.handleCancelOrder(command);
default:
throw new Error(`Unknown command type: ${command.commandType}`);
}
}
async handleCreateOrder(command) {
const orderId = crypto.randomUUID();
const event = new Event(
EventType.ORDER_CREATED,
orderId,
{
userId: command.payload.userId,
items: command.payload.items,
total: command.payload.total
},
1
);
await this.eventStore.append(event);
await this.eventBus.publish(event);
return orderId;
}
async handleCancelOrder(command) {
// Load current state from events
const events = await this.eventStore.getEvents(command.aggregateId);
const order = OrderAggregate.fromEvents(events);
// Business validation
if (order.status === 'cancelled') {
throw new Error('Order already cancelled');
}
if (order.status === 'shipped') {
throw new Error('Cannot cancel shipped order');
}
const event = new Event(
EventType.ORDER_CANCELLED,
command.aggregateId,
{ reason: command.payload.reason || 'User requested' },
order.version + 1
);
await this.eventStore.append(event);
await this.eventBus.publish(event);
}
}
// ============== Queries (Read Side) ==============
class OrderQueryService {
constructor(readDb) {
this.readDb = readDb;
}
async getOrder(orderId) {
return this.readDb.findOne('orders', { id: orderId });
}
async getUserOrders(userId, { status, limit = 10, offset = 0 } = {}) {
const query = { userId };
if (status) query.status = status;
return this.readDb.find('orders', query, {
sort: { createdAt: -1 },
limit,
skip: offset
});
}
async getOrdersByStatus(status) {
return this.readDb.find('orders', { status }, {
sort: { updatedAt: -1 }
});
}
}
// ============== Read Model Projector ==============
class OrderProjector {
constructor(readDb) {
this.readDb = readDb;
}
async project(event) {
const handlerName = `handle${this.toPascalCase(event.eventType)}`;
if (this[handlerName]) {
await this[handlerName](event);
}
}
async handleOrderCreated(event) {
const order = {
id: event.aggregateId,
userId: event.payload.userId,
status: 'pending',
items: event.payload.items,
total: event.payload.total,
createdAt: event.timestamp,
updatedAt: event.timestamp
};
await this.readDb.insert('orders', order);
}
async handleOrderCancelled(event) {
await this.readDb.update(
'orders',
{ id: event.aggregateId },
{
status: 'cancelled',
cancellationReason: event.payload.reason,
updatedAt: event.timestamp
}
);
}
toPascalCase(str) {
return str.replace(/_([a-z])/g, (g) => g[1].toUpperCase())
.replace(/^[a-z]/, (c) => c.toUpperCase());
}
}
// ============== Usage Example ==============
const app = express();
// Command endpoint (write)
app.post('/orders', async (req, res) => {
const command = new Command(
CommandType.CREATE_ORDER,
null,
req.body
);
const orderId = await commandHandler.handle(command);
res.status(201).json({ orderId });
});
// Query endpoint (read)
app.get('/orders/:id', async (req, res) => {
const order = await queryService.getOrder(req.params.id);
if (!order) return res.status(404).json({ error: 'Not found' });
res.json(order);
});
Event Sourcing
Event sourcing stores all changes as a sequence of events, providing a complete audit trail and enabling time-travel debugging.- Python
- JavaScript
Copy
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
from abc import ABC, abstractmethod
import json
import uuid
class EventType(Enum):
ORDER_CREATED = "order_created"
ORDER_ITEM_ADDED = "order_item_added"
ORDER_ITEM_REMOVED = "order_item_removed"
ORDER_SUBMITTED = "order_submitted"
ORDER_PAID = "order_paid"
ORDER_SHIPPED = "order_shipped"
ORDER_DELIVERED = "order_delivered"
ORDER_CANCELLED = "order_cancelled"
@dataclass
class Event:
event_type: EventType
aggregate_id: str
payload: Dict[str, Any]
version: int
timestamp: datetime = field(default_factory=datetime.utcnow)
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
metadata: Dict[str, Any] = field(default_factory=dict)
class EventStore:
"""Append-only event store with PostgreSQL"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def append(self, event: Event) -> None:
"""Append event with optimistic concurrency control"""
async with self.db_pool.acquire() as conn:
try:
await conn.execute("""
INSERT INTO events (
event_id, aggregate_id, event_type,
payload, version, timestamp, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
event.event_id,
event.aggregate_id,
event.event_type.value,
json.dumps(event.payload),
event.version,
event.timestamp,
json.dumps(event.metadata)
)
except UniqueViolationError:
raise ConcurrencyError(
f"Version {event.version} already exists for {event.aggregate_id}"
)
async def get_events(
self,
aggregate_id: str,
from_version: int = 0
) -> List[Event]:
"""Load all events for an aggregate"""
async with self.db_pool.acquire() as conn:
rows = await conn.fetch("""
SELECT event_id, aggregate_id, event_type,
payload, version, timestamp, metadata
FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
""", aggregate_id, from_version)
return [
Event(
event_id=row['event_id'],
aggregate_id=row['aggregate_id'],
event_type=EventType(row['event_type']),
payload=json.loads(row['payload']),
version=row['version'],
timestamp=row['timestamp'],
metadata=json.loads(row['metadata'])
)
for row in rows
]
async def get_all_events(
self,
from_position: int = 0,
batch_size: int = 1000
) -> List[Event]:
"""Stream all events for projections"""
async with self.db_pool.acquire() as conn:
rows = await conn.fetch("""
SELECT * FROM events
WHERE global_position > $1
ORDER BY global_position ASC
LIMIT $2
""", from_position, batch_size)
return [self._row_to_event(row) for row in rows]
# ============== Aggregate with Event Sourcing ==============
class OrderStatus(Enum):
DRAFT = "draft"
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
@dataclass
class OrderAggregate:
"""Order aggregate rebuilt from events"""
id: str
user_id: Optional[str] = None
status: OrderStatus = OrderStatus.DRAFT
items: List[Dict] = field(default_factory=list)
total: float = 0.0
version: int = 0
created_at: Optional[datetime] = None
@classmethod
def from_events(cls, events: List[Event]) -> 'OrderAggregate':
"""Reconstruct aggregate from event history"""
aggregate = cls(id=events[0].aggregate_id if events else None)
for event in events:
aggregate._apply(event)
return aggregate
def _apply(self, event: Event) -> None:
"""Apply event to update state"""
handler = getattr(self, f"_apply_{event.event_type.value}", None)
if handler:
handler(event)
self.version = event.version
def _apply_order_created(self, event: Event) -> None:
self.user_id = event.payload["user_id"]
self.status = OrderStatus.DRAFT
self.created_at = event.timestamp
def _apply_order_item_added(self, event: Event) -> None:
self.items.append(event.payload["item"])
self._recalculate_total()
def _apply_order_item_removed(self, event: Event) -> None:
item_id = event.payload["item_id"]
self.items = [i for i in self.items if i["id"] != item_id]
self._recalculate_total()
def _apply_order_submitted(self, event: Event) -> None:
self.status = OrderStatus.PENDING
def _apply_order_paid(self, event: Event) -> None:
self.status = OrderStatus.PAID
def _apply_order_shipped(self, event: Event) -> None:
self.status = OrderStatus.SHIPPED
def _apply_order_cancelled(self, event: Event) -> None:
self.status = OrderStatus.CANCELLED
def _recalculate_total(self) -> None:
self.total = sum(
item["price"] * item["quantity"]
for item in self.items
)
# ============== Snapshots for Performance ==============
class SnapshotStore:
"""Store periodic snapshots to speed up replay"""
def __init__(self, db_pool, snapshot_interval: int = 100):
self.db_pool = db_pool
self.snapshot_interval = snapshot_interval
async def save_snapshot(
self,
aggregate_id: str,
aggregate: OrderAggregate
) -> None:
"""Save aggregate snapshot"""
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO snapshots (aggregate_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id)
DO UPDATE SET version = $2, state = $3, created_at = $4
""",
aggregate_id,
aggregate.version,
json.dumps(aggregate.__dict__, default=str),
datetime.utcnow()
)
async def get_snapshot(
self,
aggregate_id: str
) -> Optional[OrderAggregate]:
"""Load latest snapshot"""
async with self.db_pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT state, version FROM snapshots
WHERE aggregate_id = $1
""", aggregate_id)
if row:
state = json.loads(row['state'])
return OrderAggregate(**state)
return None
class OrderRepository:
"""Repository using snapshots + events"""
def __init__(
self,
event_store: EventStore,
snapshot_store: SnapshotStore
):
self.event_store = event_store
self.snapshot_store = snapshot_store
async def get(self, order_id: str) -> Optional[OrderAggregate]:
# Try to load from snapshot first
aggregate = await self.snapshot_store.get_snapshot(order_id)
if aggregate:
# Only replay events after snapshot
events = await self.event_store.get_events(
order_id,
from_version=aggregate.version
)
else:
# Replay all events
events = await self.event_store.get_events(order_id)
if not events:
return None
aggregate = OrderAggregate(id=order_id)
# Apply remaining events
for event in events:
aggregate._apply(event)
# Create snapshot if needed
if aggregate.version % self.snapshot_store.snapshot_interval == 0:
await self.snapshot_store.save_snapshot(order_id, aggregate)
return aggregate
Copy
const { v4: uuidv4 } = require('uuid');
// ============== Event Types ==============
const EventType = {
ORDER_CREATED: 'order_created',
ORDER_ITEM_ADDED: 'order_item_added',
ORDER_ITEM_REMOVED: 'order_item_removed',
ORDER_SUBMITTED: 'order_submitted',
ORDER_PAID: 'order_paid',
ORDER_SHIPPED: 'order_shipped',
ORDER_CANCELLED: 'order_cancelled'
};
class Event {
constructor(eventType, aggregateId, payload, version) {
this.eventId = uuidv4();
this.eventType = eventType;
this.aggregateId = aggregateId;
this.payload = payload;
this.version = version;
this.timestamp = new Date();
this.metadata = {};
}
}
// ============== Event Store ==============
class EventStore {
constructor(pool) {
this.pool = pool;
}
async append(event) {
const client = await this.pool.connect();
try {
await client.query(`
INSERT INTO events (
event_id, aggregate_id, event_type,
payload, version, timestamp, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, [
event.eventId,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
event.version,
event.timestamp,
JSON.stringify(event.metadata)
]);
} catch (error) {
if (error.code === '23505') { // Unique violation
throw new ConcurrencyError(
`Version ${event.version} already exists for ${event.aggregateId}`
);
}
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId, fromVersion = 0) {
const client = await this.pool.connect();
try {
const result = await client.query(`
SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
`, [aggregateId, fromVersion]);
return result.rows.map(row => ({
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
payload: row.payload,
version: row.version,
timestamp: row.timestamp,
metadata: row.metadata
}));
} finally {
client.release();
}
}
async getAllEvents(fromPosition = 0, batchSize = 1000) {
const client = await this.pool.connect();
try {
const result = await client.query(`
SELECT * FROM events
WHERE global_position > $1
ORDER BY global_position ASC
LIMIT $2
`, [fromPosition, batchSize]);
return result.rows;
} finally {
client.release();
}
}
}
// ============== Order Aggregate ==============
const OrderStatus = {
DRAFT: 'draft',
PENDING: 'pending',
PAID: 'paid',
SHIPPED: 'shipped',
DELIVERED: 'delivered',
CANCELLED: 'cancelled'
};
class OrderAggregate {
constructor(id) {
this.id = id;
this.userId = null;
this.status = OrderStatus.DRAFT;
this.items = [];
this.total = 0;
this.version = 0;
this.createdAt = null;
}
static fromEvents(events) {
if (!events.length) return null;
const aggregate = new OrderAggregate(events[0].aggregateId);
for (const event of events) {
aggregate.apply(event);
}
return aggregate;
}
apply(event) {
const handler = this[`apply${this.toPascalCase(event.eventType)}`];
if (handler) {
handler.call(this, event);
}
this.version = event.version;
}
applyOrderCreated(event) {
this.userId = event.payload.userId;
this.status = OrderStatus.DRAFT;
this.createdAt = event.timestamp;
}
applyOrderItemAdded(event) {
this.items.push(event.payload.item);
this.recalculateTotal();
}
applyOrderItemRemoved(event) {
const itemId = event.payload.itemId;
this.items = this.items.filter(i => i.id !== itemId);
this.recalculateTotal();
}
applyOrderSubmitted(event) {
this.status = OrderStatus.PENDING;
}
applyOrderPaid(event) {
this.status = OrderStatus.PAID;
}
applyOrderShipped(event) {
this.status = OrderStatus.SHIPPED;
}
applyOrderCancelled(event) {
this.status = OrderStatus.CANCELLED;
}
recalculateTotal() {
this.total = this.items.reduce(
(sum, item) => sum + (item.price * item.quantity),
0
);
}
toPascalCase(str) {
return str.replace(/_([a-z])/g, (g) => g[1].toUpperCase())
.replace(/^[a-z]/, (c) => c.toUpperCase());
}
}
// ============== Snapshot Store ==============
class SnapshotStore {
constructor(pool, snapshotInterval = 100) {
this.pool = pool;
this.snapshotInterval = snapshotInterval;
}
async saveSnapshot(aggregateId, aggregate) {
const client = await this.pool.connect();
try {
await client.query(`
INSERT INTO snapshots (aggregate_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id)
DO UPDATE SET version = $2, state = $3, created_at = $4
`, [
aggregateId,
aggregate.version,
JSON.stringify(aggregate),
new Date()
]);
} finally {
client.release();
}
}
async getSnapshot(aggregateId) {
const client = await this.pool.connect();
try {
const result = await client.query(`
SELECT state, version FROM snapshots
WHERE aggregate_id = $1
`, [aggregateId]);
if (result.rows.length > 0) {
const state = result.rows[0].state;
const aggregate = new OrderAggregate(aggregateId);
Object.assign(aggregate, state);
return aggregate;
}
return null;
} finally {
client.release();
}
}
}
// ============== Order Repository ==============
class OrderRepository {
constructor(eventStore, snapshotStore) {
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
}
async get(orderId) {
// Try to load from snapshot first
let aggregate = await this.snapshotStore.getSnapshot(orderId);
let events;
if (aggregate) {
// Only replay events after snapshot
events = await this.eventStore.getEvents(orderId, aggregate.version);
} else {
// Replay all events
events = await this.eventStore.getEvents(orderId);
if (!events.length) return null;
aggregate = new OrderAggregate(orderId);
}
// Apply remaining events
for (const event of events) {
aggregate.apply(event);
}
// Create snapshot if needed
if (aggregate.version % this.snapshotStore.snapshotInterval === 0) {
await this.snapshotStore.saveSnapshot(orderId, aggregate);
}
return aggregate;
}
}
// ============== Time Travel / Replay ==============
class EventReplayer {
constructor(eventStore) {
this.eventStore = eventStore;
}
async getStateAtTime(aggregateId, targetTime) {
const events = await this.eventStore.getEvents(aggregateId);
const filteredEvents = events.filter(e =>
new Date(e.timestamp) <= targetTime
);
return OrderAggregate.fromEvents(filteredEvents);
}
async replayAllEvents(projector, fromPosition = 0) {
let position = fromPosition;
const batchSize = 1000;
while (true) {
const events = await this.eventStore.getAllEvents(position, batchSize);
if (events.length === 0) break;
for (const event of events) {
await projector.project(event);
position = event.global_position;
}
console.log(`Replayed up to position ${position}`);
}
return position;
}
}
module.exports = {
Event,
EventType,
EventStore,
OrderAggregate,
SnapshotStore,
OrderRepository,
EventReplayer
};
When to use Event Sourcing:
- Audit requirements (financial systems, healthcare)
- Need to replay/debug past states
- Complex business logic with temporal queries
- Event-driven microservices architecture
Interview Questions: Senior Level
How would you design for multi-region active-active?
How would you design for multi-region active-active?
Key Points:
- Data replication: Async replication between regions (eventual consistency)
- Conflict resolution: Last-write-wins (with vector clocks) or custom merge
- Routing: GeoDNS to route users to nearest region
- Failover: Health checks + automatic DNS failover
- Consistency: Accept that cross-region writes may conflict
- Latency vs consistency
- Cost of running in multiple regions
- Complexity of conflict resolution
How do you handle a database that can't keep up with writes?
How do you handle a database that can't keep up with writes?
Solutions in order of complexity:
- Batch writes: Accumulate and write in batches
- Write-behind cache: Write to Redis, async persist to DB
- Message queue: Queue writes, process at sustainable rate
- Sharding: Distribute writes across multiple DB nodes
- Different DB: Switch to write-optimized DB (Cassandra, ScyllaDB)
Explain how you'd implement distributed transactions
Explain how you'd implement distributed transactions
Answer structure:
- First ask: “Do we really need distributed transactions?” Often can redesign.
- 2PC: Strong consistency, but blocking and slow
- Saga: Eventual consistency, compensating transactions
- Outbox pattern: Reliable event publishing with local transaction
Copy
async def create_order_saga(order):
try:
order_id = await order_service.create(order)
await inventory_service.reserve(order.items)
await payment_service.charge(order.payment)
await order_service.confirm(order_id)
except PaymentFailed:
await inventory_service.release(order.items)
await order_service.cancel(order_id)
How do you debug a latency spike in a distributed system?
How do you debug a latency spike in a distributed system?
Systematic approach:
- Observe: Check metrics dashboards (p99 latency by service)
- Trace: Use distributed tracing (Jaeger/Zipkin) to find slow span
- Correlate: Check if spike correlates with deployments, traffic, or GC
- Drill down: Once you find the service, check:
- CPU/memory usage
- DB query times (slow query log)
- Network latency between services
- Thread pool saturation
- Lock contention
How would you design a system that handles 1M requests per second?
How would you design a system that handles 1M requests per second?
Approach:
- Back of envelope: 1M RPS = ~60K servers at 16 RPS each (conservative)
- Stateless compute: Horizontal scaling with load balancer
- Caching: Cache everything possible (aim for 99%+ cache hit)
- CDN: Serve static content from edge
- Database: Shard aggressively, read replicas
- Async: Queue non-critical work
- Network: 1M × 10KB = 10GB/s = 80Gbps (need multiple LBs)
- Compute: 1M / 10K (RPS per server) = 100 servers minimum
- Database: Can’t hit DB for every request, need 99%+ cache hit