Senior Level: Data pipeline design is common in senior interviews, especially at data-intensive companies. Expect questions about batch vs streaming, exactly-once processing, and handling late data.
Batch vs Stream Processing
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Processing Paradigms │
├─────────────────────────────────────────────────────────────────┤
│ │
│ BATCH PROCESSING STREAM PROCESSING │
│ ──────────────── ───────────────── │
│ Process data in chunks Process data as it arrives │
│ Higher throughput Lower latency │
│ Bounded datasets Unbounded streams │
│ Daily/hourly jobs Real-time/near-real-time │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Data Lake │ │ Message Queue │ │
│ │ (S3/HDFS) │ │ (Kafka/Kinesis) │ │
│ │ ┌─────────┐ │ │ │ │ │ │ │ │ │ │
│ │ │ Batch 1 │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ │ │
│ │ ├─────────┤ │ │ Event by Event │ │
│ │ │ Batch 2 │ │ └──────────────────┘ │
│ │ ├─────────┤ │ │ │
│ │ │ Batch 3 │ │ ▼ │
│ │ └─────────┘ │ ┌──────────────────┐ │
│ └──────────────────┘ │ Stream Processor │ │
│ │ │ (Flink/Spark) │ │
│ ▼ └──────────────────┘ │
│ ┌──────────────────┐ │ │
│ │ Batch Processor │ ▼ │
│ │ (Spark/MapReduce)│ ┌──────────────────┐ │
│ └──────────────────┘ │ Real-time Result │ │
│ │ │ (Dashboard/Alert)│ │
│ ▼ └──────────────────┘ │
│ ┌──────────────────┐ │
│ │ Results (Next AM)│ Latency: Seconds │
│ └──────────────────┘ │
│ │
│ Latency: Hours │
│ │
│ USE CASES: USE CASES: │
│ • ML model training • Fraud detection │
│ • Data warehouse ETL • Real-time dashboards │
│ • Historical analysis • Live recommendations │
│ • Report generation • IoT processing │
│ │
└─────────────────────────────────────────────────────────────────┘
Lambda Architecture
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Lambda Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────┐ │
│ │ Data Source │ │
│ │ (Events/Logs/etc) │ │
│ └────────────┬────────────────────┘ │
│ │ │
│ ┌──────────────────┴──────────────────┐ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ BATCH LAYER │ │ SPEED LAYER │ │
│ │──────────────│ │──────────────│ │
│ │ │ │ │ │
│ │ Data Lake │ │ Stream │ │
│ │ ▼ │ │ Processor │ │
│ │ Batch │ │ ▼ │ │
│ │ Processing │ │ Real-time │ │
│ │ ▼ │ │ Views │ │
│ │ Batch Views │ │ │ │
│ │ │ │ │ │
│ │ (Complete, │ │ (Recent, │ │
│ │ Accurate) │ │ Approximate)│ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ └──────────────┬────────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ SERVING LAYER │ │
│ │─────────────────│ │
│ │ │ │
│ │ Merge Results │ │
│ │ Batch + Speed │ │
│ │ │ │
│ │ Query Interface │ │
│ └─────────────────┘ │
│ │
│ PROS: CONS: │
│ • Best of both worlds • Two systems to maintain │
│ • Accurate + Fast • Code duplication │
│ • Recompute from scratch • Complex operations │
│ │
└─────────────────────────────────────────────────────────────────┘
Kappa Architecture
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Kappa Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────┐ │
│ │ Data Source │ │
│ └────────────┬────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Immutable Log (Kafka) │ │
│ │ ───────────────────── │ │
│ │ [E1][E2][E3][E4][E5]... │ │
│ │ Retain all events │ │
│ │ (or use compaction) │ │
│ └────────────┬────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Stream Processing Layer │ │
│ │ ──────────────────────── │ │
│ │ │ │
│ │ Single processing path │ │
│ │ (Flink, Kafka Streams) │ │
│ │ │ │
│ │ For reprocessing: │ │
│ │ Start new consumer from │ │
│ │ beginning of log │ │
│ │ │ │
│ └────────────┬────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Serving Layer │ │
│ │ (DB/Cache/Index) │ │
│ └─────────────────────────────────┘ │
│ │
│ PROS: CONS: │
│ • Simpler (one system) • Need log retention │
│ • No code duplication • Reprocessing can be slow │
│ • Single source of truth • Not ideal for all workloads │
│ │
│ KEY INSIGHT: │
│ "Batch processing is just stream processing where the │
│ stream happens to be bounded" │
│ │
└─────────────────────────────────────────────────────────────────┘
Data Pipeline Components
Message Queues
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Message Queue Comparison │
├─────────────────────────────────────────────────────────────────┤
│ │
│ KAFKA │
│ ───── │
│ • Log-based, persistent │
│ • Replay from any offset │
│ • Partitioned for parallelism │
│ • Consumer groups for fan-out │
│ • Best for: Event sourcing, data pipelines │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Topic: orders │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Partition 0│ │ Partition 1│ │ Partition 2│ │ │
│ │ │ [0][1][2] │ │ [0][1] │ │ [0][1][2][3]│ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ │ │ │
│ │ Consumer Group A: │ │
│ │ C1 ← P0, P1 C2 ← P2 │ │
│ │ │ │
│ │ Consumer Group B (independent): │ │
│ │ C3 ← All partitions │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ RABBITMQ │
│ ──────── │
│ • Traditional message broker │
│ • ACK/NACK, dead letter queues │
│ • Flexible routing (exchanges) │
│ • Best for: Task queues, RPC │
│ │
│ SQS (AWS) │
│ ───────── │
│ • Fully managed, serverless │
│ • Standard (at-least-once) or FIFO (exactly-once) │
│ • Best for: Decoupling AWS services │
│ │
└─────────────────────────────────────────────────────────────────┘
Stream Processing
Copy
# Apache Kafka Streams (conceptual)
class StreamProcessor:
"""
Example: Real-time order analytics
"""
def process(self):
# Define topology
builder = StreamsBuilder()
# Source: Read from orders topic
orders = builder.stream("orders")
# Transform: Parse and enrich
enriched_orders = orders \
.map(lambda key, value: (
value.user_id,
{
**value,
"processed_at": datetime.now(),
"amount_usd": convert_currency(value.amount)
}
))
# Aggregate: Count orders per user per hour
hourly_counts = enriched_orders \
.group_by_key() \
.windowed_by(TimeWindows.of(Duration.of_hours(1))) \
.count()
# Join: Enrich with user data
users = builder.table("users")
enriched_with_user = enriched_orders.join(
users,
lambda order, user: {
**order,
"user_name": user.name,
"user_tier": user.tier
}
)
# Filter: Detect high-value orders
high_value = enriched_orders \
.filter(lambda key, value: value.amount_usd > 1000) \
.to("high-value-orders")
# Sink: Write to output topics
hourly_counts.to("order-counts-hourly")
enriched_with_user.to("enriched-orders")
Handling Late Data (Windowing)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Time Windows & Late Data │
├─────────────────────────────────────────────────────────────────┤
│ │
│ EVENT TIME vs PROCESSING TIME │
│ ───────────────────────────── │
│ │
│ Event Time: When event actually occurred │
│ Processing Time: When event is processed │
│ │
│ Event: {type: "click", timestamp: "10:00:00"} │
│ Arrives at processor: 10:00:05 (5s late due to network) │
│ │
│ WINDOW TYPES: │
│ │
│ Tumbling Window (Fixed, non-overlapping): │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ 10:00- │ │ 10:05- │ │ 10:10- │ │
│ │ 10:05 │ │ 10:10 │ │ 10:15 │ │
│ └────────┘ └────────┘ └────────┘ │
│ |← 5 min →| │
│ │
│ Sliding Window (Overlapping): │
│ ┌──────────────────┐ │
│ │ 10:00-10:05 │ │
│ └──────────────────┘ │
│ ┌──────────────────┐ │
│ │ 10:02-10:07 │ │
│ └──────────────────┘ │
│ ┌──────────────────┐ │
│ │ 10:04-10:09 │ │
│ └──────────────────┘ │
│ │
│ Session Window (Activity-based): │
│ ┌───────────────┐ ┌────────────────────────┐ │
│ │ Session 1 │ │ Session 2 │ │
│ │ [e1][e2][e3] │ │ [e4][e5]...[eN] │ │
│ └───────────────┘ └────────────────────────┘ │
│ ↑ gap ↑ │
│ (timeout) │
│ │
└─────────────────────────────────────────────────────────────────┘
Watermarks for Late Data
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Watermarks │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Watermark = "I believe all events before this time have │
│ arrived" │
│ │
│ Event Time: 10:00 10:01 10:02 10:03 10:04 │
│ │ │ │ │ │ │
│ Events: [A] [B] [C] [D] │
│ ↑ │
│ Watermark = 10:02 │
│ "All events ≤10:02 have arrived" │
│ │
│ Late Event: [E at 10:01.5] │
│ ↑ │
│ Arrives after watermark! │
│ │
│ HANDLING LATE DATA: │
│ ────────────────── │
│ │
│ 1. DROP: Ignore late events (simple, may lose data) │
│ │
│ 2. ALLOWED LATENESS: Accept events within grace period │
│ window.allowed_lateness(Duration.of_minutes(5)) │
│ Events up to 5 min late can still update window │
│ │
│ 3. SIDE OUTPUT: Route late events to separate stream │
│ late_events → special processing or storage │
│ │
│ 4. REPROCESSING: Store in append-only log, recompute later │
│ │
└─────────────────────────────────────────────────────────────────┘
Exactly-Once Processing
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Delivery Semantics │
├─────────────────────────────────────────────────────────────────┤
│ │
│ AT-MOST-ONCE │
│ ───────────── │
│ • Fire and forget │
│ • May lose messages │
│ • Simplest, fastest │
│ • Use: Metrics, logs where some loss OK │
│ │
│ AT-LEAST-ONCE │
│ ───────────── │
│ • Retry until acknowledged │
│ • May have duplicates │
│ • Consumer must be idempotent │
│ • Use: Most applications (with dedup) │
│ │
│ EXACTLY-ONCE │
│ ─────────── │
│ • Each message processed exactly once │
│ • Complex to achieve │
│ • Use: Financial, critical data │
│ │
│ HOW TO ACHIEVE EXACTLY-ONCE: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. IDEMPOTENT CONSUMER │ │
│ │ • Assign unique ID to each message │ │
│ │ • Check if already processed before processing │ │
│ │ │ │
│ │ 2. TRANSACTIONAL PROCESSING │ │
│ │ • Read → Process → Write in single transaction │ │
│ │ • Kafka supports transactional producers │ │
│ │ │ │
│ │ 3. TWO-PHASE COMMIT (Distributed) │ │
│ │ • Prepare phase: Lock resources │ │
│ │ • Commit phase: Apply changes │ │
│ │ • Expensive, but guarantees │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Idempotent Consumer Implementation
Copy
class IdempotentConsumer:
"""
Ensures exactly-once processing using deduplication
"""
def __init__(self, redis_client, db_session):
self.redis = redis_client
self.db = db_session
self.dedup_ttl = 86400 # 24 hours
def process_message(self, message):
message_id = message.id
# Check if already processed
if self._is_duplicate(message_id):
logger.info(f"Skipping duplicate: {message_id}")
return
# Start transaction
try:
with self.db.begin():
# Process the message
result = self._do_processing(message)
# Mark as processed (in same transaction)
self._mark_processed(message_id)
# Acknowledge message (after commit)
message.ack()
except Exception as e:
# Don't mark as processed - will retry
logger.error(f"Processing failed: {e}")
message.nack()
raise
def _is_duplicate(self, message_id: str) -> bool:
"""Check Redis for recently processed messages"""
return self.redis.exists(f"processed:{message_id}")
def _mark_processed(self, message_id: str):
"""Mark message as processed"""
# Redis for fast lookups
self.redis.setex(f"processed:{message_id}", self.dedup_ttl, "1")
# Also persist to DB for durability
self.db.execute(
"INSERT INTO processed_messages (id, processed_at) VALUES (?, ?)",
(message_id, datetime.now())
)
Stream Processing Implementation
- Python
- JavaScript
Copy
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Any, List, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import json
from abc import ABC, abstractmethod
# ============== Kafka Consumer with Backpressure ==============
class StreamProcessor:
"""Production-ready stream processor with backpressure handling"""
def __init__(
self,
kafka_config: Dict[str, Any],
topic: str,
group_id: str,
max_concurrent: int = 100,
batch_size: int = 100,
batch_timeout_ms: int = 1000
):
self.kafka_config = kafka_config
self.topic = topic
self.group_id = group_id
self.max_concurrent = max_concurrent
self.batch_size = batch_size
self.batch_timeout_ms = batch_timeout_ms
self.semaphore = asyncio.Semaphore(max_concurrent)
self.handlers: List[Callable] = []
self.running = False
def add_handler(self, handler: Callable) -> None:
"""Add processing handler"""
self.handlers.append(handler)
async def start(self) -> None:
"""Start consuming and processing messages"""
from aiokafka import AIOKafkaConsumer
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.kafka_config['bootstrap_servers'],
group_id=self.group_id,
auto_offset_reset='earliest',
enable_auto_commit=False,
max_poll_records=self.batch_size
)
await self.consumer.start()
self.running = True
try:
async for message in self.consumer:
await self._process_with_backpressure(message)
finally:
await self.consumer.stop()
async def _process_with_backpressure(self, message) -> None:
"""Process with concurrency limit"""
async with self.semaphore:
try:
event = json.loads(message.value.decode())
# Run all handlers
for handler in self.handlers:
await handler(event)
# Commit offset after successful processing
await self.consumer.commit()
except Exception as e:
await self._handle_error(message, e)
async def _handle_error(self, message, error: Exception) -> None:
"""Send to dead letter queue on failure"""
from aiokafka import AIOKafkaProducer
dlq_message = {
"original_topic": self.topic,
"original_message": message.value.decode(),
"error": str(error),
"timestamp": datetime.utcnow().isoformat(),
"partition": message.partition,
"offset": message.offset
}
producer = AIOKafkaProducer(
bootstrap_servers=self.kafka_config['bootstrap_servers']
)
await producer.start()
try:
await producer.send_and_wait(
f"{self.topic}.dlq",
json.dumps(dlq_message).encode()
)
finally:
await producer.stop()
# ============== Windowed Aggregations ==============
@dataclass
class WindowedAggregator:
"""Tumbling window aggregation"""
window_size: timedelta
aggregations: Dict[str, Dict[str, Any]] = field(default_factory=dict)
watermark_delay: timedelta = field(default_factory=lambda: timedelta(seconds=30))
def add_event(self, event: Dict[str, Any], timestamp: datetime) -> None:
"""Add event to appropriate window"""
window_key = self._get_window_key(timestamp)
if window_key not in self.aggregations:
self.aggregations[window_key] = {
"count": 0,
"sum": 0.0,
"min": float('inf'),
"max": float('-inf'),
"window_start": window_key,
"window_end": window_key + self.window_size
}
agg = self.aggregations[window_key]
value = event.get("value", 0)
agg["count"] += 1
agg["sum"] += value
agg["min"] = min(agg["min"], value)
agg["max"] = max(agg["max"], value)
def _get_window_key(self, timestamp: datetime) -> datetime:
"""Get window start time for timestamp"""
window_seconds = self.window_size.total_seconds()
ts_seconds = timestamp.timestamp()
window_start = (ts_seconds // window_seconds) * window_seconds
return datetime.fromtimestamp(window_start)
def get_closed_windows(self, watermark: datetime) -> List[Dict[str, Any]]:
"""Get windows that are past the watermark"""
closed = []
cutoff = watermark - self.watermark_delay
for window_key, agg in list(self.aggregations.items()):
if agg["window_end"] < cutoff:
agg["avg"] = agg["sum"] / agg["count"] if agg["count"] > 0 else 0
closed.append(agg)
del self.aggregations[window_key]
return closed
# ============== Exactly-Once Processing ==============
class ExactlyOnceProcessor:
"""
Exactly-once semantics with transactional processing
"""
def __init__(self, db_pool, redis_client):
self.db_pool = db_pool
self.redis = redis_client
self.dedup_ttl = 86400 * 7 # 7 days
async def process(
self,
event: Dict[str, Any],
processor: Callable
) -> Optional[Any]:
"""Process event with exactly-once guarantee"""
event_id = event.get("event_id")
if not event_id:
raise ValueError("Event must have event_id for exactly-once")
# Check if already processed
if await self._is_duplicate(event_id):
return None
# Acquire lock for this event
lock_key = f"lock:event:{event_id}"
lock_acquired = await self.redis.set(
lock_key, "1",
ex=60, # Lock timeout
nx=True
)
if not lock_acquired:
return None # Another instance is processing
try:
async with self.db_pool.acquire() as conn:
async with conn.transaction():
# Process the event
result = await processor(event, conn)
# Mark as processed in same transaction
await conn.execute("""
INSERT INTO processed_events (event_id, processed_at)
VALUES ($1, $2)
ON CONFLICT (event_id) DO NOTHING
""", event_id, datetime.utcnow())
# Also cache in Redis for fast lookups
await self.redis.setex(
f"processed:{event_id}",
self.dedup_ttl,
"1"
)
return result
finally:
await self.redis.delete(lock_key)
async def _is_duplicate(self, event_id: str) -> bool:
"""Check Redis first, then DB"""
# Fast path: check Redis
if await self.redis.exists(f"processed:{event_id}"):
return True
# Slow path: check database
async with self.db_pool.acquire() as conn:
result = await conn.fetchval("""
SELECT 1 FROM processed_events
WHERE event_id = $1
""", event_id)
if result:
# Backfill Redis cache
await self.redis.setex(
f"processed:{event_id}",
self.dedup_ttl,
"1"
)
return True
return False
# ============== Stream Join Implementation ==============
class StreamJoiner:
"""Join two streams within a time window"""
def __init__(
self,
window_size: timedelta,
left_key: str,
right_key: str
):
self.window_size = window_size
self.left_key = left_key
self.right_key = right_key
self.left_buffer: Dict[str, List[Dict]] = defaultdict(list)
self.right_buffer: Dict[str, List[Dict]] = defaultdict(list)
def add_left(self, event: Dict[str, Any]) -> List[Dict]:
"""Add event from left stream, return any matches"""
key = event.get(self.left_key)
timestamp = datetime.fromisoformat(event["timestamp"])
# Clean old events
self._cleanup_buffer(self.left_buffer, timestamp)
self._cleanup_buffer(self.right_buffer, timestamp)
# Store in buffer
self.left_buffer[key].append({
"event": event,
"timestamp": timestamp
})
# Find matches in right buffer
matches = []
for right_event in self.right_buffer.get(key, []):
if abs((right_event["timestamp"] - timestamp).total_seconds()) < \
self.window_size.total_seconds():
matches.append({
"left": event,
"right": right_event["event"],
"join_timestamp": timestamp.isoformat()
})
return matches
def add_right(self, event: Dict[str, Any]) -> List[Dict]:
"""Add event from right stream, return any matches"""
key = event.get(self.right_key)
timestamp = datetime.fromisoformat(event["timestamp"])
# Clean old events
self._cleanup_buffer(self.left_buffer, timestamp)
self._cleanup_buffer(self.right_buffer, timestamp)
# Store in buffer
self.right_buffer[key].append({
"event": event,
"timestamp": timestamp
})
# Find matches in left buffer
matches = []
for left_event in self.left_buffer.get(key, []):
if abs((left_event["timestamp"] - timestamp).total_seconds()) < \
self.window_size.total_seconds():
matches.append({
"left": left_event["event"],
"right": event,
"join_timestamp": timestamp.isoformat()
})
return matches
def _cleanup_buffer(
self,
buffer: Dict[str, List[Dict]],
current_time: datetime
) -> None:
"""Remove events outside the window"""
cutoff = current_time - self.window_size
for key in list(buffer.keys()):
buffer[key] = [
e for e in buffer[key]
if e["timestamp"] > cutoff
]
if not buffer[key]:
del buffer[key]
Copy
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');
// ============== Stream Processor with Backpressure ==============
class StreamProcessor {
constructor(config) {
this.kafka = new Kafka(config.kafka);
this.topic = config.topic;
this.groupId = config.groupId;
this.maxConcurrent = config.maxConcurrent || 100;
this.handlers = [];
this.activeProcessing = 0;
this.running = false;
}
addHandler(handler) {
this.handlers.push(handler);
}
async start() {
this.consumer = this.kafka.consumer({ groupId: this.groupId });
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true });
this.running = true;
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Backpressure: wait if too many concurrent
while (this.activeProcessing >= this.maxConcurrent) {
await this.sleep(10);
}
this.activeProcessing++;
try {
const event = JSON.parse(message.value.toString());
for (const handler of this.handlers) {
await handler(event);
}
} catch (error) {
await this.handleError(message, error);
} finally {
this.activeProcessing--;
}
}
});
}
async handleError(message, error) {
const producer = this.kafka.producer();
await producer.connect();
const dlqMessage = {
originalTopic: this.topic,
originalMessage: message.value.toString(),
error: error.message,
timestamp: new Date().toISOString(),
partition: message.partition,
offset: message.offset
};
await producer.send({
topic: `${this.topic}.dlq`,
messages: [{ value: JSON.stringify(dlqMessage) }]
});
await producer.disconnect();
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async stop() {
this.running = false;
await this.consumer.disconnect();
}
}
// ============== Windowed Aggregator ==============
class WindowedAggregator {
constructor(windowSizeMs, watermarkDelayMs = 30000) {
this.windowSizeMs = windowSizeMs;
this.watermarkDelayMs = watermarkDelayMs;
this.windows = new Map();
}
addEvent(event, timestamp = new Date()) {
const windowKey = this.getWindowKey(timestamp);
if (!this.windows.has(windowKey)) {
this.windows.set(windowKey, {
count: 0,
sum: 0,
min: Infinity,
max: -Infinity,
windowStart: new Date(windowKey),
windowEnd: new Date(windowKey + this.windowSizeMs)
});
}
const agg = this.windows.get(windowKey);
const value = event.value || 0;
agg.count++;
agg.sum += value;
agg.min = Math.min(agg.min, value);
agg.max = Math.max(agg.max, value);
}
getWindowKey(timestamp) {
const ts = timestamp.getTime();
return Math.floor(ts / this.windowSizeMs) * this.windowSizeMs;
}
getClosedWindows(watermark = new Date()) {
const closed = [];
const cutoff = watermark.getTime() - this.watermarkDelayMs;
for (const [windowKey, agg] of this.windows) {
if (agg.windowEnd.getTime() < cutoff) {
agg.avg = agg.count > 0 ? agg.sum / agg.count : 0;
closed.push(agg);
this.windows.delete(windowKey);
}
}
return closed;
}
}
// ============== Exactly-Once Processor ==============
class ExactlyOnceProcessor {
constructor(pool, redis) {
this.pool = pool;
this.redis = redis;
this.dedupTtl = 86400 * 7; // 7 days
}
async process(event, processor) {
const eventId = event.eventId;
if (!eventId) {
throw new Error('Event must have eventId for exactly-once');
}
// Check if already processed
if (await this.isDuplicate(eventId)) {
return null;
}
// Acquire distributed lock
const lockKey = `lock:event:${eventId}`;
const lockAcquired = await this.redis.set(
lockKey, '1',
'EX', 60,
'NX'
);
if (!lockAcquired) {
return null; // Another instance is processing
}
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Process the event
const result = await processor(event, client);
// Mark as processed in same transaction
await client.query(`
INSERT INTO processed_events (event_id, processed_at)
VALUES ($1, $2)
ON CONFLICT (event_id) DO NOTHING
`, [eventId, new Date()]);
await client.query('COMMIT');
// Cache in Redis for fast lookups
await this.redis.setex(
`processed:${eventId}`,
this.dedupTtl,
'1'
);
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
await this.redis.del(lockKey);
}
}
async isDuplicate(eventId) {
// Fast path: check Redis
if (await this.redis.exists(`processed:${eventId}`)) {
return true;
}
// Slow path: check database
const client = await this.pool.connect();
try {
const result = await client.query(`
SELECT 1 FROM processed_events WHERE event_id = $1
`, [eventId]);
if (result.rows.length > 0) {
// Backfill Redis cache
await this.redis.setex(
`processed:${eventId}`,
this.dedupTtl,
'1'
);
return true;
}
} finally {
client.release();
}
return false;
}
}
// ============== Stream Join ==============
class StreamJoiner {
constructor(windowSizeMs, leftKey, rightKey) {
this.windowSizeMs = windowSizeMs;
this.leftKey = leftKey;
this.rightKey = rightKey;
this.leftBuffer = new Map();
this.rightBuffer = new Map();
}
addLeft(event) {
const key = event[this.leftKey];
const timestamp = new Date(event.timestamp);
this.cleanup(timestamp);
if (!this.leftBuffer.has(key)) {
this.leftBuffer.set(key, []);
}
this.leftBuffer.get(key).push({ event, timestamp });
// Find matches in right buffer
const matches = [];
const rightEvents = this.rightBuffer.get(key) || [];
for (const rightEntry of rightEvents) {
if (Math.abs(rightEntry.timestamp - timestamp) < this.windowSizeMs) {
matches.push({
left: event,
right: rightEntry.event,
joinTimestamp: timestamp.toISOString()
});
}
}
return matches;
}
addRight(event) {
const key = event[this.rightKey];
const timestamp = new Date(event.timestamp);
this.cleanup(timestamp);
if (!this.rightBuffer.has(key)) {
this.rightBuffer.set(key, []);
}
this.rightBuffer.get(key).push({ event, timestamp });
// Find matches in left buffer
const matches = [];
const leftEvents = this.leftBuffer.get(key) || [];
for (const leftEntry of leftEvents) {
if (Math.abs(leftEntry.timestamp - timestamp) < this.windowSizeMs) {
matches.push({
left: leftEntry.event,
right: event,
joinTimestamp: timestamp.toISOString()
});
}
}
return matches;
}
cleanup(currentTime) {
const cutoff = currentTime.getTime() - this.windowSizeMs;
for (const [key, events] of this.leftBuffer) {
const filtered = events.filter(e => e.timestamp.getTime() > cutoff);
if (filtered.length === 0) {
this.leftBuffer.delete(key);
} else {
this.leftBuffer.set(key, filtered);
}
}
for (const [key, events] of this.rightBuffer) {
const filtered = events.filter(e => e.timestamp.getTime() > cutoff);
if (filtered.length === 0) {
this.rightBuffer.delete(key);
} else {
this.rightBuffer.set(key, filtered);
}
}
}
}
module.exports = {
StreamProcessor,
WindowedAggregator,
ExactlyOnceProcessor,
StreamJoiner
};
ETL vs ELT
Copy
┌─────────────────────────────────────────────────────────────────┐
│ ETL vs ELT │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ETL (Extract-Transform-Load) │
│ ──────────────────────────── │
│ │
│ Source → Extract → Transform → Load → Warehouse │
│ ↑ │
│ Processing server │
│ │
│ • Transform BEFORE loading │
│ • Need powerful ETL server │
│ • Less storage in warehouse │
│ • Harder to re-transform │
│ • Tools: Informatica, Talend, SSIS │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ ELT (Extract-Load-Transform) │
│ ──────────────────────────── │
│ │
│ Source → Extract → Load → Transform → Results │
│ ↑ ↑ │
│ Raw storage Warehouse does transform │
│ │
│ • Load raw data first │
│ • Transform using warehouse compute │
│ • Keep raw data (can re-transform) │
│ • Modern approach (cloud warehouses) │
│ • Tools: dbt, Fivetran + Snowflake/BigQuery │
│ │
│ MODERN DATA STACK: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Sources │ → │ Fivetran│ → │Snowflake│ → │ dbt │ │
│ │ (APIs, │ │ (Extract│ │ (Load) │ │(Transform│ │
│ │ DBs) │ │ + Load)│ │ │ │ in SQL) │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Looker/ │ │
│ │ Tableau │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Data Quality
Copy
class DataQualityChecker:
"""
Data quality checks for pipeline
"""
def __init__(self):
self.checks = []
def add_check(self, check_func, name: str, severity: str = "error"):
self.checks.append({
"name": name,
"func": check_func,
"severity": severity
})
def validate(self, df) -> List[Dict]:
results = []
for check in self.checks:
passed = check["func"](df)
results.append({
"check": check["name"],
"passed": passed,
"severity": check["severity"]
})
if not passed and check["severity"] == "error":
raise DataQualityError(f"Check failed: {check['name']}")
return results
# Example usage
checker = DataQualityChecker()
# Completeness: No null values in required columns
checker.add_check(
lambda df: df[['user_id', 'order_id', 'amount']].notna().all().all(),
"required_fields_not_null",
severity="error"
)
# Freshness: Data is recent
checker.add_check(
lambda df: (datetime.now() - df['created_at'].max()).hours < 24,
"data_freshness_24h",
severity="warning"
)
# Uniqueness: No duplicate primary keys
checker.add_check(
lambda df: df['order_id'].is_unique,
"order_id_unique",
severity="error"
)
# Range: Values within expected bounds
checker.add_check(
lambda df: (df['amount'] > 0).all() and (df['amount'] < 1000000).all(),
"amount_reasonable_range",
severity="warning"
)
# Volume: Expected number of records
checker.add_check(
lambda df: len(df) >= 1000, # Expect at least 1000 orders per day
"minimum_record_count",
severity="warning"
)
Senior Interview Questions
How would you design a real-time analytics dashboard?
How would you design a real-time analytics dashboard?
Requirements clarification:Key decisions:
- What metrics? (pageviews, conversions, revenue)
- How real-time? (seconds vs minutes)
- Scale? (events per second)
Copy
Events → Kafka → Flink/Spark Streaming →
├─→ Pre-aggregated tables (ClickHouse/Druid)
└─→ Real-time materialized views
Dashboard ← Query Layer ← Time-series DB
- Pre-aggregate: Don’t query raw events for dashboards
- Materialized views: Update incrementally, not full recompute
- Time-series DB: Optimized for time-based queries
How do you handle schema changes in a data pipeline?
How do you handle schema changes in a data pipeline?
Schema evolution strategies:
- Backward compatible: New code reads old data
- Add optional fields only
- Don’t remove/rename fields
- Forward compatible: Old code reads new data
- Ignore unknown fields
- Schema registry: Centralized schema versioning
- Avro/Protobuf with Confluent Schema Registry
- Validate compatibility on publish
- Deploy new schema (backward compatible)
- Backfill if needed
- Deploy new producers
- Old data still works
How would you design a data lake?
How would you design a data lake?
Layers:
- Bronze (Raw): Exact copy of source, append-only
- Silver (Cleaned): Deduplicated, validated, typed
- Gold (Business): Aggregated, ready for consumption
- Storage: S3/ADLS with Delta Lake/Iceberg format
- Compute: Spark/Databricks
- Catalog: AWS Glue, Hive Metastore
- Query: Athena, Presto, Trino
- Partition by date for time-series data
- Use columnar formats (Parquet)
- Implement data quality checks at each layer
- Track data lineage
How do you handle backpressure in streaming systems?
How do you handle backpressure in streaming systems?
Backpressure: When consumer can’t keep up with producerStrategies:
- Buffering: Kafka naturally buffers in log
- Rate limiting: Limit producer rate
- Sampling: Process subset of events
- Auto-scaling: Add more consumers
- Load shedding: Drop low-priority events
Copy
# Reactive Streams style backpressure
async def process_with_backpressure(stream, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(event):
async with semaphore:
await process_event(event)
async for event in stream:
asyncio.create_task(process_one(event))