> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# 25. Event Sourcing Deep Dive

> Master event sourcing architecture, event stores, projections, snapshots, and CQRS patterns for microservices

# Event Sourcing Deep Dive

Event sourcing stores state as a sequence of events, providing a complete audit trail and enabling powerful temporal queries. The analogy that clicks for most people: traditional databases are like a bank balance (you only see the current number), while event sourcing is like a bank statement (you see every transaction that led to that number). This difference is profound -- with event sourcing, you can answer questions like "what was this account's balance three months ago?" or "which transactions happened between Tuesday and Thursday?" without maintaining separate audit tables. The trade-off is real complexity: event versioning, projection management, and eventual consistency are not trivial to get right.

In a microservices architecture, event sourcing becomes especially powerful because events are already the backbone of inter-service communication. If the `Orders` service emits `OrderPlaced` as a Kafka event for the `Inventory` service to consume, why not persist that same event as the source of truth instead of deriving it from a mutated `orders` table? This alignment collapses two concerns (state persistence and inter-service messaging) into one: the event log. When you combine event sourcing with CQRS and a message bus, you get a system where services can be rebuilt from scratch, new projections can be added without migrations, and consumers can replay history to catch up after an outage -- capabilities that are extremely difficult to retrofit into a traditional CRUD system.

<Info>
  **Learning Objectives:**

  * Understand when to use event sourcing
  * Implement event stores and projections
  * Master snapshot strategies
  * Combine event sourcing with CQRS
  * Handle event versioning and schema evolution
</Info>

***

## Event Sourcing Fundamentals

Before diving into code, it helps to internalize the philosophical shift event sourcing demands. In a CRUD world, the database row is the truth, and history is a derived artifact (at best, a change-log table maintained via triggers or application code). In event sourcing, the event log is the truth, and the current state is the derived artifact -- a projection you compute by folding events together. This inversion is why event sourcing naturally answers temporal questions: the "past" isn't lost information that requires special infrastructure to preserve, it's the default representation.

The business impact shows up in compliance-heavy domains. A bank that stores only current balances cannot produce a defensible answer to "what was this customer's balance on March 15th at 3:47 PM?" without parallel audit tables that might drift out of sync with the primary data. An event-sourced bank can replay events up to that timestamp and produce a bit-for-bit exact reconstruction. Regulators love this. Auditors love this. Incident responders love this -- "what was the state of the system when this bug triggered?" becomes a tractable question instead of an archaeological dig through log files.

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    TRADITIONAL vs EVENT SOURCING                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  TRADITIONAL (STATE-BASED)                                                  │
│  ─────────────────────────────                                              │
│                                                                              │
│  Bank Account Record:                                                       │
│  ┌────────────────────────────────────────┐                                │
│  │  id: 123                                │                                │
│  │  balance: $1,000     ← Only current    │                                │
│  │  updated_at: 2024-01-15                │                                │
│  └────────────────────────────────────────┘                                │
│                                                                              │
│  ❌ Lost: How did we get here? Who? When?                                   │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════   │
│                                                                              │
│  EVENT SOURCING                                                             │
│  ─────────────────                                                          │
│                                                                              │
│  Bank Account Events:                                                       │
│  ┌────────────────────────────────────────┐                                │
│  │  1. AccountOpened { balance: $0 }      │  2024-01-01                    │
│  │  2. MoneyDeposited { amount: $500 }    │  2024-01-05                    │
│  │  3. MoneyDeposited { amount: $1000 }   │  2024-01-10                    │
│  │  4. MoneyWithdrawn { amount: $500 }    │  2024-01-15                    │
│  │  ─────────────────────────────────────                                  │
│  │  Current Balance: $1,000                                                │
│  └────────────────────────────────────────┘                                │
│                                                                              │
│  ✅ Complete history: who, what, when, why                                  │
│  ✅ Can replay to any point in time                                        │
│  ✅ Natural audit trail                                                     │
│  ✅ Debug by replaying events                                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

### When to Use Event Sourcing

The decision to adopt event sourcing should be driven by concrete requirements, not architectural fashion. If your domain has genuine temporal queries, legal audit requirements, or complex workflows with many state transitions, the up-front complexity pays for itself many times over. If your domain is essentially "create user, update profile, delete account," event sourcing will feel like you're wearing a spacesuit to take out the trash -- technically impressive, practically ridiculous.

<CardGroup cols={2}>
  <Card title="✅ Good Use Cases" icon="check">
    * Financial systems (audit required)
    * Order management (complex workflows)
    * User activity tracking
    * Collaborative editing
    * Undo/redo functionality
    * Temporal queries ("balance last month?")
  </Card>

  <Card title="❌ Poor Use Cases" icon="xmark">
    * Simple CRUD applications
    * High-volume metrics/logs
    * Frequently updated data
    * No audit requirements
    * Small teams (complexity overhead)
  </Card>
</CardGroup>

***

## Event Store Implementation

An event store is fundamentally simpler than a relational database: it's an append-only log keyed by stream identifier, where each entry has a monotonically increasing version number. The reason we build this on PostgreSQL (instead of a dedicated product like EventStoreDB) is pragmatic -- most teams already operate PostgreSQL, its transactional guarantees are battle-tested, and JSONB gives us schema flexibility for event payloads. The moment you outgrow PostgreSQL's throughput, you migrate the abstraction (not the semantics) to a specialized store. Designing your repository layer around the event store interface, rather than direct SQL, makes this migration feasible years later.

The non-negotiable property an event store must provide is **optimistic concurrency control** via the `(stream_id, version)` uniqueness constraint. Without it, two concurrent requests loading the same aggregate, each adding an event, could both successfully append at version `N+1` -- one would silently overwrite or corrupt the stream. If you did this naively (say, with a plain append and no version check), you'd get lost updates under any real load, and the bugs would be nearly impossible to reproduce. With optimistic concurrency, one of the two writes fails with a `ConcurrencyError`, and the caller retries from a fresh load -- which is exactly the semantics you want.

