Skip to main content

Data Management Patterns

Managing data across microservices is one of the hardest challenges. This chapter covers patterns for data ownership, distributed transactions, and consistency.
Learning Objectives:
  • Implement database per service pattern
  • Handle distributed transactions with Saga pattern
  • Understand event sourcing and its trade-offs
  • Implement CQRS for complex queries
  • Design for eventual consistency

Database Per Service

Each microservice owns its data and exposes it only through APIs.
┌─────────────────────────────────────────────────────────────────────────────┐
│                      DATABASE PER SERVICE                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ❌ ANTI-PATTERN: Shared Database                                           │
│  ───────────────────────────────────────                                    │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐                               │
│  │  Service  │  │  Service  │  │  Service  │                               │
│  │     A     │  │     B     │  │     C     │                               │
│  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘                               │
│        │              │              │                                       │
│        └──────────────┼──────────────┘                                      │
│                       ▼                                                      │
│              ┌─────────────────┐                                            │
│              │ SHARED DATABASE │  ← Coupling, schema conflicts              │
│              └─────────────────┘                                            │
│                                                                              │
│  ✅ CORRECT: Database Per Service                                           │
│  ───────────────────────────────────                                        │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐                               │
│  │  Service  │  │  Service  │  │  Service  │                               │
│  │     A     │  │     B     │  │     C     │                               │
│  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘                               │
│        │              │              │                                       │
│        ▼              ▼              ▼                                       │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐                               │
│  │ MongoDB   │  │ PostgreSQL│  │   Redis   │  ← Best fit for each         │
│  └───────────┘  └───────────┘  └───────────┘                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation Example

// user-service/src/database.js
const mongoose = require('mongoose');

// User service uses MongoDB
const connectUserDB = async () => {
  await mongoose.connect(process.env.USER_DB_URI, {
    useNewUrlParser: true,
    useUnifiedTopology: true
  });
  console.log('User DB connected');
};

const UserSchema = new mongoose.Schema({
  email: { type: String, unique: true, required: true },
  name: { type: String, required: true },
  passwordHash: String,
  status: { type: String, enum: ['active', 'inactive', 'suspended'] },
  preferences: {
    language: String,
    timezone: String,
    notifications: Boolean
  },
  createdAt: { type: Date, default: Date.now }
});

const User = mongoose.model('User', UserSchema);

// order-service/src/database.js
const { Pool } = require('pg');
const { PrismaClient } = require('@prisma/client');

// Order service uses PostgreSQL with Prisma
const prisma = new PrismaClient();

// schema.prisma
/*
model Order {
  id          String      @id @default(uuid())
  customerId  String      // Reference to user service
  status      OrderStatus
  items       OrderItem[]
  totalAmount Decimal
  currency    String      @default("USD")
  createdAt   DateTime    @default(now())
  updatedAt   DateTime    @updatedAt
}

model OrderItem {
  id        String  @id @default(uuid())
  orderId   String
  order     Order   @relation(fields: [orderId], references: [id])
  productId String  // Reference to product service
  quantity  Int
  unitPrice Decimal
}

enum OrderStatus {
  PENDING
  CONFIRMED
  PAID
  SHIPPED
  DELIVERED
  CANCELLED
}
*/

// inventory-service/src/database.js
const Redis = require('ioredis');
const { Pool } = require('pg');

// Inventory uses Redis for fast reads, PostgreSQL for persistence
class InventoryDatabase {
  constructor() {
    this.redis = new Redis(process.env.REDIS_URL);
    this.pg = new Pool({ connectionString: process.env.INVENTORY_DB_URL });
  }

  async getStock(productId) {
    // Try cache first
    let stock = await this.redis.get(`stock:${productId}`);
    
    if (stock === null) {
      // Fallback to database
      const result = await this.pg.query(
        'SELECT quantity FROM inventory WHERE product_id = $1',
        [productId]
      );
      stock = result.rows[0]?.quantity || 0;
      
      // Cache for 1 minute
      await this.redis.setex(`stock:${productId}`, 60, stock);
    }
    
    return parseInt(stock);
  }

