Skip to main content
Senior Level: Data pipeline design is common in senior interviews, especially at data-intensive companies. Expect questions about batch vs streaming, exactly-once processing, and handling late data.

Batch vs Stream Processing

┌─────────────────────────────────────────────────────────────────┐
│              Processing Paradigms                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  BATCH PROCESSING                 STREAM PROCESSING            │
│  ────────────────                 ─────────────────             │
│  Process data in chunks           Process data as it arrives   │
│  Higher throughput                Lower latency                 │
│  Bounded datasets                 Unbounded streams             │
│  Daily/hourly jobs                Real-time/near-real-time      │
│                                                                 │
│  ┌──────────────────┐            ┌──────────────────┐          │
│  │   Data Lake      │            │  Message Queue   │          │
│  │   (S3/HDFS)      │            │  (Kafka/Kinesis) │          │
│  │   ┌─────────┐    │            │   │ │ │ │ │ │    │          │
│  │   │ Batch 1 │    │            │   ▼ ▼ ▼ ▼ ▼ ▼    │          │
│  │   ├─────────┤    │            │ Event by Event   │          │
│  │   │ Batch 2 │    │            └──────────────────┘          │
│  │   ├─────────┤    │                     │                    │
│  │   │ Batch 3 │    │                     ▼                    │
│  │   └─────────┘    │            ┌──────────────────┐          │
│  └──────────────────┘            │ Stream Processor │          │
│           │                      │ (Flink/Spark)    │          │
│           ▼                      └──────────────────┘          │
│  ┌──────────────────┐                     │                    │
│  │ Batch Processor  │                     ▼                    │
│  │ (Spark/MapReduce)│            ┌──────────────────┐          │
│  └──────────────────┘            │ Real-time Result │          │
│           │                      │ (Dashboard/Alert)│          │
│           ▼                      └──────────────────┘          │
│  ┌──────────────────┐                                          │
│  │ Results (Next AM)│            Latency: Seconds              │
│  └──────────────────┘                                          │
│                                                                 │
│  Latency: Hours                                                │
│                                                                 │
│  USE CASES:                       USE CASES:                   │
│  • ML model training             • Fraud detection             │
│  • Data warehouse ETL            • Real-time dashboards        │
│  • Historical analysis           • Live recommendations        │
│  • Report generation             • IoT processing              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Lambda Architecture

┌─────────────────────────────────────────────────────────────────┐
│              Lambda Architecture                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│               ┌─────────────────────────────────┐              │
│               │       Data Source               │              │
│               │    (Events/Logs/etc)            │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│         ┌──────────────────┴──────────────────┐                │
│         ▼                                     ▼                │
│  ┌──────────────┐                    ┌──────────────┐          │
│  │ BATCH LAYER  │                    │ SPEED LAYER  │          │
│  │──────────────│                    │──────────────│          │
│  │              │                    │              │          │
│  │  Data Lake   │                    │ Stream       │          │
│  │     ▼        │                    │ Processor    │          │
│  │  Batch       │                    │     ▼        │          │
│  │  Processing  │                    │ Real-time    │          │
│  │     ▼        │                    │ Views        │          │
│  │  Batch Views │                    │              │          │
│  │              │                    │              │          │
│  │ (Complete,   │                    │ (Recent,     │          │
│  │  Accurate)   │                    │  Approximate)│          │
│  └──────┬───────┘                    └──────┬───────┘          │
│         │                                   │                  │
│         └──────────────┬────────────────────┘                  │
│                        ▼                                       │
│               ┌─────────────────┐                              │
│               │  SERVING LAYER  │                              │
│               │─────────────────│                              │
│               │                 │                              │
│               │ Merge Results   │                              │
│               │ Batch + Speed   │                              │
│               │                 │                              │
│               │ Query Interface │                              │
│               └─────────────────┘                              │
│                                                                 │
│  PROS:                          CONS:                          │
│  • Best of both worlds          • Two systems to maintain      │
│  • Accurate + Fast              • Code duplication             │
│  • Recompute from scratch       • Complex operations           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Kappa Architecture

