Event Sourcing Deep Dive
Event sourcing stores state as a sequence of events, providing a complete audit trail and enabling powerful temporal queries.Learning Objectives:
- Understand when to use event sourcing
- Implement event stores and projections
- Master snapshot strategies
- Combine event sourcing with CQRS
- Handle event versioning and schema evolution
Event Sourcing Fundamentals
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ TRADITIONAL vs EVENT SOURCING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TRADITIONAL (STATE-BASED) │
│ ───────────────────────────── │
│ │
│ Bank Account Record: │
│ ┌────────────────────────────────────────┐ │
│ │ id: 123 │ │
│ │ balance: $1,000 ← Only current │ │
│ │ updated_at: 2024-01-15 │ │
│ └────────────────────────────────────────┘ │
│ │
│ ❌ Lost: How did we get here? Who? When? │
│ │
│ ═══════════════════════════════════════════════════════════════════════ │
│ │
│ EVENT SOURCING │
│ ───────────────── │
│ │
│ Bank Account Events: │
│ ┌────────────────────────────────────────┐ │
│ │ 1. AccountOpened { balance: $0 } │ 2024-01-01 │
│ │ 2. MoneyDeposited { amount: $500 } │ 2024-01-05 │
│ │ 3. MoneyDeposited { amount: $1000 } │ 2024-01-10 │
│ │ 4. MoneyWithdrawn { amount: $500 } │ 2024-01-15 │
│ │ ───────────────────────────────────── │
│ │ Current Balance: $1,000 │
│ └────────────────────────────────────────┘ │
│ │
│ ✅ Complete history: who, what, when, why │
│ ✅ Can replay to any point in time │
│ ✅ Natural audit trail │
│ ✅ Debug by replaying events │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
When to Use Event Sourcing
✅ Good Use Cases
- Financial systems (audit required)
- Order management (complex workflows)
- User activity tracking
- Collaborative editing
- Undo/redo functionality
- Temporal queries (“balance last month?”)
❌ Poor Use Cases
- Simple CRUD applications
- High-volume metrics/logs
- Frequently updated data
- No audit requirements
- Small teams (complexity overhead)
Event Store Implementation
Copy
// event-store.js - Core event sourcing infrastructure
const { Pool } = require('pg');
const { EventEmitter } = require('events');
class EventStore extends EventEmitter {
constructor(db) {
super();
this.db = db;
}
async initialize() {
// Create event store table
await this.db.query(`
CREATE TABLE IF NOT EXISTS events (
id BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version INTEGER NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id);
CREATE INDEX IF NOT EXISTS idx_events_stream_type ON events(stream_type);
CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);
`);
// Snapshots table
await this.db.query(`
CREATE TABLE IF NOT EXISTS snapshots (
id BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL UNIQUE,
stream_type VARCHAR(100) NOT NULL,
version INTEGER NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
`);
}
// Append events to a stream
async appendToStream(streamId, streamType, events, expectedVersion = null) {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
if (expectedVersion !== null) {
const current = await client.query(
`SELECT COALESCE(MAX(version), 0) as version
FROM events WHERE stream_id = $1`,
[streamId]
);
if (current.rows[0].version !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, ` +
`but found ${current.rows[0].version}`
);
}
}
// Get next version
const versionResult = await client.query(
`SELECT COALESCE(MAX(version), 0) + 1 as next_version
FROM events WHERE stream_id = $1`,
[streamId]
);
let version = versionResult.rows[0].next_version;
// Insert events
const insertedEvents = [];
for (const event of events) {
const result = await client.query(
`INSERT INTO events
(stream_id, stream_type, event_type, event_data, metadata, version)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *`,
[
streamId,
streamType,
event.type,
event.data,
event.metadata || {},
version++
]
);
insertedEvents.push(result.rows[0]);
}
await client.query('COMMIT');
// Emit events for subscribers
for (const event of insertedEvents) {
this.emit('event', event);
this.emit(event.event_type, event);
}
return insertedEvents;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
// Read events from a stream
async readStream(streamId, fromVersion = 0) {
const result = await this.db.query(
`SELECT * FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version ASC`,
[streamId, fromVersion]
);
return result.rows;
}
// Read all events (for projections)
async readAllEvents(fromPosition = 0, batchSize = 1000) {
const result = await this.db.query(
`SELECT * FROM events
WHERE id > $1
ORDER BY id ASC
LIMIT $2`,
[fromPosition, batchSize]
);
return result.rows;
}
// Save snapshot
async saveSnapshot(streamId, streamType, version, state) {
await this.db.query(
`INSERT INTO snapshots (stream_id, stream_type, version, state)
VALUES ($1, $2, $3, $4)
ON CONFLICT (stream_id)
DO UPDATE SET version = $3, state = $4, created_at = NOW()`,
[streamId, streamType, version, state]
);
}
// Load snapshot
async loadSnapshot(streamId) {
const result = await this.db.query(
`SELECT * FROM snapshots WHERE stream_id = $1`,
[streamId]
);
return result.rows[0] || null;
}
}
class ConcurrencyError extends Error {
constructor(message) {
super(message);
this.name = 'ConcurrencyError';
}
}
module.exports = { EventStore, ConcurrencyError };
Aggregate Pattern
Copy
// aggregate.js - Event-sourced aggregate root
class AggregateRoot {
constructor() {
this._uncommittedEvents = [];
this._version = 0;
}
get version() {
return this._version;
}
get uncommittedEvents() {
return this._uncommittedEvents;
}
clearUncommittedEvents() {
this._uncommittedEvents = [];
}
// Apply event and track as uncommitted
applyChange(event) {
this._apply(event);
this._uncommittedEvents.push(event);
}
// Apply event from history (committed)
_apply(event) {
const handler = `on${event.type}`;
if (typeof this[handler] === 'function') {
this[handler](event.data);
}
this._version++;
}
// Load from event history
loadFromHistory(events) {
for (const event of events) {
this._apply({
type: event.event_type,
data: event.event_data
});
}
}
}
// Bank Account aggregate
class BankAccount extends AggregateRoot {
constructor() {
super();
this.id = null;
this.balance = 0;
this.status = 'pending';
this.transactions = [];
}
// Commands (business logic + emit events)
static open(accountId, ownerId, initialDeposit = 0) {
const account = new BankAccount();
account.applyChange({
type: 'AccountOpened',
data: {
accountId,
ownerId,
openedAt: new Date().toISOString()
}
});
if (initialDeposit > 0) {
account.deposit(initialDeposit, 'Initial deposit');
}
return account;
}
deposit(amount, description = '') {
if (amount <= 0) {
throw new Error('Deposit amount must be positive');
}
if (this.status !== 'active') {
throw new Error('Account is not active');
}
this.applyChange({
type: 'MoneyDeposited',
data: {
amount,
description,
timestamp: new Date().toISOString()
}
});
}
withdraw(amount, description = '') {
if (amount <= 0) {
throw new Error('Withdrawal amount must be positive');
}
if (this.status !== 'active') {
throw new Error('Account is not active');
}
if (amount > this.balance) {
throw new Error('Insufficient funds');
}
this.applyChange({
type: 'MoneyWithdrawn',
data: {
amount,
description,
timestamp: new Date().toISOString()
}
});
}
close(reason = '') {
if (this.balance !== 0) {
throw new Error('Cannot close account with non-zero balance');
}
this.applyChange({
type: 'AccountClosed',
data: {
reason,
closedAt: new Date().toISOString()
}
});
}
// Event handlers (state changes)
onAccountOpened(data) {
this.id = data.accountId;
this.ownerId = data.ownerId;
this.status = 'active';
this.openedAt = data.openedAt;
}
onMoneyDeposited(data) {
this.balance += data.amount;
this.transactions.push({
type: 'deposit',
amount: data.amount,
timestamp: data.timestamp
});
}
onMoneyWithdrawn(data) {
this.balance -= data.amount;
this.transactions.push({
type: 'withdrawal',
amount: data.amount,
timestamp: data.timestamp
});
}
onAccountClosed(data) {
this.status = 'closed';
this.closedAt = data.closedAt;
}
// Snapshot for performance
toSnapshot() {
return {
id: this.id,
ownerId: this.ownerId,
balance: this.balance,
status: this.status,
openedAt: this.openedAt,
closedAt: this.closedAt,
transactionCount: this.transactions.length
};
}
static fromSnapshot(snapshot) {
const account = new BankAccount();
Object.assign(account, snapshot);
account.transactions = []; // Don't store all transactions in snapshot
return account;
}
}
module.exports = { AggregateRoot, BankAccount };
Repository Pattern
Copy
// repository.js - Event-sourced repository
class EventSourcedRepository {
constructor(eventStore, aggregateType, AggregateClass) {
this.eventStore = eventStore;
this.aggregateType = aggregateType;
this.AggregateClass = AggregateClass;
this.snapshotFrequency = 100; // Snapshot every 100 events
}
async getById(id) {
const streamId = `${this.aggregateType}-${id}`;
// Try to load from snapshot first
let aggregate;
let fromVersion = 0;
const snapshot = await this.eventStore.loadSnapshot(streamId);
if (snapshot) {
aggregate = this.AggregateClass.fromSnapshot(snapshot.state);
aggregate._version = snapshot.version;
fromVersion = snapshot.version + 1;
} else {
aggregate = new this.AggregateClass();
}
// Load events since snapshot (or all events if no snapshot)
const events = await this.eventStore.readStream(streamId, fromVersion);
if (events.length === 0 && !snapshot) {
return null; // Aggregate doesn't exist
}
aggregate.loadFromHistory(events);
return aggregate;
}
async save(aggregate) {
const streamId = `${this.aggregateType}-${aggregate.id}`;
const events = aggregate.uncommittedEvents;
if (events.length === 0) {
return; // Nothing to save
}
// Format events for storage
const eventsToStore = events.map(e => ({
type: e.type,
data: e.data,
metadata: {
timestamp: new Date().toISOString(),
...e.metadata
}
}));
// Append to event store with optimistic concurrency
const expectedVersion = aggregate.version - events.length;
await this.eventStore.appendToStream(
streamId,
this.aggregateType,
eventsToStore,
expectedVersion
);
// Create snapshot if needed
if (aggregate.version % this.snapshotFrequency === 0) {
await this.eventStore.saveSnapshot(
streamId,
this.aggregateType,
aggregate.version,
aggregate.toSnapshot()
);
}
aggregate.clearUncommittedEvents();
}
}
// Usage
const bankAccountRepo = new EventSourcedRepository(
eventStore,
'BankAccount',
BankAccount
);
// Create new account
const account = BankAccount.open('acc-123', 'user-456', 1000);
await bankAccountRepo.save(account);
// Load and modify
const loadedAccount = await bankAccountRepo.getById('acc-123');
loadedAccount.deposit(500, 'Salary');
loadedAccount.withdraw(200, 'Groceries');
await bankAccountRepo.save(loadedAccount);
module.exports = { EventSourcedRepository };
Projections
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PROJECTIONS (READ MODELS) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Event Store (Write Side) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 1. AccountOpened { accountId: 'A1', ownerId: 'U1' } │ │
│ │ 2. MoneyDeposited { accountId: 'A1', amount: 1000 } │ │
│ │ 3. AccountOpened { accountId: 'A2', ownerId: 'U2' } │ │
│ │ 4. MoneyDeposited { accountId: 'A2', amount: 500 } │ │
│ │ 5. MoneyWithdrawn { accountId: 'A1', amount: 200 } │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Projector │ │
│ │ (Event │ │
│ │ Handler) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │
│ │ Account Balances │ │ Transaction List │ │ Account Summary │ │
│ │ (Read Model) │ │ (Read Model) │ │ (Read Model) │ │
│ ├───────────────────┤ ├───────────────────┤ ├───────────────────┤ │
│ │ A1: $800 │ │ A1: +1000, -200 │ │ Total: 2 accounts │ │
│ │ A2: $500 │ │ A2: +500 │ │ Total: $1300 │ │
│ └───────────────────┘ └───────────────────┘ └───────────────────┘ │
│ │
│ Each projection can be: │
│ • Different database (PostgreSQL, MongoDB, Redis, Elasticsearch) │
│ • Optimized for specific queries │
│ • Rebuilt by replaying all events │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Projection Implementation
Copy
// projections.js - Read model projections
class ProjectionManager {
constructor(eventStore, db) {
this.eventStore = eventStore;
this.db = db;
this.projections = new Map();
this.checkpoints = new Map();
}
register(name, projection) {
this.projections.set(name, projection);
}
async start() {
// Load checkpoints
for (const [name, projection] of this.projections) {
const checkpoint = await this.loadCheckpoint(name);
this.checkpoints.set(name, checkpoint);
}
// Start processing
this.running = true;
this.process();
}
async process() {
while (this.running) {
for (const [name, projection] of this.projections) {
const checkpoint = this.checkpoints.get(name);
const events = await this.eventStore.readAllEvents(checkpoint, 100);
if (events.length > 0) {
for (const event of events) {
await projection.apply(event);
}
const newCheckpoint = events[events.length - 1].id;
await this.saveCheckpoint(name, newCheckpoint);
this.checkpoints.set(name, newCheckpoint);
}
}
await this.sleep(100);
}
}
async loadCheckpoint(projectionName) {
const result = await this.db.query(
`SELECT position FROM projection_checkpoints WHERE name = $1`,
[projectionName]
);
return result.rows[0]?.position || 0;
}
async saveCheckpoint(projectionName, position) {
await this.db.query(
`INSERT INTO projection_checkpoints (name, position)
VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE SET position = $2`,
[projectionName, position]
);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Account Balance Projection
class AccountBalanceProjection {
constructor(db) {
this.db = db;
}
async initialize() {
await this.db.query(`
CREATE TABLE IF NOT EXISTS account_balances (
account_id VARCHAR(255) PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
balance DECIMAL(15, 2) DEFAULT 0,
status VARCHAR(50) DEFAULT 'active',
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
}
async apply(event) {
switch (event.event_type) {
case 'AccountOpened':
await this.db.query(
`INSERT INTO account_balances (account_id, owner_id, balance)
VALUES ($1, $2, 0)
ON CONFLICT (account_id) DO NOTHING`,
[event.event_data.accountId, event.event_data.ownerId]
);
break;
case 'MoneyDeposited':
await this.db.query(
`UPDATE account_balances
SET balance = balance + $1, updated_at = NOW()
WHERE account_id = $2`,
[event.event_data.amount, event.stream_id.split('-')[1]]
);
break;
case 'MoneyWithdrawn':
await this.db.query(
`UPDATE account_balances
SET balance = balance - $1, updated_at = NOW()
WHERE account_id = $2`,
[event.event_data.amount, event.stream_id.split('-')[1]]
);
break;
case 'AccountClosed':
await this.db.query(
`UPDATE account_balances
SET status = 'closed', updated_at = NOW()
WHERE account_id = $1`,
[event.stream_id.split('-')[1]]
);
break;
}
}
// Rebuild from scratch
async rebuild() {
await this.db.query('TRUNCATE account_balances');
// Checkpoint will be reset, and events will be replayed
}
}
// Transaction History Projection
class TransactionHistoryProjection {
constructor(db) {
this.db = db;
}
async initialize() {
await this.db.query(`
CREATE TABLE IF NOT EXISTS transaction_history (
id BIGSERIAL PRIMARY KEY,
account_id VARCHAR(255) NOT NULL,
transaction_type VARCHAR(50) NOT NULL,
amount DECIMAL(15, 2) NOT NULL,
description TEXT,
balance_after DECIMAL(15, 2),
created_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_transactions_account
ON transaction_history(account_id, created_at DESC);
`);
}
async apply(event) {
if (event.event_type === 'MoneyDeposited') {
await this.db.query(
`INSERT INTO transaction_history
(account_id, transaction_type, amount, description, created_at)
VALUES ($1, 'deposit', $2, $3, $4)`,
[
event.stream_id.split('-')[1],
event.event_data.amount,
event.event_data.description,
event.event_data.timestamp
]
);
}
if (event.event_type === 'MoneyWithdrawn') {
await this.db.query(
`INSERT INTO transaction_history
(account_id, transaction_type, amount, description, created_at)
VALUES ($1, 'withdrawal', $2, $3, $4)`,
[
event.stream_id.split('-')[1],
event.event_data.amount,
event.event_data.description,
event.event_data.timestamp
]
);
}
}
}
// Daily Summary Projection (Aggregated view)
class DailySummaryProjection {
constructor(db) {
this.db = db;
}
async initialize() {
await this.db.query(`
CREATE TABLE IF NOT EXISTS daily_summaries (
date DATE NOT NULL,
account_id VARCHAR(255) NOT NULL,
deposits_count INTEGER DEFAULT 0,
deposits_total DECIMAL(15, 2) DEFAULT 0,
withdrawals_count INTEGER DEFAULT 0,
withdrawals_total DECIMAL(15, 2) DEFAULT 0,
PRIMARY KEY (date, account_id)
)
`);
}
async apply(event) {
const date = new Date(event.created_at).toISOString().split('T')[0];
const accountId = event.stream_id.split('-')[1];
if (event.event_type === 'MoneyDeposited') {
await this.db.query(
`INSERT INTO daily_summaries (date, account_id, deposits_count, deposits_total)
VALUES ($1, $2, 1, $3)
ON CONFLICT (date, account_id) DO UPDATE SET
deposits_count = daily_summaries.deposits_count + 1,
deposits_total = daily_summaries.deposits_total + $3`,
[date, accountId, event.event_data.amount]
);
}
if (event.event_type === 'MoneyWithdrawn') {
await this.db.query(
`INSERT INTO daily_summaries (date, account_id, withdrawals_count, withdrawals_total)
VALUES ($1, $2, 1, $3)
ON CONFLICT (date, account_id) DO UPDATE SET
withdrawals_count = daily_summaries.withdrawals_count + 1,
withdrawals_total = daily_summaries.withdrawals_total + $3`,
[date, accountId, event.event_data.amount]
);
}
}
}
module.exports = {
ProjectionManager,
AccountBalanceProjection,
TransactionHistoryProjection,
DailySummaryProjection
};
Event Versioning & Schema Evolution
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ EVENT SCHEMA EVOLUTION │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM: Events are immutable, but requirements change │
│ │
│ V1: MoneyDeposited { amount: 100 } │
│ V2: MoneyDeposited { amount: 100, currency: 'USD' } ← New field │
│ V3: MoneyDeposited { amount: { value: 100, currency: 'USD' } } ← Changed │
│ │
│ ═══════════════════════════════════════════════════════════════════════ │
│ │
│ SOLUTION: UPCASTING │
│ ───────────────────── │
│ │
│ Store original event → Transform when reading │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Event Store Upcaster App │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌────────┐ │ │
│ │ │ V1 Event │──────────────▶│ v1 → v2 → v3 │──────▶│ V3 │ │ │
│ │ └──────────────┘ └──────────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Events stay immutable, transformation happens at read time │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Upcasting Implementation
Copy
// event-upcaster.js - Event schema evolution
class EventUpcaster {
constructor() {
this.upcasters = new Map();
}
// Register an upcaster for an event type and version
register(eventType, fromVersion, toVersion, upcasterFn) {
const key = `${eventType}:${fromVersion}`;
this.upcasters.set(key, {
toVersion,
transform: upcasterFn
});
}
// Upcast event to latest version
upcast(event) {
let currentEvent = { ...event };
let currentVersion = event.metadata?.version || 1;
while (true) {
const key = `${event.event_type}:${currentVersion}`;
const upcaster = this.upcasters.get(key);
if (!upcaster) break; // No more upcasts needed
currentEvent = {
...currentEvent,
event_data: upcaster.transform(currentEvent.event_data),
metadata: {
...currentEvent.metadata,
version: upcaster.toVersion,
originalVersion: event.metadata?.version || 1
}
};
currentVersion = upcaster.toVersion;
}
return currentEvent;
}
}
// Register upcasters
const upcaster = new EventUpcaster();
// MoneyDeposited v1 → v2 (add currency)
upcaster.register('MoneyDeposited', 1, 2, (data) => ({
...data,
currency: 'USD' // Default to USD for old events
}));
// MoneyDeposited v2 → v3 (restructure amount)
upcaster.register('MoneyDeposited', 2, 3, (data) => ({
amount: {
value: data.amount,
currency: data.currency
},
description: data.description,
timestamp: data.timestamp
}));
// UserRegistered v1 → v2 (split name into firstName/lastName)
upcaster.register('UserRegistered', 1, 2, (data) => {
const [firstName, ...rest] = (data.name || '').split(' ');
return {
...data,
firstName: firstName || '',
lastName: rest.join(' ') || '',
name: undefined // Remove old field
};
});
// Event store with upcasting
class UpcastingEventStore {
constructor(eventStore, upcaster) {
this.eventStore = eventStore;
this.upcaster = upcaster;
}
async readStream(streamId, fromVersion = 0) {
const events = await this.eventStore.readStream(streamId, fromVersion);
return events.map(e => this.upcaster.upcast(e));
}
async readAllEvents(fromPosition = 0, batchSize = 1000) {
const events = await this.eventStore.readAllEvents(fromPosition, batchSize);
return events.map(e => this.upcaster.upcast(e));
}
// Write methods don't upcast (always write latest version)
async appendToStream(streamId, streamType, events, expectedVersion) {
// Add version metadata to new events
const versionedEvents = events.map(e => ({
...e,
metadata: {
...e.metadata,
version: this.getLatestVersion(e.type)
}
}));
return this.eventStore.appendToStream(
streamId, streamType, versionedEvents, expectedVersion
);
}
getLatestVersion(eventType) {
// Return the latest version for each event type
const versions = {
'MoneyDeposited': 3,
'MoneyWithdrawn': 2,
'UserRegistered': 2
};
return versions[eventType] || 1;
}
}
module.exports = { EventUpcaster, UpcastingEventStore };
CQRS with Event Sourcing
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CQRS + EVENT SOURCING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ │
│ │ Client │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌────────────────────┴────────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ Command Side │ │ Query Side │ │
│ │ (Write) │ │ (Read) │ │
│ └───────┬────────┘ └────────┬───────┘ │
│ │ │ │
│ ┌───────▼────────┐ ┌────────▼───────┐ │
│ │ Command Handler│ │ Query Handler │ │
│ └───────┬────────┘ └────────┬───────┘ │
│ │ │ │
│ ┌───────▼────────┐ ┌────────▼───────┐ │
│ │ Aggregates │ │ Read Models │ │
│ │ (Domain Logic)│ │ (Projections) │ │
│ └───────┬────────┘ └────────▲───────┘ │
│ │ │ │
│ ┌───────▼────────┐ events ┌───────┴───────┐ │
│ │ Event Store │──────────────────────▶│ Projector │ │
│ │ (Append-only) │ │ │ │
│ └────────────────┘ └───────────────┘ │
│ │
│ WRITE PATH: Command → Aggregate → Events → Event Store │
│ READ PATH: Query → Read Model (optimized for queries) │
│ SYNC: Event Store → Projector → Read Models (eventually consistent) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Complete CQRS Implementation
Copy
// cqrs-service.js - CQRS + Event Sourcing service
const express = require('express');
const { EventStore } = require('./event-store');
const { EventSourcedRepository } = require('./repository');
const { BankAccount } = require('./aggregate');
const {
ProjectionManager,
AccountBalanceProjection,
TransactionHistoryProjection
} = require('./projections');
class BankAccountService {
constructor(db) {
this.eventStore = new EventStore(db);
this.repository = new EventSourcedRepository(
this.eventStore,
'BankAccount',
BankAccount
);
// Read models
this.balanceProjection = new AccountBalanceProjection(db);
this.transactionProjection = new TransactionHistoryProjection(db);
this.db = db;
}
async initialize() {
await this.eventStore.initialize();
await this.balanceProjection.initialize();
await this.transactionProjection.initialize();
// Start projection processing
const projectionManager = new ProjectionManager(this.eventStore, this.db);
projectionManager.register('account-balances', this.balanceProjection);
projectionManager.register('transactions', this.transactionProjection);
await projectionManager.start();
}
// COMMAND HANDLERS (Write Side)
async openAccount(command) {
const { accountId, ownerId, initialDeposit } = command;
// Check if account already exists
const existing = await this.repository.getById(accountId);
if (existing) {
throw new Error('Account already exists');
}
// Create new account
const account = BankAccount.open(accountId, ownerId, initialDeposit);
await this.repository.save(account);
return { accountId: account.id, balance: account.balance };
}
async deposit(command) {
const { accountId, amount, description } = command;
const account = await this.repository.getById(accountId);
if (!account) {
throw new Error('Account not found');
}
account.deposit(amount, description);
await this.repository.save(account);
return { accountId, newBalance: account.balance };
}
async withdraw(command) {
const { accountId, amount, description } = command;
const account = await this.repository.getById(accountId);
if (!account) {
throw new Error('Account not found');
}
account.withdraw(amount, description);
await this.repository.save(account);
return { accountId, newBalance: account.balance };
}
async closeAccount(command) {
const { accountId, reason } = command;
const account = await this.repository.getById(accountId);
if (!account) {
throw new Error('Account not found');
}
account.close(reason);
await this.repository.save(account);
return { accountId, status: 'closed' };
}
// QUERY HANDLERS (Read Side) - Use projections
async getAccountBalance(accountId) {
// Query from read model (fast, optimized)
const result = await this.db.query(
`SELECT * FROM account_balances WHERE account_id = $1`,
[accountId]
);
if (result.rows.length === 0) {
throw new Error('Account not found');
}
return result.rows[0];
}
async getTransactionHistory(accountId, limit = 50) {
const result = await this.db.query(
`SELECT * FROM transaction_history
WHERE account_id = $1
ORDER BY created_at DESC
LIMIT $2`,
[accountId, limit]
);
return result.rows;
}
async getAllAccounts(ownerId) {
const result = await this.db.query(
`SELECT * FROM account_balances WHERE owner_id = $1`,
[ownerId]
);
return result.rows;
}
async getAccountSummary() {
const result = await this.db.query(`
SELECT
COUNT(*) as total_accounts,
SUM(balance) as total_balance,
AVG(balance) as average_balance,
COUNT(*) FILTER (WHERE status = 'active') as active_accounts,
COUNT(*) FILTER (WHERE status = 'closed') as closed_accounts
FROM account_balances
`);
return result.rows[0];
}
}
// Express API
const createApp = (service) => {
const app = express();
app.use(express.json());
// Commands
app.post('/accounts', async (req, res, next) => {
try {
const result = await service.openAccount(req.body);
res.status(201).json(result);
} catch (error) {
next(error);
}
});
app.post('/accounts/:id/deposit', async (req, res, next) => {
try {
const result = await service.deposit({
accountId: req.params.id,
...req.body
});
res.json(result);
} catch (error) {
next(error);
}
});
app.post('/accounts/:id/withdraw', async (req, res, next) => {
try {
const result = await service.withdraw({
accountId: req.params.id,
...req.body
});
res.json(result);
} catch (error) {
next(error);
}
});
// Queries
app.get('/accounts/:id', async (req, res, next) => {
try {
const balance = await service.getAccountBalance(req.params.id);
res.json(balance);
} catch (error) {
next(error);
}
});
app.get('/accounts/:id/transactions', async (req, res, next) => {
try {
const transactions = await service.getTransactionHistory(
req.params.id,
parseInt(req.query.limit) || 50
);
res.json(transactions);
} catch (error) {
next(error);
}
});
app.get('/summary', async (req, res, next) => {
try {
const summary = await service.getAccountSummary();
res.json(summary);
} catch (error) {
next(error);
}
});
return app;
};
module.exports = { BankAccountService, createApp };
Interview Questions
Q1: What is Event Sourcing and when should you use it?
Q1: What is Event Sourcing and when should you use it?
Answer:Event Sourcing: Store state as a sequence of events rather than current state.Use when:
- Need complete audit trail (finance, healthcare)
- Complex business workflows
- Need temporal queries (“balance 3 months ago?”)
- Debugging requires understanding “how we got here”
- Undo/redo functionality needed
- Simple CRUD apps
- High-volume updates (too many events)
- No audit requirements
- Team lacks experience (steep learning curve)
Q2: How do you handle schema evolution in events?
Q2: How do you handle schema evolution in events?
Answer:Upcasting pattern:Best practices:
- Events are immutable (never change stored events)
- Transform events when reading
- Chain transformations: v1 → v2 → v3
Copy
// v1: { name: "John Doe" }
// v2: { firstName: "John", lastName: "Doe" }
upcast(v1Event) {
const [first, ...rest] = v1Event.name.split(' ');
return { firstName: first, lastName: rest.join(' ') };
}
- Add version metadata to events
- Make new fields optional with defaults
- Test upcasters with old event data
Q3: What are projections and why do you need them?
Q3: What are projections and why do you need them?
Answer:Projections: Read-optimized views built from events.Why needed:
- Replaying events for every query is slow
- Different queries need different data shapes
- Can use different databases per projection
- Checkpoint: Last processed event position
- Rebuild: Delete projection, replay all events
- Eventually consistent: Slight delay from write
- Account balances (fast balance lookup)
- Transaction history (ordered list)
- Daily summaries (aggregated stats)
Q4: Explain snapshots in Event Sourcing
Q4: Explain snapshots in Event Sourcing
Answer:Problem: Loading aggregate with 10,000 events is slow.Solution: Periodically save aggregate state (snapshot).How it works:When to snapshot:
- Save snapshot every N events (e.g., 100)
- On load: Get snapshot + events since snapshot
- Reduces events to replay
Copy
Snapshot (at event 1000): { balance: $5000 }
Events 1001-1050: [deposit, withdraw, ...]
Load: Apply 50 events to snapshot, not 1050
- After N events (e.g., 100)
- On aggregate save (if changed significantly)
- Background process during low load
Chapter Summary
Key Takeaways:
- Event sourcing stores state as immutable event sequence
- Use aggregates to encapsulate business logic and emit events
- Projections build read-optimized views from events
- Snapshots improve loading performance for long-lived aggregates
- Upcasting handles event schema evolution
- CQRS separates read and write models for scalability