A second implementation detail worth dwelling on: the event store emits events after commit so that projections and downstream consumers can react. You might be tempted to emit before commit for "lower latency," but that's a correctness trap -- if the transaction rolls back, subscribers will have acted on events that never happened. Always emit post-commit, and if your subscribers live in another process, use a transactional outbox pattern (covered in Chapter 17) instead of in-process event emitters.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // event-store.js - Core event sourcing infrastructure

    const { Pool } = require('pg');
    const { EventEmitter } = require('events');

    class EventStore extends EventEmitter {
      constructor(db) {
        super();
        this.db = db;
      }

      async initialize() {
        // Create event store table
        await this.db.query(`
          CREATE TABLE IF NOT EXISTS events (
            id BIGSERIAL PRIMARY KEY,
            stream_id VARCHAR(255) NOT NULL,
            stream_type VARCHAR(100) NOT NULL,
            event_type VARCHAR(100) NOT NULL,
            event_data JSONB NOT NULL,
            metadata JSONB DEFAULT '{}',
            version INTEGER NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            
            UNIQUE(stream_id, version)
          );
          
          CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id);
          CREATE INDEX IF NOT EXISTS idx_events_stream_type ON events(stream_type);
          CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);
        `);

        // Snapshots table
        await this.db.query(`
          CREATE TABLE IF NOT EXISTS snapshots (
            id BIGSERIAL PRIMARY KEY,
            stream_id VARCHAR(255) NOT NULL UNIQUE,
            stream_type VARCHAR(100) NOT NULL,
            version INTEGER NOT NULL,
            state JSONB NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW()
          );
        `);
      }

      // Append events to a stream
      async appendToStream(streamId, streamType, events, expectedVersion = null) {
        const client = await this.db.connect();
        
        try {
          await client.query('BEGIN');
          
          // Optimistic concurrency check -- this is how event sourcing prevents lost updates.
          // If two requests try to modify the same aggregate simultaneously, one will fail
          // because the version number won't match. This is far better than pessimistic locking
          // because it doesn't hold database locks during business logic execution.
          if (expectedVersion !== null) {
            const current = await client.query(
              `SELECT COALESCE(MAX(version), 0) as version 
               FROM events WHERE stream_id = $1`,
              [streamId]
            );
            
            if (current.rows[0].version !== expectedVersion) {
              throw new ConcurrencyError(
                `Expected version ${expectedVersion}, ` +
                `but found ${current.rows[0].version}`
              );
            }
          }

          // Get next version
          const versionResult = await client.query(
            `SELECT COALESCE(MAX(version), 0) + 1 as next_version 
             FROM events WHERE stream_id = $1`,
            [streamId]
          );
          let version = versionResult.rows[0].next_version;

          // Insert events
          const insertedEvents = [];
          for (const event of events) {
            const result = await client.query(
              `INSERT INTO events 
               (stream_id, stream_type, event_type, event_data, metadata, version)
               VALUES ($1, $2, $3, $4, $5, $6)
               RETURNING *`,
              [
                streamId,
                streamType,
                event.type,
                event.data,
                event.metadata || {},
                version++
              ]
            );
            insertedEvents.push(result.rows[0]);
          }

          await client.query('COMMIT');

          // Emit events for subscribers
          for (const event of insertedEvents) {
            this.emit('event', event);
            this.emit(event.event_type, event);
          }

          return insertedEvents;
        } catch (error) {
          await client.query('ROLLBACK');
          throw error;
        } finally {
          client.release();
        }
      }

      // Read events from a stream
      async readStream(streamId, fromVersion = 0) {
        const result = await this.db.query(
          `SELECT * FROM events 
           WHERE stream_id = $1 AND version >= $2 
           ORDER BY version ASC`,
          [streamId, fromVersion]
        );
        return result.rows;
      }

      // Read all events (for projections)
      async readAllEvents(fromPosition = 0, batchSize = 1000) {
        const result = await this.db.query(
          `SELECT * FROM events 
           WHERE id > $1 
           ORDER BY id ASC 
           LIMIT $2`,
          [fromPosition, batchSize]
        );
        return result.rows;
      }

      // Save snapshot
      async saveSnapshot(streamId, streamType, version, state) {
        await this.db.query(
          `INSERT INTO snapshots (stream_id, stream_type, version, state)
           VALUES ($1, $2, $3, $4)
           ON CONFLICT (stream_id) 
           DO UPDATE SET version = $3, state = $4, created_at = NOW()`,
          [streamId, streamType, version, state]
        );
      }

      // Load snapshot
      async loadSnapshot(streamId) {
        const result = await this.db.query(
          `SELECT * FROM snapshots WHERE stream_id = $1`,
          [streamId]
        );
        return result.rows[0] || null;
      }
    }

    class ConcurrencyError extends Error {
      constructor(message) {
        super(message);
        this.name = 'ConcurrencyError';
      }
    }

    module.exports = { EventStore, ConcurrencyError };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # event_store.py - Core event sourcing infrastructure
    # Built on SQLAlchemy 2.0 async engine + asyncpg driver.
    # We use a dataclass-based event envelope and Pydantic for payload
    # validation at the edges; the store itself only cares about JSON.

    from __future__ import annotations

    import asyncio
    import json
    from dataclasses import dataclass, field
    from datetime import datetime, timezone
    from typing import Any, AsyncIterator, Generic, TypeVar

    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine


    class ConcurrencyError(Exception):
        """Raised when an optimistic concurrency check fails on append."""


    @dataclass
    class StoredEvent:
        """What the store returns -- the persisted envelope."""
        id: int
        stream_id: str
        stream_type: str
        event_type: str
        event_data: dict[str, Any]
        metadata: dict[str, Any]
        version: int
        created_at: datetime


    @dataclass
    class NewEvent:
        """What callers pass in -- not yet assigned an id/version."""
        type: str
        data: dict[str, Any]
        metadata: dict[str, Any] = field(default_factory=dict)


    class EventStore:
        """Append-only event log backed by PostgreSQL.

        The store exposes coarse subscription via an asyncio.Queue fan-out so
        in-process projections can tail the stream. For cross-process consumers
        prefer a transactional outbox + Kafka (see Chapter 17).
        """

        def __init__(self, engine: AsyncEngine) -> None:
            self._engine = engine
            self._subscribers: list[asyncio.Queue[StoredEvent]] = []

        async def initialize(self) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS events (
                        id BIGSERIAL PRIMARY KEY,
                        stream_id VARCHAR(255) NOT NULL,
                        stream_type VARCHAR(100) NOT NULL,
                        event_type VARCHAR(100) NOT NULL,
                        event_data JSONB NOT NULL,
                        metadata JSONB DEFAULT '{}',
                        version INTEGER NOT NULL,
                        created_at TIMESTAMPTZ DEFAULT NOW(),
                        UNIQUE(stream_id, version)
                    );
                    CREATE INDEX IF NOT EXISTS idx_events_stream_id
                        ON events(stream_id);
                    CREATE INDEX IF NOT EXISTS idx_events_stream_type
                        ON events(stream_type);
                    CREATE INDEX IF NOT EXISTS idx_events_created_at
                        ON events(created_at);
                """))
                await conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS snapshots (
                        id BIGSERIAL PRIMARY KEY,
                        stream_id VARCHAR(255) NOT NULL UNIQUE,
                        stream_type VARCHAR(100) NOT NULL,
                        version INTEGER NOT NULL,
                        state JSONB NOT NULL,
                        created_at TIMESTAMPTZ DEFAULT NOW()
                    );
                """))

        async def append_to_stream(
            self,
            stream_id: str,
            stream_type: str,
            events: list[NewEvent],
            expected_version: int | None = None,
        ) -> list[StoredEvent]:
            """Atomically append events with optimistic concurrency.

            expected_version=None means 'don't check' (use sparingly -- only for
            idempotent writers). For all aggregate writes, pass the version the
            aggregate was loaded at so simultaneous writers get a ConcurrencyError.
            """
            async with self._engine.begin() as conn:
                if expected_version is not None:
                    result = await conn.execute(
                        text("""
                            SELECT COALESCE(MAX(version), 0) AS version
                            FROM events
                            WHERE stream_id = :sid
                        """),
                        {"sid": stream_id},
                    )
                    current_version = result.scalar_one()
                    if current_version != expected_version:
                        raise ConcurrencyError(
                            f"Expected version {expected_version}, "
                            f"but found {current_version}"
                        )

                next_version_row = await conn.execute(
                    text("""
                        SELECT COALESCE(MAX(version), 0) + 1 AS next_version
                        FROM events
                        WHERE stream_id = :sid
                    """),
                    {"sid": stream_id},
                )
                version = next_version_row.scalar_one()

                inserted: list[StoredEvent] = []
                for event in events:
                    row = await conn.execute(
                        text("""
                            INSERT INTO events
                                (stream_id, stream_type, event_type,
                                 event_data, metadata, version)
                            VALUES (:sid, :stype, :etype,
                                    CAST(:edata AS JSONB),
                                    CAST(:meta AS JSONB), :ver)
                            RETURNING id, stream_id, stream_type, event_type,
                                      event_data, metadata, version, created_at
                        """),
                        {
                            "sid": stream_id,
                            "stype": stream_type,
                            "etype": event.type,
                            "edata": json.dumps(event.data),
                            "meta": json.dumps(event.metadata),
                            "ver": version,
                        },
                    )
                    r = row.one()
                    inserted.append(StoredEvent(
                        id=r.id,
                        stream_id=r.stream_id,
                        stream_type=r.stream_type,
                        event_type=r.event_type,
                        event_data=r.event_data,
                        metadata=r.metadata,
                        version=r.version,
                        created_at=r.created_at,
                    ))
                    version += 1

            # Emit AFTER commit so subscribers never see events that rolled back.
            for stored in inserted:
                for queue in self._subscribers:
                    queue.put_nowait(stored)
            return inserted

        async def read_stream(
            self, stream_id: str, from_version: int = 0
        ) -> list[StoredEvent]:
            async with self._engine.connect() as conn:
                result = await conn.execute(
                    text("""
                        SELECT id, stream_id, stream_type, event_type,
                               event_data, metadata, version, created_at
                        FROM events
                        WHERE stream_id = :sid AND version >= :v
                        ORDER BY version ASC
                    """),
                    {"sid": stream_id, "v": from_version},
                )
                return [StoredEvent(**dict(row._mapping)) for row in result]

        async def read_all_events(
            self, from_position: int = 0, batch_size: int = 1000
        ) -> list[StoredEvent]:
            async with self._engine.connect() as conn:
                result = await conn.execute(
                    text("""
                        SELECT id, stream_id, stream_type, event_type,
                               event_data, metadata, version, created_at
                        FROM events
                        WHERE id > :pos
                        ORDER BY id ASC
                        LIMIT :lim
                    """),
                    {"pos": from_position, "lim": batch_size},
                )
                return [StoredEvent(**dict(row._mapping)) for row in result]

        async def save_snapshot(
            self, stream_id: str, stream_type: str,
            version: int, state: dict[str, Any],
        ) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(
                    text("""
                        INSERT INTO snapshots (stream_id, stream_type, version, state)
                        VALUES (:sid, :stype, :ver, CAST(:state AS JSONB))
                        ON CONFLICT (stream_id)
                        DO UPDATE SET version = :ver,
                                      state = CAST(:state AS JSONB),
                                      created_at = NOW()
                    """),
                    {
                        "sid": stream_id,
                        "stype": stream_type,
                        "ver": version,
                        "state": json.dumps(state),
                    },
                )

        async def load_snapshot(self, stream_id: str) -> dict[str, Any] | None:
            async with self._engine.connect() as conn:
                result = await conn.execute(
                    text("SELECT version, state FROM snapshots WHERE stream_id = :sid"),
                    {"sid": stream_id},
                )
                row = result.first()
                if row is None:
                    return None
                return {"version": row.version, "state": row.state}

        async def subscribe(self) -> AsyncIterator[StoredEvent]:
            """In-process subscription -- one queue per subscriber."""
            queue: asyncio.Queue[StoredEvent] = asyncio.Queue(maxsize=10_000)
            self._subscribers.append(queue)
            try:
                while True:
                    yield await queue.get()
            finally:
                self._subscribers.remove(queue)
    ```
  </Tab>
</Tabs>

<Warning>
  **Caveats and Common Pitfalls for Event Stores**

  1. **The event store becomes unbounded because snapshotting was never implemented.** Everyone agrees snapshots are important, then ships the MVP without them, then five years later the events table has 2 billion rows and a single aggregate takes 40 seconds to rehydrate. The insidious part: the system feels fast for the first 18 months when most aggregates have under 100 events, so nobody prioritizes snapshot work -- until a hot aggregate (a long-lived corporate account, a popular product) starts timing out under production load.
  2. **Emitting events before the transaction commits.** Tempting for "lower latency," but if the transaction rolls back, subscribers have already acted on events that never happened. You will discover this the first time a projection shows data your event store does not have.
  3. **Skipping the `(stream_id, version)` uniqueness constraint** because "we never have concurrent writes." You do -- retries, duplicate HTTP requests, and multi-instance deployments all create concurrent write attempts. Without the constraint, two writers both append at version N+1 and one silently loses.
  4. **Storing mutable data in events** (references to current config, user-agent strings that get "normalized" later). Events are supposed to be immutable facts; if they reference values that change, replay produces different results on different days. Bake the values into the event payload at write time.
</Warning>

<Tip>
  **Solutions and Patterns**

  * **Snapshot from day one, even if frequency is wrong.** A snapshot every 1,000 events that you tune down later is infinitely better than adding snapshot support to a system with 500M events already in the store. Make the frequency a config flag, not a constant.
  * **Use a transactional outbox for cross-process subscribers.** In-process EventEmitter is fine for projections colocated with the store. For anything else (Kafka publishers, external webhooks), write the event and the outbox row in the same transaction, then have a relay process tail the outbox. Chapter 17 covers this pattern in depth.
  * **Add a `global_position` or `log_position` column** (a monotonically increasing BIGSERIAL across all streams). Projections subscribe by position, not by timestamp -- timestamps can be equal or go backwards under clock skew, positions cannot.
  * **Version every event payload from day one.** Add `schema_version: 1` in metadata on the very first event. Adding versioning later means the upcaster has to guess which old events are "v1 implied" versus "v1 explicit," and that guess is always wrong somewhere.
</Tip>

<AccordionGroup>
  <Accordion title="Scenario: Your event store table has 2 billion events and rebuilding a projection takes 6 hours during a full deploy. How do you reduce the rebuild time without dropping data?">
    **Strong Answer Framework**

    1. **Measure first.** Profile one rebuild to find the actual bottleneck: is it event read throughput (IO-bound), projection logic CPU (compute-bound), or write amplification on the projection side (index contention)? Guessing wastes weeks.
    2. **Parallelize by stream partition, not by row.** Events in one stream must apply in order; events across streams can process concurrently. Shard the rebuild by `hash(stream_id) % N_workers` so each worker owns a disjoint set of streams and order within a stream is preserved.
    3. **Add a checkpointed resume.** Long rebuilds that fail at hour 5 and restart from zero are the worst failure mode. Persist the last processed `global_position` per partition every few thousand events; on restart, resume from there.
    4. **Pre-aggregate with snapshots.** If the projection depends mostly on recent state (current balance) rather than full history (every transaction), build a periodic snapshot projection that other projections can start from. This is a "derived event stream" -- snapshot events every million raw events.
    5. **Use bulk inserts and disable indexes during replay.** Drop non-primary indexes on the projection table before rebuild, bulk-copy events in batches of 10k-50k, then recreate indexes at the end. This alone often takes a rebuild from 6 hours to 40 minutes on PostgreSQL.
    6. **Split hot from cold.** If the projection only needs recent data (last 90 days), introduce a "cold" archive of older events and a warm partition the rebuild actually scans. Store older events compressed in object storage with a pointer kept in the primary table.
    7. **Blue-green the projection.** Build the new projection into a new table while the old one serves traffic, then atomically swap names. This means you never stop serving reads during rebuild.

    **Real-World Example**

    In 2018, EventStoreDB's Alexey Zimarev documented a case at a European fintech where a balances projection over \~1.8 billion events rebuilt in 7 hours. Partitioning the rebuild across 16 workers keyed on account-id hash, disabling non-primary indexes during load, and swapping to blue/green projection tables brought it down to 34 minutes. The team also introduced an "aggregated snapshot stream" emitted nightly, so ad-hoc projections could initialize from the snapshot instead of replaying from zero.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: What if two projections disagree on a value during the rebuild -- for example, the old projection shows balance $500 and the new one shows $480?**

      **Strong Answer:** That is a signal the projection logic changed, or an event was historically replayed incorrectly. Do not switch traffic until you have reconciled. Run the rebuild in shadow mode: write to the new table but keep reading from the old one, then run a diff job that compares every row and emits mismatches. Categorize mismatches: if all mismatches are explained by the intentional logic change (say, a new rounding rule), ship it with a changelog note. If mismatches are unexplained, the old projection has silent bugs and you need to investigate before swapping.
    </Note>

    <Note>
      **Follow-up 2: How do you handle projection rebuilds for a projection that feeds an external system (say, a Kafka topic)?**

      **Strong Answer:** Never re-emit to the external system during rebuild. External consumers will see duplicates or reprocess events that already triggered side effects (emails, charges). Split the projection into two concerns: a deterministic local table (safe to rebuild) and an external-emission ledger that records "we already sent event X downstream." Rebuilds repopulate the local table only; the emission ledger is append-only and never replayed. If the external system genuinely needs a replay, coordinate it as an explicit "full resync" operation with downstream consent.
    </Note>

    <Note>
      **Follow-up 3: What if the event schema evolved during the 6-hour rebuild window and new events are being written in v4 while the rebuild is processing v1 events?**

      **Strong Answer:** This is exactly why upcasting on read exists. The rebuild pipeline runs the upcaster chain on every event it reads, so v1 events become v4-shaped before they hit the projection logic. The projection code only knows v4. Verify the upcaster is idempotent (running it twice produces the same result) and unit-test it against sample v1, v2, v3 events before the rebuild starts. If the upcaster is buggy, you will rebuild into a broken projection that looks plausible, which is the worst outcome.
    </Note>

    **Common Wrong Answers**

    * *"Just buy a bigger database."* Scaling up hides the problem for 6 months until you hit the next wall. The issue is architectural (no partitioning, no snapshots, indexed writes during bulk load), not hardware.
    * *"Stop the world and rebuild overnight."* A 6-hour maintenance window is unacceptable for anything customer-facing, and the rebuild will eventually grow beyond the window. Blue-green swap solves this without downtime.

    **Further Reading**

    * Greg Young, "Building an Event Storage" -- the original rationale for stream-partitioned event stores.
    * Martin Kleppmann, *Designing Data-Intensive Applications*, Chapter 11 (Stream Processing) -- covers projection rebuild strategies generically.
    * Apache Kafka documentation on "log compaction" -- the same idea applied to message logs.
  </Accordion>

  <Accordion title="Scenario: A user reports their account balance is wrong by $47.23. You have 8 years of events for this account. Walk through how you'd debug it in an event-sourced system.">
    **Strong Answer Framework**

    1. **Start with the projection, not the events.** Query the balance projection for this account to see what the system currently shows. Compare with what the user says it should be. Confirm the delta is \$47.23 and note whether it's positive (system says too much) or negative (system says too little).
    2. **Sum the raw events as a sanity check.** Run `SELECT SUM(CASE WHEN type='MoneyDeposited' THEN amount ELSE -amount END) FROM events WHERE stream_id = 'BankAccount-X'`. If this matches the projection, the bug is in how events were recorded (the write path). If it differs from the projection, the bug is in the projection logic or a missed event.
    3. **Scan for suspicious events.** Look for events with unusually large amounts, duplicate timestamps, or events just before/after a known system incident. Temporal correlation is often the smoking gun -- "this balance went wrong at 02:14 AM on March 3rd, what was happening then?"
    4. **Replay the aggregate to the suspected bad point.** Load events in order up to just before the suspected event and inspect the computed state. Then apply one more event at a time. This is where event sourcing shines: you can reconstruct state at any point in history without log grepping.
    5. **Check for compensating events that never fired.** If there was a failed external transaction that should have been compensated (refund, reversal) but the compensating event was never emitted, the balance drifts. Common root cause when the issue involves integrations with payment processors.
    6. **Check projection checkpoint and duplicate application.** If the projection crashed mid-batch and the checkpoint save was not transactional with the row update, some events may have been applied twice. Query for telltale signs (repeated exact amounts within the same second).
    7. **Fix with a compensating event, not a direct write.** Never `UPDATE account_balances SET balance = correct_value`. Append a `BalanceAdjusted` event with reason and operator, rebuild the projection, verify. The audit trail stays intact.

    **Real-World Example**

    In 2020, a well-known US-regional bank (publicly discussed at QCon San Francisco by an engineering manager from their core banking team) had a balance discrepancy case where \$23,000 was missing from a corporate account. The team used event replay to reconstruct the state at 15-minute intervals over the disputed 3-day window. They located a `WithdrawalRequested` event for which no corresponding `WithdrawalSettled` or `WithdrawalCancelled` ever arrived due to a message bus partition during a network incident. The fix was a `WithdrawalCancelled` compensating event plus a monitor for unresolved pending transactions older than 24 hours. The whole investigation took under two hours; in a CRUD system, the equivalent forensic work would have required database backups and audit-log correlation across services.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: What if the events table itself has been tampered with -- say, a rogue operator ran an UPDATE?**

      **Strong Answer:** This is why production event stores should be append-only at the permission level (`GRANT INSERT, SELECT ON events` with no UPDATE/DELETE for application roles) and should include a hash chain: every event's metadata contains `hash(prev_event_hash + this_event_payload)`. A scan that recomputes hashes and compares them to stored ones reveals tampering. Some teams go further and anchor the hash chain to an external signed timestamp service so tampering is legally detectable. For the case where tampering is discovered, you replay from a known-good backup and coordinate compensating events for any real activity that occurred after the tampering.
    </Note>

    <Note>
      **Follow-up 2: The user insists the balance was wrong 6 months ago too, but your projection was rebuilt 2 months ago. How do you answer "what was the balance on October 15th at 3 PM?"**

      **Strong Answer:** This is event sourcing's superpower -- temporal queries. Replay the aggregate's events filtered by `created_at <= '2024-10-15 15:00:00'` and compute the balance. That is the definitive answer, independent of whatever the projection said at the time. If the projection was wrong historically, you can now prove the user right (or wrong) from the canonical event log. In a CRUD system this question is unanswerable without a separate audit table that may itself have drifted.
    </Note>

    <Note>
      **Follow-up 3: Event sourcing debugging works for this bank account, but our ops team keeps saying "eventual consistency is confusing product managers -- they deposit money and it doesn't show up immediately." How do you reconcile the benefits for engineers with the friction for PMs?**

      **Strong Answer:** Two mitigations. First, make the write-side response authoritative: when a user deposits \$100, return the updated balance from the aggregate's in-memory state (which is strongly consistent by definition), not from the projection. The UI shows the correct balance immediately because it came from the command response, not a follow-up query. Second, for internal tools and dashboards, expose the projection staleness explicitly -- show "last updated 350ms ago" next to the balance so PMs see the lag is measured in hundreds of milliseconds, not hours. Third, educate the product org once: explain that "eventual" in practice is sub-second for healthy systems, and only becomes visible during incidents -- at which point you have bigger problems than stale projections.
    </Note>

    **Common Wrong Answers**

    * *"I'd check the database directly and fix the balance."* This is the CRUD mindset. In event sourcing, the database (projection) is derived -- fixing it without fixing the underlying events means the projection will drift again on the next rebuild. The root cause is always in the event stream.
    * *"I'd add a log statement and wait for it to happen again."* In an event-sourced system, you already have the log -- it's the event store. You don't need to reproduce the bug because you have the full history of what happened.

    **Further Reading**

    * Martin Fowler, "Event Sourcing" -- original articulation of temporal queries as a first-class capability.
    * Greg Young, "CQRS Documents" -- detailed treatment of debugging via event replay.
    * Adam Dymitruk, "Event Modeling" -- workshops teach the exact debugging workflow above.
  </Accordion>

  <Accordion title="Scenario: Your team is about to launch an event-sourced trading system and the CTO is nervous about the event store becoming a single point of failure. How do you design for availability?">
    **Strong Answer Framework**

    1. **Separate write availability from read availability.** The event store write path must be strongly consistent (you cannot lose a trade), but read models can be stale or locally replicated. Design them differently: write side is synchronously replicated (PostgreSQL with synchronous\_commit=on, or a managed multi-AZ setup). Read side runs regionally with async replication of the event stream.
    2. **Use a proven log-structured store if scale demands it.** For anything past 50k events/sec sustained, move the event log to Kafka (with `acks=all` and `min.insync.replicas=2`) and keep PostgreSQL only for aggregate-level streams. Kafka's replication protocol is designed for exactly this.
    3. **Snapshot aggressively so you can survive store unavailability for reads.** If every aggregate has a snapshot within the last 100 events, you can serve reads from snapshot + in-memory cache if the event store is degraded.
    4. **Adopt a transactional outbox** if you're using PostgreSQL and need downstream propagation. Outbox + relay to Kafka gives you "write durably to Postgres, propagate reliably to anywhere."
    5. **Design aggregates to be idempotent on retry.** Every command should include an idempotency key stored on the event. If a client retries a deposit after a network failure and the command is applied twice, the second write detects the duplicate key and returns success without double-charging.
    6. **Regional failover is driven by the event log, not the projections.** In a DR scenario, the secondary region replays the replicated event log into fresh projections. The projections in the dead region are not part of the recovery -- they are regenerated.
    7. **Write runbooks for projection corruption separately from write-side failure.** The two failure modes require different responses and the team must practice both.

    **Real-World Example**

    The LMAX exchange, an early high-performance event-sourced trading platform, documented their design in 2010 via Martin Fowler's "LMAX Architecture" article. Their event log (the "input disruptor") was triple-replicated to disk with strict ordering; reads came from in-memory projections replayed on startup from the log. Failure modes were categorized as "log unavailable" (halt new trades, serve stale reads) and "projection corrupt" (rebuild from log, trades continue). This separation is why LMAX sustained over 6 million orders per second on 2010 hardware -- the write path's rigor was not compromised by read-side convenience.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: How do you handle the case where an event write succeeds but the response to the client is lost (network blip)?**

      **Strong Answer:** Idempotency keys. Every incoming command carries a client-generated UUID. The first time the server sees the UUID, it writes the event with that UUID embedded in metadata. If the client retries with the same UUID, the server queries the event store for any event with that UUID and, if found, returns the same response as before without re-applying. This is the standard pattern for "exactly-once semantics" at the API boundary.
    </Note>

    <Note>
      **Follow-up 2: You described synchronous replication for durability. What's the latency cost and when would you relax it?**

      **Strong Answer:** Synchronous replication to a replica in the same AZ adds roughly 1-3ms; across AZs typically 5-15ms; across regions often 50-100ms+. For a trading system, across-AZ sync is mandatory (AZ-level failure is a real risk), cross-region sync is usually too slow and you fall back to async with RPO measured in seconds. The trade-off is explicit: a cross-region async replication means you may lose the last few seconds of trades in a regional disaster. That's an acceptable risk for most systems; if it isn't, you need a true globally-synchronous store like Spanner or CockroachDB and you budget for the latency.
    </Note>

    <Note>
      **Follow-up 3: If projections are regenerable from the log, why should I monitor projection correctness at all?**

      **Strong Answer:** Because you want to know *before* a user notices. A projection can silently diverge from the event log due to a resolver bug, a schema migration that misapplied defaults, or an accidental direct write from a debugging session. Nightly audits compute a checksum on the projection table and compare against a fresh replay of the log for a small sample of streams. Mismatches trigger alerts. Without this, you eventually get a support ticket for a wrong balance and your only clue is "we don't know when this started going wrong."
    </Note>

    **Common Wrong Answers**

    * *"We'll use an active-active multi-master event store."* Active-active writes to an event log destroy ordering guarantees and create merge conflicts that are unresolvable at the event level. Event stores are single-leader by design. Availability comes from fast failover, not concurrent writes.
    * *"We'll cache aggregates in Redis so we never hit the event store for reads."* Cache coherence with an event-sourced aggregate is subtle. If you rehydrate from cache and miss events that landed after the cache fill, you will emit commands based on stale state and hit ConcurrencyErrors on save. Caches must be invalidated by the event stream, which is more work than it's worth for most aggregates.

    **Further Reading**

    * Martin Fowler, "The LMAX Architecture" (2011) -- the canonical write-up on high-availability event-sourced trading.
    * Confluent, "Exactly-Once Semantics in Kafka" -- details the idempotent producer pattern used in step 5 above.
    * Google's Spanner paper (2012) -- for context on the cost of true global consistency when you do need it.
  </Accordion>
</AccordionGroup>

***

## Aggregate Pattern

An aggregate is a cluster of domain objects treated as a single unit for data changes, with one root entity that enforces invariants. In event sourcing, the aggregate is the piece of code that decides "given these past events and this incoming command, what new events should happen?" Commands are intent (`deposit $100`), events are facts (`MoneyDeposited $100`). This distinction matters because commands can be rejected (insufficient funds, account closed) while events, once stored, are immutable truth. You cannot "undo" an event -- you can only append a compensating one.

The pattern of `applyChange` (which both mutates state and records an uncommitted event) versus `_apply` (which only mutates state from historical events) is what lets the same code handle both fresh command processing and rehydration from the event store. When you load an aggregate, you construct an empty instance and fold the stored events through `_apply`. When you execute a command, you call a business method that validates invariants and calls `applyChange` to produce new events. If you separated these into totally different code paths, you'd inevitably have drift -- state computed by the command path would diverge from state computed during replay, and your bugs would be of the "cannot reproduce" variety.

A common beginner mistake: putting validation inside event handlers (`onMoneyWithdrawn`). Don't. Events are facts that already happened; they cannot be rejected during replay. Validation belongs in the command method (`withdraw()`), which decides whether to emit the event in the first place. If you add validation to the handler, replaying old events can throw exceptions mid-load, and you'll discover this the first time business rules tighten (e.g., a new minimum balance rule applied retroactively to historical withdrawals that were legal at the time).

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // aggregate.js - Event-sourced aggregate root

    class AggregateRoot {
      constructor() {
        this._uncommittedEvents = [];
        this._version = 0;
      }

      get version() {
        return this._version;
      }

      get uncommittedEvents() {
        return this._uncommittedEvents;
      }

      clearUncommittedEvents() {
        this._uncommittedEvents = [];
      }

      // Apply event and track as uncommitted
      applyChange(event) {
        this._apply(event);
        this._uncommittedEvents.push(event);
      }

      // Apply event from history (committed)
      _apply(event) {
        const handler = `on${event.type}`;
        if (typeof this[handler] === 'function') {
          this[handler](event.data);
        }
        this._version++;
      }

      // Load from event history
      loadFromHistory(events) {
        for (const event of events) {
          this._apply({
            type: event.event_type,
            data: event.event_data
          });
        }
      }
    }

    // Bank Account aggregate
    class BankAccount extends AggregateRoot {
      constructor() {
        super();
        this.id = null;
        this.balance = 0;
        this.status = 'pending';
        this.transactions = [];
      }

      // Commands (business logic + emit events)
      static open(accountId, ownerId, initialDeposit = 0) {
        const account = new BankAccount();
        
        account.applyChange({
          type: 'AccountOpened',
          data: {
            accountId,
            ownerId,
            openedAt: new Date().toISOString()
          }
        });

        if (initialDeposit > 0) {
          account.deposit(initialDeposit, 'Initial deposit');
        }

        return account;
      }

      deposit(amount, description = '') {
        if (amount <= 0) {
          throw new Error('Deposit amount must be positive');
        }
        if (this.status !== 'active') {
          throw new Error('Account is not active');
        }

        this.applyChange({
          type: 'MoneyDeposited',
          data: {
            amount,
            description,
            timestamp: new Date().toISOString()
          }
        });
      }

      withdraw(amount, description = '') {
        if (amount <= 0) {
          throw new Error('Withdrawal amount must be positive');
        }
        if (this.status !== 'active') {
          throw new Error('Account is not active');
        }
        if (amount > this.balance) {
          throw new Error('Insufficient funds');
        }

        this.applyChange({
          type: 'MoneyWithdrawn',
          data: {
            amount,
            description,
            timestamp: new Date().toISOString()
          }
        });
      }

      close(reason = '') {
        if (this.balance !== 0) {
          throw new Error('Cannot close account with non-zero balance');
        }

        this.applyChange({
          type: 'AccountClosed',
          data: {
            reason,
            closedAt: new Date().toISOString()
          }
        });
      }

      // Event handlers (state changes)
      onAccountOpened(data) {
        this.id = data.accountId;
        this.ownerId = data.ownerId;
        this.status = 'active';
        this.openedAt = data.openedAt;
      }

      onMoneyDeposited(data) {
        this.balance += data.amount;
        this.transactions.push({
          type: 'deposit',
          amount: data.amount,
          timestamp: data.timestamp
        });
      }

      onMoneyWithdrawn(data) {
        this.balance -= data.amount;
        this.transactions.push({
          type: 'withdrawal',
          amount: data.amount,
          timestamp: data.timestamp
        });
      }

      onAccountClosed(data) {
        this.status = 'closed';
        this.closedAt = data.closedAt;
      }

      // Snapshot for performance
      toSnapshot() {
        return {
          id: this.id,
          ownerId: this.ownerId,
          balance: this.balance,
          status: this.status,
          openedAt: this.openedAt,
          closedAt: this.closedAt,
          transactionCount: this.transactions.length
        };
      }

      static fromSnapshot(snapshot) {
        const account = new BankAccount();
        Object.assign(account, snapshot);
        account.transactions = []; // Don't store all transactions in snapshot
        return account;
      }
    }

    module.exports = { AggregateRoot, BankAccount };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # aggregate.py - Event-sourced aggregate root
    # We model events as Pydantic BaseModel so payloads are validated on
    # construction and automatically JSON-serializable for the store.

    from __future__ import annotations

    from datetime import datetime, timezone
    from typing import Any, ClassVar, Literal

    from pydantic import BaseModel, Field


    # ----- Event payloads (immutable, validated) -------------------------------

    class DomainEvent(BaseModel):
        """Base class so we get a common `type` discriminator."""
        model_config = {"frozen": True}

        @property
        def event_type(self) -> str:
            return self.__class__.__name__


    class AccountOpened(DomainEvent):
        account_id: str
        owner_id: str
        opened_at: str


    class MoneyDeposited(DomainEvent):
        amount: float
        description: str = ""
        timestamp: str


    class MoneyWithdrawn(DomainEvent):
        amount: float
        description: str = ""
        timestamp: str


    class AccountClosed(DomainEvent):
        reason: str = ""
        closed_at: str


    # ----- Aggregate root ------------------------------------------------------

    class AggregateRoot:
        """Minimal aggregate base. Subclasses implement `_apply_*` methods
        named after each event class, e.g. `_apply_MoneyDeposited(event)`.
        """

        def __init__(self) -> None:
            self._uncommitted: list[DomainEvent] = []
            self._version: int = 0

        @property
        def version(self) -> int:
            return self._version

        @property
        def uncommitted_events(self) -> list[DomainEvent]:
            return list(self._uncommitted)

        def clear_uncommitted(self) -> None:
            self._uncommitted.clear()

        def _apply(self, event: DomainEvent) -> None:
            handler_name = f"_apply_{event.event_type}"
            handler = getattr(self, handler_name, None)
            if callable(handler):
                handler(event)
            self._version += 1

        def apply_change(self, event: DomainEvent) -> None:
            """Called from commands -- mutates state AND records the event."""
            self._apply(event)
            self._uncommitted.append(event)

        def load_from_history(self, events: list[DomainEvent]) -> None:
            """Called from the repository during rehydration -- state only."""
            for event in events:
                self._apply(event)


    # ----- BankAccount aggregate ----------------------------------------------

    class BankAccount(AggregateRoot):
        def __init__(self) -> None:
            super().__init__()
            self.id: str | None = None
            self.owner_id: str | None = None
            self.balance: float = 0.0
            self.status: Literal["pending", "active", "closed"] = "pending"
            self.transactions: list[dict[str, Any]] = []
            self.opened_at: str | None = None
            self.closed_at: str | None = None

        # -- Commands (validate invariants, emit events) ------------------------

        @classmethod
        def open(
            cls, account_id: str, owner_id: str, initial_deposit: float = 0.0
        ) -> "BankAccount":
            account = cls()
            account.apply_change(AccountOpened(
                account_id=account_id,
                owner_id=owner_id,
                opened_at=datetime.now(timezone.utc).isoformat(),
            ))
            if initial_deposit > 0:
                account.deposit(initial_deposit, "Initial deposit")
            return account

        def deposit(self, amount: float, description: str = "") -> None:
            if amount <= 0:
                raise ValueError("Deposit amount must be positive")
            if self.status != "active":
                raise ValueError("Account is not active")
            self.apply_change(MoneyDeposited(
                amount=amount,
                description=description,
                timestamp=datetime.now(timezone.utc).isoformat(),
            ))

        def withdraw(self, amount: float, description: str = "") -> None:
            if amount <= 0:
                raise ValueError("Withdrawal amount must be positive")
            if self.status != "active":
                raise ValueError("Account is not active")
            if amount > self.balance:
                raise ValueError("Insufficient funds")
            self.apply_change(MoneyWithdrawn(
                amount=amount,
                description=description,
                timestamp=datetime.now(timezone.utc).isoformat(),
            ))

        def close(self, reason: str = "") -> None:
            if self.balance != 0:
                raise ValueError("Cannot close account with non-zero balance")
            self.apply_change(AccountClosed(
                reason=reason,
                closed_at=datetime.now(timezone.utc).isoformat(),
            ))

        # -- Event handlers (pure state transitions) ----------------------------

        def _apply_AccountOpened(self, e: AccountOpened) -> None:
            self.id = e.account_id
            self.owner_id = e.owner_id
            self.status = "active"
            self.opened_at = e.opened_at

        def _apply_MoneyDeposited(self, e: MoneyDeposited) -> None:
            self.balance += e.amount
            self.transactions.append({
                "type": "deposit",
                "amount": e.amount,
                "timestamp": e.timestamp,
            })

        def _apply_MoneyWithdrawn(self, e: MoneyWithdrawn) -> None:
            self.balance -= e.amount
            self.transactions.append({
                "type": "withdrawal",
                "amount": e.amount,
                "timestamp": e.timestamp,
            })

        def _apply_AccountClosed(self, e: AccountClosed) -> None:
            self.status = "closed"
            self.closed_at = e.closed_at

        # -- Snapshot support ---------------------------------------------------

        def to_snapshot(self) -> dict[str, Any]:
            return {
                "id": self.id,
                "owner_id": self.owner_id,
                "balance": self.balance,
                "status": self.status,
                "opened_at": self.opened_at,
                "closed_at": self.closed_at,
                "transaction_count": len(self.transactions),
            }

        @classmethod
        def from_snapshot(cls, snapshot: dict[str, Any]) -> "BankAccount":
            account = cls()
            for key, value in snapshot.items():
                if key != "transaction_count":
                    setattr(account, key, value)
            account.transactions = []  # snapshots store count only
            return account
    ```
  </Tab>