┌─────────────────────────────────────────────────────────────────┐
│              Kappa Architecture                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│               ┌─────────────────────────────────┐              │
│               │       Data Source               │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│                            ▼                                   │
│               ┌─────────────────────────────────┐              │
│               │    Immutable Log (Kafka)        │              │
│               │    ─────────────────────        │              │
│               │    [E1][E2][E3][E4][E5]...     │              │
│               │    Retain all events            │              │
│               │    (or use compaction)          │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│                            ▼                                   │
│               ┌─────────────────────────────────┐              │
│               │    Stream Processing Layer      │              │
│               │    ────────────────────────     │              │
│               │                                 │              │
│               │    Single processing path       │              │
│               │    (Flink, Kafka Streams)       │              │
│               │                                 │              │
│               │    For reprocessing:            │              │
│               │    Start new consumer from      │              │
│               │    beginning of log             │              │
│               │                                 │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│                            ▼                                   │
│               ┌─────────────────────────────────┐              │
│               │      Serving Layer              │              │
│               │      (DB/Cache/Index)           │              │
│               └─────────────────────────────────┘              │
│                                                                 │
│  PROS:                          CONS:                          │
│  • Simpler (one system)         • Need log retention          │
│  • No code duplication          • Reprocessing can be slow     │
│  • Single source of truth       • Not ideal for all workloads │
│                                                                 │
│  KEY INSIGHT:                                                  │
│  "Batch processing is just stream processing where the        │
│   stream happens to be bounded"                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Data Pipeline Components

Message Queues

┌─────────────────────────────────────────────────────────────────┐
│              Message Queue Comparison                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  KAFKA                                                          │
│  ─────                                                          │
│  • Log-based, persistent                                       │
│  • Replay from any offset                                      │
│  • Partitioned for parallelism                                 │
│  • Consumer groups for fan-out                                 │
│  • Best for: Event sourcing, data pipelines                    │
│                                                                 │
│  ┌──────────────────────────────────────────────────────┐      │
│  │  Topic: orders                                       │      │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐       │      │
│  │  │ Partition 0│ │ Partition 1│ │ Partition 2│       │      │
│  │  │ [0][1][2]  │ │ [0][1]     │ │ [0][1][2][3]│      │      │
│  │  └────────────┘ └────────────┘ └────────────┘       │      │
│  │                                                      │      │
│  │  Consumer Group A:                                   │      │
│  │    C1 ← P0, P1    C2 ← P2                           │      │
│  │                                                      │      │
│  │  Consumer Group B (independent):                     │      │
│  │    C3 ← All partitions                              │      │
│  └──────────────────────────────────────────────────────┘      │
│                                                                 │
│  RABBITMQ                                                       │
│  ────────                                                       │
│  • Traditional message broker                                  │
│  • ACK/NACK, dead letter queues                               │
│  • Flexible routing (exchanges)                                │
│  • Best for: Task queues, RPC                                  │
│                                                                 │
│  SQS (AWS)                                                      │
│  ─────────                                                      │
│  • Fully managed, serverless                                   │
│  • Standard (at-least-once) or FIFO (exactly-once)             │
│  • Best for: Decoupling AWS services                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Stream Processing

# Apache Kafka Streams (conceptual)
class StreamProcessor:
    """
    Example: Real-time order analytics
    """
    
    def process(self):
        # Define topology
        builder = StreamsBuilder()
        
        # Source: Read from orders topic
        orders = builder.stream("orders")
        
        # Transform: Parse and enrich
        enriched_orders = orders \
            .map(lambda key, value: (
                value.user_id,
                {
                    **value,
                    "processed_at": datetime.now(),
                    "amount_usd": convert_currency(value.amount)
                }
            ))
        
        # Aggregate: Count orders per user per hour
        hourly_counts = enriched_orders \
            .group_by_key() \
            .windowed_by(TimeWindows.of(Duration.of_hours(1))) \
            .count()
        
        # Join: Enrich with user data
        users = builder.table("users")
        enriched_with_user = enriched_orders.join(
            users,
            lambda order, user: {
                **order,
                "user_name": user.name,
                "user_tier": user.tier
            }
        )
        
        # Filter: Detect high-value orders
        high_value = enriched_orders \
            .filter(lambda key, value: value.amount_usd > 1000) \
            .to("high-value-orders")
        
        # Sink: Write to output topics
        hourly_counts.to("order-counts-hourly")
        enriched_with_user.to("enriched-orders")