  async updateStock(productId, delta) {
    const client = await this.pg.connect();
    
    try {
      await client.query('BEGIN');
      
      await client.query(
        `UPDATE inventory 
         SET quantity = quantity + $1, updated_at = NOW()
         WHERE product_id = $2`,
        [delta, productId]
      );
      
      await client.query('COMMIT');
      
      // Invalidate cache
      await this.redis.del(`stock:${productId}`);
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }
}

Cross-Service Data Access

// ❌ WRONG: Direct database access
class OrderService {
  async createOrder(orderData) {
    // DON'T DO THIS - accessing user database directly
    const user = await userDatabase.users.findById(orderData.userId);
    // ...
  }
}

// ✅ CORRECT: API-based access
class OrderService {
  constructor(userServiceClient, inventoryServiceClient) {
    this.userClient = userServiceClient;
    this.inventoryClient = inventoryServiceClient;
  }

  async createOrder(orderData) {
    // Validate user via API
    const user = await this.userClient.getUser(orderData.userId);
    if (!user) {
      throw new Error('User not found');
    }

    // Check inventory via API
    for (const item of orderData.items) {
      const available = await this.inventoryClient.checkStock(
        item.productId,
        item.quantity
      );
      if (!available) {
        throw new Error(`Insufficient stock for ${item.productId}`);
      }
    }

    // Create order in our database
    const order = await this.orderRepository.create({
      customerId: user.id,
      items: orderData.items,
      totalAmount: this.calculateTotal(orderData.items),
      status: 'PENDING'
    });

    return order;
  }
}

Saga Pattern

Manage distributed transactions across multiple services.

Choreography-Based Saga

Services communicate through events without a central coordinator.
┌─────────────────────────────────────────────────────────────────────────────┐
│                     CHOREOGRAPHY SAGA: ORDER FLOW                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  HAPPY PATH:                                                                 │
│  ───────────                                                                 │
│                                                                              │
│  Order       ──(1) OrderCreated──▶  Inventory   ──(2) StockReserved──▶      │
│  Service                            Service                                  │
│                                                                              │
│  Payment     ◀──(3) StockReserved──  Inventory  ──(4) PaymentSuccess──▶     │
│  Service                             Service                                 │
│                                                                              │
│  Order       ◀──(5) PaymentSuccess── Payment    ──(6) OrderConfirmed──▶     │
│  Service                             Service                                 │
│                                                                              │
│                                                                              │
│  COMPENSATION (Payment Failed):                                              │
│  ──────────────────────────────                                             │
│                                                                              │
│  Payment     ──(1) PaymentFailed──▶  Inventory  ──(2) StockReleased──▶      │
│  Service                             Service                                 │
│                                                                              │
│  Order       ◀──(3) StockReleased──  Inventory  ──(4) OrderCancelled──▶     │
│  Service                             Service                                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
// order-service/src/sagas/orderSaga.js
class OrderChoreographySaga {
  constructor(eventBus, orderRepository) {
    this.eventBus = eventBus;
    this.orderRepository = orderRepository;
    this.setupHandlers();
  }

  setupHandlers() {
    // Listen for saga-related events
    this.eventBus.subscribe('inventory.stock_reserved', this.onStockReserved.bind(this));
    this.eventBus.subscribe('inventory.stock_reservation_failed', this.onStockReservationFailed.bind(this));
    this.eventBus.subscribe('payment.success', this.onPaymentSuccess.bind(this));
    this.eventBus.subscribe('payment.failed', this.onPaymentFailed.bind(this));
    this.eventBus.subscribe('inventory.stock_released', this.onStockReleased.bind(this));
  }

  // Step 1: Create order and publish event
  async createOrder(orderData) {
    const order = await this.orderRepository.create({
      ...orderData,
      status: 'PENDING',
      sagaState: 'STARTED'
    });

    await this.eventBus.publish('order.created', {
      orderId: order.id,
      customerId: order.customerId,
      items: order.items,
      totalAmount: order.totalAmount
    });

    return order;
  }

  // Step 2: Stock reserved → Proceed to payment
  async onStockReserved(event) {
    const { orderId, reservationId } = event.data;
    
    await this.orderRepository.update(orderId, {
      sagaState: 'STOCK_RESERVED',
      inventoryReservationId: reservationId
    });

    // Trigger payment
    const order = await this.orderRepository.findById(orderId);
    await this.eventBus.publish('order.ready_for_payment', {
      orderId,
      customerId: order.customerId,
      amount: order.totalAmount
    });
  }