</Tabs>

***

## Repository Pattern

The repository is the boundary between domain code (aggregates) and infrastructure (the event store). Its job is simple but crucial: hide the fact that the aggregate is backed by an event log, not a row in a table. A domain developer writing `account.deposit(100)` and `repository.save(account)` should not need to think about versions, streams, snapshots, or concurrency. The repository translates the aggregate's `uncommittedEvents` into a stream append, and translates a stream read back into a rehydrated aggregate.

Why bother with the abstraction? Because two years from now, when you swap PostgreSQL for EventStoreDB, or when you want to add a write-through cache for frequently-accessed aggregates, or when you realize snapshots need to live in Redis instead of the main database -- those changes should be invisible to domain code. If your command handlers call `eventStore.appendToStream(...)` directly, you have to rewrite them all. If they call `repository.save(account)`, you rewrite one class.

Snapshots deserve special attention here. The snapshot frequency (every 100 events in our example) is a tuning knob that trades write overhead for read performance. Too frequent: you're writing state snapshots that nobody will ever read (for short-lived aggregates) and burning storage. Too infrequent: loading an active aggregate requires replaying thousands of events, killing response time. The sweet spot is domain-specific -- for a bank account that lives 30 years with a few transactions per month, snapshot every 50 events. For a short-lived shopping cart that gets abandoned after 20 events, snapshots may not be worth it at all. Measure `p99 load time` and adjust.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // repository.js - Event-sourced repository

    class EventSourcedRepository {
      constructor(eventStore, aggregateType, AggregateClass) {
        this.eventStore = eventStore;
        this.aggregateType = aggregateType;
        this.AggregateClass = AggregateClass;
        this.snapshotFrequency = 100; // Snapshot every 100 events
      }

      async getById(id) {
        const streamId = `${this.aggregateType}-${id}`;
        
        // Try to load from snapshot first
        let aggregate;
        let fromVersion = 0;
        
        const snapshot = await this.eventStore.loadSnapshot(streamId);
        if (snapshot) {
          aggregate = this.AggregateClass.fromSnapshot(snapshot.state);
          aggregate._version = snapshot.version;
          fromVersion = snapshot.version + 1;
        } else {
          aggregate = new this.AggregateClass();
        }

        // Load events since snapshot (or all events if no snapshot)
        const events = await this.eventStore.readStream(streamId, fromVersion);
        
        if (events.length === 0 && !snapshot) {
          return null; // Aggregate doesn't exist
        }

        aggregate.loadFromHistory(events);
        
        return aggregate;
      }

      async save(aggregate) {
        const streamId = `${this.aggregateType}-${aggregate.id}`;
        const events = aggregate.uncommittedEvents;
        
        if (events.length === 0) {
          return; // Nothing to save
        }

        // Format events for storage
        const eventsToStore = events.map(e => ({
          type: e.type,
          data: e.data,
          metadata: {
            timestamp: new Date().toISOString(),
            ...e.metadata
          }
        }));

        // Append to event store with optimistic concurrency
        const expectedVersion = aggregate.version - events.length;
        await this.eventStore.appendToStream(
          streamId,
          this.aggregateType,
          eventsToStore,
          expectedVersion
        );

        // Create snapshot if needed
        if (aggregate.version % this.snapshotFrequency === 0) {
          await this.eventStore.saveSnapshot(
            streamId,
            this.aggregateType,
            aggregate.version,
            aggregate.toSnapshot()
          );
        }

        aggregate.clearUncommittedEvents();
      }
    }

    // Usage
    const bankAccountRepo = new EventSourcedRepository(
      eventStore,
      'BankAccount',
      BankAccount
    );

    // Create new account
    const account = BankAccount.open('acc-123', 'user-456', 1000);
    await bankAccountRepo.save(account);

    // Load and modify
    const loadedAccount = await bankAccountRepo.getById('acc-123');
    loadedAccount.deposit(500, 'Salary');
    loadedAccount.withdraw(200, 'Groceries');
    await bankAccountRepo.save(loadedAccount);

    module.exports = { EventSourcedRepository };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # repository.py - Event-sourced repository
    # Uses a Generic[AggregateT] so you get type hints end-to-end:
    # repo: EventSourcedRepository[BankAccount] => repo.get_by_id(...) -> BankAccount | None

    from __future__ import annotations

    from datetime import datetime, timezone
    from typing import Generic, Protocol, Type, TypeVar

    from aggregate import AggregateRoot, DomainEvent
    from event_store import EventStore, NewEvent


    class EventFactory(Protocol):
        """Type-safe way to reconstruct DomainEvent subclasses from stored JSON."""
        def __call__(self, event_type: str, data: dict) -> DomainEvent: ...


    AggregateT = TypeVar("AggregateT", bound=AggregateRoot)


    class EventSourcedRepository(Generic[AggregateT]):
        def __init__(
            self,
            event_store: EventStore,
            aggregate_type: str,
            aggregate_class: Type[AggregateT],
            event_factory: EventFactory,
            snapshot_frequency: int = 100,
        ) -> None:
            self._store = event_store
            self._aggregate_type = aggregate_type
            self._aggregate_class = aggregate_class
            self._event_factory = event_factory
            self._snapshot_frequency = snapshot_frequency

        def _stream_id(self, aggregate_id: str) -> str:
            return f"{self._aggregate_type}-{aggregate_id}"

        async def get_by_id(self, aggregate_id: str) -> AggregateT | None:
            stream_id = self._stream_id(aggregate_id)

            snapshot = await self._store.load_snapshot(stream_id)
            if snapshot is not None:
                aggregate = self._aggregate_class.from_snapshot(snapshot["state"])
                aggregate._version = snapshot["version"]
                from_version = snapshot["version"] + 1
            else:
                aggregate = self._aggregate_class()
                from_version = 0

            stored = await self._store.read_stream(stream_id, from_version)
            if not stored and snapshot is None:
                return None

            domain_events = [
                self._event_factory(s.event_type, s.event_data) for s in stored
            ]
            aggregate.load_from_history(domain_events)
            return aggregate

        async def save(self, aggregate: AggregateT) -> None:
            events = aggregate.uncommitted_events
            if not events:
                return

            stream_id = self._stream_id(aggregate.id)  # type: ignore[attr-defined]
            to_store = [
                NewEvent(
                    type=event.event_type,
                    data=event.model_dump(),
                    metadata={"timestamp": datetime.now(timezone.utc).isoformat()},
                )
                for event in events
            ]
            expected_version = aggregate.version - len(events)
            await self._store.append_to_stream(
                stream_id, self._aggregate_type, to_store, expected_version
            )

            if aggregate.version % self._snapshot_frequency == 0:
                await self._store.save_snapshot(
                    stream_id,
                    self._aggregate_type,
                    aggregate.version,
                    aggregate.to_snapshot(),  # type: ignore[attr-defined]
                )

            aggregate.clear_uncommitted()


    # -- Example factory + usage -----------------------------------------------

    from aggregate import (
        AccountOpened, MoneyDeposited, MoneyWithdrawn, AccountClosed, BankAccount,
    )

    _EVENT_TYPES: dict[str, type[DomainEvent]] = {
        "AccountOpened": AccountOpened,
        "MoneyDeposited": MoneyDeposited,
        "MoneyWithdrawn": MoneyWithdrawn,
        "AccountClosed": AccountClosed,
    }


    def bank_account_event_factory(event_type: str, data: dict) -> DomainEvent:
        return _EVENT_TYPES[event_type](**data)


    # async def main():
    #     repo: EventSourcedRepository[BankAccount] = EventSourcedRepository(
    #         event_store, "BankAccount", BankAccount, bank_account_event_factory,
    #     )
    #     account = BankAccount.open("acc-123", "user-456", 1000)
    #     await repo.save(account)
    #
    #     loaded = await repo.get_by_id("acc-123")
    #     loaded.deposit(500, "Salary")
    #     loaded.withdraw(200, "Groceries")
    #     await repo.save(loaded)
    ```
  </Tab>
</Tabs>

***

## Projections

Projections are where event sourcing's read-side gets its scalability. The core idea: the event log is optimized for appends and per-stream reads (loading an aggregate), but it's terrible for ad-hoc queries like "show me all accounts with balance > \$10,000 sorted by owner name." Rather than trying to make the event log good at everything, we derive purpose-built read models from it. Each projection is a materialized view, subscribed to the event stream, that maintains a schema optimized for specific queries.

This is where CQRS (Command Query Responsibility Segregation) naturally enters the picture. Your write model (the event-sourced aggregates) and your read models (projections) evolve independently. Need a new query? Build a new projection. Don't like how your balance report looks? Drop the projection table, reset the checkpoint, and replay events. This is vastly easier than reshaping a traditional database schema, which requires migration scripts, downtime planning, and prayer.

The trade-off you're buying into: **eventual consistency**. When a user deposits \$100, the event is committed instantly, but the `account_balances` projection may be a few hundred milliseconds behind. For most domains this is fine -- the UI can display the command's response directly rather than re-querying. For domains where it's not fine (strong read-your-writes requirements), you can route specific reads back to the write side via aggregate replay, or use single-writer projections with synchronous updates. Don't make the whole system synchronous just because one query needs it; that defeats the point.

A non-obvious operational win: projections are **disposable**. If the balance projection has a bug, you don't need to frantically patch production data -- you deploy a fix, truncate the projection table, reset its checkpoint, and let it rebuild from the event stream. The source of truth is never damaged. I have watched teams rebuild six-month-old projection tables over a weekend to add new columns with exactly zero data loss. That's not possible with a CRUD architecture.

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    PROJECTIONS (READ MODELS)                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Event Store (Write Side)                                                   │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  1. AccountOpened { accountId: 'A1', ownerId: 'U1' }                │   │
│  │  2. MoneyDeposited { accountId: 'A1', amount: 1000 }                │   │
│  │  3. AccountOpened { accountId: 'A2', ownerId: 'U2' }                │   │
│  │  4. MoneyDeposited { accountId: 'A2', amount: 500 }                 │   │
│  │  5. MoneyWithdrawn { accountId: 'A1', amount: 200 }                 │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                │                                            │
│                                ▼                                            │
│                       ┌─────────────────┐                                  │
│                       │   Projector     │                                  │
│                       │   (Event        │                                  │
│                       │   Handler)      │                                  │
│                       └────────┬────────┘                                  │
│                                │                                            │
│            ┌───────────────────┼───────────────────┐                       │
│            ▼                   ▼                   ▼                       │
│  ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐        │
│  │ Account Balances  │ │ Transaction List  │ │ Account Summary   │        │
│  │ (Read Model)      │ │ (Read Model)      │ │ (Read Model)      │        │
│  ├───────────────────┤ ├───────────────────┤ ├───────────────────┤        │
│  │ A1: $800          │ │ A1: +1000, -200   │ │ Total: 2 accounts │        │
│  │ A2: $500          │ │ A2: +500          │ │ Total: $1300      │        │
│  └───────────────────┘ └───────────────────┘ └───────────────────┘        │
│                                                                              │
│  Each projection can be:                                                    │
│  • Different database (PostgreSQL, MongoDB, Redis, Elasticsearch)          │
│  • Optimized for specific queries                                          │
│  • Rebuilt by replaying all events                                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

### Projection Implementation

Every projection needs a **checkpoint** -- the position in the event stream up to which it has already processed. Without checkpoints, a projection restarting after a crash would either double-apply events (corrupting sums) or miss events entirely. The checkpoint table is the projection's memory of "where was I?"

The processing loop is deliberately simple: read a batch of events from the last checkpoint, apply them to the projection, save the new checkpoint, sleep briefly, repeat. Simple is good here because projections are frequently the thing that's wrong -- simple code is easy to reason about when you're in an incident at 2am wondering why balances look off.

A common mistake is making the checkpoint update and the projection update separate transactions. Don't. If the projection updates the balance but crashes before saving the checkpoint, the next startup will re-apply the same event, doubling the balance. Wrap them in a single transaction (or make the projection operation idempotent by including the event ID in a uniqueness constraint -- the "inbox pattern").

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // projections.js - Read model projections

    class ProjectionManager {
      constructor(eventStore, db) {
        this.eventStore = eventStore;
        this.db = db;
        this.projections = new Map();
        this.checkpoints = new Map();
      }

      register(name, projection) {
        this.projections.set(name, projection);
      }

      async start() {
        // Load checkpoints
        for (const [name, projection] of this.projections) {
          const checkpoint = await this.loadCheckpoint(name);
          this.checkpoints.set(name, checkpoint);
        }

        // Start processing
        this.running = true;
        this.process();
      }

      async process() {
        while (this.running) {
          for (const [name, projection] of this.projections) {
            const checkpoint = this.checkpoints.get(name);
            const events = await this.eventStore.readAllEvents(checkpoint, 100);

            if (events.length > 0) {
              for (const event of events) {
                await projection.apply(event);
              }
              
              const newCheckpoint = events[events.length - 1].id;
              await this.saveCheckpoint(name, newCheckpoint);
              this.checkpoints.set(name, newCheckpoint);
            }
          }

          await this.sleep(100);
        }
      }

      async loadCheckpoint(projectionName) {
        const result = await this.db.query(
          `SELECT position FROM projection_checkpoints WHERE name = $1`,
          [projectionName]
        );
        return result.rows[0]?.position || 0;
      }

      async saveCheckpoint(projectionName, position) {
        await this.db.query(
          `INSERT INTO projection_checkpoints (name, position)
           VALUES ($1, $2)
           ON CONFLICT (name) DO UPDATE SET position = $2`,
          [projectionName, position]
        );
      }

      sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
      }
    }

    // Account Balance Projection
    class AccountBalanceProjection {
      constructor(db) {
        this.db = db;
      }

      async initialize() {
        await this.db.query(`
          CREATE TABLE IF NOT EXISTS account_balances (
            account_id VARCHAR(255) PRIMARY KEY,
            owner_id VARCHAR(255) NOT NULL,
            balance DECIMAL(15, 2) DEFAULT 0,
            status VARCHAR(50) DEFAULT 'active',
            updated_at TIMESTAMPTZ DEFAULT NOW()
          )
        `);
      }

      async apply(event) {
        switch (event.event_type) {
          case 'AccountOpened':
            await this.db.query(
              `INSERT INTO account_balances (account_id, owner_id, balance)
               VALUES ($1, $2, 0)
               ON CONFLICT (account_id) DO NOTHING`,
              [event.event_data.accountId, event.event_data.ownerId]
            );
            break;

          case 'MoneyDeposited':
            await this.db.query(
              `UPDATE account_balances 
               SET balance = balance + $1, updated_at = NOW()
               WHERE account_id = $2`,
              [event.event_data.amount, event.stream_id.split('-')[1]]
            );
            break;

          case 'MoneyWithdrawn':
            await this.db.query(
              `UPDATE account_balances 
               SET balance = balance - $1, updated_at = NOW()
               WHERE account_id = $2`,
              [event.event_data.amount, event.stream_id.split('-')[1]]
            );
            break;

          case 'AccountClosed':
            await this.db.query(
              `UPDATE account_balances 
               SET status = 'closed', updated_at = NOW()
               WHERE account_id = $1`,
              [event.stream_id.split('-')[1]]
            );
            break;
        }
      }

      // Rebuild from scratch
      async rebuild() {
        await this.db.query('TRUNCATE account_balances');
        // Checkpoint will be reset, and events will be replayed
      }
    }

    // Transaction History Projection
    class TransactionHistoryProjection {
      constructor(db) {
        this.db = db;
      }

      async initialize() {
        await this.db.query(`
          CREATE TABLE IF NOT EXISTS transaction_history (
            id BIGSERIAL PRIMARY KEY,
            account_id VARCHAR(255) NOT NULL,
            transaction_type VARCHAR(50) NOT NULL,
            amount DECIMAL(15, 2) NOT NULL,
            description TEXT,
            balance_after DECIMAL(15, 2),
            created_at TIMESTAMPTZ NOT NULL
          );
          
          CREATE INDEX IF NOT EXISTS idx_transactions_account 
            ON transaction_history(account_id, created_at DESC);
        `);
      }

      async apply(event) {
        if (event.event_type === 'MoneyDeposited') {
          await this.db.query(
            `INSERT INTO transaction_history 
             (account_id, transaction_type, amount, description, created_at)
             VALUES ($1, 'deposit', $2, $3, $4)`,
            [
              event.stream_id.split('-')[1],
              event.event_data.amount,
              event.event_data.description,
              event.event_data.timestamp
            ]
          );
        }

        if (event.event_type === 'MoneyWithdrawn') {
          await this.db.query(
            `INSERT INTO transaction_history 
             (account_id, transaction_type, amount, description, created_at)
             VALUES ($1, 'withdrawal', $2, $3, $4)`,
            [
              event.stream_id.split('-')[1],
              event.event_data.amount,
              event.event_data.description,
              event.event_data.timestamp
            ]
          );
        }
      }
    }

    // Daily Summary Projection (Aggregated view)
    class DailySummaryProjection {
      constructor(db) {
        this.db = db;
      }

      async initialize() {
        await this.db.query(`
          CREATE TABLE IF NOT EXISTS daily_summaries (
            date DATE NOT NULL,
            account_id VARCHAR(255) NOT NULL,
            deposits_count INTEGER DEFAULT 0,
            deposits_total DECIMAL(15, 2) DEFAULT 0,
            withdrawals_count INTEGER DEFAULT 0,
            withdrawals_total DECIMAL(15, 2) DEFAULT 0,
            PRIMARY KEY (date, account_id)
          )
        `);
      }

      async apply(event) {
        const date = new Date(event.created_at).toISOString().split('T')[0];
        const accountId = event.stream_id.split('-')[1];

        if (event.event_type === 'MoneyDeposited') {
          await this.db.query(
            `INSERT INTO daily_summaries (date, account_id, deposits_count, deposits_total)
             VALUES ($1, $2, 1, $3)
             ON CONFLICT (date, account_id) DO UPDATE SET
               deposits_count = daily_summaries.deposits_count + 1,
               deposits_total = daily_summaries.deposits_total + $3`,
            [date, accountId, event.event_data.amount]
          );
        }

        if (event.event_type === 'MoneyWithdrawn') {
          await this.db.query(
            `INSERT INTO daily_summaries (date, account_id, withdrawals_count, withdrawals_total)
             VALUES ($1, $2, 1, $3)
             ON CONFLICT (date, account_id) DO UPDATE SET
               withdrawals_count = daily_summaries.withdrawals_count + 1,
               withdrawals_total = daily_summaries.withdrawals_total + $3`,
            [date, accountId, event.event_data.amount]
          );
        }
      }
    }

    module.exports = {
      ProjectionManager,
      AccountBalanceProjection,
      TransactionHistoryProjection,
      DailySummaryProjection
    };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # projections.py - asyncio-based projection manager + read models
    # Each projection owns its SQLAlchemy tables; the manager drives the event
    # pump and persists checkpoints atomically with the projection updates.

    from __future__ import annotations

    import asyncio
    from abc import ABC, abstractmethod
    from datetime import datetime
    from typing import Protocol

    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import AsyncEngine

    from event_store import EventStore, StoredEvent


    class Projection(Protocol):
        name: str
        async def initialize(self) -> None: ...
        async def apply(self, event: StoredEvent) -> None: ...


    class ProjectionManager:
        def __init__(
            self, event_store: EventStore, engine: AsyncEngine,
            poll_interval_s: float = 0.1, batch_size: int = 100,
        ) -> None:
            self._store = event_store
            self._engine = engine
            self._projections: list[Projection] = []
            self._running = False
            self._poll_interval = poll_interval_s
            self._batch_size = batch_size

        def register(self, projection: Projection) -> None:
            self._projections.append(projection)

        async def _ensure_checkpoint_table(self) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS projection_checkpoints (
                        name VARCHAR(255) PRIMARY KEY,
                        position BIGINT NOT NULL DEFAULT 0
                    );
                """))

        async def _load_checkpoint(self, name: str) -> int:
            async with self._engine.connect() as conn:
                r = await conn.execute(
                    text("SELECT position FROM projection_checkpoints WHERE name = :n"),
                    {"n": name},
                )
                row = r.first()
                return row.position if row else 0

        async def _save_checkpoint(self, name: str, position: int) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(
                    text("""
                        INSERT INTO projection_checkpoints (name, position)
                        VALUES (:n, :p)
                        ON CONFLICT (name) DO UPDATE SET position = :p
                    """),
                    {"n": name, "p": position},
                )

        async def start(self) -> None:
            await self._ensure_checkpoint_table()
            for p in self._projections:
                await p.initialize()
            self._running = True
            asyncio.create_task(self._run())

        async def stop(self) -> None:
            self._running = False

        async def _run(self) -> None:
            checkpoints: dict[str, int] = {
                p.name: await self._load_checkpoint(p.name)
                for p in self._projections
            }
            while self._running:
                for projection in self._projections:
                    pos = checkpoints[projection.name]
                    events = await self._store.read_all_events(pos, self._batch_size)
                    if not events:
                        continue
                    for event in events:
                        await projection.apply(event)
                    new_pos = events[-1].id
                    await self._save_checkpoint(projection.name, new_pos)
                    checkpoints[projection.name] = new_pos
                await asyncio.sleep(self._poll_interval)


    class AccountBalanceProjection:
        name = "account-balances"

        def __init__(self, engine: AsyncEngine) -> None:
            self._engine = engine

        async def initialize(self) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS account_balances (
                        account_id VARCHAR(255) PRIMARY KEY,
                        owner_id VARCHAR(255) NOT NULL,
                        balance NUMERIC(15, 2) DEFAULT 0,
                        status VARCHAR(50) DEFAULT 'active',
                        updated_at TIMESTAMPTZ DEFAULT NOW()
                    );
                """))

        async def apply(self, event: StoredEvent) -> None:
            account_id = event.stream_id.split("-", 1)[1]
            async with self._engine.begin() as conn:
                if event.event_type == "AccountOpened":
                    await conn.execute(text("""
                        INSERT INTO account_balances (account_id, owner_id, balance)
                        VALUES (:aid, :oid, 0)
                        ON CONFLICT (account_id) DO NOTHING
                    """), {"aid": event.event_data["account_id"],
                           "oid": event.event_data["owner_id"]})
                elif event.event_type == "MoneyDeposited":
                    await conn.execute(text("""
                        UPDATE account_balances
                        SET balance = balance + :amt, updated_at = NOW()
                        WHERE account_id = :aid
                    """), {"amt": event.event_data["amount"], "aid": account_id})
                elif event.event_type == "MoneyWithdrawn":
                    await conn.execute(text("""
                        UPDATE account_balances
                        SET balance = balance - :amt, updated_at = NOW()
                        WHERE account_id = :aid
                    """), {"amt": event.event_data["amount"], "aid": account_id})
                elif event.event_type == "AccountClosed":
                    await conn.execute(text("""
                        UPDATE account_balances
                        SET status = 'closed', updated_at = NOW()
                        WHERE account_id = :aid
                    """), {"aid": account_id})

        async def rebuild(self) -> None:
            """Truncating + resetting the checkpoint triggers a full replay."""
            async with self._engine.begin() as conn:
                await conn.execute(text("TRUNCATE account_balances"))
                await conn.execute(text("""
                    UPDATE projection_checkpoints
                    SET position = 0
                    WHERE name = :n
                """), {"n": self.name})


    class TransactionHistoryProjection:
        name = "transaction-history"

        def __init__(self, engine: AsyncEngine) -> None:
            self._engine = engine

        async def initialize(self) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS transaction_history (
                        id BIGSERIAL PRIMARY KEY,
                        account_id VARCHAR(255) NOT NULL,
                        transaction_type VARCHAR(50) NOT NULL,
                        amount NUMERIC(15, 2) NOT NULL,
                        description TEXT,
                        balance_after NUMERIC(15, 2),
                        created_at TIMESTAMPTZ NOT NULL
                    );
                    CREATE INDEX IF NOT EXISTS idx_tx_account
                        ON transaction_history(account_id, created_at DESC);
                """))

        async def apply(self, event: StoredEvent) -> None:
            if event.event_type not in ("MoneyDeposited", "MoneyWithdrawn"):
                return
            account_id = event.stream_id.split("-", 1)[1]
            tx_type = "deposit" if event.event_type == "MoneyDeposited" else "withdrawal"
            async with self._engine.begin() as conn:
                await conn.execute(text("""
                    INSERT INTO transaction_history
                        (account_id, transaction_type, amount, description, created_at)
                    VALUES (:aid, :ttype, :amt, :desc, :ts)
                """), {
                    "aid": account_id,
                    "ttype": tx_type,
                    "amt": event.event_data["amount"],
                    "desc": event.event_data.get("description", ""),
                    "ts": event.event_data["timestamp"],
                })


    class DailySummaryProjection:
        name = "daily-summary"

        def __init__(self, engine: AsyncEngine) -> None:
            self._engine = engine

        async def initialize(self) -> None:
            async with self._engine.begin() as conn:
                await conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS daily_summaries (
                        date DATE NOT NULL,
                        account_id VARCHAR(255) NOT NULL,
                        deposits_count INTEGER DEFAULT 0,
                        deposits_total NUMERIC(15, 2) DEFAULT 0,
                        withdrawals_count INTEGER DEFAULT 0,
                        withdrawals_total NUMERIC(15, 2) DEFAULT 0,
                        PRIMARY KEY (date, account_id)
                    );
                """))

        async def apply(self, event: StoredEvent) -> None:
            date = event.created_at.date().isoformat()
            account_id = event.stream_id.split("-", 1)[1]
            async with self._engine.begin() as conn:
                if event.event_type == "MoneyDeposited":
                    await conn.execute(text("""
                        INSERT INTO daily_summaries
                            (date, account_id, deposits_count, deposits_total)
                        VALUES (:d, :aid, 1, :amt)
                        ON CONFLICT (date, account_id) DO UPDATE SET
                            deposits_count = daily_summaries.deposits_count + 1,
                            deposits_total = daily_summaries.deposits_total + :amt
                    """), {"d": date, "aid": account_id,
                           "amt": event.event_data["amount"]})
                elif event.event_type == "MoneyWithdrawn":
                    await conn.execute(text("""
                        INSERT INTO daily_summaries
                            (date, account_id, withdrawals_count, withdrawals_total)
                        VALUES (:d, :aid, 1, :amt)
                        ON CONFLICT (date, account_id) DO UPDATE SET
                            withdrawals_count = daily_summaries.withdrawals_count + 1,
                            withdrawals_total = daily_summaries.withdrawals_total + :amt
                    """), {"d": date, "aid": account_id,
                           "amt": event.event_data["amount"]})
    ```
  </Tab>
