> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# 19. Database Patterns

> Master database-per-service pattern, data consistency strategies, and migration techniques for microservices

# Database Patterns

Data management is one of the most challenging aspects of microservices -- and the place where most migrations go wrong. The fundamental tension: in a monolith, you get ACID transactions and JOINs for free. In microservices, every cross-service data operation becomes a distributed systems problem. The patterns in this chapter (database-per-service, sagas, outbox, CQRS) exist because you are trading database-level consistency for deployment independence. That trade-off is worth it at scale, but only if you understand what you are giving up and how to compensate.

<Info>
  **Learning Objectives:**

  * Implement database-per-service pattern
  * Handle data consistency across services
  * Design shared data strategies
  * Execute zero-downtime database migrations
  * Choose the right database for each service
</Info>

***

## Database Per Service Pattern

### Why Separate Databases?

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    SHARED DATABASE (Anti-Pattern)                           │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                       │
│  │  Service A   │  │  Service B   │  │  Service C   │                       │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘                       │
│         │                 │                 │                                │
│         └─────────────────┼─────────────────┘                               │
│                           ▼                                                  │
│                 ┌───────────────────┐                                       │
│                 │  Shared Database  │                                       │
│                 │  ┌─────────────┐  │                                       │
│                 │  │   users     │  │                                       │
│                 │  │   orders    │  │                                       │
│                 │  │   products  │  │                                       │
│                 │  │   payments  │  │                                       │
│                 │  └─────────────┘  │                                       │
│                 └───────────────────┘                                       │
│                                                                              │
│  ⚠️ Problems:                                                                │
│  • Tight coupling through schema                                            │
│  • Can't deploy independently                                               │
│  • Schema changes affect all services                                       │
│  • Single point of failure                                                  │
│  • Can't scale databases independently                                      │
│  • Technology lock-in                                                       │
│                                                                              │
│  ═══════════════════════════════════════════════════════════════════════════│
│                                                                              │
│                   DATABASE PER SERVICE (Recommended)                         │
│  ─────────────────────────────────────────────────────────────────────────  │
│                                                                              │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                       │
│  │  User Svc    │  │  Order Svc   │  │ Product Svc  │                       │
│  │  ┌────────┐  │  │  ┌────────┐  │  │  ┌────────┐  │                       │
│  │  │  API   │  │  │  │  API   │  │  │  │  API   │  │                       │
│  │  └───┬────┘  │  │  └───┬────┘  │  │  └───┬────┘  │                       │
│  │      │       │  │      │       │  │      │       │                       │
│  │  ┌───▼────┐  │  │  ┌───▼────┐  │  │  ┌───▼────┐  │                       │
│  │  │MongoDB │  │  │  │Postgres│  │  │  │  Redis │  │                       │
│  │  │(users) │  │  │  │(orders)│  │  │  │(cache) │  │                       │
│  │  └────────┘  │  │  └────────┘  │  │  └────────┘  │                       │
│  └──────────────┘  └──────────────┘  └──────────────┘                       │
│                                                                              │
│  ✅ Benefits:                                                                │
│  • Loose coupling                                                           │
│  • Independent deployment                                                   │
│  • Right database for the job                                               │
│  • Independent scaling                                                      │
│  • Fault isolation                                                          │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

### Database Selection Guide for Microservices

Polyglot persistence -- using different databases for different services -- is a key advantage of microservices. But "use the right tool for the job" is only useful advice if you know which tool fits which job.

| Data Characteristic                      | Best Database Type | Examples                  | Why                                                  |
| ---------------------------------------- | ------------------ | ------------------------- | ---------------------------------------------------- |
| Transactional with complex relationships | Relational (SQL)   | PostgreSQL, MySQL         | ACID guarantees, JOINs, mature tooling               |
| Flexible schema, document-shaped         | Document store     | MongoDB, Couchbase        | No schema migrations, nested objects, fast iteration |
| High-write throughput, wide-column       | Column-family      | Cassandra, ScyllaDB       | Linear write scaling, time-series friendly           |
| Key-value lookups, caching, sessions     | Key-value store    | Redis, DynamoDB           | Sub-millisecond reads, simple access patterns        |
| Full-text search, log analytics          | Search engine      | Elasticsearch, OpenSearch | Inverted index, fuzzy matching, aggregations         |
| Graph relationships (social, fraud)      | Graph database     | Neo4j, Amazon Neptune     | Traversal queries that would be N+1 JOINs in SQL     |
| Event log, audit trail                   | Append-only log    | Kafka (log), EventStoreDB | Immutable history, replay capability                 |

**Common microservices database pairings:**

| Service         | Primary DB             | Why                                                         | Cache Layer                             |
| --------------- | ---------------------- | ----------------------------------------------------------- | --------------------------------------- |
| User/Auth       | PostgreSQL             | User data is relational; login needs ACID                   | Redis (sessions)                        |
| Product Catalog | MongoDB                | Varied attributes per category; read-heavy                  | Redis or CDN                            |
| Orders          | PostgreSQL             | Order lifecycle is transactional; financial data needs ACID | None (consistency critical)             |
| Inventory       | PostgreSQL             | Decrement operations need transactions                      | Redis (read cache, not source of truth) |
| Cart            | Redis                  | Ephemeral, session-scoped, fast expiry                      | N/A (Redis IS the cache)                |
| Search          | Elasticsearch          | Full-text search, faceted filtering                         | N/A (Elasticsearch IS the index)        |
| Notifications   | MongoDB or DynamoDB    | Simple writes, no complex queries, high volume              | None                                    |
| Analytics       | ClickHouse or BigQuery | Column-oriented, aggregation-heavy                          | Pre-computed materialized views         |

### Implementation Patterns

Why does each service own its own connection pool and schema? Because in a distributed system, the database is an implementation detail of the service -- not a shared resource. When you let two services talk to the same tables, you have created an invisible coupling that bypasses your API contracts. Schema changes require coordinated deploys, query performance problems in one service cause latency spikes in another, and a runaway connection pool from one service can exhaust slots for everyone else.

The CAP theorem tells us we can only pick two of consistency, availability, and partition tolerance. By giving each service its own database, you are choosing availability and partition tolerance at the system level -- each service can evolve, fail, and recover independently. The cost is that you can no longer use a database transaction to keep two services' data in sync. That is what the rest of this chapter is about: the patterns you use to compensate.

The trade-off worth being explicit about: operational complexity goes up (more databases to back up, monitor, patch), but blast radius goes down (one database failure no longer takes down the whole platform). For small systems, this is a bad trade. For systems with more than a handful of services or teams, it is the only way to stay sane.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // services/user-service/database.js
    // Each service manages its own database connection

    const mongoose = require('mongoose');

    class UserDatabase {
      constructor() {
        this.connection = null;
      }

      async connect() {
        // User service uses MongoDB
        this.connection = await mongoose.connect(process.env.USER_DB_URL, {
          useNewUrlParser: true,
          useUnifiedTopology: true,
          maxPoolSize: 10,
          serverSelectionTimeoutMS: 5000,
          socketTimeoutMS: 45000
        });

        mongoose.connection.on('error', (err) => {
          console.error('MongoDB connection error:', err);
        });

        mongoose.connection.on('disconnected', () => {
          console.log('MongoDB disconnected, attempting reconnection...');
          this.connect();
        });

        console.log('User service connected to MongoDB');
      }

      async disconnect() {
        await mongoose.connection.close();
      }
    }

    // User schema - owned by user service only
    const userSchema = new mongoose.Schema({
      id: { type: String, required: true, unique: true },
      email: { type: String, required: true, unique: true },
      name: { type: String, required: true },
      hashedPassword: { type: String, required: true },
      profile: {
        avatar: String,
        bio: String,
        preferences: mongoose.Schema.Types.Mixed
      },
      status: {
        type: String,
        enum: ['active', 'inactive', 'suspended'],
        default: 'active'
      },
      createdAt: { type: Date, default: Date.now },
      updatedAt: { type: Date, default: Date.now }
    });

    const User = mongoose.model('User', userSchema);

    module.exports = { UserDatabase, User };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # services/user_service/database.py
    # Each service manages its own database connection

    import os
    import logging
    from datetime import datetime
    from typing import Optional, Any

    from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
    from pymongo.errors import ConnectionFailure

    logger = logging.getLogger(__name__)


    class UserDatabase:
        """MongoDB connection owned by the User Service only."""

        def __init__(self) -> None:
            self.client: Optional[AsyncIOMotorClient] = None
            self.db: Optional[AsyncIOMotorDatabase] = None

        async def connect(self) -> None:
            # User service uses MongoDB
            self.client = AsyncIOMotorClient(
                os.environ["USER_DB_URL"],
                maxPoolSize=10,
                serverSelectionTimeoutMS=5000,
                socketTimeoutMS=45000,
            )
            try:
                # Eagerly verify connectivity so we fail fast on boot
                await self.client.admin.command("ping")
            except ConnectionFailure as exc:
                logger.error("MongoDB connection error: %s", exc)
                raise

            self.db = self.client.get_default_database()
            logger.info("User service connected to MongoDB")

        async def disconnect(self) -> None:
            if self.client is not None:
                self.client.close()


    # User "schema" - owned by user service only.
    # motor/pymongo do not enforce schemas, so we use a dataclass-like
    # helper for structure plus a JSON Schema validator at the collection level.
    from dataclasses import dataclass, field, asdict


    @dataclass
    class UserProfile:
        avatar: Optional[str] = None
        bio: Optional[str] = None
        preferences: dict[str, Any] = field(default_factory=dict)


    @dataclass
    class User:
        id: str
        email: str
        name: str
        hashed_password: str
        profile: UserProfile = field(default_factory=UserProfile)
        status: str = "active"  # active | inactive | suspended
        created_at: datetime = field(default_factory=datetime.utcnow)
        updated_at: datetime = field(default_factory=datetime.utcnow)

        def to_doc(self) -> dict[str, Any]:
            doc = asdict(self)
            return doc


    USER_JSON_SCHEMA = {
        "bsonType": "object",
        "required": ["id", "email", "name", "hashed_password", "status"],
        "properties": {
            "id": {"bsonType": "string"},
            "email": {"bsonType": "string"},
            "name": {"bsonType": "string"},
            "hashed_password": {"bsonType": "string"},
            "status": {"enum": ["active", "inactive", "suspended"]},
        },
    }


    async def ensure_user_indexes(db: AsyncIOMotorDatabase) -> None:
        await db.users.create_index("id", unique=True)
        await db.users.create_index("email", unique=True)
        await db.command(
            "collMod", "users", validator={"$jsonSchema": USER_JSON_SCHEMA}
        )
    ```
  </Tab>
</Tabs>

Why does the Order Service use PostgreSQL when the User Service uses MongoDB? This is polyglot persistence in practice. Orders are inherently transactional -- an order lifecycle involves money, inventory, and regulatory requirements. You need ACID guarantees: a charge must not succeed while the order fails, and you need to be able to express "decrement stock by N if and only if available stock is at least N" atomically. That is exactly what relational databases were built for.

Notice what is missing from the Order schema: any foreign key constraint to the `users` table. That table lives in a different database -- physically on a different server, possibly a different engine. You have given up referential integrity at the database layer and must now enforce it at the application layer (or tolerate brief inconsistency). This is the part that makes experienced DBAs nervous about microservices, and rightly so: you have replaced a compiler-verified invariant with a runtime contract.

The trade-off: database-per-service buys you deployment independence and the ability to pick the right storage engine per workload, at the cost of losing cross-entity constraints. You compensate by making the `user_id` an opaque string (no FK), validating existence at the API boundary, and using events to keep denormalized copies fresh.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // services/order-service/database.js
    // Order service uses PostgreSQL for ACID transactions

    const { Pool } = require('pg');
    const { Sequelize } = require('sequelize');

    class OrderDatabase {
      constructor() {
        this.sequelize = null;
      }

      async connect() {
        this.sequelize = new Sequelize(process.env.ORDER_DB_URL, {
          dialect: 'postgres',
          pool: {
            max: 20,
            min: 5,
            acquire: 30000,
            idle: 10000
          },
          logging: process.env.NODE_ENV === 'development' ? console.log : false
        });

        await this.sequelize.authenticate();
        console.log('Order service connected to PostgreSQL');
      }

      async runMigrations() {
        await this.sequelize.sync({ alter: process.env.NODE_ENV !== 'production' });
      }
    }

    // Order model - owned by order service only
    const { DataTypes } = require('sequelize');

    module.exports = (sequelize) => {
      const Order = sequelize.define('Order', {
        id: {
          type: DataTypes.UUID,
          defaultValue: DataTypes.UUIDV4,
          primaryKey: true
        },
        userId: {
          type: DataTypes.STRING,
          allowNull: false,
          // CRITICAL: No foreign key to users table -- that table lives in a different database!
          // This is the fundamental trade-off of database-per-service: you lose referential integrity
          // at the database level and must enforce it at the application level instead.
          index: true
        },
        status: {
          type: DataTypes.ENUM('pending', 'confirmed', 'processing', 'shipped', 'delivered', 'cancelled'),
          defaultValue: 'pending'
        },
        items: {
          type: DataTypes.JSONB,
          allowNull: false
        },
        totalAmount: {
          type: DataTypes.DECIMAL(10, 2),
          allowNull: false
        },
        shippingAddress: {
          type: DataTypes.JSONB
        },
        metadata: {
          type: DataTypes.JSONB,
          defaultValue: {}
        }
      }, {
        timestamps: true,
        indexes: [
          { fields: ['userId', 'createdAt'] },
          { fields: ['status'] }
        ]
      });

      return Order;
    };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # services/order_service/database.py
    # Order service uses PostgreSQL for ACID transactions

    import os
    import uuid
    import enum
    import logging
    from datetime import datetime
    from decimal import Decimal
    from typing import Any, Optional

    from sqlalchemy import String, Enum, Numeric, DateTime, Index, func
    from sqlalchemy.dialects.postgresql import UUID, JSONB
    from sqlalchemy.ext.asyncio import (
        AsyncSession,
        async_sessionmaker,
        create_async_engine,
    )
    from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

    logger = logging.getLogger(__name__)


    class Base(DeclarativeBase):
        pass


    class OrderStatus(str, enum.Enum):
        pending = "pending"
        confirmed = "confirmed"
        processing = "processing"
        shipped = "shipped"
        delivered = "delivered"
        cancelled = "cancelled"


    class Order(Base):
        """Order aggregate - owned by the Order Service only."""

        __tablename__ = "orders"

        id: Mapped[uuid.UUID] = mapped_column(
            UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
        )
        # CRITICAL: No foreign key to users table -- that table lives in a
        # different database! Referential integrity must be enforced at the
        # application layer instead.
        user_id: Mapped[str] = mapped_column(String, nullable=False, index=True)
        status: Mapped[OrderStatus] = mapped_column(
            Enum(OrderStatus, name="order_status"),
            default=OrderStatus.pending,
            nullable=False,
        )
        items: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False)
        total_amount: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)
        shipping_address: Mapped[Optional[dict[str, Any]]] = mapped_column(
            JSONB, nullable=True
        )
        meta: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict)
        created_at: Mapped[datetime] = mapped_column(
            DateTime(timezone=True), server_default=func.now(), nullable=False
        )
        updated_at: Mapped[datetime] = mapped_column(
            DateTime(timezone=True),
            server_default=func.now(),
            onupdate=func.now(),
            nullable=False,
        )

        __table_args__ = (
            Index("ix_orders_user_created", "user_id", "created_at"),
            Index("ix_orders_status", "status"),
        )


    class OrderDatabase:
        """Async PostgreSQL engine/session factory for the Order Service."""

        def __init__(self) -> None:
            self.engine = None
            self.session_factory: Optional[async_sessionmaker[AsyncSession]] = None

        async def connect(self) -> None:
            self.engine = create_async_engine(
                os.environ["ORDER_DB_URL"],
                pool_size=20,
                max_overflow=0,
                pool_pre_ping=True,
                pool_timeout=30,
                pool_recycle=1800,
                echo=os.getenv("APP_ENV") == "development",
            )
            self.session_factory = async_sessionmaker(
                self.engine, expire_on_commit=False
            )
            logger.info("Order service connected to PostgreSQL")

        async def disconnect(self) -> None:
            if self.engine is not None:
                await self.engine.dispose()
    ```
  </Tab>