Handling Late Data (Windowing)

┌─────────────────────────────────────────────────────────────────┐
│              Time Windows & Late Data                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  EVENT TIME vs PROCESSING TIME                                  │
│  ─────────────────────────────                                  │
│                                                                 │
│  Event Time: When event actually occurred                      │
│  Processing Time: When event is processed                      │
│                                                                 │
│  Event: {type: "click", timestamp: "10:00:00"}                 │
│  Arrives at processor: 10:00:05 (5s late due to network)       │
│                                                                 │
│  WINDOW TYPES:                                                 │
│                                                                 │
│  Tumbling Window (Fixed, non-overlapping):                     │
│  ┌────────┐ ┌────────┐ ┌────────┐                             │
│  │ 10:00- │ │ 10:05- │ │ 10:10- │                             │
│  │ 10:05  │ │ 10:10  │ │ 10:15  │                             │
│  └────────┘ └────────┘ └────────┘                             │
│  |← 5 min →|                                                   │
│                                                                 │
│  Sliding Window (Overlapping):                                 │
│  ┌──────────────────┐                                          │
│  │      10:00-10:05 │                                          │
│  └──────────────────┘                                          │
│       ┌──────────────────┐                                     │
│       │  10:02-10:07     │                                     │
│       └──────────────────┘                                     │
│            ┌──────────────────┐                                │
│            │  10:04-10:09     │                                │
│            └──────────────────┘                                │
│                                                                 │
│  Session Window (Activity-based):                              │
│  ┌───────────────┐     ┌────────────────────────┐             │
│  │ Session 1     │     │ Session 2               │             │
│  │ [e1][e2][e3]  │     │ [e4][e5]...[eN]        │             │
│  └───────────────┘     └────────────────────────┘             │
│             ↑ gap ↑                                            │
│            (timeout)                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Watermarks for Late Data

┌─────────────────────────────────────────────────────────────────┐
│              Watermarks                                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Watermark = "I believe all events before this time have       │
│              arrived"                                          │
│                                                                 │
│  Event Time:    10:00  10:01  10:02  10:03  10:04             │
│                   │      │      │      │      │                │
│  Events:        [A]    [B]    [C]           [D]                │
│                                  ↑                              │
│                            Watermark = 10:02                   │
│                            "All events ≤10:02 have arrived"    │
│                                                                 │
│  Late Event:              [E at 10:01.5]                       │
│                               ↑                                 │
│                         Arrives after watermark!               │
│                                                                 │
│  HANDLING LATE DATA:                                           │
│  ──────────────────                                             │
│                                                                 │
│  1. DROP: Ignore late events (simple, may lose data)           │
│                                                                 │
│  2. ALLOWED LATENESS: Accept events within grace period        │
│     window.allowed_lateness(Duration.of_minutes(5))            │
│     Events up to 5 min late can still update window            │
│                                                                 │
│  3. SIDE OUTPUT: Route late events to separate stream          │
│     late_events → special processing or storage                │
│                                                                 │
│  4. REPROCESSING: Store in append-only log, recompute later    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Exactly-Once Processing

┌─────────────────────────────────────────────────────────────────┐
│              Delivery Semantics                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  AT-MOST-ONCE                                                   │
│  ─────────────                                                  │
│  • Fire and forget                                             │
│  • May lose messages                                           │
│  • Simplest, fastest                                           │
│  • Use: Metrics, logs where some loss OK                       │
│                                                                 │
│  AT-LEAST-ONCE                                                  │
│  ─────────────                                                  │
│  • Retry until acknowledged                                    │
│  • May have duplicates                                         │
│  • Consumer must be idempotent                                 │
│  • Use: Most applications (with dedup)                         │
│                                                                 │
│  EXACTLY-ONCE                                                   │
│  ───────────                                                    │
│  • Each message processed exactly once                         │
│  • Complex to achieve                                          │
│  • Use: Financial, critical data                               │
│                                                                 │
│  HOW TO ACHIEVE EXACTLY-ONCE:                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                                                         │   │
│  │  1. IDEMPOTENT CONSUMER                                │   │
│  │     • Assign unique ID to each message                 │   │
│  │     • Check if already processed before processing     │   │
│  │                                                         │   │
│  │  2. TRANSACTIONAL PROCESSING                           │   │
│  │     • Read → Process → Write in single transaction     │   │
│  │     • Kafka supports transactional producers           │   │
│  │                                                         │   │
│  │  3. TWO-PHASE COMMIT (Distributed)                     │   │
│  │     • Prepare phase: Lock resources                    │   │
│  │     • Commit phase: Apply changes                      │   │
│  │     • Expensive, but guarantees                        │   │
│  │                                                         │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Idempotent Consumer Implementation