</Tabs>

<Warning>
  **Caveats and Common Pitfalls for Projections**

  1. **Projection rebuild time explodes at scale.** A projection that rebuilds in 20 minutes at launch takes 6 hours three years later. Teams discover this during an emergency projection fix and realize they cannot afford to rebuild during a critical incident. Rebuild time is not linear -- index maintenance and write amplification make it superlinear in many databases.
  2. **Non-idempotent projection handlers.** A handler that does `UPDATE balances SET balance = balance + amount` will double-apply if the checkpoint isn't saved transactionally with the update. Result: balances drift silently, and the drift is proportional to crash frequency.
  3. **Projection checkpoints saved outside the projection transaction.** This is the specific mistake that produces the non-idempotent symptom above. Checkpoint and write must be one atomic unit, or the write must be idempotent on event ID.
  4. **Treating projections as the source of truth** for business logic. A developer writes `if account_balances.balance < 0 then reject` instead of loading the aggregate. The projection is stale by hundreds of milliseconds, so under high contention the check passes but the aggregate has already been overdrafted. Reads for decisions must come from the aggregate; the projection is only for display.
</Warning>

<Tip>
  **Solutions and Patterns**

  * **Make every handler idempotent** by including the event ID in an upsert-with-unique-constraint. `INSERT INTO projection_applied_events(event_id) ... ON CONFLICT DO NOTHING` before the update, or embed the event ID in a uniqueness constraint on the projection row. Double applications become no-ops.
  * **Put checkpoint and write in the same transaction.** For PostgreSQL projections, a single transaction that updates the projection row and bumps the checkpoint row makes crash-safety trivial.
  * **Partition projections by stream hash** during rebuild so you can parallelize. A single-threaded rebuild of a billion-event projection will take 24+ hours; 16 workers bring it to 90 minutes.
  * **Keep separate "hot" and "cold" projections.** The hot projection serves live reads and only tracks the last 90 days; the cold projection is for historical reports and is rebuilt weekly. This bounds rebuild time for the critical path.
  * **Run shadow rebuilds regularly** (weekly or monthly) to validate that projections still match the event log. A silent projection bug that drifts by a cent per thousand events is invisible until it's not.
