Staff Level Topic: Data modeling decisions have long-lasting consequences. Wrong choices here lead to expensive migrations and system redesigns.
Data Modeling Philosophy
Great data modeling answers three questions:- How will data be queried? (Read patterns)
- How will data change? (Write patterns)
- What are the consistency requirements?
Normalization vs Denormalization
When to Normalize (3NF)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ NORMALIZED DESIGN │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Users Table Orders Table Products Table │
│ ┌──────────┐ ┌───────────┐ ┌────────────┐ │
│ │ user_id │◄────────│ user_id │ ┌───►│ product_id │ │
│ │ name │ │ order_id │ │ │ name │ │
│ │ email │ │ total │ │ │ price │ │
│ └──────────┘ │ status │ │ └────────────┘ │
│ └───────────┘ │ │
│ │ │ │
│ ┌────▼────┐ │ │
│ │Order │──────┘ │
│ │Items │ │
│ └─────────┘ │
│ │
│ ✓ Use when: │
│ • Data integrity is critical (banking, inventory) │
│ • Write-heavy workload │
│ • Complex relationships with frequent updates │
│ • ACID compliance required │
│ │
└─────────────────────────────────────────────────────────────────┘
When to Denormalize
Copy
┌─────────────────────────────────────────────────────────────────┐
│ DENORMALIZED DESIGN │
├─────────────────────────────────────────────────────────────────┤
│ │
│ User Feed Document (MongoDB/DynamoDB style) │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "user_id": "123", │ │
│ │ "name": "John Doe", │ │
│ │ "orders": [ │ │
│ │ { │ │
│ │ "order_id": "456", │ │
│ │ "product_name": "iPhone 15", // Duplicated! │ │
│ │ "product_price": 999, // Duplicated! │ │
│ │ "quantity": 1 │ │
│ │ } │ │
│ │ ] │ │
│ │ } │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ ✓ Use when: │
│ • Read-heavy workload (50:1 read/write ratio) │
│ • Low latency requirements │
│ • Data changes infrequently │
│ • Eventual consistency is acceptable │
│ • Single document/row access patterns │
│ │
└─────────────────────────────────────────────────────────────────┘
Denormalization Strategies
- Python
- JavaScript
Copy
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
@dataclass
class DenormalizedOrder:
"""
Denormalized order that includes user and product details
for single-read access pattern.
"""
order_id: str
# User info (denormalized from users table)
user_id: str
user_name: str
user_email: str
shipping_address: str
# Order details
items: List[Dict] # Contains product details
total: float
status: str
created_at: datetime
@classmethod
def from_normalized(cls, order, user, products):
"""
Build denormalized document from normalized sources.
Call this during write operations.
"""
items = []
total = 0
for item in order.items:
product = products[item.product_id]
denorm_item = {
"product_id": product.id,
"product_name": product.name,
"product_image": product.thumbnail_url,
"unit_price": product.price,
"quantity": item.quantity,
"subtotal": product.price * item.quantity
}
items.append(denorm_item)
total += denorm_item["subtotal"]
return cls(
order_id=order.id,
user_id=user.id,
user_name=user.name,
user_email=user.email,
shipping_address=user.default_address,
items=items,
total=total,
status=order.status,
created_at=order.created_at
)
class OrderService:
"""
Service that maintains both normalized and denormalized views.
Write to normalized, materialize denormalized.
"""
def __init__(self, sql_db, nosql_db):
self.sql = sql_db # Source of truth
self.nosql = nosql_db # Denormalized view
async def create_order(self, user_id: str, cart_items: List[Dict]) -> str:
"""Create order with dual-write pattern"""
# 1. Write to normalized tables (source of truth)
async with self.sql.transaction():
order = await self.sql.orders.create(
user_id=user_id,
status="pending"
)
for item in cart_items:
await self.sql.order_items.create(
order_id=order.id,
product_id=item["product_id"],
quantity=item["quantity"]
)
# 2. Build and write denormalized view
user = await self.sql.users.get(user_id)
products = await self.sql.products.get_many(
[item["product_id"] for item in cart_items]
)
denorm_order = DenormalizedOrder.from_normalized(
order, user, {p.id: p for p in products}
)
await self.nosql.orders.put(denorm_order)
return order.id
async def get_order(self, order_id: str) -> DenormalizedOrder:
"""Single read from denormalized store"""
return await self.nosql.orders.get(order_id)
async def update_product_price(self, product_id: str, new_price: float):
"""
When product price changes, update denormalized copies.
This is the trade-off of denormalization!
"""
# Update source of truth
await self.sql.products.update(product_id, price=new_price)
# Find and update affected denormalized orders
# Option 1: Batch job (eventual consistency)
# Option 2: Real-time update (more complex)
await self.schedule_denorm_update(product_id)
Copy
class DenormalizedOrderService {
constructor(sqlDb, mongoDb) {
this.sql = sqlDb;
this.mongo = mongoDb;
}
/**
* Create order with dual-write to normalized and denormalized stores
*/
async createOrder(userId, cartItems) {
// 1. Write to normalized tables (source of truth)
const order = await this.sql.transaction(async (trx) => {
const [order] = await trx('orders').insert({
user_id: userId,
status: 'pending',
created_at: new Date()
}).returning('*');
const orderItems = cartItems.map(item => ({
order_id: order.id,
product_id: item.productId,
quantity: item.quantity
}));
await trx('order_items').insert(orderItems);
return order;
});
// 2. Build denormalized document
const [user, products] = await Promise.all([
this.sql('users').where('id', userId).first(),
this.sql('products').whereIn('id', cartItems.map(i => i.productId))
]);
const denormalizedOrder = {
orderId: order.id,
userId: user.id,
userName: user.name,
userEmail: user.email,
shippingAddress: user.default_address,
items: cartItems.map(item => {
const product = products.find(p => p.id === item.productId);
return {
productId: product.id,
productName: product.name,
productImage: product.thumbnail_url,
unitPrice: product.price,
quantity: item.quantity,
subtotal: product.price * item.quantity
};
}),
status: 'pending',
createdAt: order.created_at
};
denormalizedOrder.total = denormalizedOrder.items.reduce(
(sum, item) => sum + item.subtotal, 0
);
// 3. Write to denormalized store
await this.mongo.collection('orders').insertOne(denormalizedOrder);
return order.id;
}
/**
* Single read from denormalized store - fast!
*/
async getOrder(orderId) {
return this.mongo.collection('orders').findOne({ orderId });
}
/**
* When user updates their name, update all denormalized copies
*/
async updateUserName(userId, newName) {
// Update source of truth
await this.sql('users').where('id', userId).update({ name: newName });
// Update denormalized copies (eventual consistency)
await this.mongo.collection('orders').updateMany(
{ userId },
{ $set: { userName: newName } }
);
}
}
Event Sourcing
Store all changes as immutable events instead of current state.Copy
┌─────────────────────────────────────────────────────────────────┐
│ EVENT SOURCING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Traditional (State-based) Event Sourcing │
│ ───────────────────────── ────────────── │
│ │
│ Account Balance: $500 Events: │
│ (Only current state) 1. AccountCreated(initial: $0) │
│ 2. Deposited(amount: $1000) │
│ 3. Withdrawn(amount: $300) │
│ 4. Withdrawn(amount: $200) │
│ │
│ Current State = Replay Events │
│ $0 + $1000 - $300 - $200 = $500 │
│ │
│ ✓ Benefits: │
│ • Complete audit trail │
│ • Temporal queries ("What was balance on Jan 1?") │
│ • Event replay for debugging │
│ • Easy to add new projections │
│ │
│ ✗ Challenges: │
│ • Schema evolution of events │
│ • Event store scalability │
│ • Eventual consistency complexity │
│ │
└─────────────────────────────────────────────────────────────────┘
Event Sourcing Implementation
- Python
- JavaScript
Copy
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Type, TypeVar
from datetime import datetime
from uuid import UUID, uuid4
import json
# Base event class
@dataclass
class Event:
event_id: UUID = field(default_factory=uuid4)
aggregate_id: UUID = None
timestamp: datetime = field(default_factory=datetime.utcnow)
version: int = 1
def to_dict(self) -> Dict:
return {
"event_type": self.__class__.__name__,
"event_id": str(self.event_id),
"aggregate_id": str(self.aggregate_id),
"timestamp": self.timestamp.isoformat(),
"version": self.version,
"data": self._event_data()
}
@abstractmethod
def _event_data(self) -> Dict:
pass
# Domain events for a bank account
@dataclass
class AccountCreated(Event):
owner_name: str = ""
initial_balance: float = 0.0
def _event_data(self) -> Dict:
return {
"owner_name": self.owner_name,
"initial_balance": self.initial_balance
}
@dataclass
class MoneyDeposited(Event):
amount: float = 0.0
description: str = ""
def _event_data(self) -> Dict:
return {"amount": self.amount, "description": self.description}
@dataclass
class MoneyWithdrawn(Event):
amount: float = 0.0
description: str = ""
def _event_data(self) -> Dict:
return {"amount": self.amount, "description": self.description}
@dataclass
class AccountClosed(Event):
reason: str = ""
def _event_data(self) -> Dict:
return {"reason": self.reason}
# Aggregate that rebuilds state from events
class BankAccount:
def __init__(self, account_id: UUID = None):
self.id = account_id or uuid4()
self.owner_name = ""
self.balance = 0.0
self.is_closed = False
self.version = 0
self._uncommitted_events: List[Event] = []
# Command handlers (business logic)
def create(self, owner_name: str, initial_balance: float = 0.0):
if initial_balance < 0:
raise ValueError("Initial balance cannot be negative")
self._apply_event(AccountCreated(
aggregate_id=self.id,
owner_name=owner_name,
initial_balance=initial_balance
))
def deposit(self, amount: float, description: str = ""):
if self.is_closed:
raise ValueError("Cannot deposit to closed account")
if amount <= 0:
raise ValueError("Deposit amount must be positive")
self._apply_event(MoneyDeposited(
aggregate_id=self.id,
amount=amount,
description=description
))
def withdraw(self, amount: float, description: str = ""):
if self.is_closed:
raise ValueError("Cannot withdraw from closed account")
if amount <= 0:
raise ValueError("Withdrawal amount must be positive")
if amount > self.balance:
raise ValueError("Insufficient funds")
self._apply_event(MoneyWithdrawn(
aggregate_id=self.id,
amount=amount,
description=description
))
def close(self, reason: str = ""):
if self.is_closed:
raise ValueError("Account already closed")
if self.balance != 0:
raise ValueError("Cannot close account with non-zero balance")
self._apply_event(AccountClosed(
aggregate_id=self.id,
reason=reason
))
# Event handlers (state mutations)
def _apply_event(self, event: Event, is_replay: bool = False):
handler = getattr(self, f"_on_{event.__class__.__name__}", None)
if handler:
handler(event)
self.version += 1
event.version = self.version
if not is_replay:
self._uncommitted_events.append(event)
def _on_AccountCreated(self, event: AccountCreated):
self.owner_name = event.owner_name
self.balance = event.initial_balance
def _on_MoneyDeposited(self, event: MoneyDeposited):
self.balance += event.amount
def _on_MoneyWithdrawn(self, event: MoneyWithdrawn):
self.balance -= event.amount
def _on_AccountClosed(self, event: AccountClosed):
self.is_closed = True
# Rebuild from events
@classmethod
def from_events(cls, account_id: UUID, events: List[Event]) -> "BankAccount":
account = cls(account_id)
for event in events:
account._apply_event(event, is_replay=True)
return account
def get_uncommitted_events(self) -> List[Event]:
events = self._uncommitted_events.copy()
self._uncommitted_events.clear()
return events
# Event Store
class EventStore:
def __init__(self, db):
self.db = db
async def save_events(self, aggregate_id: UUID, events: List[Event], expected_version: int):
"""Save events with optimistic concurrency control"""
async with self.db.transaction():
# Check version for optimistic locking
current_version = await self.db.fetchval(
"SELECT MAX(version) FROM events WHERE aggregate_id = $1",
str(aggregate_id)
) or 0
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, but found {current_version}"
)
# Save all events
for event in events:
await self.db.execute("""
INSERT INTO events (
event_id, aggregate_id, event_type,
data, version, timestamp
) VALUES ($1, $2, $3, $4, $5, $6)
""",
str(event.event_id),
str(event.aggregate_id),
event.__class__.__name__,
json.dumps(event._event_data()),
event.version,
event.timestamp
)
async def get_events(self, aggregate_id: UUID) -> List[Event]:
"""Get all events for an aggregate"""
rows = await self.db.fetch(
"SELECT * FROM events WHERE aggregate_id = $1 ORDER BY version",
str(aggregate_id)
)
return [self._deserialize_event(row) for row in rows]
async def get_events_since(self, aggregate_id: UUID, version: int) -> List[Event]:
"""Get events after a certain version (for snapshotting)"""
rows = await self.db.fetch(
"SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version",
str(aggregate_id), version
)
return [self._deserialize_event(row) for row in rows]
# Repository with caching and snapshotting
class BankAccountRepository:
def __init__(self, event_store: EventStore, snapshot_store=None):
self.event_store = event_store
self.snapshot_store = snapshot_store
self.snapshot_threshold = 100 # Snapshot every 100 events
async def get(self, account_id: UUID) -> BankAccount:
"""Load account from events (with optional snapshot optimization)"""
# Try to load from snapshot first
snapshot = None
if self.snapshot_store:
snapshot = await self.snapshot_store.get(account_id)
if snapshot:
# Load events since snapshot
events = await self.event_store.get_events_since(
account_id, snapshot["version"]
)
account = BankAccount.from_snapshot(snapshot)
for event in events:
account._apply_event(event, is_replay=True)
else:
# Load all events
events = await self.event_store.get_events(account_id)
account = BankAccount.from_events(account_id, events)
return account
async def save(self, account: BankAccount):
"""Save uncommitted events"""
events = account.get_uncommitted_events()
if not events:
return
expected_version = account.version - len(events)
await self.event_store.save_events(account.id, events, expected_version)
# Create snapshot if needed
if self.snapshot_store and account.version % self.snapshot_threshold == 0:
await self.snapshot_store.save(account.to_snapshot())
# Usage example
async def transfer_money(from_id: UUID, to_id: UUID, amount: float, repo: BankAccountRepository):
"""Business operation using event sourcing"""
# Load accounts from events
from_account = await repo.get(from_id)
to_account = await repo.get(to_id)
# Execute business logic (creates events)
from_account.withdraw(amount, f"Transfer to {to_id}")
to_account.deposit(amount, f"Transfer from {from_id}")
# Save events (not state!)
await repo.save(from_account)
await repo.save(to_account)
Copy
const { v4: uuidv4 } = require('uuid');
// Base Event class
class Event {
constructor(aggregateId) {
this.eventId = uuidv4();
this.aggregateId = aggregateId;
this.timestamp = new Date();
this.version = 1;
}
toJSON() {
return {
eventType: this.constructor.name,
eventId: this.eventId,
aggregateId: this.aggregateId,
timestamp: this.timestamp.toISOString(),
version: this.version,
data: this.eventData()
};
}
eventData() {
throw new Error('eventData() must be implemented');
}
}
// Domain Events
class AccountCreated extends Event {
constructor(aggregateId, ownerName, initialBalance = 0) {
super(aggregateId);
this.ownerName = ownerName;
this.initialBalance = initialBalance;
}
eventData() {
return {
ownerName: this.ownerName,
initialBalance: this.initialBalance
};
}
}
class MoneyDeposited extends Event {
constructor(aggregateId, amount, description = '') {
super(aggregateId);
this.amount = amount;
this.description = description;
}
eventData() {
return { amount: this.amount, description: this.description };
}
}
class MoneyWithdrawn extends Event {
constructor(aggregateId, amount, description = '') {
super(aggregateId);
this.amount = amount;
this.description = description;
}
eventData() {
return { amount: this.amount, description: this.description };
}
}
// Aggregate
class BankAccount {
constructor(accountId = null) {
this.id = accountId || uuidv4();
this.ownerName = '';
this.balance = 0;
this.isClosed = false;
this.version = 0;
this._uncommittedEvents = [];
}
// Commands
create(ownerName, initialBalance = 0) {
if (initialBalance < 0) {
throw new Error('Initial balance cannot be negative');
}
this._applyEvent(new AccountCreated(this.id, ownerName, initialBalance));
}
deposit(amount, description = '') {
if (this.isClosed) throw new Error('Account is closed');
if (amount <= 0) throw new Error('Amount must be positive');
this._applyEvent(new MoneyDeposited(this.id, amount, description));
}
withdraw(amount, description = '') {
if (this.isClosed) throw new Error('Account is closed');
if (amount <= 0) throw new Error('Amount must be positive');
if (amount > this.balance) throw new Error('Insufficient funds');
this._applyEvent(new MoneyWithdrawn(this.id, amount, description));
}
// Event handlers
_applyEvent(event, isReplay = false) {
const handler = this[`_on${event.constructor.name}`];
if (handler) {
handler.call(this, event);
}
this.version++;
event.version = this.version;
if (!isReplay) {
this._uncommittedEvents.push(event);
}
}
_onAccountCreated(event) {
this.ownerName = event.ownerName;
this.balance = event.initialBalance;
}
_onMoneyDeposited(event) {
this.balance += event.amount;
}
_onMoneyWithdrawn(event) {
this.balance -= event.amount;
}
// Reconstitution
static fromEvents(accountId, events) {
const account = new BankAccount(accountId);
for (const event of events) {
account._applyEvent(event, true);
}
return account;
}
getUncommittedEvents() {
const events = [...this._uncommittedEvents];
this._uncommittedEvents = [];
return events;
}
}
// Event Store
class EventStore {
constructor(db) {
this.db = db;
}
async saveEvents(aggregateId, events, expectedVersion) {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
const result = await client.query(
'SELECT MAX(version) as version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = result.rows[0]?.version || 0;
if (currentVersion !== expectedVersion) {
throw new Error(`Concurrency error: expected ${expectedVersion}, got ${currentVersion}`);
}
// Insert events
for (const event of events) {
await client.query(`
INSERT INTO events (event_id, aggregate_id, event_type, data, version, timestamp)
VALUES ($1, $2, $3, $4, $5, $6)
`, [
event.eventId,
event.aggregateId,
event.constructor.name,
JSON.stringify(event.eventData()),
event.version,
event.timestamp
]);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId) {
const result = await this.db.query(
'SELECT * FROM events WHERE aggregate_id = $1 ORDER BY version',
[aggregateId]
);
return result.rows.map(row => this._deserializeEvent(row));
}
_deserializeEvent(row) {
const EventClass = { AccountCreated, MoneyDeposited, MoneyWithdrawn }[row.event_type];
const event = Object.assign(new EventClass(row.aggregate_id), row.data);
event.eventId = row.event_id;
event.version = row.version;
event.timestamp = row.timestamp;
return event;
}
}
CQRS (Command Query Responsibility Segregation)
Separate read and write models for optimized performance.Copy
┌─────────────────────────────────────────────────────────────────┐
│ CQRS PATTERN │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ Client │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ │ │ │
│ ┌────▼────┐ ┌────▼────┐ │
│ │ Command │ │ Query │ │
│ │ API │ │ API │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ┌────▼─────────┐ ┌────▼─────────┐ │
│ │ Write Model │ │ Read Model │ │
│ │ (Normalized) │ │(Denormalized)│ │
│ └────┬─────────┘ └──────▲───────┘ │
│ │ │ │
│ │ ┌───────────┐ │ │
│ └───────►│ Events │─────────┘ │
│ │ (Sync) │ │
│ └───────────┘ │
│ │
│ Write Side: Read Side: │
│ • Complex validation • Fast queries │
│ • Business rules • Denormalized views │
│ • Single source of truth • Multiple projections │
│ • Consistency • Eventually consistent │
│ │
└─────────────────────────────────────────────────────────────────┘
CQRS Implementation
- Python
Copy
from dataclasses import dataclass
from typing import List, Dict, Optional
from abc import ABC, abstractmethod
import asyncio
# Commands (Write Side)
@dataclass
class CreateOrderCommand:
user_id: str
items: List[Dict]
@dataclass
class CancelOrderCommand:
order_id: str
reason: str
# Command Handlers
class OrderCommandHandler:
def __init__(self, event_store, event_bus):
self.event_store = event_store
self.event_bus = event_bus
async def handle_create_order(self, command: CreateOrderCommand) -> str:
# Business logic and validation
order = Order.create(command.user_id, command.items)
# Persist events
events = order.get_uncommitted_events()
await self.event_store.save(order.id, events)
# Publish for read model update
for event in events:
await self.event_bus.publish(event)
return order.id
async def handle_cancel_order(self, command: CancelOrderCommand):
order = await self.event_store.load(Order, command.order_id)
order.cancel(command.reason)
events = order.get_uncommitted_events()
await self.event_store.save(order.id, events)
for event in events:
await self.event_bus.publish(event)
# Queries (Read Side)
@dataclass
class OrderSummary:
"""Denormalized read model optimized for display"""
order_id: str
user_name: str
user_email: str
items: List[Dict]
total: float
status: str
created_at: str
class OrderQueryHandler:
def __init__(self, read_db):
self.db = read_db # Optimized read database
async def get_order(self, order_id: str) -> OrderSummary:
"""Single query - no joins needed"""
return await self.db.orders.find_one({"order_id": order_id})
async def get_user_orders(self, user_id: str, limit: int = 20) -> List[OrderSummary]:
"""Pre-indexed for fast lookup"""
return await self.db.orders.find(
{"user_id": user_id}
).sort("created_at", -1).limit(limit).to_list()
async def get_orders_by_status(self, status: str) -> List[OrderSummary]:
return await self.db.orders.find({"status": status}).to_list()
# Event handlers that update read model
class OrderProjection:
"""Builds read model from events"""
def __init__(self, read_db, user_service):
self.db = read_db
self.user_service = user_service
async def handle_order_created(self, event: OrderCreatedEvent):
# Denormalize user data
user = await self.user_service.get(event.user_id)
# Build read model document
order_summary = {
"order_id": event.order_id,
"user_id": event.user_id,
"user_name": user.name,
"user_email": user.email,
"items": event.items,
"total": sum(item["price"] * item["quantity"] for item in event.items),
"status": "pending",
"created_at": event.timestamp.isoformat()
}
await self.db.orders.insert_one(order_summary)
async def handle_order_cancelled(self, event: OrderCancelledEvent):
await self.db.orders.update_one(
{"order_id": event.order_id},
{"$set": {"status": "cancelled", "cancellation_reason": event.reason}}
)
async def handle_order_shipped(self, event: OrderShippedEvent):
await self.db.orders.update_one(
{"order_id": event.order_id},
{"$set": {
"status": "shipped",
"tracking_number": event.tracking_number,
"shipped_at": event.timestamp.isoformat()
}}
)
# CQRS Facade
class OrderService:
"""Unified interface for both commands and queries"""
def __init__(self, command_handler, query_handler):
self.commands = command_handler
self.queries = query_handler
# Commands
async def create_order(self, user_id: str, items: List[Dict]) -> str:
return await self.commands.handle_create_order(
CreateOrderCommand(user_id, items)
)
async def cancel_order(self, order_id: str, reason: str):
await self.commands.handle_cancel_order(
CancelOrderCommand(order_id, reason)
)
# Queries
async def get_order(self, order_id: str) -> OrderSummary:
return await self.queries.get_order(order_id)
async def get_user_orders(self, user_id: str) -> List[OrderSummary]:
return await self.queries.get_user_orders(user_id)
Time-Series Data Patterns
Copy
┌─────────────────────────────────────────────────────────────────┐
│ TIME-SERIES DATA PATTERNS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Use Cases: │
│ • Metrics and monitoring │
│ • IoT sensor data │
│ • Financial market data │
│ • User activity logs │
│ │
│ Characteristics: │
│ • Append-only (immutable) │
│ • Time-ordered │
│ • High write volume │
│ • Range queries common │
│ • Often needs aggregation │
│ │
│ Schema Design: │
│ ───────────── │
│ │
│ Wide-column approach (Cassandra/TimescaleDB): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Partition Key: (metric_name, time_bucket) │ │
│ │ Clustering: timestamp DESC │ │
│ │ │ │
│ │ cpu_usage:2024-01-15: │ │
│ │ 10:30:00 → 45.2 │ │
│ │ 10:29:00 → 43.1 │ │
│ │ 10:28:00 → 47.8 │ │
│ │ ... │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Downsampling Strategy: │
│ ───────────────────── │
│ Raw data → 1 min avg → 1 hour avg → 1 day avg │
│ Keep: 7 days 30 days 1 year forever │
│ │
└─────────────────────────────────────────────────────────────────┘
Time-Series Implementation
Copy
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import asyncio
class TimeSeriesStore:
"""
Time-series storage with automatic downsampling and retention.
Designed for metrics and monitoring data.
"""
def __init__(self, db):
self.db = db
self.retention_policies = {
"raw": timedelta(days=7),
"1min": timedelta(days=30),
"1hour": timedelta(days=365),
"1day": None # Keep forever
}
async def write(self, metric: str, value: float, timestamp: datetime = None, tags: Dict = None):
"""Write a single metric point"""
timestamp = timestamp or datetime.utcnow()
tags = tags or {}
# Calculate time bucket for partitioning
time_bucket = timestamp.strftime("%Y-%m-%d")
await self.db.execute("""
INSERT INTO metrics_raw (metric, time_bucket, timestamp, value, tags)
VALUES ($1, $2, $3, $4, $5)
""", metric, time_bucket, timestamp, value, tags)
async def write_batch(self, points: List[Dict]):
"""Batch write for high throughput"""
async with self.db.transaction():
for point in points:
await self.write(**point)
async def query(
self,
metric: str,
start: datetime,
end: datetime,
resolution: str = "auto",
aggregation: str = "avg"
) -> List[Dict]:
"""
Query metrics with automatic resolution selection.
Uses downsampled data for longer time ranges.
"""
time_range = end - start
# Auto-select resolution based on time range
if resolution == "auto":
if time_range <= timedelta(hours=6):
resolution = "raw"
elif time_range <= timedelta(days=7):
resolution = "1min"
elif time_range <= timedelta(days=90):
resolution = "1hour"
else:
resolution = "1day"
table = f"metrics_{resolution}"
return await self.db.fetch(f"""
SELECT
time_bucket('{resolution}', timestamp) as bucket,
{aggregation}(value) as value
FROM {table}
WHERE metric = $1 AND timestamp BETWEEN $2 AND $3
GROUP BY bucket
ORDER BY bucket
""", metric, start, end)
async def downsample(self, resolution: str):
"""
Aggregate raw data into lower resolution.
Run periodically via cron job.
"""
resolutions = {
"1min": ("raw", "1 minute"),
"1hour": ("1min", "1 hour"),
"1day": ("1hour", "1 day")
}
source_table, interval = resolutions[resolution]
target_table = f"metrics_{resolution}"
await self.db.execute(f"""
INSERT INTO {target_table} (metric, timestamp, value_avg, value_min, value_max, count)
SELECT
metric,
time_bucket('{interval}', timestamp) as bucket,
avg(value),
min(value),
max(value),
count(*)
FROM metrics_{source_table}
WHERE timestamp < now() - interval '{interval}'
AND NOT EXISTS (
SELECT 1 FROM {target_table}
WHERE metric = metrics_{source_table}.metric
AND timestamp = time_bucket('{interval}', metrics_{source_table}.timestamp)
)
GROUP BY metric, bucket
""")
async def cleanup(self):
"""Delete data beyond retention period"""
for resolution, retention in self.retention_policies.items():
if retention is None:
continue
table = f"metrics_{resolution}"
cutoff = datetime.utcnow() - retention
await self.db.execute(f"""
DELETE FROM {table} WHERE timestamp < $1
""", cutoff)
Data Modeling Best Practices
1. Model for Queries
Copy
❌ Don't: Normalize everything
"I'll just do a 6-table JOIN"
✅ Do: Denormalize for common queries
"This query runs 10,000x/second, so I'll store it ready-to-read"
2. Understand Access Patterns
Copy
Before modeling, answer:
• What are the top 5 queries by frequency?
• What's the read/write ratio?
• What consistency is required?
• How will data grow over time?
3. Handle Growth
Copy
Today: 1M rows, 1 GB
Year 1: 100M rows, 100 GB
Year 3: 1B rows, 1 TB
Plan for:
• Partitioning strategy
• Archival policy
• Index management
4. Consider Consistency
Copy
Strong Consistency (SQL):
• Financial transactions
• Inventory counts
• User authentication
Eventual Consistency (NoSQL):
• Social feeds
• Analytics
• Caching
Interview Tips
When asked about data modeling in interviews:
- Clarify access patterns first - “How will this data be queried?”
- Discuss trade-offs - “Denormalizing gives us speed but costs storage and complexity”
- Consider scale - “At 1M users this works, at 100M we’d need to shard”
- Mention consistency - “For payments, we need strong consistency”