Data Management Patterns
Managing data across microservices is one of the hardest challenges. This chapter covers patterns for data ownership, distributed transactions, and consistency.Learning Objectives:
- Implement database per service pattern
- Handle distributed transactions with Saga pattern
- Understand event sourcing and its trade-offs
- Implement CQRS for complex queries
- Design for eventual consistency
Database Per Service
Each microservice owns its data and exposes it only through APIs.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATABASE PER SERVICE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ❌ ANTI-PATTERN: Shared Database │
│ ─────────────────────────────────────── │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Service │ │ Service │ │ Service │ │
│ │ A │ │ B │ │ C │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ SHARED DATABASE │ ← Coupling, schema conflicts │
│ └─────────────────┘ │
│ │
│ ✅ CORRECT: Database Per Service │
│ ─────────────────────────────────── │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Service │ │ Service │ │ Service │ │
│ │ A │ │ B │ │ C │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ MongoDB │ │ PostgreSQL│ │ Redis │ ← Best fit for each │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Implementation Example
Copy
// user-service/src/database.js
const mongoose = require('mongoose');
// User service uses MongoDB
const connectUserDB = async () => {
await mongoose.connect(process.env.USER_DB_URI, {
useNewUrlParser: true,
useUnifiedTopology: true
});
console.log('User DB connected');
};
const UserSchema = new mongoose.Schema({
email: { type: String, unique: true, required: true },
name: { type: String, required: true },
passwordHash: String,
status: { type: String, enum: ['active', 'inactive', 'suspended'] },
preferences: {
language: String,
timezone: String,
notifications: Boolean
},
createdAt: { type: Date, default: Date.now }
});
const User = mongoose.model('User', UserSchema);
// order-service/src/database.js
const { Pool } = require('pg');
const { PrismaClient } = require('@prisma/client');
// Order service uses PostgreSQL with Prisma
const prisma = new PrismaClient();
// schema.prisma
/*
model Order {
id String @id @default(uuid())
customerId String // Reference to user service
status OrderStatus
items OrderItem[]
totalAmount Decimal
currency String @default("USD")
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
model OrderItem {
id String @id @default(uuid())
orderId String
order Order @relation(fields: [orderId], references: [id])
productId String // Reference to product service
quantity Int
unitPrice Decimal
}
enum OrderStatus {
PENDING
CONFIRMED
PAID
SHIPPED
DELIVERED
CANCELLED
}
*/
// inventory-service/src/database.js
const Redis = require('ioredis');
const { Pool } = require('pg');
// Inventory uses Redis for fast reads, PostgreSQL for persistence
class InventoryDatabase {
constructor() {
this.redis = new Redis(process.env.REDIS_URL);
this.pg = new Pool({ connectionString: process.env.INVENTORY_DB_URL });
}
async getStock(productId) {
// Try cache first
let stock = await this.redis.get(`stock:${productId}`);
if (stock === null) {
// Fallback to database
const result = await this.pg.query(
'SELECT quantity FROM inventory WHERE product_id = $1',
[productId]
);
stock = result.rows[0]?.quantity || 0;
// Cache for 1 minute
await this.redis.setex(`stock:${productId}`, 60, stock);
}
return parseInt(stock);
}
async updateStock(productId, delta) {
const client = await this.pg.connect();
try {
await client.query('BEGIN');
await client.query(
`UPDATE inventory
SET quantity = quantity + $1, updated_at = NOW()
WHERE product_id = $2`,
[delta, productId]
);
await client.query('COMMIT');
// Invalidate cache
await this.redis.del(`stock:${productId}`);
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
Cross-Service Data Access
Copy
// ❌ WRONG: Direct database access
class OrderService {
async createOrder(orderData) {
// DON'T DO THIS - accessing user database directly
const user = await userDatabase.users.findById(orderData.userId);
// ...
}
}
// ✅ CORRECT: API-based access
class OrderService {
constructor(userServiceClient, inventoryServiceClient) {
this.userClient = userServiceClient;
this.inventoryClient = inventoryServiceClient;
}
async createOrder(orderData) {
// Validate user via API
const user = await this.userClient.getUser(orderData.userId);
if (!user) {
throw new Error('User not found');
}
// Check inventory via API
for (const item of orderData.items) {
const available = await this.inventoryClient.checkStock(
item.productId,
item.quantity
);
if (!available) {
throw new Error(`Insufficient stock for ${item.productId}`);
}
}
// Create order in our database
const order = await this.orderRepository.create({
customerId: user.id,
items: orderData.items,
totalAmount: this.calculateTotal(orderData.items),
status: 'PENDING'
});
return order;
}
}
Saga Pattern
Manage distributed transactions across multiple services.Choreography-Based Saga
Services communicate through events without a central coordinator.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CHOREOGRAPHY SAGA: ORDER FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ HAPPY PATH: │
│ ─────────── │
│ │
│ Order ──(1) OrderCreated──▶ Inventory ──(2) StockReserved──▶ │
│ Service Service │
│ │
│ Payment ◀──(3) StockReserved── Inventory ──(4) PaymentSuccess──▶ │
│ Service Service │
│ │
│ Order ◀──(5) PaymentSuccess── Payment ──(6) OrderConfirmed──▶ │
│ Service Service │
│ │
│ │
│ COMPENSATION (Payment Failed): │
│ ────────────────────────────── │
│ │
│ Payment ──(1) PaymentFailed──▶ Inventory ──(2) StockReleased──▶ │
│ Service Service │
│ │
│ Order ◀──(3) StockReleased── Inventory ──(4) OrderCancelled──▶ │
│ Service Service │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Copy
// order-service/src/sagas/orderSaga.js
class OrderChoreographySaga {
constructor(eventBus, orderRepository) {
this.eventBus = eventBus;
this.orderRepository = orderRepository;
this.setupHandlers();
}
setupHandlers() {
// Listen for saga-related events
this.eventBus.subscribe('inventory.stock_reserved', this.onStockReserved.bind(this));
this.eventBus.subscribe('inventory.stock_reservation_failed', this.onStockReservationFailed.bind(this));
this.eventBus.subscribe('payment.success', this.onPaymentSuccess.bind(this));
this.eventBus.subscribe('payment.failed', this.onPaymentFailed.bind(this));
this.eventBus.subscribe('inventory.stock_released', this.onStockReleased.bind(this));
}
// Step 1: Create order and publish event
async createOrder(orderData) {
const order = await this.orderRepository.create({
...orderData,
status: 'PENDING',
sagaState: 'STARTED'
});
await this.eventBus.publish('order.created', {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount
});
return order;
}
// Step 2: Stock reserved → Proceed to payment
async onStockReserved(event) {
const { orderId, reservationId } = event.data;
await this.orderRepository.update(orderId, {
sagaState: 'STOCK_RESERVED',
inventoryReservationId: reservationId
});
// Trigger payment
const order = await this.orderRepository.findById(orderId);
await this.eventBus.publish('order.ready_for_payment', {
orderId,
customerId: order.customerId,
amount: order.totalAmount
});
}
// Stock reservation failed → Cancel order
async onStockReservationFailed(event) {
const { orderId, reason } = event.data;
await this.orderRepository.update(orderId, {
status: 'CANCELLED',
sagaState: 'FAILED',
failureReason: reason
});
await this.eventBus.publish('order.cancelled', {
orderId,
reason: 'Insufficient stock'
});
}
// Step 3: Payment success → Confirm order
async onPaymentSuccess(event) {
const { orderId, paymentId, amount } = event.data;
await this.orderRepository.update(orderId, {
status: 'CONFIRMED',
sagaState: 'COMPLETED',
paymentId
});
await this.eventBus.publish('order.confirmed', {
orderId,
paymentId
});
}
// Payment failed → Trigger compensation
async onPaymentFailed(event) {
const { orderId, reason } = event.data;
await this.orderRepository.update(orderId, {
sagaState: 'COMPENSATING',
paymentFailureReason: reason
});
// Trigger inventory release (compensation)
const order = await this.orderRepository.findById(orderId);
await this.eventBus.publish('order.payment_failed', {
orderId,
reservationId: order.inventoryReservationId
});
}
// Compensation complete → Mark order as cancelled
async onStockReleased(event) {
const { orderId } = event.data;
const order = await this.orderRepository.findById(orderId);
if (order.sagaState === 'COMPENSATING') {
await this.orderRepository.update(orderId, {
status: 'CANCELLED',
sagaState: 'COMPENSATED'
});
await this.eventBus.publish('order.cancelled', {
orderId,
reason: 'Payment failed'
});
}
}
}
// inventory-service/src/handlers/sagaHandler.js
class InventorySagaHandler {
constructor(eventBus, inventoryService) {
this.eventBus = eventBus;
this.inventoryService = inventoryService;
this.setupHandlers();
}
setupHandlers() {
this.eventBus.subscribe('order.created', this.onOrderCreated.bind(this));
this.eventBus.subscribe('order.payment_failed', this.onPaymentFailed.bind(this));
}
async onOrderCreated(event) {
const { orderId, items } = event.data;
try {
const reservationId = await this.inventoryService.reserveStock(orderId, items);
await this.eventBus.publish('inventory.stock_reserved', {
orderId,
reservationId
});
} catch (error) {
await this.eventBus.publish('inventory.stock_reservation_failed', {
orderId,
reason: error.message
});
}
}
async onPaymentFailed(event) {
const { orderId, reservationId } = event.data;
await this.inventoryService.releaseReservation(reservationId);
await this.eventBus.publish('inventory.stock_released', {
orderId,
reservationId
});
}
}
Orchestration-Based Saga
A central orchestrator controls the saga flow.Copy
// saga-orchestrator/src/sagas/orderSaga.js
class OrderSagaOrchestrator {
constructor(eventBus, stateStore, serviceClients) {
this.eventBus = eventBus;
this.stateStore = stateStore;
this.services = serviceClients;
}
async startOrderSaga(orderData) {
const sagaId = generateId();
// Initialize saga state
await this.stateStore.create(sagaId, {
type: 'ORDER_SAGA',
status: 'STARTED',
step: 'CREATE_ORDER',
data: orderData,
compensations: []
});
try {
// Step 1: Create Order
const order = await this.executeStep(sagaId, 'CREATE_ORDER', async () => {
return this.services.order.createOrder(orderData);
}, async (order) => {
await this.services.order.cancelOrder(order.id);
});
// Step 2: Reserve Inventory
await this.executeStep(sagaId, 'RESERVE_INVENTORY', async () => {
return this.services.inventory.reserve(order.id, order.items);
}, async (reservationId) => {
await this.services.inventory.release(reservationId);
});
// Step 3: Process Payment
await this.executeStep(sagaId, 'PROCESS_PAYMENT', async () => {
return this.services.payment.charge(order.id, order.totalAmount);
}, async (paymentId) => {
await this.services.payment.refund(paymentId);
});
// Step 4: Confirm Order
await this.executeStep(sagaId, 'CONFIRM_ORDER', async () => {
return this.services.order.confirm(order.id);
}, null); // No compensation needed
await this.stateStore.update(sagaId, {
status: 'COMPLETED',
completedAt: new Date()
});
return order;
} catch (error) {
await this.compensate(sagaId, error);
throw error;
}
}
async executeStep(sagaId, stepName, action, compensation) {
await this.stateStore.update(sagaId, { step: stepName });
const result = await action();
if (compensation) {
// Store compensation for potential rollback
const saga = await this.stateStore.get(sagaId);
saga.compensations.unshift({ step: stepName, compensation, result });
await this.stateStore.update(sagaId, { compensations: saga.compensations });
}
return result;
}
async compensate(sagaId, error) {
await this.stateStore.update(sagaId, {
status: 'COMPENSATING',
error: error.message
});
const saga = await this.stateStore.get(sagaId);
for (const { step, compensation, result } of saga.compensations) {
try {
console.log(`Compensating step: ${step}`);
await compensation(result);
} catch (compError) {
console.error(`Compensation failed for ${step}:`, compError);
// Store for manual intervention
await this.stateStore.update(sagaId, {
status: 'COMPENSATION_FAILED',
failedCompensation: step
});
return;
}
}
await this.stateStore.update(sagaId, {
status: 'COMPENSATED',
compensatedAt: new Date()
});
}
}
Event Sourcing
Store state as a sequence of events instead of current state.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ EVENT SOURCING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TRADITIONAL APPROACH: │
│ ───────────────────── │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Order Table │ │
│ │ id: 123, status: SHIPPED, total: $99.99, shipped_at: 2024-01-15 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ ▲ We only see current state, history is lost │
│ │
│ EVENT SOURCING APPROACH: │
│ ──────────────────────── │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Event Store for Order 123 │ │
│ │ ──────────────────────────────────────────────────────────────── │ │
│ │ [1] OrderCreated { items: [...], total: $99.99 } 2024-01-10 │ │
│ │ [2] PaymentReceived { paymentId: "pay_abc" } 2024-01-10 │ │
│ │ [3] OrderConfirmed { } 2024-01-10 │ │
│ │ [4] ItemsShipped { trackingNo: "1Z999" } 2024-01-15 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ ▲ Full history, can rebuild any past state │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Implementation
Copy
// event-store/src/eventStore.js
class EventStore {
constructor(database) {
this.db = database;
}
async appendEvents(aggregateId, events, expectedVersion) {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = result.rows[0]?.current_version || 0;
if (expectedVersion !== currentVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current is ${currentVersion}`
);
}
// Append events
let version = currentVersion;
for (const event of events) {
version++;
await client.query(
`INSERT INTO events (aggregate_id, aggregate_type, event_type, event_data, version, timestamp)
VALUES ($1, $2, $3, $4, $5, $6)`,
[
aggregateId,
event.aggregateType,
event.eventType,
JSON.stringify(event.data),
version,
new Date()
]
);
}
await client.query('COMMIT');
return version;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId, fromVersion = 0) {
const result = await this.db.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version`,
[aggregateId, fromVersion]
);
return result.rows.map(row => ({
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
eventType: row.event_type,
data: row.event_data,
version: row.version,
timestamp: row.timestamp
}));
}
async getEventsByType(eventType, fromTimestamp, limit = 1000) {
const result = await this.db.query(
`SELECT * FROM events
WHERE event_type = $1 AND timestamp > $2
ORDER BY timestamp LIMIT $3`,
[eventType, fromTimestamp, limit]
);
return result.rows;
}
}
// order-service/src/aggregates/Order.js
class Order {
constructor() {
this.id = null;
this.customerId = null;
this.items = [];
this.status = null;
this.totalAmount = 0;
this.version = 0;
this.pendingEvents = [];
}
// Factory method to create new order
static create(orderId, customerId, items) {
const order = new Order();
order.apply(new OrderCreatedEvent(orderId, customerId, items));
return order;
}
// Reconstitute from events
static fromEvents(events) {
const order = new Order();
for (const event of events) {
order.applyEvent(event);
order.version = event.version;
}
return order;
}
// Business methods that generate events
addItem(productId, quantity, price) {
if (this.status !== 'DRAFT') {
throw new Error('Cannot add items to non-draft order');
}
this.apply(new ItemAddedEvent(this.id, productId, quantity, price));
}
submit() {
if (this.items.length === 0) {
throw new Error('Cannot submit empty order');
}
this.apply(new OrderSubmittedEvent(this.id));
}
confirmPayment(paymentId) {
if (this.status !== 'PENDING_PAYMENT') {
throw new Error('Order not pending payment');
}
this.apply(new PaymentConfirmedEvent(this.id, paymentId));
}
ship(trackingNumber) {
if (this.status !== 'CONFIRMED') {
throw new Error('Order not confirmed');
}
this.apply(new OrderShippedEvent(this.id, trackingNumber));
}
// Apply new event (adds to pending)
apply(event) {
this.applyEvent(event);
this.pendingEvents.push(event);
}
// Apply event to state
applyEvent(event) {
switch (event.eventType) {
case 'OrderCreated':
this.id = event.data.orderId;
this.customerId = event.data.customerId;
this.items = event.data.items;
this.totalAmount = this.calculateTotal();
this.status = 'DRAFT';
break;
case 'ItemAdded':
this.items.push({
productId: event.data.productId,
quantity: event.data.quantity,
price: event.data.price
});
this.totalAmount = this.calculateTotal();
break;
case 'OrderSubmitted':
this.status = 'PENDING_PAYMENT';
break;
case 'PaymentConfirmed':
this.status = 'CONFIRMED';
this.paymentId = event.data.paymentId;
break;
case 'OrderShipped':
this.status = 'SHIPPED';
this.trackingNumber = event.data.trackingNumber;
break;
}
}
calculateTotal() {
return this.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
}
getPendingEvents() {
return this.pendingEvents;
}
clearPendingEvents() {
this.pendingEvents = [];
}
}
// order-service/src/repositories/OrderRepository.js
class OrderRepository {
constructor(eventStore) {
this.eventStore = eventStore;
}
async getById(orderId) {
const events = await this.eventStore.getEvents(orderId);
if (events.length === 0) {
return null;
}
return Order.fromEvents(events);
}
async save(order) {
const pendingEvents = order.getPendingEvents();
if (pendingEvents.length === 0) {
return;
}
const newVersion = await this.eventStore.appendEvents(
order.id,
pendingEvents,
order.version
);
order.version = newVersion;
order.clearPendingEvents();
// Publish events for other services
for (const event of pendingEvents) {
await this.eventBus.publish(event.eventType, event);
}
}
}
Snapshotting for Performance
Copy
// Snapshot to avoid replaying all events
class SnapshotStore {
constructor(database) {
this.db = database;
this.snapshotFrequency = 100; // Snapshot every 100 events
}
async saveSnapshot(aggregateId, snapshot, version) {
await this.db.query(
`INSERT INTO snapshots (aggregate_id, snapshot_data, version, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id) DO UPDATE SET snapshot_data = $2, version = $3`,
[aggregateId, JSON.stringify(snapshot), version, new Date()]
);
}
async getSnapshot(aggregateId) {
const result = await this.db.query(
'SELECT * FROM snapshots WHERE aggregate_id = $1',
[aggregateId]
);
return result.rows[0];
}
}
class OrderRepositoryWithSnapshots {
async getById(orderId) {
// Try to load from snapshot
const snapshot = await this.snapshotStore.getSnapshot(orderId);
let order;
let fromVersion = 0;
if (snapshot) {
order = Order.fromSnapshot(snapshot.snapshot_data);
order.version = snapshot.version;
fromVersion = snapshot.version;
} else {
order = new Order();
}
// Load events since snapshot
const events = await this.eventStore.getEvents(orderId, fromVersion);
for (const event of events) {
order.applyEvent(event);
order.version = event.version;
}
return order;
}
async save(order) {
await super.save(order);
// Create snapshot if needed
if (order.version % this.snapshotStore.snapshotFrequency === 0) {
await this.snapshotStore.saveSnapshot(
order.id,
order.toSnapshot(),
order.version
);
}
}
}
CQRS Pattern
Command Query Responsibility Segregation - separate read and write models.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CQRS PATTERN │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ CLIENT │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ │ │ │
│ Commands (Write) Queries (Read) │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ COMMAND │ │ QUERY │ │
│ │ HANDLER │ │ HANDLER │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ WRITE │──── Events ──▶│ READ │ │
│ │ MODEL │ │ MODEL │ │
│ │ (Normalized)│ │(Denormalized)│ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ │ Elasticsearch│ │
│ │ (Source) │ │ (Cache) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Implementation
Copy
// Write side - Commands
class OrderCommandHandler {
constructor(orderRepository, eventBus) {
this.orderRepository = orderRepository;
this.eventBus = eventBus;
}
async handleCreateOrder(command) {
const order = Order.create(
command.orderId,
command.customerId,
command.items
);
await this.orderRepository.save(order);
// Events are published to update read models
return order.id;
}
async handleSubmitOrder(command) {
const order = await this.orderRepository.getById(command.orderId);
if (!order) {
throw new Error('Order not found');
}
order.submit();
await this.orderRepository.save(order);
}
}
// Read side - Query models
class OrderReadModel {
constructor(elasticsearch) {
this.es = elasticsearch;
}
// Update handlers - called when events occur
async onOrderCreated(event) {
await this.es.index({
index: 'orders',
id: event.data.orderId,
body: {
orderId: event.data.orderId,
customerId: event.data.customerId,
customerName: await this.getCustomerName(event.data.customerId),
items: event.data.items,
itemCount: event.data.items.length,
totalAmount: this.calculateTotal(event.data.items),
status: 'DRAFT',
createdAt: event.timestamp,
updatedAt: event.timestamp
}
});
}
async onOrderSubmitted(event) {
await this.es.update({
index: 'orders',
id: event.data.orderId,
body: {
doc: {
status: 'PENDING_PAYMENT',
submittedAt: event.timestamp,
updatedAt: event.timestamp
}
}
});
}
async onOrderShipped(event) {
await this.es.update({
index: 'orders',
id: event.data.orderId,
body: {
doc: {
status: 'SHIPPED',
trackingNumber: event.data.trackingNumber,
shippedAt: event.timestamp,
updatedAt: event.timestamp
}
}
});
}
}
// Query handlers
class OrderQueryHandler {
constructor(elasticsearch) {
this.es = elasticsearch;
}
async getOrderById(orderId) {
const result = await this.es.get({
index: 'orders',
id: orderId
});
return result._source;
}
async searchOrders(criteria) {
const query = {
bool: {
must: []
}
};
if (criteria.customerId) {
query.bool.must.push({ term: { customerId: criteria.customerId } });
}
if (criteria.status) {
query.bool.must.push({ term: { status: criteria.status } });
}
if (criteria.dateRange) {
query.bool.must.push({
range: {
createdAt: {
gte: criteria.dateRange.from,
lte: criteria.dateRange.to
}
}
});
}
const result = await this.es.search({
index: 'orders',
body: {
query,
sort: [{ createdAt: 'desc' }],
from: criteria.offset || 0,
size: criteria.limit || 20
}
});
return {
orders: result.hits.hits.map(h => h._source),
total: result.hits.total.value
};
}
async getOrderStatistics(customerId) {
const result = await this.es.search({
index: 'orders',
body: {
query: {
term: { customerId }
},
aggs: {
total_spent: { sum: { field: 'totalAmount' } },
order_count: { value_count: { field: 'orderId' } },
avg_order_value: { avg: { field: 'totalAmount' } },
by_status: {
terms: { field: 'status' }
}
},
size: 0
}
});
return {
totalSpent: result.aggregations.total_spent.value,
orderCount: result.aggregations.order_count.value,
avgOrderValue: result.aggregations.avg_order_value.value,
byStatus: result.aggregations.by_status.buckets
};
}
}
Interview Questions
Q1: How do you handle distributed transactions?
Q1: How do you handle distributed transactions?
Answer:Options:
- Saga Pattern: Series of local transactions with compensating actions
- Two-Phase Commit (2PC): Coordinator-based, but slow and blocking
- Outbox Pattern: Store events in same transaction as data changes
- Non-blocking
- Each service has autonomy
- Scales better
- Handles failures gracefully
Q2: When would you use Event Sourcing?
Q2: When would you use Event Sourcing?
Answer:Use when:
- Need full audit trail
- Business requires “what happened” history
- Complex domain with many state transitions
- Need to rebuild state at any point in time
- Event-driven architecture already in place
- Simple CRUD operations
- Low complexity domain
- Team unfamiliar with pattern
- Query performance is critical (use with CQRS)
Q3: How does CQRS help with scalability?
Q3: How does CQRS help with scalability?
Answer:Benefits:
- Independent Scaling: Scale reads (often 90%+) separately from writes
- Optimized Models: Read model denormalized for query performance
- Different Databases: Use best database for each (PostgreSQL for writes, Elasticsearch for reads)
- Caching: Read model can be cached aggressively
- Eventual consistency between read/write
- More complex architecture
- Need to handle stale reads
Q4: What is the Outbox Pattern?
Q4: What is the Outbox Pattern?
Answer:Problem: Publishing events after DB commit can fail, causing inconsistency.Solution:Benefits: Guaranteed at-least-once delivery, atomic with business logic
- In same transaction: save data AND write event to outbox table
- Separate process reads outbox and publishes to message broker
- Mark event as published after successful publish
Copy
BEGIN TRANSACTION;
INSERT INTO orders (...);
INSERT INTO outbox (event_type, payload) VALUES (...);
COMMIT;
Summary
Key Takeaways
- Each service owns its data
- Use Saga for distributed transactions
- Event Sourcing for audit/history needs
- CQRS separates read/write concerns
- Accept eventual consistency
Next Steps
In the next chapter, we’ll cover Resilience Patterns - circuit breakers, retries, and bulkheads.