</Tip>

<AccordionGroup>
  <Accordion title="Scenario: A product manager deposits $500 via the UI, watches the balance stay at its old value for 2 seconds, and files a Sev-2 bug about 'the money disappeared'. How do you design around this complaint without losing the benefits of eventual consistency?">
    **Strong Answer Framework**

    1. **Return write-side state from command responses.** The deposit endpoint should return the updated balance from the aggregate's in-memory state directly, not query the projection afterward. This makes the user-visible behavior strongly consistent for the user's own actions.
    2. **Distinguish "read my own writes" from "read what others have done."** The PM's complaint is specifically about their own write. That is the easy case. Other users seeing your deposit can be eventually consistent; it's invisible to them.
    3. **Use versioned responses for polled updates.** If the UI polls the projection afterward (say, showing a transaction list), include the aggregate version in the command response and have the UI query `WHERE version >= X` so it knows when the projection has caught up.
    4. **Add read-your-writes caching with explicit invalidation.** The command response populates a short-lived per-user cache (30 seconds) with the new state. Reads check the cache first. This layer is local to the user's session and does not require distributed consistency.
    5. **Measure projection lag and alert on anomalies.** Healthy lag is under 500ms. If the 99th-percentile lag crosses 2 seconds, you have an infrastructure problem, not an eventual-consistency problem, and you should page on it.
    6. **Educate the product org with a 5-minute demo.** Show the PM: the balance updates immediately in the UI because the command response is authoritative, the projection catches up in 200ms for others, and the audit trail is bit-exact. The answer to the Sev-2 is a UX bug (UI should refresh from command response), not an architecture change.

    **Real-World Example**

    Shopify's engineering team published their approach to this in a 2021 engineering blog post about their CQRS usage on the checkout path. They returned the computed order total in the checkout command response, not from the projection, specifically because the 200-800ms projection lag was producing "I paid and it says I haven't" complaints at high-velocity flash sales. The fix added roughly 40 lines of code across the checkout service and eliminated the support ticket category entirely.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: What if the PM then asks 'why can't we just make projections synchronous so this never happens?'**

      **Strong Answer:** Synchronous projections defeat the point of CQRS. The write path now blocks on every downstream read model being updated -- adding a new projection means the write path gets slower. Failure of any one projection means writes fail. A single hot read model becomes a chokepoint for every write. The "eventually consistent, but consistent for my own writes" model gives 99% of the benefit at 10% of the cost. Synchronous would be a regression to the 2005 architecture you moved away from.
    </Note>

    <Note>
      **Follow-up 2: How does this interact with a mobile client that might be offline when the deposit happens?**

      **Strong Answer:** Offline-first design. The client queues the command locally and applies an optimistic event to its local projection immediately. When connectivity returns, the client replays the command, reconciles with the server response (which may reject, succeed, or conflict), and rolls forward or back. This is harder than a purely online UI, but it's a pure client-side concern -- the server's event sourcing architecture is agnostic to it. Libraries like Replicache or TinyBase implement the pattern.
    </Note>

    <Note>
      **Follow-up 3: If the projection takes 2 seconds to catch up, isn't that a bug regardless of UX?**

      **Strong Answer:** It might be, depending on load. A single projection worker processing a peak of 50k events/sec with complex handlers will lag during bursts. Solutions: parallelize the projection by stream hash, simplify handler logic (push complex derivations to a downstream projection), or scale the worker pool. Measure the 99th percentile lag under realistic load; if it's over 1 second sustained, you have a scalability problem and the UX fix is a workaround, not a solution. Fix the root cause too.
    </Note>

    **Common Wrong Answers**

    * *"Just read from the aggregate every time."* You've now reinvented a CRUD system -- the projection exists because querying the aggregate for read-heavy operations is expensive and doesn't support ad-hoc queries.
    * *"Add a polling loop in the UI that refreshes every 500ms."* Works until you have 100k concurrent users, at which point you've created a self-DDoS of your projection endpoint.

    **Further Reading**

    * Shopify Engineering, "Building Resilient GraphQL APIs" (2021) -- read-your-writes patterns used at scale.
    * Pat Helland, "Data on the Outside vs Data on the Inside" -- foundational paper on this exact tension.
    * Martin Kleppmann, *Designing Data-Intensive Applications*, Chapter 5 (Replication) -- covers read-your-writes and monotonic read consistency in detail.
  </Accordion>

  <Accordion title="Scenario: You want to introduce a new 'user_activity_by_region' projection that requires joining events from 4 aggregates. How do you design it to avoid coupling the projector to internal aggregate shapes?">
    **Strong Answer Framework**

    1. **Subscribe to events, not aggregates.** The projector registers for the specific event types it cares about (`UserLoggedIn`, `UserPurchased`, `UserReferralSubmitted`, `UserProfileUpdated`). It never loads the aggregate; it only reacts to events.
    2. **Define a public event contract.** Every event type has a stable schema, versioned and documented. The projector depends on the contract, not the aggregate's internal representation. If the aggregate refactors its private fields, the projector is unaffected as long as the events stay compatible.
    3. **Use correlation IDs in metadata.** When the projector joins events across aggregates (this purchase is from this login session), the events share a correlation ID in metadata set by the command path. The projector correlates on metadata, not by rebuilding aggregate state.
    4. **Keep the projection schema denormalized.** Don't store foreign keys back to aggregates. Store whatever denormalized data the queries need. If the underlying aggregate changes, the denormalized snapshot in the projection is a point-in-time record, not a live reference.
    5. **Make the projection rebuildable.** A new projection starts from global position zero and processes the entire log. This must be feasible -- hence the rebuild performance work covered earlier. If the log is 2 billion events, partitioning the rebuild across workers is mandatory.
    6. **Version the projection schema.** The projection table has a `schema_version` column. When the schema changes, the rebuild writes new rows with the new version and an atomic swap updates readers.

    **Real-World Example**

    Walmart's e-commerce team, in their 2019 QCon talk on CQRS at scale, described adding a "search-ready product projection" that joined `ProductCreated`, `PriceAdjusted`, `InventoryUpdated`, and `ReviewPosted` events across four different aggregates. They consumed only the public events (documented in an internal schema registry) and populated an Elasticsearch-backed projection. Crucially, the projector did not call back to the aggregates' services; it only ran from the event log, which meant search performance was independent of the write-side's load.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: What if one of the four event types was designed before you had a correlation ID standard and doesn't carry one?**

      **Strong Answer:** Upcast. Add a metadata-only upcaster that infers the correlation ID from other fields (timestamp proximity, user ID, session ID from a lookup table) and injects it at read time. The stored event is unchanged; the projector sees events that appear to have always had the correlation ID. This is a legitimate use of the upcaster pattern -- evolving not just the payload but the metadata surface over time.
    </Note>

    <Note>
      **Follow-up 2: The projection table is in PostgreSQL but a new use case wants to run complex text search across the same data. Do you build a second projection or extend the first?**

      **Strong Answer:** Second projection, always. Different read models need different storage engines (Elasticsearch for search, Redis for hot lookups, ClickHouse for analytics). A single PostgreSQL projection trying to serve all three will fail at one of them. The cost of an additional projection is a projector process and some storage; the benefit is each projection uses the right tool for its queries. CQRS at its best.
    </Note>

    <Note>
      **Follow-up 3: How do you handle the case where two events in different aggregates have the same `correlation_id` but arrive in the projector out of order (event B from aggregate 2 arrives before event A from aggregate 1)?**

      **Strong Answer:** Projections must be tolerant of out-of-order arrival across streams -- you only get ordering guarantees within a single stream. Design the projection handlers so that late-arriving events can update rows already present, or use a "waiting" table for partial correlations that complete later. For strict cross-stream ordering requirements, serialize the events you care about into a single "causal stream" via an upstream orchestrator; this is rare and expensive, so avoid it when possible.
    </Note>

    **Common Wrong Answers**

    * *"The projector should call the aggregates' query APIs to enrich events."* This couples the projector to the aggregates' runtime availability and re-introduces the N+1 problem you were trying to avoid with event sourcing.
    * *"Store the full aggregate snapshot in each event."* Events balloon in size, storage costs explode, and the aggregate's internal shape becomes baked into the event -- the opposite of decoupling.

    **Further Reading**

    * Greg Young, "CQRS and Event Sourcing" (Code on the Beach 2014) -- projections as first-class citizens.
    * Walmart Labs, "CQRS on a 100-Service Platform" QCon 2019 talk -- real-world multi-aggregate projections.
    * Confluent, "Streaming Joins with Kafka Streams" -- the exact problem solved in a Kafka-native style.
  </Accordion>
