Skip to main content

Event Sourcing Deep Dive

Event sourcing stores state as a sequence of events, providing a complete audit trail and enabling powerful temporal queries.
Learning Objectives:
  • Understand when to use event sourcing
  • Implement event stores and projections
  • Master snapshot strategies
  • Combine event sourcing with CQRS
  • Handle event versioning and schema evolution

Event Sourcing Fundamentals

┌─────────────────────────────────────────────────────────────────────────────┐
│                    TRADITIONAL vs EVENT SOURCING                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  TRADITIONAL (STATE-BASED)                                                  │
│  ─────────────────────────────                                              │
│                                                                              │
│  Bank Account Record:                                                       │
│  ┌────────────────────────────────────────┐                                │
│  │  id: 123                                │                                │
│  │  balance: $1,000     ← Only current    │                                │
│  │  updated_at: 2024-01-15                │                                │
│  └────────────────────────────────────────┘                                │
│                                                                              │
│  ❌ Lost: How did we get here? Who? When?                                   │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════   │
│                                                                              │
│  EVENT SOURCING                                                             │
│  ─────────────────                                                          │
│                                                                              │
│  Bank Account Events:                                                       │
│  ┌────────────────────────────────────────┐                                │
│  │  1. AccountOpened { balance: $0 }      │  2024-01-01                    │
│  │  2. MoneyDeposited { amount: $500 }    │  2024-01-05                    │
│  │  3. MoneyDeposited { amount: $1000 }   │  2024-01-10                    │
│  │  4. MoneyWithdrawn { amount: $500 }    │  2024-01-15                    │
│  │  ─────────────────────────────────────                                  │
│  │  Current Balance: $1,000                                                │
│  └────────────────────────────────────────┘                                │
│                                                                              │
│  ✅ Complete history: who, what, when, why                                  │
│  ✅ Can replay to any point in time                                        │
│  ✅ Natural audit trail                                                     │
│  ✅ Debug by replaying events                                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

When to Use Event Sourcing

✅ Good Use Cases

  • Financial systems (audit required)
  • Order management (complex workflows)
  • User activity tracking
  • Collaborative editing
  • Undo/redo functionality
  • Temporal queries (“balance last month?”)

❌ Poor Use Cases

  • Simple CRUD applications
  • High-volume metrics/logs
  • Frequently updated data
  • No audit requirements
  • Small teams (complexity overhead)

Event Store Implementation

// event-store.js - Core event sourcing infrastructure

const { Pool } = require('pg');
const { EventEmitter } = require('events');

class EventStore extends EventEmitter {
  constructor(db) {
    super();
    this.db = db;
  }