</Tabs>

### Handling Data Ownership

Once each service owns its own database, the next question is: how does the Order Service work with user data it does not own? The guiding principle is that each piece of data has exactly one service that is the source of truth. Everyone else either asks that service at read time or keeps a local, read-only projection that is kept fresh via events.

The pattern below uses synchronous API calls for validation (confirm the user and products exist) and denormalization for historical accuracy (capture the product name and price into the order). Denormalization is critical here: if the product's price changes tomorrow, the order record must still reflect the price the customer actually paid. This is an example of where consistency requirements differ between the current-state view and the historical record -- the product service owns "current price," but the order owns "price at time of purchase."

The trade-off: synchronous calls create runtime coupling (User Service outage blocks order creation) and latency (network hop on every order). For high-throughput systems, you combine this with a read-through cache or a local projection updated via events. For correctness-critical fields (shipping address, payment method), you accept the coupling because stale data would be worse than a brief outage.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Order service needs user data but doesn't own it
    // Pattern: Store only the user ID, fetch details when needed

    // services/order-service/services/order-service.js
    class OrderService {
      constructor(userClient, productClient) {
        this.userClient = userClient;  // HTTP client to user service
        this.productClient = productClient;  // HTTP client to product service
      }

      async createOrder(userId, items) {
        // Validate user exists (call user service)
        const user = await this.userClient.getUser(userId);
        if (!user) {
          throw new NotFoundError('User not found');
        }

        // Validate products and get prices (call product service)
        const products = await this.productClient.getProducts(
          items.map(i => i.productId)
        );

        // Calculate total
        const orderItems = items.map(item => {
          const product = products.find(p => p.id === item.productId);
          if (!product) {
            throw new NotFoundError(`Product ${item.productId} not found`);
          }
          return {
            productId: product.id,
            name: product.name,  // Denormalize for historical record
            price: product.price,  // Price at time of order
            quantity: item.quantity,
            subtotal: product.price * item.quantity
          };
        });

        const totalAmount = orderItems.reduce((sum, item) => sum + item.subtotal, 0);

        // Create order (only store references, not full data)
        const order = await Order.create({
          userId,  // Just the ID, not full user data
          items: orderItems,
          totalAmount,
          status: 'pending'
        });

        // Publish event for other services
        await this.eventBus.publish('order.created', {
          orderId: order.id,
          userId,
          items: orderItems,
          totalAmount
        });

        return order;
      }

      async getOrderWithDetails(orderId) {
        const order = await Order.findByPk(orderId);
        if (!order) return null;

        // Enrich with current user data (for display purposes)
        const user = await this.userClient.getUser(order.userId);

        return {
          ...order.toJSON(),
          user: user ? {
            id: user.id,
            name: user.name,
            email: user.email
          } : null
        };
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # services/order_service/services/order_service.py
    # Order service needs user data but doesn't own it.
    # Pattern: Store only the user ID, fetch details when needed.

    from decimal import Decimal
    from typing import Any
    from uuid import UUID

    from sqlalchemy import select
    from sqlalchemy.ext.asyncio import AsyncSession

    from order_service.clients import UserClient, ProductClient, EventBus
    from order_service.database import Order, OrderStatus
    from order_service.errors import NotFoundError


    class OrderService:
        def __init__(
            self,
            session_factory,
            user_client: UserClient,
            product_client: ProductClient,
            event_bus: EventBus,
        ) -> None:
            self._session_factory = session_factory
            self._user_client = user_client
            self._product_client = product_client
            self._event_bus = event_bus

        async def create_order(
            self, user_id: str, items: list[dict[str, Any]]
        ) -> Order:
            # Validate user exists (call user service)
            user = await self._user_client.get_user(user_id)
            if user is None:
                raise NotFoundError("User not found")

            # Validate products and get prices (call product service)
            product_ids = [item["product_id"] for item in items]
            products = await self._product_client.get_products(product_ids)
            products_by_id = {p["id"]: p for p in products}

            order_items: list[dict[str, Any]] = []
            for item in items:
                product = products_by_id.get(item["product_id"])
                if product is None:
                    raise NotFoundError(f"Product {item['product_id']} not found")
                price = Decimal(str(product["price"]))
                qty = int(item["quantity"])
                order_items.append(
                    {
                        "product_id": product["id"],
                        # Denormalize for historical record -- the order must
                        # remember the name/price at purchase time.
                        "name": product["name"],
                        "price": str(price),
                        "quantity": qty,
                        "subtotal": str(price * qty),
                    }
                )

            total_amount = sum(
                (Decimal(i["price"]) * i["quantity"] for i in order_items),
                start=Decimal("0"),
            )

            async with self._session_factory() as session:  # type: AsyncSession
                async with session.begin():
                    order = Order(
                        user_id=user_id,  # Just the ID, not full user data
                        items=order_items,
                        total_amount=total_amount,
                        status=OrderStatus.pending,
                    )
                    session.add(order)
                    await session.flush()  # populate order.id

                await self._event_bus.publish(
                    "order.created",
                    {
                        "order_id": str(order.id),
                        "user_id": user_id,
                        "items": order_items,
                        "total_amount": str(total_amount),
                    },
                )
                return order

        async def get_order_with_details(
            self, order_id: UUID
        ) -> dict[str, Any] | None:
            async with self._session_factory() as session:
                order = await session.get(Order, order_id)
                if order is None:
                    return None

            # Enrich with current user data (for display purposes)
            user = await self._user_client.get_user(order.user_id)
            return {
                "id": str(order.id),
                "user_id": order.user_id,
                "status": order.status.value,
                "items": order.items,
                "total_amount": str(order.total_amount),
                "user": (
                    {"id": user["id"], "name": user["name"], "email": user["email"]}
                    if user
                    else None
                ),
            }
    ```
  </Tab>
</Tabs>

***

## Data Consistency Strategies

<Warning>
  **Caveats and Common Pitfalls with Cross-Service Data Access**

  The moment you have database-per-service, two innocent-looking questions appear: "How do I join data that lives in different services?" and "How do I build the report that used to be a single SQL query?" The wrong answers create more damage than most teams realize.

  * **Cross-service JOINs via API calls create N+1 complexity that scales catastrophically.** A page that shows "orders with the customer name and the top product category" now calls Order Service, then for each order calls User Service, then for each order calls Product Service. One page load becomes 1 + 2N calls. At 50 orders on the page, that is 101 HTTP calls fanning out. Each adds network latency, error surface area, and tail-latency risk. Under load, this pattern is the single most common cause of "mysterious" platform-wide slowdowns.
  * **Reporting becomes nearly impossible without a plan.** Finance wants a monthly reconciliation joining orders, payments, inventory, users, and refunds. Each lives in a different service with a different database engine. Running five separate queries and JOINing in a Python script is slow, error-prone, and loses transactional consistency. Teams who did not plan for this end up building a quarterly "please stop asking me" ritual instead of a real reporting platform.
  * **Fan-out calls amplify partial failures.** If one of the five services in your composition query is slow, your p99 becomes the sum of the slowest five p99s. If one returns an error, your entire query fails (or worse, silently returns partial data). A single flaky service pollutes every aggregated view.
  * **Developers reach for shared read replicas as a "temporary" shortcut.** "I will just query the User Service database directly for this one report" is the opening line of a tragedy that ends with two services tightly coupled through a schema that was never meant to be a public API. Nine months later, a harmless column rename in User Service breaks a report nobody remembered was joining across the boundary.
</Warning>

<Tip>
  **Solutions and Patterns for Cross-Service Data Access**

  * **API Composition for small, bounded queries.** When you need data from 2-3 services and the result set is small (under a hundred items, typically one user's view), fan out in parallel, set strict timeouts, and degrade gracefully when a dependency is slow. This is appropriate for a user-facing page. It is not appropriate for reporting.
  * **CQRS with a dedicated query service.** Build a read-only service that subscribes to domain events from every service whose data it needs, maintains a denormalized materialized view in its own store (often Elasticsearch or PostgreSQL with aggressive indexing), and serves queries directly. The query service is eventually consistent but fast. This is the correct answer for "show me all orders from New York users with category X." The source services never feel the query load.
  * **Analytics pipeline for reporting.** Stream every domain event into a data warehouse (BigQuery, Snowflake, Redshift) via a tool like Debezium or Kafka Connect. Reports run on the warehouse, not on the operational stores. This cleanly separates OLTP concerns (low latency, high consistency) from OLAP concerns (complex joins across historical data).
  * **Data mesh for large organizations.** Each service team publishes a curated "data product" -- a versioned, schema-controlled, SLA-backed dataset -- that other teams can consume. This treats cross-service data access as a first-class product with ownership rather than a backdoor.
  * **Never query another service's database directly.** If you can trace a read path from your service to another service's database, you have recreated the shared-database anti-pattern with extra steps. The API or event stream is the only legitimate boundary.
</Tip>

### Eventual Consistency

Eventual consistency means that after a write, there is a window of time where different services may see different versions of the data. This is not a bug -- it is a design choice. The key question is: how long is that window, and can your business tolerate it?

For most use cases (user profile updates, catalog changes, notification preferences), a few seconds of inconsistency is invisible to users. For others (inventory counts during a flash sale, account balances), even brief inconsistency can cause real problems. The pattern below shows how services propagate changes through events.

In CAP terms, eventual consistency is what you get when you choose AP (availability + partition tolerance). Both sides of the network partition can accept writes; when the partition heals, the writes are reconciled. The cost is that readers may see stale data during the partition -- or even outside a partition, during the natural replication lag between services.

The key design lever is the "inconsistency window" -- the time between a write on the source of truth and the read model catching up. For in-process event handlers, this can be milliseconds. For events routed through Kafka consumers with batching, it can be hundreds of milliseconds. For cross-region replication, it can be seconds. Always know what your window is and whether your business can tolerate it -- "eventual" without a number is just wishful thinking.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Most microservices use eventual consistency
    // Example: User updates email, order service eventually learns about it

    // user-service/handlers/user-updated.js
    class UserEventHandler {
      async handleUserUpdated(event) {
        const { userId, changes } = event;
        
        // Publish event
        await eventBus.publish('user.updated', {
          userId,
          email: changes.email,
          name: changes.name,
          updatedAt: new Date().toISOString()
        });
      }
    }

    // order-service/handlers/user-events.js
    class UserEventConsumer {
      constructor() {
        // Local cache/read model of user data needed by order service
        this.userCache = new Map();
      }

      async handleUserUpdated(event) {
        const { userId, email, name } = event;
        
        // Update local cache
        this.userCache.set(userId, {
          id: userId,
          email,
          name,
          lastUpdated: new Date()
        });
        
        // Optionally update denormalized data in orders
        // (depends on business requirements)
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Most microservices use eventual consistency.
    # Example: User updates email -> Order Service eventually learns about it.

    from datetime import datetime, timezone
    from typing import Any

    from order_service.clients import EventBus


    # user_service/handlers/user_updated.py
    class UserEventHandler:
        def __init__(self, event_bus: EventBus) -> None:
            self._event_bus = event_bus

        async def handle_user_updated(self, event: dict[str, Any]) -> None:
            user_id = event["user_id"]
            changes = event["changes"]

            await self._event_bus.publish(
                "user.updated",
                {
                    "user_id": user_id,
                    "email": changes.get("email"),
                    "name": changes.get("name"),
                    "updated_at": datetime.now(timezone.utc).isoformat(),
                },
            )


    # order_service/handlers/user_events.py
    class UserEventConsumer:
        """Maintains a local projection of user data the order service needs."""

        def __init__(self) -> None:
            self._user_cache: dict[str, dict[str, Any]] = {}

        async def handle_user_updated(self, event: dict[str, Any]) -> None:
            user_id = event["user_id"]
            self._user_cache[user_id] = {
                "id": user_id,
                "email": event.get("email"),
                "name": event.get("name"),
                "last_updated": datetime.now(timezone.utc),
            }
            # Optionally update denormalized fields in open orders here,
            # depending on business rules.
    ```
  </Tab>
</Tabs>

### Saga Pattern for Distributed Transactions

Sagas exist because two-phase commit (2PC) does not scale across services. 2PC requires every participant to hold locks while the coordinator waits for votes -- any slow service blocks everyone else, and any coordinator failure leaves participants in an in-doubt state. A saga replaces "all-or-nothing atomicity" with "all-or-eventually-compensated." The system passes through intermediate states where some steps are done and others aren't, and if a later step fails, earlier steps are undone by explicit compensation.

The trade-off is important: a saga is **not** a transaction. It has no isolation. Other readers will see the intermediate states, and two concurrent sagas can interleave in ways that a real transaction would prevent. You compensate with idempotency (every step and compensation can be retried safely), semantic locks (mark the order as "reserving" so no one else touches it), and careful ordering (do the reversible steps first, commit the irreversible steps last).

There are two flavors worth knowing: orchestrated sagas use a central coordinator (easy to reason about, becomes a bottleneck and SPOF), and choreographed sagas use events between peers (more scalable, harder to debug because the "flow" is spread across many services). The example below uses orchestration because it is clearer to read -- but in production systems beyond 3-4 steps, I usually prefer choreography with a saga log for observability.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Example: Order creation saga with compensation

    class OrderSaga {
      constructor(orderService, inventoryClient, paymentClient, eventBus) {
        this.orderService = orderService;
        this.inventoryClient = inventoryClient;
        this.paymentClient = paymentClient;
        this.eventBus = eventBus;
      }

      async execute(orderData) {
        const saga = new SagaExecution();
        let order = null;
        let reservation = null;
        let payment = null;

        try {
          // Step 1: Create order
          saga.addStep({
            name: 'createOrder',
            execute: async () => {
              order = await this.orderService.createOrder(orderData);
              return order;
            },
            compensate: async () => {
              if (order) {
                await this.orderService.cancelOrder(order.id);
              }
            }
          });

          // Step 2: Reserve inventory
          saga.addStep({
            name: 'reserveInventory',
            execute: async () => {
              reservation = await this.inventoryClient.reserve({
                orderId: order.id,
                items: order.items
              });
              return reservation;
            },
            compensate: async () => {
              if (reservation) {
                await this.inventoryClient.release(reservation.id);
              }
            }
          });

          // Step 3: Process payment
          saga.addStep({
            name: 'processPayment',
            execute: async () => {
              payment = await this.paymentClient.charge({
                orderId: order.id,
                userId: order.userId,
                amount: order.totalAmount
              });
              return payment;
            },
            compensate: async () => {
              if (payment) {
                await this.paymentClient.refund(payment.id);
              }
            }
          });

          // Step 4: Confirm order
          saga.addStep({
            name: 'confirmOrder',
            execute: async () => {
              await this.orderService.confirmOrder(order.id);
              await this.inventoryClient.commit(reservation.id);
              return order;
            },
            compensate: async () => {
              // Nothing to compensate - previous steps will handle it
            }
          });

          // Execute saga
          await saga.run();

          // Publish success event
          await this.eventBus.publish('order.completed', {
            orderId: order.id,
            userId: order.userId
          });

          return order;

        } catch (error) {
          // Saga will automatically run compensations
          console.error('Order saga failed:', error);
          
          await this.eventBus.publish('order.failed', {
            orderId: order?.id,
            userId: orderData.userId,
            reason: error.message
          });

          throw error;
        }
      }
    }

    // Saga execution engine
    class SagaExecution {
      constructor() {
        this.steps = [];
        this.completedSteps = [];
      }

      addStep(step) {
        this.steps.push(step);
      }

      async run() {
        for (const step of this.steps) {
          try {
            console.log(`Executing step: ${step.name}`);
            const result = await step.execute();
            this.completedSteps.push(step);
          } catch (error) {
            console.error(`Step ${step.name} failed:`, error);
            await this.compensate();
            throw error;
          }
        }
      }

      async compensate() {
        // Execute compensations in reverse order
        for (const step of this.completedSteps.reverse()) {
          try {
            console.log(`Compensating step: ${step.name}`);
            await step.compensate();
          } catch (error) {
            console.error(`Compensation for ${step.name} failed:`, error);
            // Log for manual intervention
          }
        }
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Example: Order creation saga with compensation.

    import logging
    from dataclasses import dataclass, field
    from typing import Any, Awaitable, Callable, Optional

    logger = logging.getLogger(__name__)


    StepFn = Callable[[], Awaitable[Any]]


    @dataclass
    class SagaStep:
        name: str
        execute: StepFn
        compensate: StepFn


    @dataclass
    class SagaExecution:
        steps: list[SagaStep] = field(default_factory=list)
        completed: list[SagaStep] = field(default_factory=list)

        def add_step(self, step: SagaStep) -> None:
            self.steps.append(step)

        async def run(self) -> None:
            for step in self.steps:
                try:
                    logger.info("Executing step: %s", step.name)
                    await step.execute()
                    self.completed.append(step)
                except Exception:
                    logger.exception("Step %s failed", step.name)
                    await self._compensate()
                    raise

        async def _compensate(self) -> None:
            # Execute compensations in reverse order.
            for step in reversed(self.completed):
                try:
                    logger.info("Compensating step: %s", step.name)
                    await step.compensate()
                except Exception:
                    logger.exception(
                        "Compensation for %s failed -- requires manual intervention",
                        step.name,
                    )


    class OrderSaga:
        def __init__(
            self,
            order_service,
            inventory_client,
            payment_client,
            event_bus,
        ) -> None:
            self._order_service = order_service
            self._inventory_client = inventory_client
            self._payment_client = payment_client
            self._event_bus = event_bus

        async def execute(self, order_data: dict[str, Any]) -> Any:
            saga = SagaExecution()
            state: dict[str, Any] = {
                "order": None,
                "reservation": None,
                "payment": None,
            }

            async def create_order() -> Any:
                state["order"] = await self._order_service.create_order(order_data)
                return state["order"]

            async def cancel_order() -> None:
                if state["order"] is not None:
                    await self._order_service.cancel_order(state["order"].id)

            async def reserve_inventory() -> Any:
                state["reservation"] = await self._inventory_client.reserve(
                    order_id=state["order"].id, items=state["order"].items
                )
                return state["reservation"]

            async def release_inventory() -> None:
                if state["reservation"] is not None:
                    await self._inventory_client.release(state["reservation"]["id"])

            async def process_payment() -> Any:
                state["payment"] = await self._payment_client.charge(
                    order_id=state["order"].id,
                    user_id=state["order"].user_id,
                    amount=state["order"].total_amount,
                )
                return state["payment"]

            async def refund_payment() -> None:
                if state["payment"] is not None:
                    await self._payment_client.refund(state["payment"]["id"])

            async def confirm_order() -> Any:
                await self._order_service.confirm_order(state["order"].id)
                await self._inventory_client.commit(state["reservation"]["id"])
                return state["order"]

            async def noop() -> None:
                # Nothing to compensate -- earlier steps handle it.
                return None

            saga.add_step(SagaStep("create_order", create_order, cancel_order))
            saga.add_step(
                SagaStep("reserve_inventory", reserve_inventory, release_inventory)
            )
            saga.add_step(SagaStep("process_payment", process_payment, refund_payment))
            saga.add_step(SagaStep("confirm_order", confirm_order, noop))

            try:
                await saga.run()
                await self._event_bus.publish(
                    "order.completed",
                    {
                        "order_id": str(state["order"].id),
                        "user_id": state["order"].user_id,
                    },
                )
                return state["order"]
            except Exception as exc:
                logger.exception("Order saga failed")
                await self._event_bus.publish(
                    "order.failed",
                    {
                        "order_id": (
                            str(state["order"].id) if state["order"] else None
                        ),
                        "user_id": order_data.get("user_id"),
                        "reason": str(exc),
                    },
                )
                raise
    ```
  </Tab>
</Tabs>

### Outbox Pattern

The Outbox pattern solves one of the trickiest problems in distributed systems: how do you atomically update your database AND publish an event? If you write to the database and then publish to Kafka, what happens when your process crashes between those two operations? The database has the update but the event was never sent -- now downstream services are permanently out of sync.

The solution is elegant: write the event to an "outbox" table **in the same database transaction** as your business data. A separate background process reads the outbox and publishes events to the message broker. Since the business data and the outbox entry are in the same transaction, they either both succeed or both fail. Guaranteed consistency.

One subtle but important point: the outbox gives you **at-least-once** delivery, not exactly-once. If the publisher crashes after sending the event to Kafka but before marking the outbox row as published, the event will be re-sent. Consumers must be idempotent -- typically by including a message ID that consumers track in their own "inbox" table and ignoring duplicates. This is the distributed systems version of "measure twice, cut once": plan for replays from day one, because you will have them.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // database/models/outbox.js
    const OutboxMessage = sequelize.define('OutboxMessage', {
      id: {
        type: DataTypes.UUID,
        defaultValue: DataTypes.UUIDV4,
        primaryKey: true
      },
      aggregateType: {
        type: DataTypes.STRING,
        allowNull: false
      },
      aggregateId: {
        type: DataTypes.STRING,
        allowNull: false
      },
      eventType: {
        type: DataTypes.STRING,
        allowNull: false
      },
      payload: {
        type: DataTypes.JSONB,
        allowNull: false
      },
      published: {
        type: DataTypes.BOOLEAN,
        defaultValue: false
      },
      publishedAt: {
        type: DataTypes.DATE
      },
      retryCount: {
        type: DataTypes.INTEGER,
        defaultValue: 0
      }
    }, {
      indexes: [
        { fields: ['published', 'createdAt'] }
      ]
    });

    // services/order-service.js
    class OrderService {
      async createOrder(orderData, transaction) {
        // Create order and outbox message in same transaction
        const t = transaction || await sequelize.transaction();

        try {
          const order = await Order.create(orderData, { transaction: t });

          // Write to outbox in same transaction
          await OutboxMessage.create({
            aggregateType: 'Order',
            aggregateId: order.id,
            eventType: 'order.created',
            payload: {
              orderId: order.id,
              userId: order.userId,
              items: order.items,
              totalAmount: order.totalAmount,
              createdAt: order.createdAt
            }
          }, { transaction: t });

          await t.commit();
          return order;

        } catch (error) {
          await t.rollback();
          throw error;
        }
      }
    }

    // workers/outbox-publisher.js
    class OutboxPublisher {
      constructor(eventBus) {
        this.eventBus = eventBus;
        this.batchSize = 100;
        this.pollInterval = 1000;  // 1 second
      }

      async start() {
        console.log('Outbox publisher started');
        
        while (true) {
          try {
            await this.publishPendingMessages();
          } catch (error) {
            console.error('Outbox publisher error:', error);
          }
          
          await new Promise(r => setTimeout(r, this.pollInterval));
        }
      }

      async publishPendingMessages() {
        const messages = await OutboxMessage.findAll({
          where: {
            published: false,
            retryCount: { [Op.lt]: 5 }
          },
          order: [['createdAt', 'ASC']],
          limit: this.batchSize
        });

        for (const message of messages) {
          try {
            await this.eventBus.publish(message.eventType, {
              ...message.payload,
              _metadata: {
                messageId: message.id,
                aggregateType: message.aggregateType,
                aggregateId: message.aggregateId,
                timestamp: message.createdAt
              }
            });

            await message.update({
              published: true,
              publishedAt: new Date()
            });

          } catch (error) {
            console.error(`Failed to publish message ${message.id}:`, error);
            await message.increment('retryCount');
          }
        }
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # order_service/models/outbox.py
    import asyncio
    import logging
    import uuid
    from datetime import datetime
    from typing import Any, Optional

    from sqlalchemy import String, Boolean, Integer, DateTime, Index, func, select
    from sqlalchemy.dialects.postgresql import UUID, JSONB
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.orm import Mapped, mapped_column

    from order_service.database import Base

    logger = logging.getLogger(__name__)


    class OutboxMessage(Base):
        __tablename__ = "outbox_messages"

        id: Mapped[uuid.UUID] = mapped_column(
            UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
        )
        aggregate_type: Mapped[str] = mapped_column(String, nullable=False)
        aggregate_id: Mapped[str] = mapped_column(String, nullable=False)
        event_type: Mapped[str] = mapped_column(String, nullable=False)
        payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
        published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
        published_at: Mapped[Optional[datetime]] = mapped_column(
            DateTime(timezone=True), nullable=True
        )
        retry_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
        created_at: Mapped[datetime] = mapped_column(
            DateTime(timezone=True), server_default=func.now(), nullable=False
        )

        __table_args__ = (
            Index("ix_outbox_unpublished", "published", "created_at"),
        )


    # order_service/services/order_service.py
    from order_service.database import Order, OrderStatus


    class OrderService:
        def __init__(self, session_factory) -> None:
            self._session_factory = session_factory

        async def create_order(self, order_data: dict[str, Any]) -> Order:
            # Business data AND outbox row are written in the same transaction.
            async with self._session_factory() as session:  # type: AsyncSession
                async with session.begin():
                    order = Order(
                        user_id=order_data["user_id"],
                        items=order_data["items"],
                        total_amount=order_data["total_amount"],
                        status=OrderStatus.pending,
                    )
                    session.add(order)
                    await session.flush()

                    session.add(
                        OutboxMessage(
                            aggregate_type="Order",
                            aggregate_id=str(order.id),
                            event_type="order.created",
                            payload={
                                "order_id": str(order.id),
                                "user_id": order.user_id,
                                "items": order.items,
                                "total_amount": str(order.total_amount),
                                "created_at": order.created_at.isoformat(),
                            },
                        )
                    )
                # Commit happens at session.begin() exit -- either both rows
                # are persisted or neither is.
                return order


    # order_service/workers/outbox_publisher.py
    class OutboxPublisher:
        def __init__(
            self,
            session_factory,
            event_bus,
            batch_size: int = 100,
            poll_interval: float = 1.0,
            max_retries: int = 5,
        ) -> None:
            self._session_factory = session_factory
            self._event_bus = event_bus
            self._batch_size = batch_size
            self._poll_interval = poll_interval
            self._max_retries = max_retries
            self._stopped = asyncio.Event()

        async def start(self) -> None:
            logger.info("Outbox publisher started")
            while not self._stopped.is_set():
                try:
                    await self._publish_pending()
                except Exception:
                    logger.exception("Outbox publisher error")
                try:
                    await asyncio.wait_for(
                        self._stopped.wait(), timeout=self._poll_interval
                    )
                except asyncio.TimeoutError:
                    pass

        def stop(self) -> None:
            self._stopped.set()

        async def _publish_pending(self) -> None:
            async with self._session_factory() as session:  # type: AsyncSession
                # SKIP LOCKED lets multiple publisher replicas run without
                # double-sending the same row.
                stmt = (
                    select(OutboxMessage)
                    .where(OutboxMessage.published.is_(False))
                    .where(OutboxMessage.retry_count < self._max_retries)
                    .order_by(OutboxMessage.created_at.asc())
                    .limit(self._batch_size)
                    .with_for_update(skip_locked=True)
                )
                async with session.begin():
                    result = await session.execute(stmt)
                    messages: list[OutboxMessage] = list(result.scalars())

                    for msg in messages:
                        try:
                            await self._event_bus.publish(
                                msg.event_type,
                                {
                                    **msg.payload,
                                    "_metadata": {
                                        "message_id": str(msg.id),
                                        "aggregate_type": msg.aggregate_type,
                                        "aggregate_id": msg.aggregate_id,
                                        "timestamp": msg.created_at.isoformat(),
                                    },
                                },
                            )
                            msg.published = True
                            msg.published_at = datetime.utcnow()
                        except Exception:
                            logger.exception(
                                "Failed to publish message %s", msg.id
                            )
                            msg.retry_count += 1
    ```
  </Tab>
</Tabs>

The corresponding Alembic migration for creating the outbox table:

```python theme={null}
# alembic/versions/2026_04_01_add_outbox_table.py
"""add outbox table

Revision ID: 2026_04_01_outbox
Revises: 2026_03_15_orders
Create Date: 2026-04-01 10:00:00
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

revision = "2026_04_01_outbox"
down_revision = "2026_03_15_orders"
branch_labels = None
depends_on = None


def upgrade() -> None:
    op.create_table(
        "outbox_messages",
        sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
        sa.Column("aggregate_type", sa.String(), nullable=False),
        sa.Column("aggregate_id", sa.String(), nullable=False),
        sa.Column("event_type", sa.String(), nullable=False),
        sa.Column("payload", postgresql.JSONB(), nullable=False),
        sa.Column(
            "published", sa.Boolean(), nullable=False, server_default=sa.false()
        ),
        sa.Column("published_at", sa.DateTime(timezone=True), nullable=True),
        sa.Column(
            "retry_count", sa.Integer(), nullable=False, server_default="0"
        ),
        sa.Column(
            "created_at",
            sa.DateTime(timezone=True),
            nullable=False,
            server_default=sa.func.now(),
        ),
    )
    op.create_index(
        "ix_outbox_unpublished",
        "outbox_messages",
        ["published", "created_at"],
    )


def downgrade() -> None:
    op.drop_index("ix_outbox_unpublished", table_name="outbox_messages")
    op.drop_table("outbox_messages")
```

***

## Data Replication Strategies

### Change Data Capture (CDC)

CDC is the outbox pattern's more ambitious cousin. Instead of writing events to an application-managed outbox table, CDC reads the database's own transaction log (Postgres WAL, MySQL binlog, MongoDB oplog) and turns every row change into an event. Tools like Debezium do this transparently -- your application code just writes to the database as normal, and Debezium streams every INSERT/UPDATE/DELETE into Kafka.

The big advantage over outbox: zero application code. You do not need to maintain an outbox table, write publishers, or remember to add events for new tables. The big trade-off: you are now coupled to the database's internal representation. Column renames, schema changes, and database engine migrations all ripple out to downstream consumers. You also get **every** change, including ones that should not have been events (internal columns, admin corrections, migration backfills) -- so consumers need to filter.

The pattern shines when you are extracting from a system you do not own (legacy monolith, vendor database), or when you want to build read models from an existing transactional database without touching application code. For new systems, I usually prefer the outbox pattern because it makes event contracts explicit rather than leaking schema details.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Using Debezium for CDC (conceptual Node.js consumer)

    const { Kafka } = require('kafkajs');

    class CDCConsumer {
      constructor(kafkaConfig) {
        this.kafka = new Kafka(kafkaConfig);
        this.consumer = this.kafka.consumer({ groupId: 'cdc-consumer' });
      }

      async start() {
        await this.consumer.connect();
        
        // Subscribe to CDC topics (Debezium format)
        await this.consumer.subscribe({
          topics: [
            'dbserver1.inventory.products',  // CDC topic for products table
            'dbserver1.users.users'          // CDC topic for users table
          ]
        });

        await this.consumer.run({
          eachMessage: async ({ topic, partition, message }) => {
            const change = JSON.parse(message.value.toString());
            await this.handleChange(topic, change);
          }
        });
      }

      async handleChange(topic, change) {
        // Debezium message format
        const { before, after, op, source } = change.payload;
        
        /*
          op values:
          'c' = create
          'u' = update
          'd' = delete
          'r' = read (snapshot)
        */
        
        switch (op) {
          case 'c':
            await this.handleCreate(topic, after);
            break;
          case 'u':
            await this.handleUpdate(topic, before, after);
            break;
          case 'd':
            await this.handleDelete(topic, before);
            break;
        }
      }

      async handleCreate(topic, data) {
        if (topic.includes('products')) {
          // Update local product cache/read model
          await this.productCache.set(data.id, {
            id: data.id,
            name: data.name,
            price: data.price / 100,  // Convert cents to dollars
            stock: data.stock
          });
        }
      }

      async handleUpdate(topic, before, after) {
        if (topic.includes('products')) {
          // Update local cache
          await this.productCache.set(after.id, {
            id: after.id,
            name: after.name,
            price: after.price / 100,
            stock: after.stock
          });

          // Check for significant changes
          if (before.price !== after.price) {
            await this.eventBus.publish('product.price_changed', {
              productId: after.id,
              oldPrice: before.price / 100,
              newPrice: after.price / 100
            });
          }
        }
      }

      async handleDelete(topic, before) {
        if (topic.includes('products')) {
          await this.productCache.delete(before.id);
        }
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Using Debezium for CDC (conceptual aiokafka consumer).

    import json
    import logging
    from typing import Any, Optional

    from aiokafka import AIOKafkaConsumer
    import redis.asyncio as redis

    logger = logging.getLogger(__name__)


    class CDCConsumer:
        def __init__(
            self,
            bootstrap_servers: str,
            cache: redis.Redis,
            event_bus,
            group_id: str = "cdc-consumer",
        ) -> None:
            self._bootstrap = bootstrap_servers
            self._group_id = group_id
            self._cache = cache
            self._event_bus = event_bus
            self._consumer: Optional[AIOKafkaConsumer] = None

        async def start(self) -> None:
            self._consumer = AIOKafkaConsumer(
                "dbserver1.inventory.products",  # CDC topic for products table
                "dbserver1.users.users",  # CDC topic for users table
                bootstrap_servers=self._bootstrap,
                group_id=self._group_id,
                enable_auto_commit=False,
                value_deserializer=lambda v: json.loads(v.decode("utf-8")),
            )
            await self._consumer.start()
            try:
                async for msg in self._consumer:
                    await self._handle_change(msg.topic, msg.value)
                    await self._consumer.commit()
            finally:
                await self._consumer.stop()

        async def _handle_change(self, topic: str, change: dict[str, Any]) -> None:
            # Debezium envelope: {schema, payload: {before, after, op, source, ts_ms}}
            payload = change.get("payload", {})
            before = payload.get("before")
            after = payload.get("after")
            op = payload.get("op")
            # op values: c=create, u=update, d=delete, r=read/snapshot
            if op == "c":
                await self._handle_create(topic, after)
            elif op == "u":
                await self._handle_update(topic, before, after)
            elif op == "d":
                await self._handle_delete(topic, before)

        async def _handle_create(self, topic: str, data: dict[str, Any]) -> None:
            if "products" in topic:
                await self._cache.set(
                    f"product:{data['id']}",
                    json.dumps(
                        {
                            "id": data["id"],
                            "name": data["name"],
                            "price": data["price"] / 100,  # cents -> dollars
                            "stock": data["stock"],
                        }
                    ),
                )

        async def _handle_update(
            self,
            topic: str,
            before: dict[str, Any],
            after: dict[str, Any],
        ) -> None:
            if "products" in topic:
                await self._cache.set(
                    f"product:{after['id']}",
                    json.dumps(
                        {
                            "id": after["id"],
                            "name": after["name"],
                            "price": after["price"] / 100,
                            "stock": after["stock"],
                        }
                    ),
                )
                if before["price"] != after["price"]:
                    await self._event_bus.publish(
                        "product.price_changed",
                        {
                            "product_id": after["id"],
                            "old_price": before["price"] / 100,
                            "new_price": after["price"] / 100,
                        },
                    )

        async def _handle_delete(self, topic: str, before: dict[str, Any]) -> None:
            if "products" in topic:
                await self._cache.delete(f"product:{before['id']}")
    ```
  </Tab>
</Tabs>

### CQRS (Command Query Responsibility Segregation)

CQRS is the observation that reads and writes want fundamentally different shapes. Writes want normalized data (one source of truth per fact, minimal update anomalies) and transactional invariants (decrement stock only if available >= requested). Reads want denormalized data (everything for one screen in one query, no joins) and aggregations (dashboards, rollups). A single data model that is optimal for both is rare -- usually it is a compromise that is good at neither.

CQRS separates them explicitly. The write model (command side) owns the business rules and enforces invariants. The read model (query side) is a projection -- often in a different store entirely, like Elasticsearch for search or Redis for counters -- that is updated asynchronously from events. You get to scale them independently, optimize them independently, and even use different databases.

The CAP trade-off is explicit here: the read model is eventually consistent with the write model. A user who places an order may not see it immediately in their order history (read model has not caught up). Most users tolerate sub-second lag. If they cannot, you either read from the write model for that specific query (bypass CQRS) or accept the complexity. The reason CQRS is not default everywhere: for simple CRUD services, it is massive over-engineering. Apply it where read patterns diverge significantly from write patterns -- search, dashboards, complex reporting.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Separate read and write models

    // Write side - commands
    class OrderCommandService {
      constructor(orderRepository, eventStore) {
        this.orderRepository = orderRepository;
        this.eventStore = eventStore;
      }

      async createOrder(command) {
        // Validate command
        this.validateCreateOrder(command);

        // Create order aggregate
        const order = new OrderAggregate();
        order.create(command);

        // Save to event store
        await this.eventStore.save(order.id, order.uncommittedEvents);

        // Publish events
        for (const event of order.uncommittedEvents) {
          await this.eventBus.publish(event.type, event);
        }

        return order.id;
      }

      async cancelOrder(orderId, reason) {
        // Load aggregate from event store
        const events = await this.eventStore.getEvents(orderId);
        const order = OrderAggregate.fromEvents(events);

        // Apply command
        order.cancel(reason);

        // Save new events
        await this.eventStore.save(orderId, order.uncommittedEvents);

        // Publish events
        for (const event of order.uncommittedEvents) {
          await this.eventBus.publish(event.type, event);
        }
      }
    }

    // Read side - queries (optimized for reads)
    class OrderQueryService {
      constructor(readDatabase) {
        this.db = readDatabase;  // Optimized read database (could be Elasticsearch, Redis, etc.)
      }

      async getOrderById(orderId) {
        return this.db.orders.findOne({ id: orderId });
      }

      async getOrdersByUser(userId, options = {}) {
        const { page = 1, limit = 20, status } = options;
        
        const query = { userId };
        if (status) query.status = status;

        return this.db.orders.find(query)
          .sort({ createdAt: -1 })
          .skip((page - 1) * limit)
          .limit(limit);
      }

      async getOrderStats(userId) {
        // Aggregate query on read model
        return this.db.orders.aggregate([
          { $match: { userId } },
          {
            $group: {
              _id: '$status',
              count: { $sum: 1 },
              totalAmount: { $sum: '$totalAmount' }
            }
          }
        ]);
      }
    }

    // Projection - updates read model from events
    class OrderProjection {
      constructor(readDatabase) {
        this.db = readDatabase;
      }

      async handleEvent(event) {
        switch (event.type) {
          case 'order.created':
            await this.onOrderCreated(event);
            break;
          case 'order.confirmed':
            await this.onOrderConfirmed(event);
            break;
          case 'order.shipped':
            await this.onOrderShipped(event);
            break;
          case 'order.cancelled':
            await this.onOrderCancelled(event);
            break;
        }
      }

      async onOrderCreated(event) {
        await this.db.orders.insertOne({
          id: event.orderId,
          userId: event.userId,
          items: event.items,
          totalAmount: event.totalAmount,
          status: 'pending',
          createdAt: event.timestamp,
          updatedAt: event.timestamp
        });
      }

      async onOrderShipped(event) {
        await this.db.orders.updateOne(
          { id: event.orderId },
          {
            $set: {
              status: 'shipped',
              shippedAt: event.timestamp,
              trackingNumber: event.trackingNumber,
              updatedAt: event.timestamp
            }
          }
        );
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Separate read and write models.

    from datetime import datetime
    from typing import Any, Optional

    from motor.motor_asyncio import AsyncIOMotorDatabase


    # -------- Write side (commands) --------
    class OrderCommandService:
        def __init__(self, event_store, event_bus) -> None:
            self._event_store = event_store
            self._event_bus = event_bus

        async def create_order(self, command: dict[str, Any]) -> str:
            self._validate_create(command)

            order = OrderAggregate()
            order.create(command)

            await self._event_store.save(order.id, order.uncommitted_events)
            for event in order.uncommitted_events:
                await self._event_bus.publish(event["type"], event)
            return order.id

        async def cancel_order(self, order_id: str, reason: str) -> None:
            events = await self._event_store.get_events(order_id)
            order = OrderAggregate.from_events(events)
            order.cancel(reason)
            await self._event_store.save(order_id, order.uncommitted_events)
            for event in order.uncommitted_events:
                await self._event_bus.publish(event["type"], event)

        def _validate_create(self, command: dict[str, Any]) -> None:
            if not command.get("user_id"):
                raise ValueError("user_id is required")
            if not command.get("items"):
                raise ValueError("items is required")


    # -------- Read side (queries) --------
    class OrderQueryService:
        """Queries hit a denormalized read model (MongoDB here, could be
        Elasticsearch, Redis, Postgres materialized view, etc.)."""

        def __init__(self, read_db: AsyncIOMotorDatabase) -> None:
            self._db = read_db

        async def get_order_by_id(self, order_id: str) -> Optional[dict[str, Any]]:
            return await self._db.orders.find_one({"id": order_id})

        async def get_orders_by_user(
            self,
            user_id: str,
            page: int = 1,
            limit: int = 20,
            status: Optional[str] = None,
        ) -> list[dict[str, Any]]:
            query: dict[str, Any] = {"user_id": user_id}
            if status is not None:
                query["status"] = status
            cursor = (
                self._db.orders.find(query)
                .sort("created_at", -1)
                .skip((page - 1) * limit)
                .limit(limit)
            )
            return [doc async for doc in cursor]

        async def get_order_stats(self, user_id: str) -> list[dict[str, Any]]:
            pipeline = [
                {"$match": {"user_id": user_id}},
                {
                    "$group": {
                        "_id": "$status",
                        "count": {"$sum": 1},
                        "total_amount": {"$sum": "$total_amount"},
                    }
                },
            ]
            return [doc async for doc in self._db.orders.aggregate(pipeline)]


    # -------- Projection (event -> read model) --------
    class OrderProjection:
        def __init__(self, read_db: AsyncIOMotorDatabase) -> None:
            self._db = read_db

        async def handle_event(self, event: dict[str, Any]) -> None:
            handlers = {
                "order.created": self._on_order_created,
                "order.confirmed": self._on_order_confirmed,
                "order.shipped": self._on_order_shipped,
                "order.cancelled": self._on_order_cancelled,
            }
            handler = handlers.get(event["type"])
            if handler is not None:
                await handler(event)

        async def _on_order_created(self, event: dict[str, Any]) -> None:
            await self._db.orders.insert_one(
                {
                    "id": event["order_id"],
                    "user_id": event["user_id"],
                    "items": event["items"],
                    "total_amount": event["total_amount"],
                    "status": "pending",
                    "created_at": event["timestamp"],
                    "updated_at": event["timestamp"],
                }
            )

        async def _on_order_confirmed(self, event: dict[str, Any]) -> None:
            await self._db.orders.update_one(
                {"id": event["order_id"]},
                {"$set": {"status": "confirmed", "updated_at": event["timestamp"]}},
            )

        async def _on_order_shipped(self, event: dict[str, Any]) -> None:
            await self._db.orders.update_one(
                {"id": event["order_id"]},
                {
                    "$set": {
                        "status": "shipped",
                        "shipped_at": event["timestamp"],
                        "tracking_number": event["tracking_number"],
                        "updated_at": event["timestamp"],
                    }
                },
            )

        async def _on_order_cancelled(self, event: dict[str, Any]) -> None:
            await self._db.orders.update_one(
                {"id": event["order_id"]},
                {
                    "$set": {
                        "status": "cancelled",
                        "cancelled_at": event["timestamp"],
                        "cancel_reason": event.get("reason"),
                        "updated_at": event["timestamp"],
                    }
                },
            )
    ```
  </Tab>
</Tabs>

***

## Zero-Downtime Database Migrations

<Warning>
  **Caveats and Common Pitfalls with Cross-Service Schema Migrations**

  Expand-contract is well understood for a single service. The real migration pain in microservices comes from coordinating schema changes that ripple across services.

  * **"Big bang" coordinated releases fail predictably.** The temptation when a change touches 4 services is to ship them all in one release train. In practice, one team's change has a bug, the release is blocked, and the other three teams are stuck with half-migrated code in main. Your "coordinated" release becomes an indefinite freeze.
  * **Contract-breaking changes disguised as additive ones.** A team adds a new required field to an event payload and "expands" the producer. But every consumer that was not updated drops or crashes on the new field. Additive-looking schema changes can be contract breaks if consumers do unexpected validation (strict mode JSON schema, Avro without default values).
  * **Expand phase never contracts.** The team adds the new column, deploys producers and consumers to use it, then never comes back to remove the old one. Two years later, the system is maintaining both columns forever, and new engineers do not know which one is real. The "temporary" dual-write becomes permanent technical debt.
  * **Migration tooling does not version with application code.** The Flyway/Alembic script runs at deploy time, but the deploy rollback path does not automatically reverse the migration. A bad rollback leaves the schema migrated but the code reverted, creating a state nobody tested.
</Warning>

<Tip>
  **Solutions and Patterns for Coordinated Schema Migrations**

  * **Enforce backward-and-forward compatibility at the contract layer.** Use a schema registry (Confluent Schema Registry for Avro, Buf for Protobuf) that mechanically rejects breaking changes at CI time. If a producer tries to remove a field or change a type, the build fails. This converts coordination problems into individual-team problems.
  * **Deploy consumers first, producers second.** When rolling out an additive field, deploy every consumer with the ability to handle the new field *before* any producer emits it. This lets producers roll out independently without coordinating on timing. When removing a field, reverse: deprecate in producers first, wait until no traffic uses the field, then remove from consumers.
  * **Track contract versions as first-class artifacts.** Every service exposes its input and output schemas at `/schema` or similar. A CI job consumes all the registries and produces a compatibility graph. You should be able to ask "what breaks if Service X removes field Y?" and get a mechanical answer.
  * **Always schedule the contract phase.** When you start an expand, create the JIRA ticket for the contract phase right then, with a date (typically 30 days later). Assign it to a real human. Track the ratio of open expand-phase migrations to contract-phase migrations in your quarterly health dashboard.
  * **Couple migrations to feature flags, not deploys.** The migration enables a capability; the feature flag enables the use of that capability. Rolling back the flag is instant; rolling back the schema is never instant. This separation keeps you recoverable.
</Tip>

### Expand-Contract Pattern

Why is a one-shot `ALTER TABLE ADD COLUMN ... NOT NULL` dangerous in production? Because for the moment between the migration running and the new code deploying, your old application code is still running against a schema it does not know about. If the column is `NOT NULL` with no default, old inserts break. If the migration takes a table lock (which many legacy databases do for certain ALTER operations), every query blocks until it completes. At scale, this is an outage.

The expand-contract pattern (sometimes called "parallel change") inverts the dependency by splitting a schema change across multiple deploys. Each deploy is backward compatible with the previous one, so at no point do you need to coordinate "database and application update together." You add the new thing (expand), move traffic to it gradually, then remove the old thing (contract). Each phase is safe to roll back independently.

This is one of those patterns that feels like overkill when you are three engineers on a startup but becomes non-negotiable at scale. The first time you take down production with a careless ALTER, you will understand why. The trade-off is calendar time -- a rename that would take 5 minutes in a monolith takes 2-4 weeks in a mature microservices system -- in exchange for zero downtime and full reversibility.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // migrations/expand-contract/add-email-verified.js

    /*
     * Goal: Add email_verified column to users table
     * 
     * Phase 1: EXPAND (add column, nullable)
     * Phase 2: MIGRATE (backfill data)
     * Phase 3: CONTRACT (make non-nullable, remove old logic)
     */

    // Phase 1: Expand - Add nullable column
    const phase1_expand = {
      async up(queryInterface, Sequelize) {
        await queryInterface.addColumn('users', 'email_verified', {
          type: Sequelize.BOOLEAN,
          allowNull: true,  // Nullable initially
          defaultValue: null
        });
      },
      
      async down(queryInterface) {
        await queryInterface.removeColumn('users', 'email_verified');
      }
    };

    // Phase 2: Update application to write to new column
    // Deploy code that writes to both old and new columns

    class UserService {
      async updateEmailVerification(userId, verified) {
        // Write to new column
        await User.update(
          { email_verified: verified },
          { where: { id: userId } }
        );
        
        // Also update any legacy fields if they exist
        // ...
      }
    }

    // Phase 3: Backfill - Migrate existing data
    const phase3_backfill = {
      async up(queryInterface) {
        // Backfill in batches to avoid locking
        let offset = 0;
        const batchSize = 1000;
        
        while (true) {
          const [results] = await queryInterface.sequelize.query(`
            UPDATE users 
            SET email_verified = CASE 
              WHEN verified_at IS NOT NULL THEN true 
              ELSE false 
            END
            WHERE email_verified IS NULL
            LIMIT ${batchSize}
          `);
          
          if (results.affectedRows === 0) break;
          
          console.log(`Backfilled ${offset + results.affectedRows} rows`);
          offset += batchSize;
          
          // Small delay to reduce database load
          await new Promise(r => setTimeout(r, 100));
        }
      }
    };

    // Phase 4: Contract - Make non-nullable after backfill complete
    const phase4_contract = {
      async up(queryInterface, Sequelize) {
        // Verify all rows have values
        const [nullCount] = await queryInterface.sequelize.query(`
          SELECT COUNT(*) as count FROM users WHERE email_verified IS NULL
        `);
        
        if (nullCount[0].count > 0) {
          throw new Error('Cannot contract: null values still exist');
        }
        
        // Make column non-nullable
        await queryInterface.changeColumn('users', 'email_verified', {
          type: Sequelize.BOOLEAN,
          allowNull: false,
          defaultValue: false
        });
      }
    };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Alembic migrations for the expand-contract pattern.
    # Goal: Add email_verified column to users table without downtime.
    #
    # Phase 1: EXPAND   - add column, nullable
    # Phase 2: DEPLOY   - ship application code that writes both columns
    # Phase 3: BACKFILL - fill existing rows in batches
    # Phase 4: CONTRACT - set NOT NULL after verification

    # alembic/versions/2026_04_10_expand_add_email_verified.py
    """phase 1: add email_verified (nullable)

    Revision ID: 2026_04_10_expand
    Revises: 2026_04_01_outbox
    """
    import sqlalchemy as sa
    from alembic import op

    revision = "2026_04_10_expand"
    down_revision = "2026_04_01_outbox"


    def upgrade() -> None:
        op.add_column(
            "users",
            sa.Column("email_verified", sa.Boolean(), nullable=True),
        )


    def downgrade() -> None:
        op.drop_column("users", "email_verified")


    # -----------------------------------------------------------------------
    # Application code deployed between phase 1 and phase 3:
    # writes the new column in addition to existing legacy fields.
    # -----------------------------------------------------------------------
    from sqlalchemy import update
    from sqlalchemy.ext.asyncio import AsyncSession

    from user_service.database import User


    class UserService:
        def __init__(self, session_factory) -> None:
            self._session_factory = session_factory

        async def update_email_verification(
            self, user_id: str, verified: bool
        ) -> None:
            async with self._session_factory() as session:  # type: AsyncSession
                async with session.begin():
                    await session.execute(
                        update(User)
                        .where(User.id == user_id)
                        .values(email_verified=verified)
                    )


    # alembic/versions/2026_04_17_backfill_email_verified.py
    """phase 3: backfill email_verified in batches

    Revision ID: 2026_04_17_backfill
    Revises: 2026_04_10_expand
    """
    import time
    import sqlalchemy as sa
    from alembic import op

    revision = "2026_04_17_backfill"
    down_revision = "2026_04_10_expand"

    BATCH_SIZE = 1000


    def upgrade() -> None:
        conn = op.get_bind()
        while True:
            result = conn.execute(
                sa.text(
                    """
                    WITH batch AS (
                        SELECT id FROM users
                        WHERE email_verified IS NULL
                        ORDER BY id
                        LIMIT :batch_size
                        FOR UPDATE SKIP LOCKED
                    )
                    UPDATE users SET email_verified = (verified_at IS NOT NULL)
                    WHERE id IN (SELECT id FROM batch)
                    """
                ),
                {"batch_size": BATCH_SIZE},
            )
            if result.rowcount == 0:
                break
            # Small delay to reduce database load.
            time.sleep(0.1)


    def downgrade() -> None:
        # Backfill is not reversible -- downgrade is a no-op on purpose.
        pass


    # alembic/versions/2026_04_24_contract_email_verified.py
    """phase 4: make email_verified NOT NULL

    Revision ID: 2026_04_24_contract
    Revises: 2026_04_17_backfill
    """
    import sqlalchemy as sa
    from alembic import op

    revision = "2026_04_24_contract"
    down_revision = "2026_04_17_backfill"


    def upgrade() -> None:
        conn = op.get_bind()
        null_count = conn.execute(
            sa.text("SELECT COUNT(*) FROM users WHERE email_verified IS NULL")
        ).scalar_one()
        if null_count > 0:
            raise RuntimeError(
                f"Cannot contract: {null_count} rows still have NULL email_verified"
            )
        op.alter_column(
            "users",
            "email_verified",
            existing_type=sa.Boolean(),
            nullable=False,
            server_default=sa.false(),
        )


    def downgrade() -> None:
        op.alter_column(
            "users",
            "email_verified",
            existing_type=sa.Boolean(),
            nullable=True,
            server_default=None,
        )
    ```
  </Tab>
</Tabs>

### Renaming a Column

Renaming a column is the canonical example of why you need expand-contract thinking. Renames are "free" in a monolith -- one commit, one deploy -- but in a system with multiple services or replicas running the same code during a rolling deploy, a rename is actively dangerous. At the moment the migration runs, half your instances know the column as `name`, the other half know it as `full_name`, and every query from the wrong half errors.

The dual-write phase (writing to both columns) may feel wasteful, but it buys you independence: you can roll back the code change without touching the database, or roll back the database change without touching the code. At scale, that optionality is worth the extra writes. The critical discipline is the soak period between phases -- "deploy the dual-write, wait a week, verify all instances are on the new code, then backfill." Rushing the phases is how you end up with rows that have `full_name` but not `name`, breaking the old readers you forgot still existed.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Renaming user.name to user.full_name (zero-downtime)

    // Step 1: Add new column
    const step1 = {
      async up(queryInterface, Sequelize) {
        await queryInterface.addColumn('users', 'full_name', {
          type: Sequelize.STRING,
          allowNull: true
        });
      }
    };

    // Step 2: Deploy code that writes to BOTH columns
    class UserService {
      async updateName(userId, name) {
        await User.update(
          {
            name: name,       // Old column
            full_name: name   // New column
          },
          { where: { id: userId } }
        );
      }
    }

    // Step 3: Backfill new column
    const step3 = {
      async up(queryInterface) {
        await queryInterface.sequelize.query(`
          UPDATE users SET full_name = name WHERE full_name IS NULL
        `);
      }
    };

    // Step 4: Deploy code that reads from new column
    class UserService {
      async getUser(userId) {
        const user = await User.findByPk(userId);
        return {
          ...user.toJSON(),
          // Use new column, fallback to old
          fullName: user.full_name || user.name
        };
      }
    }

    // Step 5: Deploy code that ONLY writes to new column
    class UserService {
      async updateName(userId, name) {
        await User.update(
          { full_name: name },
          { where: { id: userId } }
        );
      }
    }

    // Step 6: Drop old column (after verifying all reads use new column)
    const step6 = {
      async up(queryInterface) {
        await queryInterface.removeColumn('users', 'name');
      }
    };
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Renaming users.name -> users.full_name (zero-downtime).
    #
    # Step 1: Alembic migration adds the new column (nullable).
    # Step 2: Deploy code that writes BOTH columns.
    # Step 3: Alembic migration backfills.
    # Step 4: Deploy code that reads new column, falls back to old.
    # Step 5: Deploy code that writes ONLY the new column.
    # Step 6: Alembic migration drops the old column.

    # alembic/versions/step1_add_full_name.py
    import sqlalchemy as sa
    from alembic import op

    revision = "step1_add_full_name"
    down_revision = "2026_04_24_contract"


    def upgrade() -> None:
        op.add_column("users", sa.Column("full_name", sa.String(), nullable=True))


    def downgrade() -> None:
        op.drop_column("users", "full_name")


    # Step 2: application writes both columns during the transition.
    from sqlalchemy import update
    from sqlalchemy.ext.asyncio import AsyncSession

    from user_service.database import User


    class UserServiceDualWrite:
        def __init__(self, session_factory) -> None:
            self._session_factory = session_factory

        async def update_name(self, user_id: str, name: str) -> None:
            async with self._session_factory() as session:  # type: AsyncSession
                async with session.begin():
                    await session.execute(
                        update(User)
                        .where(User.id == user_id)
                        .values(name=name, full_name=name)
                    )


    # alembic/versions/step3_backfill_full_name.py
    import sqlalchemy as sa
    from alembic import op

    revision = "step3_backfill_full_name"
    down_revision = "step1_add_full_name"


    def upgrade() -> None:
        op.execute(
            sa.text("UPDATE users SET full_name = name WHERE full_name IS NULL")
        )


    def downgrade() -> None:
        pass


    # Step 4: read new column, fall back to old.
    class UserServiceDualRead:
        def __init__(self, session_factory) -> None:
            self._session_factory = session_factory

        async def get_user(self, user_id: str) -> dict | None:
            async with self._session_factory() as session:
                user = await session.get(User, user_id)
                if user is None:
                    return None
                return {
                    "id": user.id,
                    "email": user.email,
                    # Prefer new column, fall back for rows written pre-step-2.
                    "full_name": user.full_name or user.name,
                }


    # Step 5: write only the new column.
    class UserServiceNewOnly:
        def __init__(self, session_factory) -> None:
            self._session_factory = session_factory

        async def update_name(self, user_id: str, name: str) -> None:
            async with self._session_factory() as session:
                async with session.begin():
                    await session.execute(
                        update(User)
                        .where(User.id == user_id)
                        .values(full_name=name)
                    )


    # alembic/versions/step6_drop_name.py
    import sqlalchemy as sa
    from alembic import op

    revision = "step6_drop_name"
    down_revision = "step3_backfill_full_name"


    def upgrade() -> None:
        op.drop_column("users", "name")


    def downgrade() -> None:
        # Bringing the column back is cheap, but existing data is gone.
        op.add_column("users", sa.Column("name", sa.String(), nullable=True))
    ```
  </Tab>
</Tabs>

***

## Polyglot Persistence

<Warning>
  **Caveats and Common Pitfalls with Polyglot Persistence**

  Polyglot persistence sounds great in a design doc and feels like a nightmare in year two. Here are the traps that turn "the right tool for the job" into a governance crisis:

  * **Operational sprawl without the team to support it.** Every database you add is another thing to back up, patch, monitor, tune, and page someone about at 3 AM. A 50-engineer company running PostgreSQL, MongoDB, Cassandra, Elasticsearch, Redis, Neo4j, and ClickHouse is not practicing polyglot persistence -- it is hoarding dependencies. Each engine needs a subject-matter expert who understands its failure modes, backup strategy, and upgrade path.
  * **Cross-cutting concerns get forgotten.** GDPR erasure requests, SOC 2 audit trails, PII classification, and encryption-at-rest policies all have to be implemented seven different ways. Teams almost universally underestimate this cost until a compliance audit arrives and they discover their MongoDB cluster stores unencrypted PII because "we thought that was handled at the application layer."
  * **No source-of-truth discipline.** When a product exists in MongoDB, Elasticsearch, and Redis, which one is authoritative? If you do not answer this on day one and write it down, engineers will start treating whichever store is "closest to their code" as the truth, and the three stores will drift apart silently. Drift is invisible until it causes a customer-facing bug.
  * **Skills silos become deployment bottlenecks.** The one engineer who understands Cassandra is on vacation. The Neo4j license renewal fell between two teams' JIRA boards. The ClickHouse cluster has not been patched in 18 months because nobody owns it. Polyglot persistence concentrates knowledge risk in ways that look invisible on an org chart.
</Warning>

<Tip>
  **Solutions and Patterns for Sustainable Polyglot Persistence**

  Treat polyglot persistence as a governance problem, not an architecture problem. The patterns that actually work:

  * **Maintain a data store registry.** A single source of truth (a wiki page, a YAML file in an infra repo, a Backstage catalog) that lists every datastore, its owning team, its classification (PII, financial, analytics), its SLA, its backup policy, and its on-call. If a store is not in the registry, it does not exist. Prune quarterly.
  * **Codify the "justification bar" for new stores.** A new database technology requires a written ADR that answers: "What access pattern do our existing stores not satisfy? What is the TCO over three years including staffing? Who will own this on-call?" Raising the bar keeps polyglot from becoming pathological.
  * **Declare source-of-truth precedence explicitly.** For every piece of data that lives in multiple stores, the architecture doc must name the authoritative store and the rebuild path for the others. If MongoDB is truth and Elasticsearch is derived, you must be able to rebuild the Elasticsearch index from MongoDB with a single command.
  * **Centralize cross-cutting concerns.** Build one team that owns the GDPR deletion pipeline, the backup verification tooling, the encryption-at-rest policy enforcement, and the audit log aggregation across all stores. Forcing each service team to reimplement these is both wasteful and unreliable.
  * **Default to fewer engines.** Most companies can run well on PostgreSQL plus one search engine plus one cache -- about three storage technologies total. Reach for a fourth only when you have concrete evidence the three cannot meet your requirements.
</Tip>

### Choosing the Right Database

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    POLYGLOT PERSISTENCE                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Service          │ Database        │ Why                                   │
│  ─────────────────┼─────────────────┼─────────────────────────────────────  │
│  User Service     │ PostgreSQL      │ ACID transactions, complex queries    │
│  Product Catalog  │ MongoDB         │ Flexible schema, document structure   │
│  Shopping Cart    │ Redis           │ Fast, ephemeral, TTL support          │
│  Search Service   │ Elasticsearch   │ Full-text search, faceting            │
│  Order Service    │ PostgreSQL      │ Transactions, consistency required    │
│  Analytics        │ ClickHouse      │ Columnar, fast aggregations           │
│  Session Store    │ Redis           │ Fast reads, TTL, distributed          │
│  Notifications    │ MongoDB         │ Flexible schema, time-series          │
│  Audit Logs       │ Cassandra       │ Write-heavy, time-series, scalable    │
│  Graph Relations  │ Neo4j           │ Relationship queries, recommendations │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

Why combine MongoDB, Elasticsearch, and Redis for a single Product Service? Because a product has three distinct access patterns that each want a different storage engine. Reads by ID dominate (show me product X) and want a key-value lookup -- Redis. Search and faceted filtering (blue running shoes under \$100) want an inverted index -- Elasticsearch. The source of truth for the product catalog wants flexible, nested documents -- MongoDB. Forcing any one engine to handle all three makes it bad at two of them.

The price you pay is consistency complexity. When a product is updated, you have to propagate that change to MongoDB (source of truth), Elasticsearch (search index), and Redis (cache). If one of those updates fails, the three stores drift. The pattern below uses synchronous dual-writes for simplicity, but in production this is exactly where you want the outbox pattern -- commit to MongoDB, write an outbox event, and let projections update Elasticsearch and Redis asynchronously. That way a Redis outage does not block a product update.

CAP again: by using MongoDB as the source of truth and treating the others as derived, you have declared the consistency hierarchy. If Elasticsearch is ever out of sync with MongoDB, MongoDB wins. If Redis has stale data, a miss falls back to MongoDB. This clarity is what makes polyglot persistence work -- every store has a defined role and a defined precedence.

<Tabs>
  <Tab title="Node.js">
    ```javascript theme={null}
    // Multi-database service example

    // services/product-service/databases/index.js
    class ProductDatabases {
      constructor() {
        this.mongo = null;      // Primary storage
        this.elastic = null;    // Search index
        this.redis = null;      // Cache
      }

      async connect() {
        // MongoDB for product data
        this.mongo = await mongoose.connect(process.env.MONGO_URL);
        
        // Elasticsearch for search
        this.elastic = new Client({
          node: process.env.ELASTICSEARCH_URL
        });
        
        // Redis for caching
        this.redis = createClient({
          url: process.env.REDIS_URL
        });
        await this.redis.connect();
      }
    }

    // Product service using multiple databases
    class ProductService {
      constructor(databases) {
        this.db = databases;
      }

      async createProduct(productData) {
        // Write to MongoDB (source of truth)
        const product = await Product.create(productData);
        
        // Index in Elasticsearch for search
        await this.db.elastic.index({
          index: 'products',
          id: product.id,
          body: {
            name: product.name,
            description: product.description,
            category: product.category,
            price: product.price,
            tags: product.tags
          }
        });
        
        // Invalidate cache
        await this.db.redis.del(`product:${product.id}`);
        
        return product;
      }

      async getProduct(productId) {
        // Try cache first
        const cached = await this.db.redis.get(`product:${productId}`);
        if (cached) {
          return JSON.parse(cached);
        }
        
        // Fetch from MongoDB
        const product = await Product.findById(productId);
        if (!product) return null;
        
        // Cache for 5 minutes
        await this.db.redis.setEx(
          `product:${productId}`,
          300,
          JSON.stringify(product)
        );
        
        return product;
      }

      async searchProducts(query, filters = {}) {
        // Use Elasticsearch for search
        const result = await this.db.elastic.search({
          index: 'products',
          body: {
            query: {
              bool: {
                must: [
                  {
                    multi_match: {
                      query,
                      fields: ['name^3', 'description', 'tags']
                    }
                  }
                ],
                filter: this.buildFilters(filters)
              }
            },
            aggs: {
              categories: { terms: { field: 'category.keyword' } },
              price_ranges: {
                range: {
                  field: 'price',
                  ranges: [
                    { to: 25 },
                    { from: 25, to: 50 },
                    { from: 50, to: 100 },
                    { from: 100 }
                  ]
                }
              }
            }
          }
        });
        
        return {
          hits: result.hits.hits.map(h => ({ id: h._id, ...h._source })),
          total: result.hits.total.value,
          facets: result.aggregations
        };
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={null}
    # Multi-database Product Service.

    import json
    import os
    from typing import Any, Optional

    from elasticsearch import AsyncElasticsearch
    from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
    import redis.asyncio as redis


    class ProductDatabases:
        def __init__(self) -> None:
            self.mongo: Optional[AsyncIOMotorDatabase] = None  # Primary storage
            self.elastic: Optional[AsyncElasticsearch] = None  # Search index
            self.redis: Optional[redis.Redis] = None  # Cache

        async def connect(self) -> None:
            mongo_client = AsyncIOMotorClient(os.environ["MONGO_URL"])
            self.mongo = mongo_client.get_default_database()

            self.elastic = AsyncElasticsearch(
                os.environ["ELASTICSEARCH_URL"]
            )

            self.redis = redis.from_url(
                os.environ["REDIS_URL"], decode_responses=True
            )
            await self.redis.ping()


    class ProductService:
        def __init__(self, dbs: ProductDatabases) -> None:
            self._dbs = dbs

        async def create_product(self, product_data: dict[str, Any]) -> dict[str, Any]:
            # Write to MongoDB (source of truth).
            result = await self._dbs.mongo.products.insert_one(product_data)
            product = {**product_data, "_id": str(result.inserted_id)}
            product_id = product.get("id") or product["_id"]

            # Index in Elasticsearch for search.
            await self._dbs.elastic.index(
                index="products",
                id=product_id,
                document={
                    "name": product["name"],
                    "description": product.get("description"),
                    "category": product.get("category"),
                    "price": product["price"],
                    "tags": product.get("tags", []),
                },
            )

            # Invalidate cache.
            await self._dbs.redis.delete(f"product:{product_id}")
            return product

        async def get_product(self, product_id: str) -> Optional[dict[str, Any]]:
            # Try cache first.
            cached = await self._dbs.redis.get(f"product:{product_id}")
            if cached is not None:
                return json.loads(cached)

            # Fall back to MongoDB.
            product = await self._dbs.mongo.products.find_one({"id": product_id})
            if product is None:
                return None
            product["_id"] = str(product["_id"])

            # Cache for 5 minutes.
            await self._dbs.redis.set(
                f"product:{product_id}", json.dumps(product, default=str), ex=300
            )
            return product

        async def search_products(
            self, query: str, filters: Optional[dict[str, Any]] = None
        ) -> dict[str, Any]:
            filters = filters or {}
            body = {
                "query": {
                    "bool": {
                        "must": [
                            {
                                "multi_match": {
                                    "query": query,
                                    "fields": ["name^3", "description", "tags"],
                                }
                            }
                        ],
                        "filter": self._build_filters(filters),
                    }
                },
                "aggs": {
                    "categories": {"terms": {"field": "category.keyword"}},
                    "price_ranges": {
                        "range": {
                            "field": "price",
                            "ranges": [
                                {"to": 25},
                                {"from": 25, "to": 50},
                                {"from": 50, "to": 100},
                                {"from": 100},
                            ],
                        }
                    },
                },
            }
            result = await self._dbs.elastic.search(index="products", body=body)
            return {
                "hits": [
                    {"id": h["_id"], **h["_source"]}
                    for h in result["hits"]["hits"]
                ],
                "total": result["hits"]["total"]["value"],
                "facets": result.get("aggregations", {}),
            }

        def _build_filters(self, filters: dict[str, Any]) -> list[dict[str, Any]]:
            clauses: list[dict[str, Any]] = []
            if category := filters.get("category"):
                clauses.append({"term": {"category.keyword": category}})
            if (price_min := filters.get("price_min")) is not None or (
                price_max := filters.get("price_max")
            ) is not None:
                rng: dict[str, Any] = {}
                if filters.get("price_min") is not None:
                    rng["gte"] = filters["price_min"]
                if filters.get("price_max") is not None:
                    rng["lte"] = filters["price_max"]
                clauses.append({"range": {"price": rng}})
            return clauses
    ```
  </Tab>
</Tabs>

***

## Interview Questions

<AccordionGroup>
  <Accordion title="Q1: Why database-per-service? What are the challenges?">
    **Answer:**

    **Benefits:**

    * Loose coupling between services
    * Independent scaling
    * Right database for each use case
    * Isolated failures
    * Independent deployments

    **Challenges:**

    * Data consistency (no ACID across services)
    * Query complexity (no JOINs across databases)
    * Duplicate data management
    * Operational overhead

    **Solutions:**

    * Saga pattern for transactions
    * Event sourcing for consistency
    * CQRS for complex queries
    * API composition for cross-service data
  </Accordion>

  <Accordion title="Q2: How do you handle transactions across services?">
    **Answer:**

    **Avoid 2PC** (two-phase commit) - doesn't scale, blocks resources.

    **Use Saga pattern:**

    1. **Choreography**: Services react to events, publish compensating events on failure
    2. **Orchestration**: Central coordinator manages the flow

    **Example (Order saga):**

    ```
    Create Order → Reserve Inventory → Charge Payment → Confirm
         ↓ failure       ↓ failure          ↓ failure
    Cancel Order ← Release Inventory ← Refund Payment
    ```

    **Key practices:**

    * Design idempotent operations
    * Store saga state for recovery
    * Implement timeouts
    * Log everything for debugging
  </Accordion>

  <Accordion title="Q3: What is the Outbox pattern?">
    **Answer:**

    The **Outbox pattern** ensures reliable event publishing by writing events to an outbox table in the same transaction as the business data.

    **Flow:**

    1. Begin transaction
    2. Update business data
    3. Insert event to outbox table
    4. Commit transaction
    5. Background worker publishes events from outbox
    6. Mark events as published

    **Benefits:**

    * Atomic: Data and event saved together
    * Reliable: Won't lose events if message broker is down
    * Idempotent: Can replay if needed

    **Alternative:** Change Data Capture (Debezium)
  </Accordion>

  <Accordion title="Q4: How do you perform zero-downtime database migrations?">
    **Answer:**

    **Expand-Contract Pattern:**

    1. **Expand**: Add new column (nullable)
    2. **Deploy**: Code writes to both old and new
    3. **Migrate**: Backfill existing data
    4. **Switch**: Code reads from new column
    5. **Contract**: Make non-nullable, remove old

    **Never do in one deploy:**

    * Rename columns
    * Change column types
    * Drop columns
    * Add NOT NULL without default

    **Always:**

    * Backward compatible migrations
    * Small batches for data migration
    * Test rollback procedures
  </Accordion>

  <Accordion title="Q5: When would you use CQRS?">
    **Answer:**

    **CQRS** = Command Query Responsibility Segregation

    Separate read (Query) and write (Command) models.

    **Use when:**

    * Read and write patterns differ significantly
    * Complex read queries (aggregations, joins)
    * Need to scale reads independently
    * Event sourcing is used

    **Benefits:**

    * Optimized read models (denormalized)
    * Better read performance
    * Simpler write logic

    **Avoid when:**

    * Simple CRUD operations
    * Strong consistency required
    * Small team (adds complexity)
  </Accordion>
</AccordionGroup>

***

## Chapter Summary

<Info>
  **Key Takeaways:**

  * Each service should own its database (no shared schemas)
  * Use Saga pattern for distributed transactions
  * Outbox pattern ensures reliable event publishing
  * CQRS separates read and write concerns
  * Zero-downtime migrations use expand-contract pattern
  * Choose the right database for each use case (polyglot persistence)
</Info>

**Next Chapter:** Caching Strategies - Distributed caching patterns for microservices.

***

## Interview Deep-Dive

<AccordionGroup>
  <Accordion title="'You need to perform a database migration that renames a column used by 3 other services through APIs. How do you execute this with zero downtime across all services?'">
    **Strong Answer:**

    This is the expand-and-contract pattern applied to a cross-service migration. It takes three phases spread across multiple deployments.

    Phase one (expand): add the new column to the database alongside the old column. Deploy a version of the owning service that writes to both columns and reads from the new column (falling back to old if new is null). The API response includes both the old field name and the new field name. No other service needs to change yet. Run a backfill script to populate the new column from existing old column values.

    Phase two (migrate consumers): update the 3 consuming services one at a time to read from the new API field name. Since both old and new fields are present in the API response, you can update consumers independently at their own pace. Each consumer team updates when they are ready, no coordinated deployment needed.

    Phase three (contract): once all consumers are confirmed to be using the new field name (verified through API access logs or feature flags), deploy a version of the owning service that stops populating the old field. After a soak period (one week), run a migration to drop the old column. Update the API to remove the deprecated field.

    The entire process takes 2-4 weeks but has zero downtime at any stage. Each phase is independently reversible. The temptation is to do it in one deployment with a coordinated release -- this is the fastest path to a production outage.

    **Follow-up: "What if the column rename also changes the data type -- for example, from a string status to an integer enum?"**

    The expand phase becomes more complex. I add the new integer column and deploy a service version that writes both representations (the old string and the new integer). The backfill script converts existing strings to integers. The API returns both formats during the migration window. Consumers migrate to the new format at their own pace. The key constraint: the conversion must be lossless. If any existing string value does not map cleanly to an integer enum value, you need a default mapping and a manual review process for ambiguous records before dropping the old column.
  </Accordion>

  <Accordion title="'Your Order Service needs data from the User Service to create an order. Should it call the User Service API at order creation time, or should it maintain a local copy of user data? What are the trade-offs?'">
    **Strong Answer:**

    Both approaches are valid, and the choice depends on the data freshness requirement and the call volume.

    Synchronous API call: the Order Service calls User Service's GET /users/:id during order creation. Simple, always fresh data, no data duplication. But it creates a runtime dependency -- if User Service is down, orders cannot be created. At 1,000 orders per minute, that is 1,000 additional calls to User Service, adding latency and load.

    Local data copy: the Order Service subscribes to UserUpdated events and maintains a read-only copy of relevant user fields (name, email, shipping address) in its own database. Order creation uses the local copy -- no external call needed. The trade-off: the local copy can be stale (if a user updates their email and immediately places an order, the order might have the old email). But for most fields, seconds of staleness is acceptable.

    My recommendation for order creation specifically: use the local copy for non-critical fields (name for display, email for notifications) and a synchronous call for critical fields (shipping address if it affects pricing, payment method tokens). This hybrid approach minimizes the runtime dependency while maintaining correctness where it matters.

    The implementation: Order Service consumes UserCreated, UserUpdated, and UserDeleted events from Kafka. It stores a slim projection: user\_id, name, email, default\_shipping\_address. At order creation, it reads from this local table. If the local copy is missing (new user, event not yet consumed), it falls back to an API call with a circuit breaker.

    **Follow-up: "How do you handle the case where the user's data in the local copy is outdated and causes a wrong shipping address on the order?"**

    At checkout time (not at add-to-cart time), I fetch the shipping address synchronously from the User Service. The local copy is used for the shopping experience (displaying the user's name, showing saved addresses), but the final order record is confirmed against the live data. This is the same pattern Amazon uses -- your cart shows "deliver to John's home address," but at the moment of order placement, the system verifies the address with the source of truth. If there is a mismatch, the user is prompted to confirm.
  </Accordion>

  <Accordion title="'How do you choose the right database for each microservice? Give me a concrete example where using the wrong database caused real problems.'">
    **Strong Answer:**

    The decision framework I use evaluates four dimensions: data model (relational, document, key-value, graph), query patterns (simple lookups, complex joins, full-text search, time series), consistency requirements (strong, eventual, causal), and operational characteristics (scaling model, backup/restore, managed options).

    For an e-commerce platform: User Service uses PostgreSQL (relational data, strong consistency for auth), Product Catalog uses MongoDB (flexible schema for varying product attributes, nested documents for specifications), Cart Service uses Redis (fast reads/writes, TTL for abandoned carts, data is ephemeral), Search Service uses Elasticsearch (full-text search, faceted filtering, relevance scoring), and Analytics uses ClickHouse or TimescaleDB (time-series queries, column-oriented for aggregations).

    A concrete example of the wrong choice: I worked on a system where the team used MongoDB for the Order Service because "we use MongoDB everywhere." Orders are inherently relational: an order has items, items reference products, orders reference customers, payments reference orders. The team ended up embedding everything in one massive document -- order, items, payment, shipping -- which worked for reads but was a nightmare for writes. Updating the payment status required reading the entire 50KB document, modifying one field, and writing it back. With concurrent updates (payment status change + shipping status change), they hit MongoDB's document-level locking and got write conflicts. After 6 months of fighting this, they migrated to PostgreSQL with proper normalized tables. The migration took 3 months and cost significant engineering time.

    The lesson: polyglot persistence is a feature of microservices, but it only works if each team makes an informed database choice based on their access patterns, not on familiarity or organizational defaults.

    **Follow-up: "How do you handle cross-service queries when each service has a different database? For example, 'show me all orders from users in New York.'"**

    You do not query across services. You build a dedicated query service with its own denormalized read model. The query service subscribes to events from both Order Service and User Service, maintains a materialized view that joins user location with order data, and serves the query from its own optimized store (Elasticsearch for complex filtering, PostgreSQL with appropriate indexes for simpler queries). This is CQRS applied at the system level. The alternative -- having the API gateway call User Service to get New York users, then call Order Service for each user's orders -- creates an N+1 query problem that scales terribly.
  </Accordion>
</AccordionGroup>

***

## Interview Questions with Structured Answers

<AccordionGroup>
  <Accordion title="Finance needs a monthly reconciliation report that joins data from five microservices' databases (Orders, Payments, Inventory, Users, Refunds). Each uses a different engine. Walk me through your options and their trade-offs.">
    **Strong Answer Framework**

    1. **Reject the obvious wrong answers first.** Explicitly state that you will not query the operational databases directly across service boundaries (creates coupling, OLTP load from OLAP queries) and you will not do runtime API composition for a report that joins millions of rows (N+1, fan-out timeouts, cost).
    2. **Identify the workload class.** Monthly reconciliation is OLAP: large joins, historical data, tolerance for minutes of staleness, high query complexity, low query frequency. This is the wrong workload for any of the five operational stores.
    3. **Propose a data warehouse approach with streaming ingestion.** Stream every domain event (OrderPlaced, PaymentCaptured, InventoryDeducted, UserCreated, RefundIssued) from each service into a warehouse (BigQuery, Snowflake, or Redshift) via Kafka plus a sink connector (Kafka Connect, Debezium for CDC where event streams are not yet available). The warehouse stores denormalized, queryable, historical data.
    4. **Address the consistency question.** Reconciliation tolerates eventual consistency on the order of minutes because the report runs monthly. Acknowledge that the warehouse will lag real-time by seconds to minutes, and design a "freeze boundary" (e.g., month-end plus 24 hours) after which data is considered final for the report.
    5. **Call out operational costs honestly.** A warehouse plus streaming pipeline is expensive. For a team under 20 engineers, a simpler approach is a nightly ETL job that reads from each service's read replica (never the primary) and populates a dedicated reporting PostgreSQL with denormalized tables. This is less real-time but dramatically cheaper to operate.
    6. **Explain the governance layer.** Whichever path you pick, the reports live in a place with its own access controls, audit logs, and PII classification. Finance queries the warehouse, not the operational services.

    **Real-World Example**

    Airbnb in around 2017-2019 built Minerva, a metrics platform sitting on top of a data warehouse (initially Hive, later Presto/Druid). Before Minerva, every team built their own reports by querying production replicas, and numbers never agreed across dashboards because everyone defined "booking" slightly differently. Minerva imposed a single semantic layer with versioned metric definitions on top of the warehouse, and it consumed events from every operational service. The pattern -- stream-to-warehouse plus semantic layer -- is now the default at most large tech companies. Shopify, Uber, and Netflix have all published similar architectures.

    **Senior Follow-up Questions**

    <Note>
      **Q: "What if Finance needs the data within 5 minutes of the event, not monthly?"**

      Stream processing changes the answer. Instead of batching into a warehouse, you run a stream processor (Flink, Kafka Streams, Materialize) that maintains a continuously updated materialized view joining the five event streams. The view lives in a serving store (PostgreSQL with Debezium, or directly a streaming database like Materialize or RisingWave) and answers queries within seconds. The trade-off: stream joins over event streams are significantly more complex than batch joins -- you have to reason about windowing, late-arriving events, and out-of-order events. For most reconciliation use cases, 5-minute freshness is overkill; hourly or daily batch is simpler and sufficient.
    </Note>

    <Note>
      **Q: "How do you handle a case where the Orders Service changes its schema in a way that breaks downstream reports?"**

      This is the classic "data contract" problem. The solution is to decouple the internal service schema from the published event schema. The service can refactor its internal tables freely, but the event it emits conforms to a versioned contract registered in a schema registry. Breaking changes require a new event version; old consumers keep working with the old version until they migrate. Companies like Convoy and GoCardless have written publicly about this pattern under the name "Data Contracts." The registry enforces compatibility at CI time so broken schemas never make it to production.
    </Note>

    <Note>
      **Q: "Your warehouse bill is 40k per month and the CFO is asking why. Where do you look first?"**

      Three places. First, partition and cluster keys: most warehouse cost comes from scanning too much data per query. An unpartitioned fact table scanned by every dashboard every hour is death. Partition by event date and cluster by high-cardinality filter columns. Second, materialized summary tables: daily or hourly rollups precomputed once, queried many times, cost 1 percent of the raw-data cost. Third, retention policy: most teams keep raw events forever by default. Age out raw events older than 90 days to cheaper storage (S3 Glacier, BigQuery long-term storage) while keeping aggregates hot. These three together routinely cut warehouse costs by 60-80 percent.
    </Note>

    **Common Wrong Answers**

    * **"I would just have each service expose a reporting API and the report service fans out to all five."** This fails because the report joins potentially millions of rows across five services. Fan-out APIs are for serving a user's view (tens of rows), not reconciliation. It also puts OLAP load on OLTP databases, risking production impact.
    * **"Give the reporting team direct read access to each service's database."** This recreates the shared-database anti-pattern. A harmless column rename in Orders now breaks Finance's report. Reporting gets coupled to internal schemas that were never meant to be public, and service teams lose the ability to evolve their storage without breaking downstream consumers.

    **Further Reading**

    * Data Mesh principles (Zhamak Dehghani, martinfowler.com) -- the foundational articulation of treating data as a product.
    * "Data Contracts" on Chad Sanderson's Substack -- practical patterns for making service-to-warehouse handoffs reliable.
    * Airbnb Minerva blog posts (medium.com/airbnb-engineering) -- a worked example of a semantic-layer-on-warehouse architecture at scale.
  </Accordion>

  <Accordion title="Your team is migrating from a shared database to database-per-service. Three services need to coordinate on a column rename that will touch all their schemas. How do you sequence this without a platform-wide freeze?">
    **Strong Answer Framework**

    1. **Clarify what "column rename" means across service boundaries.** If three services read this column from the same database today, they all have the rename in their query layer. Database-per-service migration means each service will own its copy going forward. The "rename" is actually two migrations: the shared-to-owned split, plus the rename itself.
    2. **Sequence the split before the rename.** Do one thing at a time. First, give each service its own copy of the data via CDC replication (Debezium) or an application-layer dual-write. Let services read from their own copy under a feature flag. Keep writes going to the shared database during this phase -- services own reads first, writes last.
    3. **Apply expand-contract independently per service.** Once each service owns its copy, a column rename is an expand-contract within each service, and the services no longer need to coordinate on timing. Service A can rename next week; Service B can rename next month. The dependency is broken.
    4. **Handle write ownership last.** Once reads are served from owned copies, pick one service as the write authority for the data, route all writes through its API (or events), and retire the shared write path. At this point the shared database is truly retired.
    5. **Instrument everything.** Use feature flags to control read paths, monitor both old and new paths in parallel, and have a verification job that checks consistency between the shared database and the owned copies. Only retire the shared path when the verification job has been clean for a week.
    6. **Write down the rollback path at each step.** If the new path fails verification, flip the flag back. If the flag flip does not recover within 5 minutes, disable the new path entirely and investigate offline. Never let a migration become a one-way door.

    **Strong Answer Framework: The Key Insight**

    The point of database-per-service is that no two services coordinate on a schema change again. If your migration plan requires a coordinated release of three services, you are skipping the "own your data" step and going straight to the rename -- which means you still have a shared database, just with more copies of it. Do the ownership split first; the rename becomes trivial.

    **Real-World Example**

    Shopify's extraction of the merchant-facing shop data from the monolith's shared MySQL (around 2019-2022) followed almost exactly this sequence. They did not rename columns across the monolith; they first built a shop-specific service, dual-wrote shop data to both the monolith and the new service, migrated reads one call site at a time behind feature flags, and only then started normalizing the new service's schema on its own. The public blog posts about this (shopify.engineering) describe how they used component-level shard isolation plus feature flags to make the migration resumable at any step.

    **Senior Follow-up Questions**

    <Note>
      **Q: "What do you do when verification reveals drift between the shared database and the owned copy?"**

      First, never silently correct the drift in production without understanding why. Drift means one of three things: the CDC pipeline is broken (missed events), there is a write path you did not know about (some cron job writing directly to the old database), or there is a bug in your dual-write logic. Log the drift with full row-level diff, pause the migration (keep flags reading from the shared database), investigate the root cause, and only resume after a fix is deployed and the pipeline has been clean for 24 hours. Silently auto-correcting drift is how you lose a week investigating phantom data integrity bugs later.
    </Note>

    <Note>
      **Q: "How long should the dual-write period last?"**

      Long enough that you trust the new path, typically 2-4 weeks. The key metric is not calendar time but verification coverage: have you observed the new path handling every failure mode you care about (deploy rollback, database failover, regional outage)? Ending dual-write too early means you lose your rollback path. Ending too late means you pay the cost of two writes forever. Most teams end up keeping dual-write for 6-8 weeks in practice because they find real bugs during weeks 3-4 that they want to observe recovering from.
    </Note>

    <Note>
      **Q: "Who owns the CDC pipeline operationally?"**

      This is the question that kills migrations. The pipeline is neither a service team concern (they built it to enable their migration) nor a pure platform concern (it encodes domain-specific schema knowledge). Best practice: the platform team owns the CDC infrastructure (Debezium, Kafka, serialization), and the service team owns the pipeline configuration (which tables, which transformations). Set this boundary explicitly before starting the migration, or both teams will assume the other is on-call when the pipeline breaks.
    </Note>

    **Common Wrong Answers**

    * **"We freeze all deploys on all three services for the migration weekend, run the rename script, redeploy everyone atomically."** This is the monolith playbook. It fails because "atomic" across three services does not exist -- one will have a CI failure, one will have a flaky test, the third will roll out first, and you end up with services reading from a schema the others have not migrated to. The freeze turns into a three-day outage.
    * **"We add the new column name as an alias in the shared database schema, and services update at their own pace."** Aliases (views, computed columns) can work but they defeat the database-per-service goal. You still have a shared database, just with more columns. You solve the immediate rename but not the underlying coupling. Future migrations will hit the same coordination problem.

    **Further Reading**

    * "Under Deconstruction: The State of Shopify's Monolith" (shopify.engineering) -- real-world description of component-ification and data extraction at scale.
    * "Online migrations at scale" (stripe.com/blog) -- Stripe's canonical four-step dual-write playbook.
    * "Evolutionary Database Design" (Pramod Sadalage and Martin Fowler, martinfowler.com) -- the foundational catalog of migration patterns.
  </Accordion>

  <Accordion title="You are designing the Orders Service. It needs current user info for every order. Do you call the User Service synchronously, maintain a local projection via events, or store a full copy via CDC? Defend your choice.">
    **Strong Answer Framework**

    1. **Clarify what "needs" means.** Identify the specific fields needed (name, email, shipping address) and the specific moments they are needed (cart, checkout, post-order email, dispute resolution). Different moments have different freshness requirements.
    2. **Reject the extremes.** Full synchronous coupling makes Orders depend on User availability and adds latency to every order. Full local copy via CDC replicates User's entire schema into Orders, recreating coupling at the database level. The answer is usually in the middle.
    3. **Propose a "slim projection" via events.** Orders subscribes to UserCreated, UserUpdated, UserDeleted events and maintains a small, well-defined projection: user\_id, display\_name, contact\_email, default\_shipping\_address. Projection is read-only, owned by Orders, and decoupled from User's internal schema.
    4. **Define freshness boundaries by field.** Display\_name and contact\_email from the projection are fine (seconds of staleness is invisible). Shipping address at checkout is verified synchronously against User because address correctness has financial and operational consequences. Payment method tokens are never copied; they are fetched synchronously at payment time.
    5. **Handle the bootstrap case.** What if Orders sees an order for a user it has never heard of (new user, event not yet consumed)? Fallback to a synchronous API call with a circuit breaker. This handles the race between user signup and first order gracefully.
    6. **Call out the governance implications.** The projection is now a derived dataset that Orders owns. If GDPR deletion hits User, Orders must handle it (UserDeleted event triggers projection cleanup). If the projection schema evolves, it evolves independently from User's internal schema.

    **Real-World Example**

    Amazon's order system uses exactly this pattern. The cart shows the user's saved shipping addresses from a local cache (fast, eventual consistency acceptable). At the moment of order placement, the address is re-verified against the authoritative User Service. Payment method tokens are never cached -- they are fetched fresh at authorization time. This hybrid approach is documented (indirectly) in Amazon's published architectural patterns and has been echoed by Werner Vogels in various re:Invent keynotes circa 2016-2019.

    **Senior Follow-up Questions**

    <Note>
      **Q: "What if the projection gets out of sync with User -- say, because a Kafka event was lost?"**

      Defense in depth. First, a periodic reconciliation job scans a random sample of the projection and cross-checks against User Service, flagging drift. Second, UserUpdated events include a monotonic version number; if Orders sees events out of order or with gaps, it requests a fresh snapshot of that user from User Service. Third, a "last updated" timestamp on every projection row that is older than, say, 30 days triggers a background refresh. No individual mechanism is perfect; the combination keeps drift bounded.
    </Note>

    <Note>
      **Q: "How do you handle PII in the projection for compliance?"**

      Treat the projection as subject to the same PII classification as User. If User classifies email as PII with encryption-at-rest and access logging, Orders' projection inherits those requirements. This is why you want the projection to be slim: the less PII you copy, the less compliance surface you own. For GDPR erasure, UserDeleted events are processed by every service that holds a projection, and each service is responsible for purging its copy. A company-wide erasure tracking system (often a dedicated service) monitors that every projection has confirmed deletion before the erasure request is closed.
    </Note>

    <Note>
      **Q: "At what scale does this break down?"**

      Two places. First, projection storage: a slim projection of 100 million users at maybe 500 bytes each is 50 GB per service that subscribes. With 20 services each maintaining a projection, that is 1 TB of duplicated user data across the fleet. That is fine. Full-fat replicas of every User field across 20 services are not fine. Second, event volume: if User receives a billion updates per day, every subscriber has to keep up with a billion events per day. At that scale you partition the event stream by user\_id, have consumers scale out horizontally, and probably rethink whether every service actually needs its own projection or whether a shared read-model service would serve them all.
    </Note>

    **Common Wrong Answers**

    * **"Call the User Service API synchronously for every order. It is simpler and always fresh."** This couples Orders availability to User availability. If User is down for 10 minutes, no orders can be placed, even though 90 percent of the order creation flow does not actually need user data. Synchronous coupling also adds tens of milliseconds of latency to every order, which at scale is a non-trivial throughput hit.
    * **"Use CDC to replicate the entire users table into Orders database."** This replicates User's internal schema into Orders, which means any User schema change breaks Orders. It also copies every field, including ones Orders does not need, violating least-privilege for PII. The right pattern is a domain event contract, not raw table replication.

    **Further Reading**

    * "Designing Data-Intensive Applications" by Martin Kleppmann, Chapter 11 (Stream Processing) -- the canonical treatment of event-driven projections and their trade-offs.
    * "Event-driven architecture" patterns on microservices.io by Chris Richardson -- pragmatic coverage of CQRS and read-model patterns.
    * "The Outbox Pattern and CDC for Reliable Event Propagation" (debezium.io blog) -- concrete implementation guidance for the event pipeline that feeds projections.
  </Accordion>
</AccordionGroup>