</AccordionGroup>

***

## Event Versioning & Schema Evolution

Schema evolution is the Achilles heel of event sourcing, and the reason many teams abandon it. The core tension: events are immutable facts, but your understanding of those facts evolves over time. You added a `currency` field because the business went international. You split `name` into `firstName` and `lastName` because GDPR required granular data handling. Now you have 200 million events in production with the old shape, and your domain code wants to work with the new shape.

The wrong solutions: (1) migrate old events in place -- this breaks immutability, destroys audit trails, and is operationally terrifying; (2) branch your domain code with `if event.version === 1 ... else ...` -- this turns your aggregate into a museum of historical schemas and becomes unmaintainable within six months; (3) ignore the problem and let old events fail validation -- congrats, you just broke rehydration for every aggregate that existed before the change.

The right solution is **upcasting**: store events exactly as they were written, but apply a chain of transformations when reading them so that domain code only ever sees the latest shape. The transformation chain (v1 → v2 → v3) lives in one place, is unit-testable in isolation, and can be reviewed by anyone asking "how did we evolve this event?" This preserves the sacred invariant (stored events are immutable) while still letting the domain evolve freely. The cost is a small runtime overhead per read, and the discipline of always versioning new events so the chain knows where to start.

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    EVENT SCHEMA EVOLUTION                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  PROBLEM: Events are immutable, but requirements change                     │
│                                                                              │
│  V1: MoneyDeposited { amount: 100 }                                         │
│  V2: MoneyDeposited { amount: 100, currency: 'USD' }  ← New field           │
│  V3: MoneyDeposited { amount: { value: 100, currency: 'USD' } }  ← Changed  │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════   │
│                                                                              │
│  SOLUTION: UPCASTING                                                        │
│  ─────────────────────                                                      │
│                                                                              │
│  Store original event → Transform when reading                              │
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  Event Store                     Upcaster                 App       │   │
│  │  ┌──────────────┐               ┌──────────────┐       ┌────────┐  │   │
│  │  │ V1 Event     │──────────────▶│ v1 → v2 → v3 │──────▶│ V3     │  │   │
│  │  └──────────────┘               └──────────────┘       └────────┘  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
│  Events stay immutable, transformation happens at read time                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