class IdempotentConsumer:
    """
    Ensures exactly-once processing using deduplication
    """
    
    def __init__(self, redis_client, db_session):
        self.redis = redis_client
        self.db = db_session
        self.dedup_ttl = 86400  # 24 hours
    
    def process_message(self, message):
        message_id = message.id
        
        # Check if already processed
        if self._is_duplicate(message_id):
            logger.info(f"Skipping duplicate: {message_id}")
            return
        
        # Start transaction
        try:
            with self.db.begin():
                # Process the message
                result = self._do_processing(message)
                
                # Mark as processed (in same transaction)
                self._mark_processed(message_id)
                
            # Acknowledge message (after commit)
            message.ack()
            
        except Exception as e:
            # Don't mark as processed - will retry
            logger.error(f"Processing failed: {e}")
            message.nack()
            raise
    
    def _is_duplicate(self, message_id: str) -> bool:
        """Check Redis for recently processed messages"""
        return self.redis.exists(f"processed:{message_id}")
    
    def _mark_processed(self, message_id: str):
        """Mark message as processed"""
        # Redis for fast lookups
        self.redis.setex(f"processed:{message_id}", self.dedup_ttl, "1")
        
        # Also persist to DB for durability
        self.db.execute(
            "INSERT INTO processed_messages (id, processed_at) VALUES (?, ?)",
            (message_id, datetime.now())
        )

Stream Processing Implementation

Stream Processing Pipeline Real-world stream processing with Kafka and async processing:
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Any, List, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import json
from abc import ABC, abstractmethod

# ============== Kafka Consumer with Backpressure ==============
class StreamProcessor:
    """Production-ready stream processor with backpressure handling"""
    
    def __init__(
        self,
        kafka_config: Dict[str, Any],
        topic: str,
        group_id: str,
        max_concurrent: int = 100,
        batch_size: int = 100,
        batch_timeout_ms: int = 1000
    ):
        self.kafka_config = kafka_config
        self.topic = topic
        self.group_id = group_id
        self.max_concurrent = max_concurrent
        self.batch_size = batch_size
        self.batch_timeout_ms = batch_timeout_ms
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.handlers: List[Callable] = []
        self.running = False
    
    def add_handler(self, handler: Callable) -> None:
        """Add processing handler"""
        self.handlers.append(handler)
    
    async def start(self) -> None:
        """Start consuming and processing messages"""
        from aiokafka import AIOKafkaConsumer
        
        self.consumer = AIOKafkaConsumer(
            self.topic,
            bootstrap_servers=self.kafka_config['bootstrap_servers'],
            group_id=self.group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            max_poll_records=self.batch_size
        )
        
        await self.consumer.start()
        self.running = True
        
        try:
            async for message in self.consumer:
                await self._process_with_backpressure(message)
        finally:
            await self.consumer.stop()
    
    async def _process_with_backpressure(self, message) -> None:
        """Process with concurrency limit"""
        async with self.semaphore:
            try:
                event = json.loads(message.value.decode())
                
                # Run all handlers
                for handler in self.handlers:
                    await handler(event)
                
                # Commit offset after successful processing
                await self.consumer.commit()
                
            except Exception as e:
                await self._handle_error(message, e)
    
    async def _handle_error(self, message, error: Exception) -> None:
        """Send to dead letter queue on failure"""
        from aiokafka import AIOKafkaProducer
        
        dlq_message = {
            "original_topic": self.topic,
            "original_message": message.value.decode(),
            "error": str(error),
            "timestamp": datetime.utcnow().isoformat(),
            "partition": message.partition,
            "offset": message.offset
        }
        
        producer = AIOKafkaProducer(
            bootstrap_servers=self.kafka_config['bootstrap_servers']
        )
        await producer.start()
        try:
            await producer.send_and_wait(
                f"{self.topic}.dlq",
                json.dumps(dlq_message).encode()
            )
        finally:
            await producer.stop()