  // Stock reservation failed → Cancel order
  async onStockReservationFailed(event) {
    const { orderId, reason } = event.data;
    
    await this.orderRepository.update(orderId, {
      status: 'CANCELLED',
      sagaState: 'FAILED',
      failureReason: reason
    });

    await this.eventBus.publish('order.cancelled', {
      orderId,
      reason: 'Insufficient stock'
    });
  }

  // Step 3: Payment success → Confirm order
  async onPaymentSuccess(event) {
    const { orderId, paymentId, amount } = event.data;
    
    await this.orderRepository.update(orderId, {
      status: 'CONFIRMED',
      sagaState: 'COMPLETED',
      paymentId
    });

    await this.eventBus.publish('order.confirmed', {
      orderId,
      paymentId
    });
  }

  // Payment failed → Trigger compensation
  async onPaymentFailed(event) {
    const { orderId, reason } = event.data;
    
    await this.orderRepository.update(orderId, {
      sagaState: 'COMPENSATING',
      paymentFailureReason: reason
    });

    // Trigger inventory release (compensation)
    const order = await this.orderRepository.findById(orderId);
    await this.eventBus.publish('order.payment_failed', {
      orderId,
      reservationId: order.inventoryReservationId
    });
  }

  // Compensation complete → Mark order as cancelled
  async onStockReleased(event) {
    const { orderId } = event.data;
    
    const order = await this.orderRepository.findById(orderId);
    if (order.sagaState === 'COMPENSATING') {
      await this.orderRepository.update(orderId, {
        status: 'CANCELLED',
        sagaState: 'COMPENSATED'
      });

      await this.eventBus.publish('order.cancelled', {
        orderId,
        reason: 'Payment failed'
      });
    }
  }
}

// inventory-service/src/handlers/sagaHandler.js
class InventorySagaHandler {
  constructor(eventBus, inventoryService) {
    this.eventBus = eventBus;
    this.inventoryService = inventoryService;
    this.setupHandlers();
  }

  setupHandlers() {
    this.eventBus.subscribe('order.created', this.onOrderCreated.bind(this));
    this.eventBus.subscribe('order.payment_failed', this.onPaymentFailed.bind(this));
  }

  async onOrderCreated(event) {
    const { orderId, items } = event.data;

    try {
      const reservationId = await this.inventoryService.reserveStock(orderId, items);
      
      await this.eventBus.publish('inventory.stock_reserved', {
        orderId,
        reservationId
      });
    } catch (error) {
      await this.eventBus.publish('inventory.stock_reservation_failed', {
        orderId,
        reason: error.message
      });
    }
  }

  async onPaymentFailed(event) {
    const { orderId, reservationId } = event.data;

    await this.inventoryService.releaseReservation(reservationId);
    
    await this.eventBus.publish('inventory.stock_released', {
      orderId,
      reservationId
    });
  }
}

Orchestration-Based Saga

A central orchestrator controls the saga flow.
// saga-orchestrator/src/sagas/orderSaga.js
class OrderSagaOrchestrator {
  constructor(eventBus, stateStore, serviceClients) {
    this.eventBus = eventBus;
    this.stateStore = stateStore;
    this.services = serviceClients;
  }

  async startOrderSaga(orderData) {
    const sagaId = generateId();
    
    // Initialize saga state
    await this.stateStore.create(sagaId, {
      type: 'ORDER_SAGA',
      status: 'STARTED',
      step: 'CREATE_ORDER',
      data: orderData,
      compensations: []
    });

    try {
      // Step 1: Create Order
      const order = await this.executeStep(sagaId, 'CREATE_ORDER', async () => {
        return this.services.order.createOrder(orderData);
      }, async (order) => {
        await this.services.order.cancelOrder(order.id);
      });

      // Step 2: Reserve Inventory
      await this.executeStep(sagaId, 'RESERVE_INVENTORY', async () => {
        return this.services.inventory.reserve(order.id, order.items);
      }, async (reservationId) => {
        await this.services.inventory.release(reservationId);
      });

      // Step 3: Process Payment
      await this.executeStep(sagaId, 'PROCESS_PAYMENT', async () => {
        return this.services.payment.charge(order.id, order.totalAmount);
      }, async (paymentId) => {
        await this.services.payment.refund(paymentId);
      });

      // Step 4: Confirm Order
      await this.executeStep(sagaId, 'CONFIRM_ORDER', async () => {
        return this.services.order.confirm(order.id);
      }, null); // No compensation needed

      await this.stateStore.update(sagaId, {
        status: 'COMPLETED',
        completedAt: new Date()
      });

      return order;
    } catch (error) {
      await this.compensate(sagaId, error);
      throw error;
    }
  }