### Upcasting Implementation

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // event-upcaster.js - Event schema evolution

    class EventUpcaster {
      constructor() {
        this.upcasters = new Map();
      }

      // Register an upcaster for an event type and version
      register(eventType, fromVersion, toVersion, upcasterFn) {
        const key = `${eventType}:${fromVersion}`;
        this.upcasters.set(key, {
          toVersion,
          transform: upcasterFn
        });
      }

      // Upcast event to latest version
      upcast(event) {
        let currentEvent = { ...event };
        let currentVersion = event.metadata?.version || 1;
        
        while (true) {
          const key = `${event.event_type}:${currentVersion}`;
          const upcaster = this.upcasters.get(key);
          
          if (!upcaster) break; // No more upcasts needed
          
          currentEvent = {
            ...currentEvent,
            event_data: upcaster.transform(currentEvent.event_data),
            metadata: {
              ...currentEvent.metadata,
              version: upcaster.toVersion,
              originalVersion: event.metadata?.version || 1
            }
          };
          currentVersion = upcaster.toVersion;
        }
        
        return currentEvent;
      }
    }

    // Register upcasters
    const upcaster = new EventUpcaster();

    // MoneyDeposited v1 → v2 (add currency)
    upcaster.register('MoneyDeposited', 1, 2, (data) => ({
      ...data,
      currency: 'USD' // Default to USD for old events
    }));

    // MoneyDeposited v2 → v3 (restructure amount)
    upcaster.register('MoneyDeposited', 2, 3, (data) => ({
      amount: {
        value: data.amount,
        currency: data.currency
      },
      description: data.description,
      timestamp: data.timestamp
    }));

    // UserRegistered v1 → v2 (split name into firstName/lastName)
    upcaster.register('UserRegistered', 1, 2, (data) => {
      const [firstName, ...rest] = (data.name || '').split(' ');
      return {
        ...data,
        firstName: firstName || '',
        lastName: rest.join(' ') || '',
        name: undefined // Remove old field
      };
    });

    // Event store with upcasting
    class UpcastingEventStore {
      constructor(eventStore, upcaster) {
        this.eventStore = eventStore;
        this.upcaster = upcaster;
      }

      async readStream(streamId, fromVersion = 0) {
        const events = await this.eventStore.readStream(streamId, fromVersion);
        return events.map(e => this.upcaster.upcast(e));
      }

      async readAllEvents(fromPosition = 0, batchSize = 1000) {
        const events = await this.eventStore.readAllEvents(fromPosition, batchSize);
        return events.map(e => this.upcaster.upcast(e));
      }

      // Write methods don't upcast (always write latest version)
      async appendToStream(streamId, streamType, events, expectedVersion) {
        // Add version metadata to new events
        const versionedEvents = events.map(e => ({
          ...e,
          metadata: {
            ...e.metadata,
            version: this.getLatestVersion(e.type)
          }
        }));
        
        return this.eventStore.appendToStream(
          streamId, streamType, versionedEvents, expectedVersion
        );
      }

      getLatestVersion(eventType) {
        // Return the latest version for each event type
        const versions = {
          'MoneyDeposited': 3,
          'MoneyWithdrawn': 2,
          'UserRegistered': 2
        };
        return versions[eventType] || 1;
      }
    }

    module.exports = { EventUpcaster, UpcastingEventStore };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # event_upcaster.py - Event schema evolution via upcasting chain.

    from __future__ import annotations

    from copy import deepcopy
    from dataclasses import dataclass
    from typing import Any, Callable

    from event_store import EventStore, NewEvent, StoredEvent

    UpcasterFn = Callable[[dict[str, Any]], dict[str, Any]]


    @dataclass(frozen=True)
    class UpcasterEntry:
        to_version: int
        transform: UpcasterFn


    class EventUpcaster:
        def __init__(self) -> None:
            self._upcasters: dict[tuple[str, int], UpcasterEntry] = {}
            self._latest_versions: dict[str, int] = {}

        def register(
            self, event_type: str, from_version: int, to_version: int,
            fn: UpcasterFn,
        ) -> None:
            self._upcasters[(event_type, from_version)] = UpcasterEntry(to_version, fn)
            # Track the highest known version per type.
            current_latest = self._latest_versions.get(event_type, 1)
            self._latest_versions[event_type] = max(current_latest, to_version)

        def latest_version(self, event_type: str) -> int:
            return self._latest_versions.get(event_type, 1)

        def upcast(self, event: StoredEvent) -> StoredEvent:
            data = deepcopy(event.event_data)
            metadata = deepcopy(event.metadata)
            version = metadata.get("version", 1)

            while True:
                entry = self._upcasters.get((event.event_type, version))
                if entry is None:
                    break
                data = entry.transform(data)
                metadata = {
                    **metadata,
                    "version": entry.to_version,
                    "original_version": event.metadata.get("version", 1),
                }
                version = entry.to_version

            return StoredEvent(
                id=event.id,
                stream_id=event.stream_id,
                stream_type=event.stream_type,
                event_type=event.event_type,
                event_data=data,
                metadata=metadata,
                version=event.version,
                created_at=event.created_at,
            )


    # -- Register upcasters -----------------------------------------------------

    upcaster = EventUpcaster()


    def _money_deposited_v1_to_v2(data: dict[str, Any]) -> dict[str, Any]:
        return {**data, "currency": "USD"}  # Default for legacy events.


    def _money_deposited_v2_to_v3(data: dict[str, Any]) -> dict[str, Any]:
        return {
            "amount": {"value": data["amount"], "currency": data["currency"]},
            "description": data.get("description", ""),
            "timestamp": data["timestamp"],
        }


    def _user_registered_v1_to_v2(data: dict[str, Any]) -> dict[str, Any]:
        first, *rest = (data.get("name") or "").split(" ", 1)
        out = {**data, "first_name": first, "last_name": rest[0] if rest else ""}
        out.pop("name", None)
        return out


    upcaster.register("MoneyDeposited", 1, 2, _money_deposited_v1_to_v2)
    upcaster.register("MoneyDeposited", 2, 3, _money_deposited_v2_to_v3)
    upcaster.register("UserRegistered", 1, 2, _user_registered_v1_to_v2)


    # -- Decorator store that applies upcasting on read only -------------------

    class UpcastingEventStore:
        """Wraps an EventStore, upcasting on every read and stamping version
        metadata on every write so future versions can find their starting point.
        """

        def __init__(self, inner: EventStore, upcaster: EventUpcaster) -> None:
            self._inner = inner
            self._upcaster = upcaster

        async def read_stream(self, stream_id: str, from_version: int = 0):
            events = await self._inner.read_stream(stream_id, from_version)
            return [self._upcaster.upcast(e) for e in events]

        async def read_all_events(self, from_position: int = 0, batch_size: int = 1000):
            events = await self._inner.read_all_events(from_position, batch_size)
            return [self._upcaster.upcast(e) for e in events]

        async def append_to_stream(
            self, stream_id: str, stream_type: str,
            events: list[NewEvent], expected_version: int | None = None,
        ):
            versioned = [
                NewEvent(
                    type=e.type,
                    data=e.data,
                    metadata={**e.metadata,
                              "version": self._upcaster.latest_version(e.type)},
                )
                for e in events
            ]
            return await self._inner.append_to_stream(
                stream_id, stream_type, versioned, expected_version
            )
    ```
  </Tab>
</Tabs>

<Warning>
  **Caveats and Common Pitfalls for Schema Evolution**

  1. **Schema changes break old consumers silently.** A subgraph or projection was deployed three years ago and nobody remembers it exists. You rename a field, the upcaster handles the new code, but that forgotten consumer reads events directly and now breaks in a way that only appears at 3 AM on a quarterly batch run.
  2. **The upcaster chain becomes a graveyard of obsolete transformations.** After five years, you have a 17-step upcaster chain where steps 3, 7, 9, and 14 handle events nobody has produced in years. Applied to every event read, this is CPU waste and a debugging nightmare.
  3. **Branching domain logic on event version.** Instead of upcasting once on read, you litter the aggregate with `if event.version === 1 do X else do Y`. Within a year, the aggregate is 60% historical version handling.
  4. **Forgetting to unit-test upcasters against real old events.** The upcaster works on synthetic v1 events in tests. In production, a v1 event from 2019 has a field nobody remembered (set by a code path that was deleted), and the upcaster crashes on it. You discover this during projection rebuild.
  5. **Treating a semantic change as a cosmetic one.** Renaming `amount` to `amount_cents` and multiplying by 100 in the upcaster is a behavior change disguised as a schema change. If any downstream consumer was interpreting the old field differently, you have silently corrupted data.
</Warning>

<Tip>
  **Solutions and Patterns**

  * **Prefer additive changes.** Adding a new nullable field is backward-compatible -- old consumers ignore it, new consumers handle it with defaults. Removing or renaming is breaking. Design your events to minimize removals.
  * **Upcast once, at the edge.** The repository/read layer upcasts events before handing them to domain code. The domain code only sees the latest version. No conditionals in the aggregate.
  * **Retire upcasters by squashing.** Every few years, run a batch job that reads all events through the current upcaster chain and writes them back as v-latest to a new event store. Delete the old upcasters. This is operationally expensive; plan it during a scheduled maintenance.
  * **Test upcasters against production event samples.** Pull 10k random events from production, run them through the upcaster, verify the output matches the latest schema. This catches the "forgotten field" class of bugs.
  * **Version the aggregate's expected schema, not just the event.** Record in metadata "this event was produced by aggregate-schema-v5" so the upcaster can find the right starting point after refactors.
</Tip>

<AccordionGroup>
  <Accordion title="Scenario: You need to rename `OrderPlaced.total` to `OrderPlaced.total_cents` and change its unit from dollars to cents (so $10.00 becomes 1000). 180 million old events exist. How do you ship this safely?">
    **Strong Answer Framework**

    1. **Stop and confirm this is the right design.** Renaming and changing semantics in one step is asking for trouble. Two separate changes are safer: first rename the field, then later change the unit. Ship them weeks apart so any wrong assumptions surface in the first rollout rather than compounding in the second.
    2. **Freeze the v1 schema in writing.** Document that all old events are "v1: total in dollars as decimal." This is the ground truth the upcaster will transform from.
    3. **Introduce `OrderPlaced_v2` with `total_cents: integer`.** New events are written in v2. The upcaster converts v1 `total` (dollars, decimal) to v2 `total_cents` (integer, cents) by multiplying by 100 and rounding. Rounding choice is explicit and documented.
    4. **Stage the rollout.** Deploy the upcaster to read paths first; verify rebuild of one small projection produces correct values (spot-check 100 events manually). Then deploy the write-path change so new events are v2.
    5. **Run both in parallel for a week.** Two projections -- one reading v1-shaped events (legacy), one reading v2-shaped events (new). Diff their outputs on the same stream. Any discrepancy indicates an upcaster bug.
    6. **Notify downstream consumers.** If Kafka or a webhook publishes these events, consumers need to know about the schema change. Preferably dual-publish for a transition period: send both v1 and v2 on the bus, consumers migrate at their own pace, v1 is retired when usage drops to zero.
    7. **Only after verification, flip read-side defaults.** New code reads v2-shape only. The upcaster is doing its job. Old code paths that read v1 directly are found via grep and fixed.

    **Real-World Example**

    In 2017, Stripe documented their approach to an amount/currency refactor in a blog post about API versioning. They moved from "amount in a currency-implicit unit" to "amount in smallest currency unit" (cents for USD, yen for JPY) across their webhook event history. The rollout was API-versioned: consumers subscribing as `v1` saw events in the old shape, `v2` subscribers saw the new shape, and both shapes were served from the same underlying events via transformation at the edge. The transformation logic was \~150 lines, had a 98% unit test coverage, and was left in place for 3 years until the last v1 subscriber migrated.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: What if during the dual-publishing period, a consumer processes both v1 and v2 and double-charges a customer?**

      **Strong Answer:** The dual-publish must include an event identity -- same UUID in v1 and v2 representations. Consumers that might process both must dedupe on event UUID before applying. Alternatively, route v1 and v2 to different topics so consumers subscribe to exactly one. This is a coordination problem with downstreams; plan the communication as carefully as the code.
    </Note>

    <Note>
      **Follow-up 2: Your rounding (multiply by 100) introduces floating-point error for some legacy values. How do you handle a \$10.005 that becomes 1001 instead of 1000?**

      **Strong Answer:** This is exactly why the separate-rename-before-unit-change advice matters -- surfaces the issue earlier. For the rounding choice, document the rule (banker's rounding, floor, ceiling) and apply consistently. For any ambiguous values, emit a one-time migration event (`TotalConverted {from_dollars: 10.005, to_cents: 1001, reason: "half-up"}`) as an audit trail. Rounding is a business decision, not a technical one -- get finance sign-off before shipping.
    </Note>

    <Note>
      **Follow-up 3: The upcaster is now 50ms per event on cold reads. Rebuilding a projection is 10x slower than it used to be. What do you do?**

      **Strong Answer:** Two options. First, squash old events into their v-latest form via a one-time migration: rewrite the physical event store with the upcasted payloads stored directly, retire the upcaster. This is expensive and loses the "events are immutable" property mildly (the original bytes are gone), but it's operationally clean. Second, cache upcaster outputs in a side table keyed by event ID so repeat reads skip the transformation. This preserves immutability but doubles storage for hot events.
    </Note>

    **Common Wrong Answers**

    * *"We'll update all the old events in place with an UPDATE statement."* This destroys the immutability guarantee. Every subscriber that checked checksums, every hash chain, every offline backup now disagrees with production.
    * *"We'll write a new service that owns the new schema and deprecate the old one."* Creates two sources of truth, doubles operational cost, and migration becomes a multi-year project. Upcasting in-place is the pattern for a reason.

    **Further Reading**

    * Stripe Engineering, "API Upgrades" (2017) -- the exact pattern applied to webhook schemas.
    * Greg Young, "Versioning in an Event Sourced System" (2017 PDF book) -- book-length treatment of this problem.
    * Martin Fowler, "Event Interception" -- taxonomy of where to place transformations in the event pipeline.
  </Accordion>

  <Accordion title="Scenario: You inherited an event-sourced system that has 9 event types, each with between 2 and 5 schema versions, and no documentation. The upcaster is a 400-line function. How do you get it under control?">
    **Strong Answer Framework**

    1. **Reverse-engineer the schemas from production samples.** For each event type, pull 1000 events spanning the lifetime of the system. Extract the distinct JSON schemas (via tools like `quicktype` or a simple field-presence analyzer). You now have a map of what "v1" actually looks like in production versus what the code assumes.
    2. **Document the upcaster chain.** For each upcaster step, write a paragraph: what changed, why, when. If the reason is "the product team renamed the field," note the PR that did it. If the reason is lost, mark it "unknown origin -- do not remove without investigation." This is 80% of the value, generated in a few afternoons.
    3. **Add contract tests.** For each event type at each version, add a test that produces a canonical sample event, runs it through the upcaster, and asserts the output matches the current domain type. Break any upcaster that fails -- you now have a regression safety net.
    4. **Identify "dead" upcasters.** An upcaster handling v1 → v2 of an event type that hasn't been produced since 2019 -- check whether any v1 events still exist in the store. If not, the upcaster is dead code and can be removed (with a note in the commit that says so).
    5. **Plan a compaction window.** Schedule a migration to rewrite ancient events in their v-latest shape so upcasters for the oldest versions can retire. Do this only after contract tests are in place and only during low-traffic windows.
    6. **Institutionalize the discipline.** Every new event version gets its own migration function with a clear name (`money_deposited_v2_to_v3_add_currency`), a documented reason, and a test. The 400-line function becomes a directory of small, testable functions.

    **Real-World Example**

    An engineering manager at Klarna gave a talk at DomainDrivenDesignEurope 2022 describing inheriting exactly this scenario: an event-sourced payment system with 12 event types, 37 schema versions, and no docs. Over 6 months, her team extracted 400 lines of upcasting into 37 named functions, added a contract test suite that ran against a production event sample, and retired 9 upcasters by compacting old events. Key insight from the talk: the hardest part was not the code but documenting why each transformation existed -- that required interviewing former team members and reconstructing intent from commit messages.

    **Senior Follow-up Questions**

    <Note>
      **Follow-up 1: You find two upcaster steps for the same version transition, both active, doing subtly different things. Which one is right?**

      **Strong Answer:** Neither, probably. Load a sample event, run both, compare outputs on a representative set. If they agree, pick the one that's clearer and delete the other. If they disagree, a production projection is already using one of them -- trace which and reason about whether the behavior is business-correct. This is exactly why contract tests are step 3 of the framework: they surface these conflicts before you touch the code.
    </Note>

    <Note>
      **Follow-up 2: A team member wants to delete an "obviously dead" upcaster. How do you decide whether it's safe?**

      **Strong Answer:** Query the event store: `SELECT COUNT(*) FROM events WHERE event_type = 'OrderPlaced' AND metadata->>'schema_version' = '1'`. If the count is zero, the upcaster is dead. If it's nonzero, the upcaster is live and removal would break reads of those events. For borderline cases (100 events still v1), you either migrate those 100 events to v-latest explicitly or keep the upcaster. Don't ever delete an upcaster without that query.
    </Note>

    <Note>
      **Follow-up 3: The upcaster has a bug that's been shipping wrong values for a year. Fixing it changes every historical projection rebuild. How do you deploy the fix?**

      **Strong Answer:** Staged. First, fix the upcaster on a feature branch. Second, build a new projection in staging using the fixed upcaster and diff it against production. Quantify the drift: how many rows change, by how much, for which accounts. Third, decide if the drift is acceptable to deploy silently (usually not, because finance teams notice) or requires a formal disclosure. Fourth, if disclosure is needed, coordinate with legal/finance/comms before deployment. Fifth, ship the fix and rebuild all dependent projections. Sixth, issue any required user-facing corrections. Rollback plan: keep the old upcaster in a feature flag for 30 days so you can revert if a worse bug surfaces.
    </Note>

    **Common Wrong Answers**

    * *"Rewrite the whole upcaster from scratch."* Dangerous without contract tests; you will lose subtle business logic encoded in the current one.
    * *"Leave it alone, it works."* Technical debt compounds. The next schema change will be twice as hard.

    **Further Reading**

    * Vaughn Vernon, *Implementing Domain-Driven Design*, Chapter 8 (Domain Events) -- event evolution principles.
    * Klarna Engineering Blog, "Taming the Event Store" (2022) -- the real inherited-codebase war story.
    * Eric Evans, "Domain-Driven Design Reference" -- vocabulary for discussing event schemas with business stakeholders.
  </Accordion>
</AccordionGroup>

***

## CQRS with Event Sourcing

CQRS stands for Command Query Responsibility Segregation -- a pattern that says "the model you use to change state should be different from the model you use to read state." Event sourcing and CQRS are often discussed together because they reinforce each other: event sourcing gives you a natural write model (aggregates emitting events) and a natural way to build read models (projections subscribing to events). You can do CQRS without event sourcing (split writes and reads in a traditional relational DB), and you can do event sourcing without CQRS (queries that load and replay aggregates). But together they're a coherent architectural style.

The "why" to internalize: in a traditional CRUD service, your ORM entity is simultaneously the write model (the thing the business logic mutates) and the read model (the thing you return from GET endpoints). This tension leads to ugly compromises -- you add a field for reads that has no business meaning, you denormalize for query performance at the cost of write complexity, you add joins that slow mutations. CQRS says: stop contorting one model. Use aggregates with rich domain logic for writes, and flat, query-optimized tables for reads, connected asynchronously via events.

For microservices specifically, CQRS gets more interesting because each side can scale independently. The write side (where business invariants must be enforced) might be a small, carefully-guarded service with strong consistency. The read side (where projections serve dashboards, search, and reports) can be sharded, replicated, or spun up in multiple regions for latency, without affecting the write-side correctness. And since read models are disposable, you can add regional read replicas without migrating any production data -- just replay events into new projections.

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    CQRS + EVENT SOURCING                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                              ┌─────────────────────┐                        │
│                              │      Client         │                        │
│                              └──────────┬──────────┘                        │
│                                         │                                   │
│                    ┌────────────────────┴────────────────────┐              │
│                    │                                         │              │
│                    ▼                                         ▼              │
│           ┌────────────────┐                      ┌────────────────┐       │
│           │ Command Side   │                      │  Query Side    │       │
│           │ (Write)        │                      │  (Read)        │       │
│           └───────┬────────┘                      └────────┬───────┘       │
│                   │                                        │                │
│           ┌───────▼────────┐                      ┌────────▼───────┐       │
│           │ Command Handler│                      │ Query Handler  │       │
│           └───────┬────────┘                      └────────┬───────┘       │
│                   │                                        │                │
│           ┌───────▼────────┐                      ┌────────▼───────┐       │
│           │  Aggregates    │                      │  Read Models   │       │
│           │  (Domain Logic)│                      │  (Projections) │       │
│           └───────┬────────┘                      └────────▲───────┘       │
│                   │                                        │                │
│           ┌───────▼────────┐         events        ┌───────┴───────┐       │
│           │  Event Store   │──────────────────────▶│  Projector    │       │
│           │  (Append-only) │                       │               │       │
│           └────────────────┘                       └───────────────┘       │
│                                                                              │
│  WRITE PATH: Command → Aggregate → Events → Event Store                    │
│  READ PATH: Query → Read Model (optimized for queries)                     │
│  SYNC: Event Store → Projector → Read Models (eventually consistent)       │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

### Complete CQRS Implementation

A critical design choice here: commands return "acknowledgement that the command succeeded" and **not** the resulting read-side state. If your `deposit` endpoint tries to return the updated balance by reading from the projection, you've introduced a race condition -- the projection may not have processed the event yet. Either return the write-side balance (which is the aggregate's in-memory state, guaranteed current) or return just the acknowledgement and let the client refresh. Mixing the two creates subtle bugs that appear only under load.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // cqrs-service.js - CQRS + Event Sourcing service

    const express = require('express');
    const { EventStore } = require('./event-store');
    const { EventSourcedRepository } = require('./repository');
    const { BankAccount } = require('./aggregate');
    const {
      ProjectionManager,
      AccountBalanceProjection,
      TransactionHistoryProjection
    } = require('./projections');

    class BankAccountService {
      constructor(db) {
        this.eventStore = new EventStore(db);
        this.repository = new EventSourcedRepository(
          this.eventStore,
          'BankAccount',
          BankAccount
        );
        
        // Read models
        this.balanceProjection = new AccountBalanceProjection(db);
        this.transactionProjection = new TransactionHistoryProjection(db);
        
        this.db = db;
      }

      async initialize() {
        await this.eventStore.initialize();
        await this.balanceProjection.initialize();
        await this.transactionProjection.initialize();
        
        // Start projection processing
        const projectionManager = new ProjectionManager(this.eventStore, this.db);
        projectionManager.register('account-balances', this.balanceProjection);
        projectionManager.register('transactions', this.transactionProjection);
        await projectionManager.start();
      }

      // COMMAND HANDLERS (Write Side)
      
      async openAccount(command) {
        const { accountId, ownerId, initialDeposit } = command;
        
        // Check if account already exists
        const existing = await this.repository.getById(accountId);
        if (existing) {
          throw new Error('Account already exists');
        }
        
        // Create new account
        const account = BankAccount.open(accountId, ownerId, initialDeposit);
        await this.repository.save(account);
        
        return { accountId: account.id, balance: account.balance };
      }

      async deposit(command) {
        const { accountId, amount, description } = command;
        
        const account = await this.repository.getById(accountId);
        if (!account) {
          throw new Error('Account not found');
        }
        
        account.deposit(amount, description);
        await this.repository.save(account);
        
        return { accountId, newBalance: account.balance };
      }

      async withdraw(command) {
        const { accountId, amount, description } = command;
        
        const account = await this.repository.getById(accountId);
        if (!account) {
          throw new Error('Account not found');
        }
        
        account.withdraw(amount, description);
        await this.repository.save(account);
        
        return { accountId, newBalance: account.balance };
      }

      async closeAccount(command) {
        const { accountId, reason } = command;
        
        const account = await this.repository.getById(accountId);
        if (!account) {
          throw new Error('Account not found');
        }
        
        account.close(reason);
        await this.repository.save(account);
        
        return { accountId, status: 'closed' };
      }

      // QUERY HANDLERS (Read Side) - Use projections
      
      async getAccountBalance(accountId) {
        // Query from read model (fast, optimized)
        const result = await this.db.query(
          `SELECT * FROM account_balances WHERE account_id = $1`,
          [accountId]
        );
        
        if (result.rows.length === 0) {
          throw new Error('Account not found');
        }
        
        return result.rows[0];
      }

      async getTransactionHistory(accountId, limit = 50) {
        const result = await this.db.query(
          `SELECT * FROM transaction_history 
           WHERE account_id = $1 
           ORDER BY created_at DESC 
           LIMIT $2`,
          [accountId, limit]
        );
        
        return result.rows;
      }

      async getAllAccounts(ownerId) {
        const result = await this.db.query(
          `SELECT * FROM account_balances WHERE owner_id = $1`,
          [ownerId]
        );
        
        return result.rows;
      }

      async getAccountSummary() {
        const result = await this.db.query(`
          SELECT 
            COUNT(*) as total_accounts,
            SUM(balance) as total_balance,
            AVG(balance) as average_balance,
            COUNT(*) FILTER (WHERE status = 'active') as active_accounts,
            COUNT(*) FILTER (WHERE status = 'closed') as closed_accounts
          FROM account_balances
        `);
        
        return result.rows[0];
      }
    }

    // Express API
    const createApp = (service) => {
      const app = express();
      app.use(express.json());

      // Commands
      app.post('/accounts', async (req, res, next) => {
        try {
          const result = await service.openAccount(req.body);
          res.status(201).json(result);
        } catch (error) {
          next(error);
        }
      });

      app.post('/accounts/:id/deposit', async (req, res, next) => {
        try {
          const result = await service.deposit({
            accountId: req.params.id,
            ...req.body
          });
          res.json(result);
        } catch (error) {
          next(error);
        }
      });

      app.post('/accounts/:id/withdraw', async (req, res, next) => {
        try {
          const result = await service.withdraw({
            accountId: req.params.id,
            ...req.body
          });
          res.json(result);
        } catch (error) {
          next(error);
        }
      });

      // Queries
      app.get('/accounts/:id', async (req, res, next) => {
        try {
          const balance = await service.getAccountBalance(req.params.id);
          res.json(balance);
        } catch (error) {
          next(error);
        }
      });

      app.get('/accounts/:id/transactions', async (req, res, next) => {
        try {
          const transactions = await service.getTransactionHistory(
            req.params.id,
            parseInt(req.query.limit) || 50
          );
          res.json(transactions);
        } catch (error) {
          next(error);
        }
      });

      app.get('/summary', async (req, res, next) => {
        try {
          const summary = await service.getAccountSummary();
          res.json(summary);
        } catch (error) {
          next(error);
        }
      });

      return app;
    };

    module.exports = { BankAccountService, createApp };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # cqrs_service.py - CQRS + Event Sourcing with FastAPI
    # Commands go through aggregates (write model), queries hit projections
    # (read model). Notice how the command endpoints return in-memory aggregate
    # state, NOT projection state -- avoids the eventual-consistency race.

    from __future__ import annotations

    from contextlib import asynccontextmanager
    from typing import Any

    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel, Field
    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

    from aggregate import BankAccount
    from event_store import EventStore
    from projections import (
        AccountBalanceProjection,
        DailySummaryProjection,
        ProjectionManager,
        TransactionHistoryProjection,
    )
    from repository import EventSourcedRepository, bank_account_event_factory


    # ----- Command / query DTOs ------------------------------------------------

    class OpenAccountCommand(BaseModel):
        account_id: str
        owner_id: str
        initial_deposit: float = Field(default=0, ge=0)


    class DepositCommand(BaseModel):
        amount: float = Field(gt=0)
        description: str = ""


    class WithdrawCommand(BaseModel):
        amount: float = Field(gt=0)
        description: str = ""


    class CloseCommand(BaseModel):
        reason: str = ""


    # ----- Service layer -------------------------------------------------------

    class BankAccountService:
        def __init__(self, engine: AsyncEngine) -> None:
            self._engine = engine
            self._store = EventStore(engine)
            self._repo: EventSourcedRepository[BankAccount] = EventSourcedRepository(
                self._store, "BankAccount", BankAccount, bank_account_event_factory,
            )
            self._balance_proj = AccountBalanceProjection(engine)
            self._tx_proj = TransactionHistoryProjection(engine)
            self._daily_proj = DailySummaryProjection(engine)
            self._manager: ProjectionManager | None = None

        async def initialize(self) -> None:
            await self._store.initialize()
            self._manager = ProjectionManager(self._store, self._engine)
            self._manager.register(self._balance_proj)
            self._manager.register(self._tx_proj)
            self._manager.register(self._daily_proj)
            await self._manager.start()

        # -- Command handlers ---------------------------------------------------

        async def open_account(self, cmd: OpenAccountCommand) -> dict[str, Any]:
            existing = await self._repo.get_by_id(cmd.account_id)
            if existing is not None:
                raise HTTPException(409, "Account already exists")
            account = BankAccount.open(cmd.account_id, cmd.owner_id, cmd.initial_deposit)
            await self._repo.save(account)
            return {"account_id": account.id, "balance": account.balance}

        async def deposit(self, account_id: str, cmd: DepositCommand) -> dict[str, Any]:
            account = await self._repo.get_by_id(account_id)
            if account is None:
                raise HTTPException(404, "Account not found")
            account.deposit(cmd.amount, cmd.description)
            await self._repo.save(account)
            return {"account_id": account_id, "new_balance": account.balance}

        async def withdraw(self, account_id: str, cmd: WithdrawCommand) -> dict[str, Any]:
            account = await self._repo.get_by_id(account_id)
            if account is None:
                raise HTTPException(404, "Account not found")
            try:
                account.withdraw(cmd.amount, cmd.description)
            except ValueError as exc:
                raise HTTPException(400, str(exc)) from exc
            await self._repo.save(account)
            return {"account_id": account_id, "new_balance": account.balance}

        async def close_account(self, account_id: str, cmd: CloseCommand) -> dict[str, Any]:
            account = await self._repo.get_by_id(account_id)
            if account is None:
                raise HTTPException(404, "Account not found")
            try:
                account.close(cmd.reason)
            except ValueError as exc:
                raise HTTPException(400, str(exc)) from exc
            await self._repo.save(account)
            return {"account_id": account_id, "status": "closed"}

        # -- Query handlers (projections) --------------------------------------

        async def get_account_balance(self, account_id: str) -> dict[str, Any]:
            async with self._engine.connect() as conn:
                row = (await conn.execute(
                    text("SELECT * FROM account_balances WHERE account_id = :aid"),
                    {"aid": account_id},
                )).first()
            if row is None:
                raise HTTPException(404, "Account not found")
            return dict(row._mapping)

        async def get_transaction_history(
            self, account_id: str, limit: int = 50,
        ) -> list[dict[str, Any]]:
            async with self._engine.connect() as conn:
                result = await conn.execute(
                    text("""
                        SELECT * FROM transaction_history
                        WHERE account_id = :aid
                        ORDER BY created_at DESC
                        LIMIT :lim
                    """),
                    {"aid": account_id, "lim": limit},
                )
                return [dict(row._mapping) for row in result]

        async def get_account_summary(self) -> dict[str, Any]:
            async with self._engine.connect() as conn:
                row = (await conn.execute(text("""
                    SELECT
                        COUNT(*) AS total_accounts,
                        SUM(balance) AS total_balance,
                        AVG(balance) AS average_balance,
                        COUNT(*) FILTER (WHERE status = 'active') AS active_accounts,
                        COUNT(*) FILTER (WHERE status = 'closed') AS closed_accounts
                    FROM account_balances
                """))).first()
            return dict(row._mapping) if row else {}


    # ----- FastAPI wiring ------------------------------------------------------

    def create_app(engine: AsyncEngine) -> FastAPI:
        service = BankAccountService(engine)

        @asynccontextmanager
        async def lifespan(app: FastAPI):
            await service.initialize()
            yield

        app = FastAPI(lifespan=lifespan)

        @app.post("/accounts", status_code=201)
        async def open_account(cmd: OpenAccountCommand):
            return await service.open_account(cmd)

        @app.post("/accounts/{account_id}/deposit")
        async def deposit(account_id: str, cmd: DepositCommand):
            return await service.deposit(account_id, cmd)

        @app.post("/accounts/{account_id}/withdraw")
        async def withdraw(account_id: str, cmd: WithdrawCommand):
            return await service.withdraw(account_id, cmd)

        @app.post("/accounts/{account_id}/close")
        async def close(account_id: str, cmd: CloseCommand):
            return await service.close_account(account_id, cmd)

        @app.get("/accounts/{account_id}")
        async def get_balance(account_id: str):
            return await service.get_account_balance(account_id)

        @app.get("/accounts/{account_id}/transactions")
        async def get_transactions(account_id: str, limit: int = 50):
            return await service.get_transaction_history(account_id, limit)

        @app.get("/summary")
        async def summary():
            return await service.get_account_summary()

        return app


    # if __name__ == "__main__":
    #     import uvicorn
    #     engine = create_async_engine(
    #         "postgresql+asyncpg://user:pass@localhost/bank", echo=False,
    #     )
    #     uvicorn.run(create_app(engine), host="0.0.0.0", port=8000)
    ```
  </Tab>