# ============== Windowed Aggregations ==============
@dataclass
class WindowedAggregator:
    """Tumbling window aggregation"""
    
    window_size: timedelta
    aggregations: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    watermark_delay: timedelta = field(default_factory=lambda: timedelta(seconds=30))
    
    def add_event(self, event: Dict[str, Any], timestamp: datetime) -> None:
        """Add event to appropriate window"""
        window_key = self._get_window_key(timestamp)
        
        if window_key not in self.aggregations:
            self.aggregations[window_key] = {
                "count": 0,
                "sum": 0.0,
                "min": float('inf'),
                "max": float('-inf'),
                "window_start": window_key,
                "window_end": window_key + self.window_size
            }
        
        agg = self.aggregations[window_key]
        value = event.get("value", 0)
        
        agg["count"] += 1
        agg["sum"] += value
        agg["min"] = min(agg["min"], value)
        agg["max"] = max(agg["max"], value)
    
    def _get_window_key(self, timestamp: datetime) -> datetime:
        """Get window start time for timestamp"""
        window_seconds = self.window_size.total_seconds()
        ts_seconds = timestamp.timestamp()
        window_start = (ts_seconds // window_seconds) * window_seconds
        return datetime.fromtimestamp(window_start)
    
    def get_closed_windows(self, watermark: datetime) -> List[Dict[str, Any]]:
        """Get windows that are past the watermark"""
        closed = []
        cutoff = watermark - self.watermark_delay
        
        for window_key, agg in list(self.aggregations.items()):
            if agg["window_end"] < cutoff:
                agg["avg"] = agg["sum"] / agg["count"] if agg["count"] > 0 else 0
                closed.append(agg)
                del self.aggregations[window_key]
        
        return closed

# ============== Exactly-Once Processing ==============
class ExactlyOnceProcessor:
    """
    Exactly-once semantics with transactional processing
    """
    
    def __init__(self, db_pool, redis_client):
        self.db_pool = db_pool
        self.redis = redis_client
        self.dedup_ttl = 86400 * 7  # 7 days
    
    async def process(
        self, 
        event: Dict[str, Any], 
        processor: Callable
    ) -> Optional[Any]:
        """Process event with exactly-once guarantee"""
        event_id = event.get("event_id")
        
        if not event_id:
            raise ValueError("Event must have event_id for exactly-once")
        
        # Check if already processed
        if await self._is_duplicate(event_id):
            return None
        
        # Acquire lock for this event
        lock_key = f"lock:event:{event_id}"
        lock_acquired = await self.redis.set(
            lock_key, "1", 
            ex=60,  # Lock timeout
            nx=True
        )
        
        if not lock_acquired:
            return None  # Another instance is processing
        
        try:
            async with self.db_pool.acquire() as conn:
                async with conn.transaction():
                    # Process the event
                    result = await processor(event, conn)
                    
                    # Mark as processed in same transaction
                    await conn.execute("""
                        INSERT INTO processed_events (event_id, processed_at)
                        VALUES ($1, $2)
                        ON CONFLICT (event_id) DO NOTHING
                    """, event_id, datetime.utcnow())
            
            # Also cache in Redis for fast lookups
            await self.redis.setex(
                f"processed:{event_id}", 
                self.dedup_ttl, 
                "1"
            )
            
            return result
            
        finally:
            await self.redis.delete(lock_key)
    
    async def _is_duplicate(self, event_id: str) -> bool:
        """Check Redis first, then DB"""
        # Fast path: check Redis
        if await self.redis.exists(f"processed:{event_id}"):
            return True
        
        # Slow path: check database
        async with self.db_pool.acquire() as conn:
            result = await conn.fetchval("""
                SELECT 1 FROM processed_events 
                WHERE event_id = $1
            """, event_id)
            
            if result:
                # Backfill Redis cache
                await self.redis.setex(
                    f"processed:{event_id}", 
                    self.dedup_ttl, 
                    "1"
                )
                return True
        
        return False

# ============== Stream Join Implementation ==============
class StreamJoiner:
    """Join two streams within a time window"""
    
    def __init__(
        self, 
        window_size: timedelta,
        left_key: str,
        right_key: str
    ):
        self.window_size = window_size
        self.left_key = left_key
        self.right_key = right_key
        self.left_buffer: Dict[str, List[Dict]] = defaultdict(list)
        self.right_buffer: Dict[str, List[Dict]] = defaultdict(list)
    
    def add_left(self, event: Dict[str, Any]) -> List[Dict]:
        """Add event from left stream, return any matches"""
        key = event.get(self.left_key)
        timestamp = datetime.fromisoformat(event["timestamp"])
        
        # Clean old events
        self._cleanup_buffer(self.left_buffer, timestamp)
        self._cleanup_buffer(self.right_buffer, timestamp)
        
        # Store in buffer
        self.left_buffer[key].append({
            "event": event,
            "timestamp": timestamp
        })
        
        # Find matches in right buffer
        matches = []
        for right_event in self.right_buffer.get(key, []):
            if abs((right_event["timestamp"] - timestamp).total_seconds()) < \
               self.window_size.total_seconds():
                matches.append({
                    "left": event,
                    "right": right_event["event"],
                    "join_timestamp": timestamp.isoformat()
                })
        
        return matches
    
    def add_right(self, event: Dict[str, Any]) -> List[Dict]:
        """Add event from right stream, return any matches"""
        key = event.get(self.right_key)
        timestamp = datetime.fromisoformat(event["timestamp"])
        
        # Clean old events
        self._cleanup_buffer(self.left_buffer, timestamp)
        self._cleanup_buffer(self.right_buffer, timestamp)
        
        # Store in buffer
        self.right_buffer[key].append({
            "event": event,
            "timestamp": timestamp
        })
        
        # Find matches in left buffer
        matches = []
        for left_event in self.left_buffer.get(key, []):
            if abs((left_event["timestamp"] - timestamp).total_seconds()) < \
               self.window_size.total_seconds():
                matches.append({
                    "left": left_event["event"],
                    "right": event,
                    "join_timestamp": timestamp.isoformat()
                })
        
        return matches
    
    def _cleanup_buffer(
        self, 
        buffer: Dict[str, List[Dict]], 
        current_time: datetime
    ) -> None:
        """Remove events outside the window"""
        cutoff = current_time - self.window_size
        
        for key in list(buffer.keys()):
            buffer[key] = [
                e for e in buffer[key] 
                if e["timestamp"] > cutoff
            ]
            if not buffer[key]:
                del buffer[key]

ETL vs ELT

┌─────────────────────────────────────────────────────────────────┐
│              ETL vs ELT                                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ETL (Extract-Transform-Load)                                  │
│  ────────────────────────────                                   │
│                                                                 │
│  Source → Extract → Transform → Load → Warehouse               │
│                         ↑                                       │
│                   Processing server                            │
│                                                                 │
│  • Transform BEFORE loading                                    │
│  • Need powerful ETL server                                    │
│  • Less storage in warehouse                                   │
│  • Harder to re-transform                                      │
│  • Tools: Informatica, Talend, SSIS                           │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  ELT (Extract-Load-Transform)                                  │
│  ────────────────────────────                                   │
│                                                                 │
│  Source → Extract → Load → Transform → Results                 │
│                       ↑         ↑                              │
│                 Raw storage   Warehouse does transform         │
│                                                                 │
│  • Load raw data first                                         │
│  • Transform using warehouse compute                           │
│  • Keep raw data (can re-transform)                           │
│  • Modern approach (cloud warehouses)                         │
│  • Tools: dbt, Fivetran + Snowflake/BigQuery                  │
│                                                                 │
│  MODERN DATA STACK:                                            │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐        │
│  │ Sources │ → │ Fivetran│ → │Snowflake│ → │   dbt   │        │
│  │ (APIs,  │   │ (Extract│   │ (Load)  │   │(Transform│       │
│  │  DBs)   │   │  + Load)│   │         │   │  in SQL) │       │
│  └─────────┘   └─────────┘   └─────────┘   └─────────┘        │
│                                    │                           │
│                                    ▼                           │
│                             ┌─────────────┐                    │
│                             │  Looker/    │                    │
│                             │  Tableau    │                    │
│                             └─────────────┘                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Data Quality

class DataQualityChecker:
    """
    Data quality checks for pipeline
    """
    
    def __init__(self):
        self.checks = []
    
    def add_check(self, check_func, name: str, severity: str = "error"):
        self.checks.append({
            "name": name,
            "func": check_func,
            "severity": severity
        })
    
    def validate(self, df) -> List[Dict]:
        results = []
        for check in self.checks:
            passed = check["func"](df)
            results.append({
                "check": check["name"],
                "passed": passed,
                "severity": check["severity"]
            })
            
            if not passed and check["severity"] == "error":
                raise DataQualityError(f"Check failed: {check['name']}")
        
        return results

# Example usage
checker = DataQualityChecker()

# Completeness: No null values in required columns
checker.add_check(
    lambda df: df[['user_id', 'order_id', 'amount']].notna().all().all(),
    "required_fields_not_null",
    severity="error"
)

# Freshness: Data is recent
checker.add_check(
    lambda df: (datetime.now() - df['created_at'].max()).hours < 24,
    "data_freshness_24h",
    severity="warning"
)

# Uniqueness: No duplicate primary keys
checker.add_check(
    lambda df: df['order_id'].is_unique,
    "order_id_unique",
    severity="error"
)

# Range: Values within expected bounds
checker.add_check(
    lambda df: (df['amount'] > 0).all() and (df['amount'] < 1000000).all(),
    "amount_reasonable_range",
    severity="warning"
)

# Volume: Expected number of records
checker.add_check(
    lambda df: len(df) >= 1000,  # Expect at least 1000 orders per day
    "minimum_record_count",
    severity="warning"
)

Senior Interview Questions

Requirements clarification:
  • What metrics? (pageviews, conversions, revenue)
  • How real-time? (seconds vs minutes)
  • Scale? (events per second)
Architecture:
Events → Kafka → Flink/Spark Streaming → 
  ├─→ Pre-aggregated tables (ClickHouse/Druid)
  └─→ Real-time materialized views

Dashboard ← Query Layer ← Time-series DB
Key decisions:
  1. Pre-aggregate: Don’t query raw events for dashboards
  2. Materialized views: Update incrementally, not full recompute
  3. Time-series DB: Optimized for time-based queries
Schema evolution strategies:
  1. Backward compatible: New code reads old data
    • Add optional fields only
    • Don’t remove/rename fields
  2. Forward compatible: Old code reads new data
    • Ignore unknown fields
  3. Schema registry: Centralized schema versioning
    • Avro/Protobuf with Confluent Schema Registry
    • Validate compatibility on publish
Migration strategy:
  1. Deploy new schema (backward compatible)
  2. Backfill if needed
  3. Deploy new producers
  4. Old data still works
Layers:
  1. Bronze (Raw): Exact copy of source, append-only
  2. Silver (Cleaned): Deduplicated, validated, typed
  3. Gold (Business): Aggregated, ready for consumption
Technologies:
  • Storage: S3/ADLS with Delta Lake/Iceberg format
  • Compute: Spark/Databricks
  • Catalog: AWS Glue, Hive Metastore
  • Query: Athena, Presto, Trino
Best practices:
  • Partition by date for time-series data
  • Use columnar formats (Parquet)
  • Implement data quality checks at each layer
  • Track data lineage
Backpressure: When consumer can’t keep up with producerStrategies:
  1. Buffering: Kafka naturally buffers in log
  2. Rate limiting: Limit producer rate
  3. Sampling: Process subset of events
  4. Auto-scaling: Add more consumers
  5. Load shedding: Drop low-priority events
Implementation:
# Reactive Streams style backpressure
async def process_with_backpressure(stream, max_concurrent=100):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_one(event):
        async with semaphore:
            await process_event(event)
    
    async for event in stream:
        asyncio.create_task(process_one(event))