  async executeStep(sagaId, stepName, action, compensation) {
    await this.stateStore.update(sagaId, { step: stepName });

    const result = await action();

    if (compensation) {
      // Store compensation for potential rollback
      const saga = await this.stateStore.get(sagaId);
      saga.compensations.unshift({ step: stepName, compensation, result });
      await this.stateStore.update(sagaId, { compensations: saga.compensations });
    }

    return result;
  }

  async compensate(sagaId, error) {
    await this.stateStore.update(sagaId, {
      status: 'COMPENSATING',
      error: error.message
    });

    const saga = await this.stateStore.get(sagaId);

    for (const { step, compensation, result } of saga.compensations) {
      try {
        console.log(`Compensating step: ${step}`);
        await compensation(result);
      } catch (compError) {
        console.error(`Compensation failed for ${step}:`, compError);
        // Store for manual intervention
        await this.stateStore.update(sagaId, {
          status: 'COMPENSATION_FAILED',
          failedCompensation: step
        });
        return;
      }
    }

    await this.stateStore.update(sagaId, {
      status: 'COMPENSATED',
      compensatedAt: new Date()
    });
  }
}

Event Sourcing

Store state as a sequence of events instead of current state.
┌─────────────────────────────────────────────────────────────────────────────┐
│                          EVENT SOURCING                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  TRADITIONAL APPROACH:                                                       │
│  ─────────────────────                                                       │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │  Order Table                                                         │    │
│  │  id: 123, status: SHIPPED, total: $99.99, shipped_at: 2024-01-15    │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│  ▲ We only see current state, history is lost                               │
│                                                                              │
│  EVENT SOURCING APPROACH:                                                    │
│  ────────────────────────                                                    │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │  Event Store for Order 123                                           │    │
│  │  ────────────────────────────────────────────────────────────────   │    │
│  │  [1] OrderCreated { items: [...], total: $99.99 }      2024-01-10   │    │
│  │  [2] PaymentReceived { paymentId: "pay_abc" }          2024-01-10   │    │
│  │  [3] OrderConfirmed { }                                 2024-01-10   │    │
│  │  [4] ItemsShipped { trackingNo: "1Z999" }              2024-01-15   │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│  ▲ Full history, can rebuild any past state                                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation

// event-store/src/eventStore.js
class EventStore {
  constructor(database) {
    this.db = database;
  }