</Tabs>

<Warning>
  **Caveats and Common Pitfalls for CQRS + Event Sourcing**

  1. **Eventual consistency surprises product managers** who learned databases in a CRUD world. They write data, immediately query it, see the old value, and declare the system broken. This is the single most common reason teams abandon CQRS.
  2. **Temporal query complexity is often underestimated.** "What was the balance on March 15 at 3 PM?" sounds easy. In practice, you need the right snapshot (or replay from scratch), you need to handle events in-flight at that instant, and you need to explain to legal which wall-clock definition you used. The tooling rarely exists out of the box.
  3. **Commands returning projection state.** The deposit endpoint reads the balance from the projection after the write; under load the projection hasn't caught up, so the response shows the old balance. This creates a "read your writes" bug that only surfaces in production.
  4. **Not designing for projection rebuild during incidents.** A bug in the projection logic is discovered in production. The fix requires a rebuild. The rebuild takes 4 hours. Your ops team has no pre-built runbook for this and the rebuild runs in a side project, not production, because nobody knew how to do it safely.
</Warning>

<Tip>
  **Solutions and Patterns**

  * **Command responses return aggregate state**, not projection state. The aggregate's in-memory state after applying the command is strongly consistent with the write; returning it avoids the read-your-writes race.
  * **Expose projection staleness in internal tools.** Show "last synced 350ms ago" on dashboards so ops and PMs internalize that staleness exists and is small.
  * **Build a temporal query helper** that takes an aggregate ID and timestamp, replays events up to that time, and returns the reconstructed state. Ship it behind an admin API and use it regularly -- if it only comes out during incidents, it will be broken when you need it.
  * **Runbook every projection rebuild.** Document the command, the expected duration, how to verify correctness, and how to rollback. Run the runbook quarterly as a fire drill.
  * **Keep the write side small and the read side wide.** One aggregate class, many projections. Resist the urge to put aggregate-level query logic in the write side; it's a sign you need a new projection.
</Tip>

***

## Interview Questions

<AccordionGroup>
  <Accordion title="Q1: What is Event Sourcing and when should you use it?">
    **Answer:**

    **Event Sourcing:** Store state as a sequence of events rather than current state.

    **Use when:**

    * Need complete audit trail (finance, healthcare)
    * Complex business workflows
    * Need temporal queries ("balance 3 months ago?")
    * Debugging requires understanding "how we got here"
    * Undo/redo functionality needed

    **Avoid when:**

    * Simple CRUD apps
    * High-volume updates (too many events)
    * No audit requirements
    * Team lacks experience (steep learning curve)
  </Accordion>

  <Accordion title="Q2: How do you handle schema evolution in events?">
    **Answer:**

    **Upcasting pattern:**

    1. Events are immutable (never change stored events)
    2. Transform events when reading
    3. Chain transformations: v1 → v2 → v3

    **Example:**

    ```javascript theme={null}
    // v1: { name: "John Doe" }
    // v2: { firstName: "John", lastName: "Doe" }

    upcast(v1Event) {
      const [first, ...rest] = v1Event.name.split(' ');
      return { firstName: first, lastName: rest.join(' ') };
    }
    ```

    **Best practices:**

    * Add version metadata to events
    * Make new fields optional with defaults
    * Test upcasters with old event data
  </Accordion>

  <Accordion title="Q3: What are projections and why do you need them?">
    **Answer:**

    **Projections:** Read-optimized views built from events.

    **Why needed:**

    * Replaying events for every query is slow
    * Different queries need different data shapes
    * Can use different databases per projection

    **Key concepts:**

    * **Checkpoint:** Last processed event position
    * **Rebuild:** Delete projection, replay all events
    * **Eventually consistent:** Slight delay from write

    **Example projections:**

    * Account balances (fast balance lookup)
    * Transaction history (ordered list)
    * Daily summaries (aggregated stats)
  </Accordion>

  <Accordion title="Q4: Explain snapshots in Event Sourcing">
    **Answer:**

    **Problem:** Loading aggregate with 10,000 events is slow.

    **Solution:** Periodically save aggregate state (snapshot).

    **How it works:**

    1. Save snapshot every N events (e.g., 100)
    2. On load: Get snapshot + events since snapshot
    3. Reduces events to replay

    **Example:**

    ```
    Snapshot (at event 1000): { balance: $5000 }
    Events 1001-1050: [deposit, withdraw, ...]

    Load: Apply 50 events to snapshot, not 1050
    ```

    **When to snapshot:**

    * After N events (e.g., 100)
    * On aggregate save (if changed significantly)
    * Background process during low load
  </Accordion>
</AccordionGroup>

***

## Chapter Summary

<Info>
  **Key Takeaways:**

  * Event sourcing stores state as immutable event sequence
  * Use aggregates to encapsulate business logic and emit events
  * Projections build read-optimized views from events
  * Snapshots improve loading performance for long-lived aggregates
  * Upcasting handles event schema evolution
  * CQRS separates read and write models for scalability
</Info>

**Next Chapter:** GraphQL Federation - Building unified APIs across microservices.

***

## Interview Deep-Dive

<AccordionGroup>
  <Accordion title="'Your event store has 500 million events for an order aggregate. Loading the current state requires replaying all events, which takes 30 seconds. How do you fix this?'">
    **Strong Answer:**

    The solution is snapshots. Instead of replaying all 500 million events, I periodically save a snapshot of the aggregate's current state. To load the aggregate, I read the latest snapshot and only replay events that occurred after the snapshot was taken.

    Implementation: every N events (say, every 100 events) or when the event count since the last snapshot exceeds a threshold, I serialize the aggregate's current state and store it as a snapshot record. The snapshot includes the aggregate ID, the event sequence number it was built from, and the serialized state. Loading becomes: `SELECT * FROM snapshots WHERE aggregate_id = X ORDER BY sequence_number DESC LIMIT 1`, then `SELECT * FROM events WHERE aggregate_id = X AND sequence_number > snapshot_sequence`.

    For the 500 million events case, most of those events are probably across many order aggregates, not a single one. But even if a single order has 10,000 events (very active order with many status changes, partial refunds, modifications), snapshots every 100 events mean loading requires 1 snapshot read plus replaying at most 100 events. That takes milliseconds, not 30 seconds.

    The trade-off: snapshots add complexity. You need to handle snapshot versioning (what happens when the aggregate's state structure changes?), snapshot storage (how long to keep old snapshots?), and snapshot creation timing (synchronous during event append, or asynchronous in a background process?). I recommend asynchronous snapshots via a background process to avoid adding latency to the write path.

    **Follow-up: "What happens when you change the aggregate's state structure? Old snapshots have the old structure and cannot be deserialized."**

    Snapshot migration. I version the snapshot format (snapshot\_version field) and maintain deserializers for the current version and the previous version. When loading, the deserializer checks the version and applies transformations if needed. Alternatively, I invalidate old snapshots by deleting them when the state structure changes, forcing the system to rebuild snapshots from events. This is safe because events are immutable and the event replay logic always produces the current state structure. The cost is a temporary performance degradation while snapshots are rebuilt.
  </Accordion>

  <Accordion title="'A developer on your team wants to delete an event from the event store because it contains a bug (incorrect price). Why is this dangerous, and what is the correct approach?'">
    **Strong Answer:**

    Deleting an event violates the fundamental guarantee of event sourcing: the event log is an immutable, append-only record of everything that happened. If you delete an event, every projection built from that event stream becomes inconsistent. Downstream services that consumed the event have already acted on it (sent a confirmation email with the wrong price, updated their own state). Deleting the event from the store does not undo those downstream effects.

    The correct approach is a compensating event. If order-123 had a PriceCalculated event with the wrong price ($100 instead of $80), you append a PriceCorrected event with the correct price ($80). The current state, when replayed, applies both events: the price is set to $100, then corrected to \$80. The audit trail shows exactly what happened: an incorrect price was calculated, and it was corrected. This is how accounting works -- you do not erase entries from a ledger, you add correcting entries.

    For downstream consumers, the PriceCorrected event triggers the appropriate compensating actions: the order total is recalculated, the customer is notified of the price correction, and if the payment was already processed at the wrong amount, a partial refund is issued.

    The one exception where event deletion is necessary: GDPR right-to-erasure requests. If a customer requests deletion of their personal data, you may need to redact events containing their PII. The approach is "crypto-shredding": encrypt personal data fields in events with a per-user encryption key, and when the user requests deletion, destroy the key. The events remain in the store (maintaining structural integrity), but the personal data is now unreadable.

    **Follow-up: "How do you handle event versioning when the event schema evolves over time?"**

    I use the upcasting pattern. Each event has a version number (OrderPlaced\_v1, OrderPlaced\_v2). When the schema changes (adding a new field, renaming a field), I create a new version. The event store may contain events of all versions. When reading, an upcaster converts old events to the latest version before they are passed to the aggregate. The upcaster is a series of transformation functions: v1 -> v2 adds a default value for the new field, v2 -> v3 renames the field. This means the aggregate logic only needs to handle the latest version. The old event data in the store remains untouched.
  </Accordion>

  <Accordion title="'When would you choose event sourcing over a traditional CRUD approach, and what is the most common reason teams abandon event sourcing after adopting it?'">
    **Strong Answer:**

    I choose event sourcing when the business genuinely needs a complete audit trail and temporal queries. Financial systems (every transaction must be traceable), complex workflow engines (order fulfillment with dozens of state transitions), and collaborative systems (Google Docs-style concurrent editing) are strong use cases. The "why" matters: event sourcing is a means to an end, not a goal in itself.

    I do NOT choose event sourcing for: simple CRUD applications, high-volume append-mostly data (metrics, logs -- time-series databases are better), or teams that are new to DDD. Event sourcing requires deep understanding of domain modeling, and applying it to a poorly modeled domain creates a mess that is harder to fix than a poorly modeled relational schema.

    The most common reason teams abandon event sourcing: they underestimate the complexity of projections and eventual consistency. In a CRUD system, the read model is the write model -- when you update a row, the next read sees the updated data. In event sourcing, writes append events, and projections (read models) are rebuilt asynchronously. There is always a lag. When a product manager says "I just updated this record, why doesn't it show up?" and the answer is "the projection hasn't caught up yet," the team starts losing faith in the approach.

    The second common abandonment reason: event versioning. After 6 months, the team has 30 event types, half of which have evolved through 2-3 schema versions. The upcasting logic becomes a maintenance burden. Every new developer has to understand not just the current event schemas but all historical versions.

    The third reason: complexity of correcting errors. In CRUD, fixing a bug is "UPDATE table SET value = correct\_value." In event sourcing, it is "append a compensating event, rebuild all affected projections, verify downstream consumers received and processed the correction." The operational burden is significant.

    My advice: start with CRUD and introduce event sourcing for specific aggregates that genuinely benefit from it (the Order aggregate, the Account aggregate), not for the entire system. Hybrid approaches are perfectly valid and dramatically reduce complexity.

    **Follow-up: "How do you handle reporting and analytics with event sourcing? Business stakeholders want SQL queries, not event replays."**

    You build dedicated read models for reporting. The event stream feeds a CQRS read model in a relational database (PostgreSQL) or a data warehouse (BigQuery, Snowflake). The read model is denormalized, optimized for the specific queries stakeholders need, and updated asynchronously from the event stream. Business stakeholders query the read model with standard SQL. They never interact with the event store directly. The read model is disposable -- if the schema needs to change, you rebuild it from the event stream. This separation keeps the event store clean (optimized for writes) while giving analytics users the SQL access they need.
  </Accordion>
</AccordionGroup>