  async initialize() {
    // Create event store table
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS events (
        id BIGSERIAL PRIMARY KEY,
        stream_id VARCHAR(255) NOT NULL,
        stream_type VARCHAR(100) NOT NULL,
        event_type VARCHAR(100) NOT NULL,
        event_data JSONB NOT NULL,
        metadata JSONB DEFAULT '{}',
        version INTEGER NOT NULL,
        created_at TIMESTAMPTZ DEFAULT NOW(),
        
        UNIQUE(stream_id, version)
      );
      
      CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id);
      CREATE INDEX IF NOT EXISTS idx_events_stream_type ON events(stream_type);
      CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);
    `);

    // Snapshots table
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS snapshots (
        id BIGSERIAL PRIMARY KEY,
        stream_id VARCHAR(255) NOT NULL UNIQUE,
        stream_type VARCHAR(100) NOT NULL,
        version INTEGER NOT NULL,
        state JSONB NOT NULL,
        created_at TIMESTAMPTZ DEFAULT NOW()
      );
    `);
  }

  // Append events to a stream
  async appendToStream(streamId, streamType, events, expectedVersion = null) {
    const client = await this.db.connect();
    
    try {
      await client.query('BEGIN');
      
      // Optimistic concurrency check
      if (expectedVersion !== null) {
        const current = await client.query(
          `SELECT COALESCE(MAX(version), 0) as version 
           FROM events WHERE stream_id = $1`,
          [streamId]
        );
        
        if (current.rows[0].version !== expectedVersion) {
          throw new ConcurrencyError(
            `Expected version ${expectedVersion}, ` +
            `but found ${current.rows[0].version}`
          );
        }
      }

      // Get next version
      const versionResult = await client.query(
        `SELECT COALESCE(MAX(version), 0) + 1 as next_version 
         FROM events WHERE stream_id = $1`,
        [streamId]
      );
      let version = versionResult.rows[0].next_version;

      // Insert events
      const insertedEvents = [];
      for (const event of events) {
        const result = await client.query(
          `INSERT INTO events 
           (stream_id, stream_type, event_type, event_data, metadata, version)
           VALUES ($1, $2, $3, $4, $5, $6)
           RETURNING *`,
          [
            streamId,
            streamType,
            event.type,
            event.data,
            event.metadata || {},
            version++
          ]
        );
        insertedEvents.push(result.rows[0]);
      }

      await client.query('COMMIT');

      // Emit events for subscribers
      for (const event of insertedEvents) {
        this.emit('event', event);
        this.emit(event.event_type, event);
      }

      return insertedEvents;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  // Read events from a stream
  async readStream(streamId, fromVersion = 0) {
    const result = await this.db.query(
      `SELECT * FROM events 
       WHERE stream_id = $1 AND version >= $2 
       ORDER BY version ASC`,
      [streamId, fromVersion]
    );
    return result.rows;
  }

  // Read all events (for projections)
  async readAllEvents(fromPosition = 0, batchSize = 1000) {
    const result = await this.db.query(
      `SELECT * FROM events 
       WHERE id > $1 
       ORDER BY id ASC 
       LIMIT $2`,
      [fromPosition, batchSize]
    );
    return result.rows;
  }

  // Save snapshot
  async saveSnapshot(streamId, streamType, version, state) {
    await this.db.query(
      `INSERT INTO snapshots (stream_id, stream_type, version, state)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (stream_id) 
       DO UPDATE SET version = $3, state = $4, created_at = NOW()`,
      [streamId, streamType, version, state]
    );
  }

  // Load snapshot
  async loadSnapshot(streamId) {
    const result = await this.db.query(
      `SELECT * FROM snapshots WHERE stream_id = $1`,
      [streamId]
    );
    return result.rows[0] || null;
  }
}

class ConcurrencyError extends Error {
  constructor(message) {
    super(message);
    this.name = 'ConcurrencyError';
  }
}

module.exports = { EventStore, ConcurrencyError };

Aggregate Pattern

// aggregate.js - Event-sourced aggregate root

class AggregateRoot {
  constructor() {
    this._uncommittedEvents = [];
    this._version = 0;
  }

  get version() {
    return this._version;
  }

  get uncommittedEvents() {
    return this._uncommittedEvents;
  }

  clearUncommittedEvents() {
    this._uncommittedEvents = [];
  }

  // Apply event and track as uncommitted
  applyChange(event) {
    this._apply(event);
    this._uncommittedEvents.push(event);
  }

  // Apply event from history (committed)
  _apply(event) {
    const handler = `on${event.type}`;
    if (typeof this[handler] === 'function') {
      this[handler](event.data);
    }
    this._version++;
  }

  // Load from event history
  loadFromHistory(events) {
    for (const event of events) {
      this._apply({
        type: event.event_type,
        data: event.event_data
      });
    }
  }
}

// Bank Account aggregate
class BankAccount extends AggregateRoot {
  constructor() {
    super();
    this.id = null;
    this.balance = 0;
    this.status = 'pending';
    this.transactions = [];
  }

  // Commands (business logic + emit events)
  static open(accountId, ownerId, initialDeposit = 0) {
    const account = new BankAccount();
    
    account.applyChange({
      type: 'AccountOpened',
      data: {
        accountId,
        ownerId,
        openedAt: new Date().toISOString()
      }
    });

    if (initialDeposit > 0) {
      account.deposit(initialDeposit, 'Initial deposit');
    }

    return account;
  }

  deposit(amount, description = '') {
    if (amount <= 0) {
      throw new Error('Deposit amount must be positive');
    }
    if (this.status !== 'active') {
      throw new Error('Account is not active');
    }

    this.applyChange({
      type: 'MoneyDeposited',
      data: {
        amount,
        description,
        timestamp: new Date().toISOString()
      }
    });
  }

  withdraw(amount, description = '') {
    if (amount <= 0) {
      throw new Error('Withdrawal amount must be positive');
    }
    if (this.status !== 'active') {
      throw new Error('Account is not active');
    }
    if (amount > this.balance) {
      throw new Error('Insufficient funds');
    }

    this.applyChange({
      type: 'MoneyWithdrawn',
      data: {
        amount,
        description,
        timestamp: new Date().toISOString()
      }
    });
  }

  close(reason = '') {
    if (this.balance !== 0) {
      throw new Error('Cannot close account with non-zero balance');
    }

    this.applyChange({
      type: 'AccountClosed',
      data: {
        reason,
        closedAt: new Date().toISOString()
      }
    });
  }

  // Event handlers (state changes)
  onAccountOpened(data) {
    this.id = data.accountId;
    this.ownerId = data.ownerId;
    this.status = 'active';
    this.openedAt = data.openedAt;
  }

  onMoneyDeposited(data) {
    this.balance += data.amount;
    this.transactions.push({
      type: 'deposit',
      amount: data.amount,
      timestamp: data.timestamp
    });
  }

  onMoneyWithdrawn(data) {
    this.balance -= data.amount;
    this.transactions.push({
      type: 'withdrawal',
      amount: data.amount,
      timestamp: data.timestamp
    });
  }

  onAccountClosed(data) {
    this.status = 'closed';
    this.closedAt = data.closedAt;
  }

  // Snapshot for performance
  toSnapshot() {
    return {
      id: this.id,
      ownerId: this.ownerId,
      balance: this.balance,
      status: this.status,
      openedAt: this.openedAt,
      closedAt: this.closedAt,
      transactionCount: this.transactions.length
    };
  }

  static fromSnapshot(snapshot) {
    const account = new BankAccount();
    Object.assign(account, snapshot);
    account.transactions = []; // Don't store all transactions in snapshot
    return account;
  }
}

module.exports = { AggregateRoot, BankAccount };

Repository Pattern

// repository.js - Event-sourced repository

class EventSourcedRepository {
  constructor(eventStore, aggregateType, AggregateClass) {
    this.eventStore = eventStore;
    this.aggregateType = aggregateType;
    this.AggregateClass = AggregateClass;
    this.snapshotFrequency = 100; // Snapshot every 100 events
  }

  async getById(id) {
    const streamId = `${this.aggregateType}-${id}`;
    
    // Try to load from snapshot first
    let aggregate;
    let fromVersion = 0;
    
    const snapshot = await this.eventStore.loadSnapshot(streamId);
    if (snapshot) {
      aggregate = this.AggregateClass.fromSnapshot(snapshot.state);
      aggregate._version = snapshot.version;
      fromVersion = snapshot.version + 1;
    } else {
      aggregate = new this.AggregateClass();
    }

    // Load events since snapshot (or all events if no snapshot)
    const events = await this.eventStore.readStream(streamId, fromVersion);
    
    if (events.length === 0 && !snapshot) {
      return null; // Aggregate doesn't exist
    }

    aggregate.loadFromHistory(events);
    
    return aggregate;
  }

  async save(aggregate) {
    const streamId = `${this.aggregateType}-${aggregate.id}`;
    const events = aggregate.uncommittedEvents;
    
    if (events.length === 0) {
      return; // Nothing to save
    }

    // Format events for storage
    const eventsToStore = events.map(e => ({
      type: e.type,
      data: e.data,
      metadata: {
        timestamp: new Date().toISOString(),
        ...e.metadata
      }
    }));

    // Append to event store with optimistic concurrency
    const expectedVersion = aggregate.version - events.length;
    await this.eventStore.appendToStream(
      streamId,
      this.aggregateType,
      eventsToStore,
      expectedVersion
    );

    // Create snapshot if needed
    if (aggregate.version % this.snapshotFrequency === 0) {
      await this.eventStore.saveSnapshot(
        streamId,
        this.aggregateType,
        aggregate.version,
        aggregate.toSnapshot()
      );
    }

    aggregate.clearUncommittedEvents();
  }
}

// Usage
const bankAccountRepo = new EventSourcedRepository(
  eventStore,
  'BankAccount',
  BankAccount
);

// Create new account
const account = BankAccount.open('acc-123', 'user-456', 1000);
await bankAccountRepo.save(account);

// Load and modify
const loadedAccount = await bankAccountRepo.getById('acc-123');
loadedAccount.deposit(500, 'Salary');
loadedAccount.withdraw(200, 'Groceries');
await bankAccountRepo.save(loadedAccount);

module.exports = { EventSourcedRepository };

Projections

┌─────────────────────────────────────────────────────────────────────────────┐
│                    PROJECTIONS (READ MODELS)                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Event Store (Write Side)                                                   │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  1. AccountOpened { accountId: 'A1', ownerId: 'U1' }                │   │
│  │  2. MoneyDeposited { accountId: 'A1', amount: 1000 }                │   │
│  │  3. AccountOpened { accountId: 'A2', ownerId: 'U2' }                │   │
│  │  4. MoneyDeposited { accountId: 'A2', amount: 500 }                 │   │
│  │  5. MoneyWithdrawn { accountId: 'A1', amount: 200 }                 │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                │                                            │
│                                ▼                                            │
│                       ┌─────────────────┐                                  │
│                       │   Projector     │                                  │
│                       │   (Event        │                                  │
│                       │   Handler)      │                                  │
│                       └────────┬────────┘                                  │
│                                │                                            │
│            ┌───────────────────┼───────────────────┐                       │
│            ▼                   ▼                   ▼                       │
│  ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐        │
│  │ Account Balances  │ │ Transaction List  │ │ Account Summary   │        │
│  │ (Read Model)      │ │ (Read Model)      │ │ (Read Model)      │        │
│  ├───────────────────┤ ├───────────────────┤ ├───────────────────┤        │
│  │ A1: $800          │ │ A1: +1000, -200   │ │ Total: 2 accounts │        │
│  │ A2: $500          │ │ A2: +500          │ │ Total: $1300      │        │
│  └───────────────────┘ └───────────────────┘ └───────────────────┘        │
│                                                                              │
│  Each projection can be:                                                    │
│  • Different database (PostgreSQL, MongoDB, Redis, Elasticsearch)          │
│  • Optimized for specific queries                                          │
│  • Rebuilt by replaying all events                                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Projection Implementation

// projections.js - Read model projections

class ProjectionManager {
  constructor(eventStore, db) {
    this.eventStore = eventStore;
    this.db = db;
    this.projections = new Map();
    this.checkpoints = new Map();
  }

  register(name, projection) {
    this.projections.set(name, projection);
  }

  async start() {
    // Load checkpoints
    for (const [name, projection] of this.projections) {
      const checkpoint = await this.loadCheckpoint(name);
      this.checkpoints.set(name, checkpoint);
    }

    // Start processing
    this.running = true;
    this.process();
  }

  async process() {
    while (this.running) {
      for (const [name, projection] of this.projections) {
        const checkpoint = this.checkpoints.get(name);
        const events = await this.eventStore.readAllEvents(checkpoint, 100);

        if (events.length > 0) {
          for (const event of events) {
            await projection.apply(event);
          }
          
          const newCheckpoint = events[events.length - 1].id;
          await this.saveCheckpoint(name, newCheckpoint);
          this.checkpoints.set(name, newCheckpoint);
        }
      }

      await this.sleep(100);
    }
  }

  async loadCheckpoint(projectionName) {
    const result = await this.db.query(
      `SELECT position FROM projection_checkpoints WHERE name = $1`,
      [projectionName]
    );
    return result.rows[0]?.position || 0;
  }

  async saveCheckpoint(projectionName, position) {
    await this.db.query(
      `INSERT INTO projection_checkpoints (name, position)
       VALUES ($1, $2)
       ON CONFLICT (name) DO UPDATE SET position = $2`,
      [projectionName, position]
    );
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Account Balance Projection
class AccountBalanceProjection {
  constructor(db) {
    this.db = db;
  }

  async initialize() {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS account_balances (
        account_id VARCHAR(255) PRIMARY KEY,
        owner_id VARCHAR(255) NOT NULL,
        balance DECIMAL(15, 2) DEFAULT 0,
        status VARCHAR(50) DEFAULT 'active',
        updated_at TIMESTAMPTZ DEFAULT NOW()
      )
    `);
  }

  async apply(event) {
    switch (event.event_type) {
      case 'AccountOpened':
        await this.db.query(
          `INSERT INTO account_balances (account_id, owner_id, balance)
           VALUES ($1, $2, 0)
           ON CONFLICT (account_id) DO NOTHING`,
          [event.event_data.accountId, event.event_data.ownerId]
        );
        break;

      case 'MoneyDeposited':
        await this.db.query(
          `UPDATE account_balances 
           SET balance = balance + $1, updated_at = NOW()
           WHERE account_id = $2`,
          [event.event_data.amount, event.stream_id.split('-')[1]]
        );
        break;

      case 'MoneyWithdrawn':
        await this.db.query(
          `UPDATE account_balances 
           SET balance = balance - $1, updated_at = NOW()
           WHERE account_id = $2`,
          [event.event_data.amount, event.stream_id.split('-')[1]]
        );
        break;

      case 'AccountClosed':
        await this.db.query(
          `UPDATE account_balances 
           SET status = 'closed', updated_at = NOW()
           WHERE account_id = $1`,
          [event.stream_id.split('-')[1]]
        );
        break;
    }
  }

  // Rebuild from scratch
  async rebuild() {
    await this.db.query('TRUNCATE account_balances');
    // Checkpoint will be reset, and events will be replayed
  }
}

// Transaction History Projection
class TransactionHistoryProjection {
  constructor(db) {
    this.db = db;
  }

  async initialize() {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS transaction_history (
        id BIGSERIAL PRIMARY KEY,
        account_id VARCHAR(255) NOT NULL,
        transaction_type VARCHAR(50) NOT NULL,
        amount DECIMAL(15, 2) NOT NULL,
        description TEXT,
        balance_after DECIMAL(15, 2),
        created_at TIMESTAMPTZ NOT NULL
      );
      
      CREATE INDEX IF NOT EXISTS idx_transactions_account 
        ON transaction_history(account_id, created_at DESC);
    `);
  }

  async apply(event) {
    if (event.event_type === 'MoneyDeposited') {
      await this.db.query(
        `INSERT INTO transaction_history 
         (account_id, transaction_type, amount, description, created_at)
         VALUES ($1, 'deposit', $2, $3, $4)`,
        [
          event.stream_id.split('-')[1],
          event.event_data.amount,
          event.event_data.description,
          event.event_data.timestamp
        ]
      );
    }

    if (event.event_type === 'MoneyWithdrawn') {
      await this.db.query(
        `INSERT INTO transaction_history 
         (account_id, transaction_type, amount, description, created_at)
         VALUES ($1, 'withdrawal', $2, $3, $4)`,
        [
          event.stream_id.split('-')[1],
          event.event_data.amount,
          event.event_data.description,
          event.event_data.timestamp
        ]
      );
    }
  }
}

// Daily Summary Projection (Aggregated view)
class DailySummaryProjection {
  constructor(db) {
    this.db = db;
  }

  async initialize() {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS daily_summaries (
        date DATE NOT NULL,
        account_id VARCHAR(255) NOT NULL,
        deposits_count INTEGER DEFAULT 0,
        deposits_total DECIMAL(15, 2) DEFAULT 0,
        withdrawals_count INTEGER DEFAULT 0,
        withdrawals_total DECIMAL(15, 2) DEFAULT 0,
        PRIMARY KEY (date, account_id)
      )
    `);
  }

  async apply(event) {
    const date = new Date(event.created_at).toISOString().split('T')[0];
    const accountId = event.stream_id.split('-')[1];

    if (event.event_type === 'MoneyDeposited') {
      await this.db.query(
        `INSERT INTO daily_summaries (date, account_id, deposits_count, deposits_total)
         VALUES ($1, $2, 1, $3)
         ON CONFLICT (date, account_id) DO UPDATE SET
           deposits_count = daily_summaries.deposits_count + 1,
           deposits_total = daily_summaries.deposits_total + $3`,
        [date, accountId, event.event_data.amount]
      );
    }

    if (event.event_type === 'MoneyWithdrawn') {
      await this.db.query(
        `INSERT INTO daily_summaries (date, account_id, withdrawals_count, withdrawals_total)
         VALUES ($1, $2, 1, $3)
         ON CONFLICT (date, account_id) DO UPDATE SET
           withdrawals_count = daily_summaries.withdrawals_count + 1,
           withdrawals_total = daily_summaries.withdrawals_total + $3`,
        [date, accountId, event.event_data.amount]
      );
    }
  }
}

module.exports = {
  ProjectionManager,
  AccountBalanceProjection,
  TransactionHistoryProjection,
  DailySummaryProjection
};

Event Versioning & Schema Evolution

┌─────────────────────────────────────────────────────────────────────────────┐
│                    EVENT SCHEMA EVOLUTION                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  PROBLEM: Events are immutable, but requirements change                     │
│                                                                              │
│  V1: MoneyDeposited { amount: 100 }                                         │
│  V2: MoneyDeposited { amount: 100, currency: 'USD' }  ← New field           │
│  V3: MoneyDeposited { amount: { value: 100, currency: 'USD' } }  ← Changed  │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════   │
│                                                                              │
│  SOLUTION: UPCASTING                                                        │
│  ─────────────────────                                                      │
│                                                                              │
│  Store original event → Transform when reading                              │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  Event Store                     Upcaster                 App       │   │
│  │  ┌──────────────┐               ┌──────────────┐       ┌────────┐  │   │
│  │  │ V1 Event     │──────────────▶│ v1 → v2 → v3 │──────▶│ V3     │  │   │
│  │  └──────────────┘               └──────────────┘       └────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  Events stay immutable, transformation happens at read time                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Upcasting Implementation

// event-upcaster.js - Event schema evolution

class EventUpcaster {
  constructor() {
    this.upcasters = new Map();
  }

  // Register an upcaster for an event type and version
  register(eventType, fromVersion, toVersion, upcasterFn) {
    const key = `${eventType}:${fromVersion}`;
    this.upcasters.set(key, {
      toVersion,
      transform: upcasterFn
    });
  }

  // Upcast event to latest version
  upcast(event) {
    let currentEvent = { ...event };
    let currentVersion = event.metadata?.version || 1;
    
    while (true) {
      const key = `${event.event_type}:${currentVersion}`;
      const upcaster = this.upcasters.get(key);
      
      if (!upcaster) break; // No more upcasts needed
      
      currentEvent = {
        ...currentEvent,
        event_data: upcaster.transform(currentEvent.event_data),
        metadata: {
          ...currentEvent.metadata,
          version: upcaster.toVersion,
          originalVersion: event.metadata?.version || 1
        }
      };
      currentVersion = upcaster.toVersion;
    }
    
    return currentEvent;
  }
}

// Register upcasters
const upcaster = new EventUpcaster();

// MoneyDeposited v1 → v2 (add currency)
upcaster.register('MoneyDeposited', 1, 2, (data) => ({
  ...data,
  currency: 'USD' // Default to USD for old events
}));

// MoneyDeposited v2 → v3 (restructure amount)
upcaster.register('MoneyDeposited', 2, 3, (data) => ({
  amount: {
    value: data.amount,
    currency: data.currency
  },
  description: data.description,
  timestamp: data.timestamp
}));

// UserRegistered v1 → v2 (split name into firstName/lastName)
upcaster.register('UserRegistered', 1, 2, (data) => {
  const [firstName, ...rest] = (data.name || '').split(' ');
  return {
    ...data,
    firstName: firstName || '',
    lastName: rest.join(' ') || '',
    name: undefined // Remove old field
  };
});

// Event store with upcasting
class UpcastingEventStore {
  constructor(eventStore, upcaster) {
    this.eventStore = eventStore;
    this.upcaster = upcaster;
  }

  async readStream(streamId, fromVersion = 0) {
    const events = await this.eventStore.readStream(streamId, fromVersion);
    return events.map(e => this.upcaster.upcast(e));
  }

  async readAllEvents(fromPosition = 0, batchSize = 1000) {
    const events = await this.eventStore.readAllEvents(fromPosition, batchSize);
    return events.map(e => this.upcaster.upcast(e));
  }

  // Write methods don't upcast (always write latest version)
  async appendToStream(streamId, streamType, events, expectedVersion) {
    // Add version metadata to new events
    const versionedEvents = events.map(e => ({
      ...e,
      metadata: {
        ...e.metadata,
        version: this.getLatestVersion(e.type)
      }
    }));
    
    return this.eventStore.appendToStream(
      streamId, streamType, versionedEvents, expectedVersion
    );
  }

  getLatestVersion(eventType) {
    // Return the latest version for each event type
    const versions = {
      'MoneyDeposited': 3,
      'MoneyWithdrawn': 2,
      'UserRegistered': 2
    };
    return versions[eventType] || 1;
  }
}

module.exports = { EventUpcaster, UpcastingEventStore };

CQRS with Event Sourcing

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CQRS + EVENT SOURCING                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                              ┌─────────────────────┐                        │
│                              │      Client         │                        │
│                              └──────────┬──────────┘                        │
│                                         │                                   │
│                    ┌────────────────────┴────────────────────┐              │
│                    │                                         │              │
│                    ▼                                         ▼              │
│           ┌────────────────┐                      ┌────────────────┐       │
│           │ Command Side   │                      │  Query Side    │       │
│           │ (Write)        │                      │  (Read)        │       │
│           └───────┬────────┘                      └────────┬───────┘       │
│                   │                                        │                │
│           ┌───────▼────────┐                      ┌────────▼───────┐       │
│           │ Command Handler│                      │ Query Handler  │       │
│           └───────┬────────┘                      └────────┬───────┘       │
│                   │                                        │                │
│           ┌───────▼────────┐                      ┌────────▼───────┐       │
│           │  Aggregates    │                      │  Read Models   │       │
│           │  (Domain Logic)│                      │  (Projections) │       │
│           └───────┬────────┘                      └────────▲───────┘       │
│                   │                                        │                │
│           ┌───────▼────────┐         events        ┌───────┴───────┐       │
│           │  Event Store   │──────────────────────▶│  Projector    │       │
│           │  (Append-only) │                       │               │       │
│           └────────────────┘                       └───────────────┘       │
│                                                                              │
│  WRITE PATH: Command → Aggregate → Events → Event Store                    │
│  READ PATH: Query → Read Model (optimized for queries)                     │
│  SYNC: Event Store → Projector → Read Models (eventually consistent)       │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Complete CQRS Implementation

// cqrs-service.js - CQRS + Event Sourcing service

const express = require('express');
const { EventStore } = require('./event-store');
const { EventSourcedRepository } = require('./repository');
const { BankAccount } = require('./aggregate');
const {
  ProjectionManager,
  AccountBalanceProjection,
  TransactionHistoryProjection
} = require('./projections');

class BankAccountService {
  constructor(db) {
    this.eventStore = new EventStore(db);
    this.repository = new EventSourcedRepository(
      this.eventStore,
      'BankAccount',
      BankAccount
    );
    
    // Read models
    this.balanceProjection = new AccountBalanceProjection(db);
    this.transactionProjection = new TransactionHistoryProjection(db);
    
    this.db = db;
  }

  async initialize() {
    await this.eventStore.initialize();
    await this.balanceProjection.initialize();
    await this.transactionProjection.initialize();
    
    // Start projection processing
    const projectionManager = new ProjectionManager(this.eventStore, this.db);
    projectionManager.register('account-balances', this.balanceProjection);
    projectionManager.register('transactions', this.transactionProjection);
    await projectionManager.start();
  }

  // COMMAND HANDLERS (Write Side)
  
  async openAccount(command) {
    const { accountId, ownerId, initialDeposit } = command;
    
    // Check if account already exists
    const existing = await this.repository.getById(accountId);
    if (existing) {
      throw new Error('Account already exists');
    }
    
    // Create new account
    const account = BankAccount.open(accountId, ownerId, initialDeposit);
    await this.repository.save(account);
    
    return { accountId: account.id, balance: account.balance };
  }

  async deposit(command) {
    const { accountId, amount, description } = command;
    
    const account = await this.repository.getById(accountId);
    if (!account) {
      throw new Error('Account not found');
    }
    
    account.deposit(amount, description);
    await this.repository.save(account);
    
    return { accountId, newBalance: account.balance };
  }

  async withdraw(command) {
    const { accountId, amount, description } = command;
    
    const account = await this.repository.getById(accountId);
    if (!account) {
      throw new Error('Account not found');
    }
    
    account.withdraw(amount, description);
    await this.repository.save(account);
    
    return { accountId, newBalance: account.balance };
  }

  async closeAccount(command) {
    const { accountId, reason } = command;
    
    const account = await this.repository.getById(accountId);
    if (!account) {
      throw new Error('Account not found');
    }
    
    account.close(reason);
    await this.repository.save(account);
    
    return { accountId, status: 'closed' };
  }

  // QUERY HANDLERS (Read Side) - Use projections
  
  async getAccountBalance(accountId) {
    // Query from read model (fast, optimized)
    const result = await this.db.query(
      `SELECT * FROM account_balances WHERE account_id = $1`,
      [accountId]
    );
    
    if (result.rows.length === 0) {
      throw new Error('Account not found');
    }
    
    return result.rows[0];
  }

  async getTransactionHistory(accountId, limit = 50) {
    const result = await this.db.query(
      `SELECT * FROM transaction_history 
       WHERE account_id = $1 
       ORDER BY created_at DESC 
       LIMIT $2`,
      [accountId, limit]
    );
    
    return result.rows;
  }

  async getAllAccounts(ownerId) {
    const result = await this.db.query(
      `SELECT * FROM account_balances WHERE owner_id = $1`,
      [ownerId]
    );
    
    return result.rows;
  }

  async getAccountSummary() {
    const result = await this.db.query(`
      SELECT 
        COUNT(*) as total_accounts,
        SUM(balance) as total_balance,
        AVG(balance) as average_balance,
        COUNT(*) FILTER (WHERE status = 'active') as active_accounts,
        COUNT(*) FILTER (WHERE status = 'closed') as closed_accounts
      FROM account_balances
    `);
    
    return result.rows[0];
  }
}

// Express API
const createApp = (service) => {
  const app = express();
  app.use(express.json());

  // Commands
  app.post('/accounts', async (req, res, next) => {
    try {
      const result = await service.openAccount(req.body);
      res.status(201).json(result);
    } catch (error) {
      next(error);
    }
  });

  app.post('/accounts/:id/deposit', async (req, res, next) => {
    try {
      const result = await service.deposit({
        accountId: req.params.id,
        ...req.body
      });
      res.json(result);
    } catch (error) {
      next(error);
    }
  });

  app.post('/accounts/:id/withdraw', async (req, res, next) => {
    try {
      const result = await service.withdraw({
        accountId: req.params.id,
        ...req.body
      });
      res.json(result);
    } catch (error) {
      next(error);
    }
  });

  // Queries
  app.get('/accounts/:id', async (req, res, next) => {
    try {
      const balance = await service.getAccountBalance(req.params.id);
      res.json(balance);
    } catch (error) {
      next(error);
    }
  });

  app.get('/accounts/:id/transactions', async (req, res, next) => {
    try {
      const transactions = await service.getTransactionHistory(
        req.params.id,
        parseInt(req.query.limit) || 50
      );
      res.json(transactions);
    } catch (error) {
      next(error);
    }
  });

  app.get('/summary', async (req, res, next) => {
    try {
      const summary = await service.getAccountSummary();
      res.json(summary);
    } catch (error) {
      next(error);
    }
  });

  return app;
};

module.exports = { BankAccountService, createApp };

Interview Questions

Answer:Event Sourcing: Store state as a sequence of events rather than current state.Use when:
  • Need complete audit trail (finance, healthcare)
  • Complex business workflows
  • Need temporal queries (“balance 3 months ago?”)
  • Debugging requires understanding “how we got here”
  • Undo/redo functionality needed
Avoid when:
  • Simple CRUD apps
  • High-volume updates (too many events)
  • No audit requirements
  • Team lacks experience (steep learning curve)
Answer:Upcasting pattern:
  1. Events are immutable (never change stored events)
  2. Transform events when reading
  3. Chain transformations: v1 → v2 → v3
Example:
// v1: { name: "John Doe" }
// v2: { firstName: "John", lastName: "Doe" }

upcast(v1Event) {
  const [first, ...rest] = v1Event.name.split(' ');
  return { firstName: first, lastName: rest.join(' ') };
}
Best practices:
  • Add version metadata to events
  • Make new fields optional with defaults
  • Test upcasters with old event data
Answer:Projections: Read-optimized views built from events.Why needed:
  • Replaying events for every query is slow
  • Different queries need different data shapes
  • Can use different databases per projection
Key concepts:
  • Checkpoint: Last processed event position
  • Rebuild: Delete projection, replay all events
  • Eventually consistent: Slight delay from write
Example projections:
  • Account balances (fast balance lookup)
  • Transaction history (ordered list)
  • Daily summaries (aggregated stats)
Answer:Problem: Loading aggregate with 10,000 events is slow.Solution: Periodically save aggregate state (snapshot).How it works:
  1. Save snapshot every N events (e.g., 100)
  2. On load: Get snapshot + events since snapshot
  3. Reduces events to replay
Example:
Snapshot (at event 1000): { balance: $5000 }
Events 1001-1050: [deposit, withdraw, ...]

Load: Apply 50 events to snapshot, not 1050
When to snapshot:
  • After N events (e.g., 100)
  • On aggregate save (if changed significantly)
  • Background process during low load

Chapter Summary

Key Takeaways:
  • Event sourcing stores state as immutable event sequence
  • Use aggregates to encapsulate business logic and emit events
  • Projections build read-optimized views from events
  • Snapshots improve loading performance for long-lived aggregates
  • Upcasting handles event schema evolution
  • CQRS separates read and write models for scalability
Next Chapter: GraphQL Federation - Building unified APIs across microservices.