  async appendEvents(aggregateId, events, expectedVersion) {
    const client = await this.db.connect();
    
    try {
      await client.query('BEGIN');
      
      // Optimistic concurrency check
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
        [aggregateId]
      );
      
      const currentVersion = result.rows[0]?.current_version || 0;
      
      if (expectedVersion !== currentVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current is ${currentVersion}`
        );
      }
      
      // Append events
      let version = currentVersion;
      for (const event of events) {
        version++;
        await client.query(
          `INSERT INTO events (aggregate_id, aggregate_type, event_type, event_data, version, timestamp)
           VALUES ($1, $2, $3, $4, $5, $6)`,
          [
            aggregateId,
            event.aggregateType,
            event.eventType,
            JSON.stringify(event.data),
            version,
            new Date()
          ]
        );
      }
      
      await client.query('COMMIT');
      return version;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId, fromVersion = 0) {
    const result = await this.db.query(
      `SELECT * FROM events 
       WHERE aggregate_id = $1 AND version > $2 
       ORDER BY version`,
      [aggregateId, fromVersion]
    );
    
    return result.rows.map(row => ({
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      eventType: row.event_type,
      data: row.event_data,
      version: row.version,
      timestamp: row.timestamp
    }));
  }

  async getEventsByType(eventType, fromTimestamp, limit = 1000) {
    const result = await this.db.query(
      `SELECT * FROM events 
       WHERE event_type = $1 AND timestamp > $2 
       ORDER BY timestamp LIMIT $3`,
      [eventType, fromTimestamp, limit]
    );
    return result.rows;
  }
}

// order-service/src/aggregates/Order.js
class Order {
  constructor() {
    this.id = null;
    this.customerId = null;
    this.items = [];
    this.status = null;
    this.totalAmount = 0;
    this.version = 0;
    this.pendingEvents = [];
  }

  // Factory method to create new order
  static create(orderId, customerId, items) {
    const order = new Order();
    order.apply(new OrderCreatedEvent(orderId, customerId, items));
    return order;
  }

  // Reconstitute from events
  static fromEvents(events) {
    const order = new Order();
    for (const event of events) {
      order.applyEvent(event);
      order.version = event.version;
    }
    return order;
  }

  // Business methods that generate events
  addItem(productId, quantity, price) {
    if (this.status !== 'DRAFT') {
      throw new Error('Cannot add items to non-draft order');
    }
    this.apply(new ItemAddedEvent(this.id, productId, quantity, price));
  }

  submit() {
    if (this.items.length === 0) {
      throw new Error('Cannot submit empty order');
    }
    this.apply(new OrderSubmittedEvent(this.id));
  }

  confirmPayment(paymentId) {
    if (this.status !== 'PENDING_PAYMENT') {
      throw new Error('Order not pending payment');
    }
    this.apply(new PaymentConfirmedEvent(this.id, paymentId));
  }

  ship(trackingNumber) {
    if (this.status !== 'CONFIRMED') {
      throw new Error('Order not confirmed');
    }
    this.apply(new OrderShippedEvent(this.id, trackingNumber));
  }

  // Apply new event (adds to pending)
  apply(event) {
    this.applyEvent(event);
    this.pendingEvents.push(event);
  }

  // Apply event to state
  applyEvent(event) {
    switch (event.eventType) {
      case 'OrderCreated':
        this.id = event.data.orderId;
        this.customerId = event.data.customerId;
        this.items = event.data.items;
        this.totalAmount = this.calculateTotal();
        this.status = 'DRAFT';
        break;
        
      case 'ItemAdded':
        this.items.push({
          productId: event.data.productId,
          quantity: event.data.quantity,
          price: event.data.price
        });
        this.totalAmount = this.calculateTotal();
        break;
        
      case 'OrderSubmitted':
        this.status = 'PENDING_PAYMENT';
        break;
        
      case 'PaymentConfirmed':
        this.status = 'CONFIRMED';
        this.paymentId = event.data.paymentId;
        break;
        
      case 'OrderShipped':
        this.status = 'SHIPPED';
        this.trackingNumber = event.data.trackingNumber;
        break;
    }
  }

  calculateTotal() {
    return this.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
  }

  getPendingEvents() {
    return this.pendingEvents;
  }

  clearPendingEvents() {
    this.pendingEvents = [];
  }
}

// order-service/src/repositories/OrderRepository.js
class OrderRepository {
  constructor(eventStore) {
    this.eventStore = eventStore;
  }

  async getById(orderId) {
    const events = await this.eventStore.getEvents(orderId);
    
    if (events.length === 0) {
      return null;
    }
    
    return Order.fromEvents(events);
  }

  async save(order) {
    const pendingEvents = order.getPendingEvents();
    
    if (pendingEvents.length === 0) {
      return;
    }
    
    const newVersion = await this.eventStore.appendEvents(
      order.id,
      pendingEvents,
      order.version
    );
    
    order.version = newVersion;
    order.clearPendingEvents();
    
    // Publish events for other services
    for (const event of pendingEvents) {
      await this.eventBus.publish(event.eventType, event);
    }
  }
}

Snapshotting for Performance

// Snapshot to avoid replaying all events
class SnapshotStore {
  constructor(database) {
    this.db = database;
    this.snapshotFrequency = 100; // Snapshot every 100 events
  }

  async saveSnapshot(aggregateId, snapshot, version) {
    await this.db.query(
      `INSERT INTO snapshots (aggregate_id, snapshot_data, version, created_at)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (aggregate_id) DO UPDATE SET snapshot_data = $2, version = $3`,
      [aggregateId, JSON.stringify(snapshot), version, new Date()]
    );
  }

  async getSnapshot(aggregateId) {
    const result = await this.db.query(
      'SELECT * FROM snapshots WHERE aggregate_id = $1',
      [aggregateId]
    );
    return result.rows[0];
  }
}

class OrderRepositoryWithSnapshots {
  async getById(orderId) {
    // Try to load from snapshot
    const snapshot = await this.snapshotStore.getSnapshot(orderId);
    
    let order;
    let fromVersion = 0;
    
    if (snapshot) {
      order = Order.fromSnapshot(snapshot.snapshot_data);
      order.version = snapshot.version;
      fromVersion = snapshot.version;
    } else {
      order = new Order();
    }
    
    // Load events since snapshot
    const events = await this.eventStore.getEvents(orderId, fromVersion);
    
    for (const event of events) {
      order.applyEvent(event);
      order.version = event.version;
    }
    
    return order;
  }

  async save(order) {
    await super.save(order);
    
    // Create snapshot if needed
    if (order.version % this.snapshotStore.snapshotFrequency === 0) {
      await this.snapshotStore.saveSnapshot(
        order.id,
        order.toSnapshot(),
        order.version
      );
    }
  }
}

CQRS Pattern

Command Query Responsibility Segregation - separate read and write models.
┌─────────────────────────────────────────────────────────────────────────────┐
│                             CQRS PATTERN                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                            ┌─────────────┐                                  │
│                            │    CLIENT   │                                  │
│                            └──────┬──────┘                                  │
│                                   │                                          │
│                    ┌──────────────┴──────────────┐                          │
│                    │                             │                          │
│            Commands (Write)               Queries (Read)                    │
│                    │                             │                          │
│                    ▼                             ▼                          │
│            ┌─────────────┐               ┌─────────────┐                    │
│            │   COMMAND   │               │    QUERY    │                    │
│            │   HANDLER   │               │   HANDLER   │                    │
│            └──────┬──────┘               └──────┬──────┘                    │
│                   │                             │                           │
│                   ▼                             ▼                           │
│            ┌─────────────┐               ┌─────────────┐                    │
│            │    WRITE    │──── Events ──▶│    READ     │                    │
│            │    MODEL    │               │    MODEL    │                    │
│            │ (Normalized)│               │(Denormalized)│                   │
│            └─────────────┘               └─────────────┘                    │
│                   │                             │                           │
│                   ▼                             ▼                           │
│            ┌─────────────┐               ┌─────────────┐                    │
│            │ PostgreSQL  │               │ Elasticsearch│                   │
│            │  (Source)   │               │   (Cache)    │                   │
│            └─────────────┘               └─────────────┘                    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation

// Write side - Commands
class OrderCommandHandler {
  constructor(orderRepository, eventBus) {
    this.orderRepository = orderRepository;
    this.eventBus = eventBus;
  }

  async handleCreateOrder(command) {
    const order = Order.create(
      command.orderId,
      command.customerId,
      command.items
    );
    
    await this.orderRepository.save(order);
    
    // Events are published to update read models
    return order.id;
  }

  async handleSubmitOrder(command) {
    const order = await this.orderRepository.getById(command.orderId);
    
    if (!order) {
      throw new Error('Order not found');
    }
    
    order.submit();
    await this.orderRepository.save(order);
  }
}

// Read side - Query models
class OrderReadModel {
  constructor(elasticsearch) {
    this.es = elasticsearch;
  }

  // Update handlers - called when events occur
  async onOrderCreated(event) {
    await this.es.index({
      index: 'orders',
      id: event.data.orderId,
      body: {
        orderId: event.data.orderId,
        customerId: event.data.customerId,
        customerName: await this.getCustomerName(event.data.customerId),
        items: event.data.items,
        itemCount: event.data.items.length,
        totalAmount: this.calculateTotal(event.data.items),
        status: 'DRAFT',
        createdAt: event.timestamp,
        updatedAt: event.timestamp
      }
    });
  }

  async onOrderSubmitted(event) {
    await this.es.update({
      index: 'orders',
      id: event.data.orderId,
      body: {
        doc: {
          status: 'PENDING_PAYMENT',
          submittedAt: event.timestamp,
          updatedAt: event.timestamp
        }
      }
    });
  }

  async onOrderShipped(event) {
    await this.es.update({
      index: 'orders',
      id: event.data.orderId,
      body: {
        doc: {
          status: 'SHIPPED',
          trackingNumber: event.data.trackingNumber,
          shippedAt: event.timestamp,
          updatedAt: event.timestamp
        }
      }
    });
  }
}

// Query handlers
class OrderQueryHandler {
  constructor(elasticsearch) {
    this.es = elasticsearch;
  }

  async getOrderById(orderId) {
    const result = await this.es.get({
      index: 'orders',
      id: orderId
    });
    return result._source;
  }

  async searchOrders(criteria) {
    const query = {
      bool: {
        must: []
      }
    };

    if (criteria.customerId) {
      query.bool.must.push({ term: { customerId: criteria.customerId } });
    }
    
    if (criteria.status) {
      query.bool.must.push({ term: { status: criteria.status } });
    }
    
    if (criteria.dateRange) {
      query.bool.must.push({
        range: {
          createdAt: {
            gte: criteria.dateRange.from,
            lte: criteria.dateRange.to
          }
        }
      });
    }

    const result = await this.es.search({
      index: 'orders',
      body: {
        query,
        sort: [{ createdAt: 'desc' }],
        from: criteria.offset || 0,
        size: criteria.limit || 20
      }
    });

    return {
      orders: result.hits.hits.map(h => h._source),
      total: result.hits.total.value
    };
  }

  async getOrderStatistics(customerId) {
    const result = await this.es.search({
      index: 'orders',
      body: {
        query: {
          term: { customerId }
        },
        aggs: {
          total_spent: { sum: { field: 'totalAmount' } },
          order_count: { value_count: { field: 'orderId' } },
          avg_order_value: { avg: { field: 'totalAmount' } },
          by_status: {
            terms: { field: 'status' }
          }
        },
        size: 0
      }
    });

    return {
      totalSpent: result.aggregations.total_spent.value,
      orderCount: result.aggregations.order_count.value,
      avgOrderValue: result.aggregations.avg_order_value.value,
      byStatus: result.aggregations.by_status.buckets
    };
  }
}

Interview Questions

Answer:Options:
  1. Saga Pattern: Series of local transactions with compensating actions
  2. Two-Phase Commit (2PC): Coordinator-based, but slow and blocking
  3. Outbox Pattern: Store events in same transaction as data changes
Saga is preferred because:
  • Non-blocking
  • Each service has autonomy
  • Scales better
  • Handles failures gracefully
Trade-off: Eventual consistency, need to handle compensation
Answer:Use when:
  • Need full audit trail
  • Business requires “what happened” history
  • Complex domain with many state transitions
  • Need to rebuild state at any point in time
  • Event-driven architecture already in place
Avoid when:
  • Simple CRUD operations
  • Low complexity domain
  • Team unfamiliar with pattern
  • Query performance is critical (use with CQRS)
Challenges: Event versioning, storage growth, eventual consistency
Answer:Benefits:
  • Independent Scaling: Scale reads (often 90%+) separately from writes
  • Optimized Models: Read model denormalized for query performance
  • Different Databases: Use best database for each (PostgreSQL for writes, Elasticsearch for reads)
  • Caching: Read model can be cached aggressively
Trade-offs:
  • Eventual consistency between read/write
  • More complex architecture
  • Need to handle stale reads
Answer:Problem: Publishing events after DB commit can fail, causing inconsistency.Solution:
  1. In same transaction: save data AND write event to outbox table
  2. Separate process reads outbox and publishes to message broker
  3. Mark event as published after successful publish
BEGIN TRANSACTION;
  INSERT INTO orders (...);
  INSERT INTO outbox (event_type, payload) VALUES (...);
COMMIT;
Benefits: Guaranteed at-least-once delivery, atomic with business logic

Summary

Key Takeaways

  • Each service owns its data
  • Use Saga for distributed transactions
  • Event Sourcing for audit/history needs
  • CQRS separates read/write concerns
  • Accept eventual consistency

Next Steps

In the next chapter, we’ll cover Resilience Patterns - circuit breakers, retries, and bulkheads.