Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Senior Level: Data pipeline design is common in senior interviews, especially at data-intensive companies (LinkedIn, Netflix, Uber, Stripe). Expect questions about batch vs streaming, exactly-once processing, and handling late data. The key insight interviewers look for: the choice between batch and stream is not binary. Most production systems use both (the “Lambda” pattern) or unify them (the “Kappa” pattern). Knowing when each is appropriate — and being able to articulate the trade-offs — is what separates strong senior candidates.

Batch vs Stream Processing

┌─────────────────────────────────────────────────────────────────┐
│              Processing Paradigms                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  BATCH PROCESSING                 STREAM PROCESSING            │
│  ────────────────                 ─────────────────             │
│  Process data in chunks           Process data as it arrives   │
│  Higher throughput                Lower latency                 │
│  Bounded datasets                 Unbounded streams             │
│  Daily/hourly jobs                Real-time/near-real-time      │
│                                                                 │
│  ┌──────────────────┐            ┌──────────────────┐          │
│  │   Data Lake      │            │  Message Queue   │          │
│  │   (S3/HDFS)      │            │  (Kafka/Kinesis) │          │
│  │   ┌─────────┐    │            │   │ │ │ │ │ │    │          │
│  │   │ Batch 1 │    │            │   ▼ ▼ ▼ ▼ ▼ ▼    │          │
│  │   ├─────────┤    │            │ Event by Event   │          │
│  │   │ Batch 2 │    │            └──────────────────┘          │
│  │   ├─────────┤    │                     │                    │
│  │   │ Batch 3 │    │                     ▼                    │
│  │   └─────────┘    │            ┌──────────────────┐          │
│  └──────────────────┘            │ Stream Processor │          │
│           │                      │ (Flink/Spark)    │          │
│           ▼                      └──────────────────┘          │
│  ┌──────────────────┐                     │                    │
│  │ Batch Processor  │                     ▼                    │
│  │ (Spark/MapReduce)│            ┌──────────────────┐          │
│  └──────────────────┘            │ Real-time Result │          │
│           │                      │ (Dashboard/Alert)│          │
│           ▼                      └──────────────────┘          │
│  ┌──────────────────┐                                          │
│  │ Results (Next AM)│            Latency: Seconds              │
│  └──────────────────┘                                          │
│                                                                 │
│  Latency: Hours                                                │
│                                                                 │
│  USE CASES:                       USE CASES:                   │
│  • ML model training             • Fraud detection             │
│  • Data warehouse ETL            • Real-time dashboards        │
│  • Historical analysis           • Live recommendations        │
│  • Report generation             • IoT processing              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Lambda Architecture

┌─────────────────────────────────────────────────────────────────┐
│              Lambda Architecture                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│               ┌─────────────────────────────────┐              │
│               │       Data Source               │              │
│               │    (Events/Logs/etc)            │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│         ┌──────────────────┴──────────────────┐                │
│         ▼                                     ▼                │
│  ┌──────────────┐                    ┌──────────────┐          │
│  │ BATCH LAYER  │                    │ SPEED LAYER  │          │
│  │──────────────│                    │──────────────│          │
│  │              │                    │              │          │
│  │  Data Lake   │                    │ Stream       │          │
│  │     ▼        │                    │ Processor    │          │
│  │  Batch       │                    │     ▼        │          │
│  │  Processing  │                    │ Real-time    │          │
│  │     ▼        │                    │ Views        │          │
│  │  Batch Views │                    │              │          │
│  │              │                    │              │          │
│  │ (Complete,   │                    │ (Recent,     │          │
│  │  Accurate)   │                    │  Approximate)│          │
│  └──────┬───────┘                    └──────┬───────┘          │
│         │                                   │                  │
│         └──────────────┬────────────────────┘                  │
│                        ▼                                       │
│               ┌─────────────────┐                              │
│               │  SERVING LAYER  │                              │
│               │─────────────────│                              │
│               │                 │                              │
│               │ Merge Results   │                              │
│               │ Batch + Speed   │                              │
│               │                 │                              │
│               │ Query Interface │                              │
│               └─────────────────┘                              │
│                                                                 │
│  PROS:                          CONS:                          │
│  • Best of both worlds          • Two systems to maintain      │
│  • Accurate + Fast              • Code duplication             │
│  • Recompute from scratch       • Complex operations           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Kappa Architecture

┌─────────────────────────────────────────────────────────────────┐
│              Kappa Architecture                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│               ┌─────────────────────────────────┐              │
│               │       Data Source               │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│                            ▼                                   │
│               ┌─────────────────────────────────┐              │
│               │    Immutable Log (Kafka)        │              │
│               │    ─────────────────────        │              │
│               │    [E1][E2][E3][E4][E5]...     │              │
│               │    Retain all events            │              │
│               │    (or use compaction)          │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│                            ▼                                   │
│               ┌─────────────────────────────────┐              │
│               │    Stream Processing Layer      │              │
│               │    ────────────────────────     │              │
│               │                                 │              │
│               │    Single processing path       │              │
│               │    (Flink, Kafka Streams)       │              │
│               │                                 │              │
│               │    For reprocessing:            │              │
│               │    Start new consumer from      │              │
│               │    beginning of log             │              │
│               │                                 │              │
│               └────────────┬────────────────────┘              │
│                            │                                   │
│                            ▼                                   │
│               ┌─────────────────────────────────┐              │
│               │      Serving Layer              │              │
│               │      (DB/Cache/Index)           │              │
│               └─────────────────────────────────┘              │
│                                                                 │
│  PROS:                          CONS:                          │
│  • Simpler (one system)         • Need log retention          │
│  • No code duplication          • Reprocessing can be slow     │
│  • Single source of truth       • Not ideal for all workloads │
│                                                                 │
│  KEY INSIGHT:                                                  │
│  "Batch processing is just stream processing where the        │
│   stream happens to be bounded"                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Data Pipeline Components

Message Queues

┌─────────────────────────────────────────────────────────────────┐
│              Message Queue Comparison                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  KAFKA                                                          │
│  ─────                                                          │
│  • Log-based, persistent                                       │
│  • Replay from any offset                                      │
│  • Partitioned for parallelism                                 │
│  • Consumer groups for fan-out                                 │
│  • Best for: Event sourcing, data pipelines                    │
│                                                                 │
│  ┌──────────────────────────────────────────────────────┐      │
│  │  Topic: orders                                       │      │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐       │      │
│  │  │ Partition 0│ │ Partition 1│ │ Partition 2│       │      │
│  │  │ [0][1][2]  │ │ [0][1]     │ │ [0][1][2][3]│      │      │
│  │  └────────────┘ └────────────┘ └────────────┘       │      │
│  │                                                      │      │
│  │  Consumer Group A:                                   │      │
│  │    C1 ← P0, P1    C2 ← P2                           │      │
│  │                                                      │      │
│  │  Consumer Group B (independent):                     │      │
│  │    C3 ← All partitions                              │      │
│  └──────────────────────────────────────────────────────┘      │
│                                                                 │
│  RABBITMQ                                                       │
│  ────────                                                       │
│  • Traditional message broker                                  │
│  • ACK/NACK, dead letter queues                               │
│  • Flexible routing (exchanges)                                │
│  • Best for: Task queues, RPC                                  │
│                                                                 │
│  SQS (AWS)                                                      │
│  ─────────                                                      │
│  • Fully managed, serverless                                   │
│  • Standard (at-least-once) or FIFO (exactly-once)             │
│  • Best for: Decoupling AWS services                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Stream Processing

# Apache Kafka Streams (conceptual)
class StreamProcessor:
    """
    Example: Real-time order analytics
    """
    
    def process(self):
        # Define topology
        builder = StreamsBuilder()
        
        # Source: Read from orders topic
        orders = builder.stream("orders")
        
        # Transform: Parse and enrich
        enriched_orders = orders \
            .map(lambda key, value: (
                value.user_id,
                {
                    **value,
                    "processed_at": datetime.now(),
                    "amount_usd": convert_currency(value.amount)
                }
            ))
        
        # Aggregate: Count orders per user per hour
        hourly_counts = enriched_orders \
            .group_by_key() \
            .windowed_by(TimeWindows.of(Duration.of_hours(1))) \
            .count()
        
        # Join: Enrich with user data
        users = builder.table("users")
        enriched_with_user = enriched_orders.join(
            users,
            lambda order, user: {
                **order,
                "user_name": user.name,
                "user_tier": user.tier
            }
        )
        
        # Filter: Detect high-value orders
        high_value = enriched_orders \
            .filter(lambda key, value: value.amount_usd > 1000) \
            .to("high-value-orders")
        
        # Sink: Write to output topics
        hourly_counts.to("order-counts-hourly")
        enriched_with_user.to("enriched-orders")

Handling Late Data (Windowing)

┌─────────────────────────────────────────────────────────────────┐
│              Time Windows & Late Data                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  EVENT TIME vs PROCESSING TIME                                  │
│  ─────────────────────────────                                  │
│                                                                 │
│  Event Time: When event actually occurred                      │
│  Processing Time: When event is processed                      │
│                                                                 │
│  Event: {type: "click", timestamp: "10:00:00"}                 │
│  Arrives at processor: 10:00:05 (5s late due to network)       │
│                                                                 │
│  WINDOW TYPES:                                                 │
│                                                                 │
│  Tumbling Window (Fixed, non-overlapping):                     │
│  ┌────────┐ ┌────────┐ ┌────────┐                             │
│  │ 10:00- │ │ 10:05- │ │ 10:10- │                             │
│  │ 10:05  │ │ 10:10  │ │ 10:15  │                             │
│  └────────┘ └────────┘ └────────┘                             │
│  |← 5 min →|                                                   │
│                                                                 │
│  Sliding Window (Overlapping):                                 │
│  ┌──────────────────┐                                          │
│  │      10:00-10:05 │                                          │
│  └──────────────────┘                                          │
│       ┌──────────────────┐                                     │
│       │  10:02-10:07     │                                     │
│       └──────────────────┘                                     │
│            ┌──────────────────┐                                │
│            │  10:04-10:09     │                                │
│            └──────────────────┘                                │
│                                                                 │
│  Session Window (Activity-based):                              │
│  ┌───────────────┐     ┌────────────────────────┐             │
│  │ Session 1     │     │ Session 2               │             │
│  │ [e1][e2][e3]  │     │ [e4][e5]...[eN]        │             │
│  └───────────────┘     └────────────────────────┘             │
│             ↑ gap ↑                                            │
│            (timeout)                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Watermarks for Late Data

Late data is the single most underestimated problem in stream processing. In a batch world, you process a complete dataset — no data is “late” because you wait for all of it. In a streaming world, events arrive out of order due to network delays, client clock skew, mobile devices going offline then reconnecting, and retry logic in upstream producers. A robust pipeline must decide: how long do I wait for stragglers before closing a window and emitting results? Watermarks are the mechanism for making this decision systematically. They are essentially the pipeline’s best guess at “what time is it in the event world?”
┌─────────────────────────────────────────────────────────────────┐
│              Watermarks                                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Watermark = "I believe all events before this time have       │
│              arrived"                                          │
│                                                                 │
│  Event Time:    10:00  10:01  10:02  10:03  10:04             │
│                   │      │      │      │      │                │
│  Events:        [A]    [B]    [C]           [D]                │
│                                  ↑                              │
│                            Watermark = 10:02                   │
│                            "All events ≤10:02 have arrived"    │
│                                                                 │
│  Late Event:              [E at 10:01.5]                       │
│                               ↑                                 │
│                         Arrives after watermark!               │
│                                                                 │
│  HANDLING LATE DATA:                                           │
│  ──────────────────                                             │
│                                                                 │
│  1. DROP: Ignore late events (simple, may lose data)           │
│                                                                 │
│  2. ALLOWED LATENESS: Accept events within grace period        │
│     window.allowed_lateness(Duration.of_minutes(5))            │
│     Events up to 5 min late can still update window            │
│                                                                 │
│  3. SIDE OUTPUT: Route late events to separate stream          │
│     late_events → special processing or storage                │
│                                                                 │
│  4. REPROCESSING: Store in append-only log, recompute later    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Exactly-Once Processing

┌─────────────────────────────────────────────────────────────────┐
│              Delivery Semantics                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  AT-MOST-ONCE                                                   │
│  ─────────────                                                  │
│  • Fire and forget                                             │
│  • May lose messages                                           │
│  • Simplest, fastest                                           │
│  • Use: Metrics, logs where some loss OK                       │
│                                                                 │
│  AT-LEAST-ONCE                                                  │
│  ─────────────                                                  │
│  • Retry until acknowledged                                    │
│  • May have duplicates                                         │
│  • Consumer must be idempotent                                 │
│  • Use: Most applications (with dedup)                         │
│                                                                 │
│  EXACTLY-ONCE                                                   │
│  ───────────                                                    │
│  • Each message processed exactly once                         │
│  • Complex to achieve                                          │
│  • Use: Financial, critical data                               │
│                                                                 │
│  HOW TO ACHIEVE EXACTLY-ONCE:                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                                                         │   │
│  │  1. IDEMPOTENT CONSUMER                                │   │
│  │     • Assign unique ID to each message                 │   │
│  │     • Check if already processed before processing     │   │
│  │                                                         │   │
│  │  2. TRANSACTIONAL PROCESSING                           │   │
│  │     • Read → Process → Write in single transaction     │   │
│  │     • Kafka supports transactional producers           │   │
│  │                                                         │   │
│  │  3. TWO-PHASE COMMIT (Distributed)                     │   │
│  │     • Prepare phase: Lock resources                    │   │
│  │     • Commit phase: Apply changes                      │   │
│  │     • Expensive, but guarantees                        │   │
│  │                                                         │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Idempotent Consumer Implementation

class IdempotentConsumer:
    """
    Ensures exactly-once processing using deduplication
    """
    
    def __init__(self, redis_client, db_session):
        self.redis = redis_client
        self.db = db_session
        self.dedup_ttl = 86400  # 24 hours
    
    def process_message(self, message):
        message_id = message.id
        
        # Check if already processed
        if self._is_duplicate(message_id):
            logger.info(f"Skipping duplicate: {message_id}")
            return
        
        # Start transaction
        try:
            with self.db.begin():
                # Process the message
                result = self._do_processing(message)
                
                # Mark as processed (in same transaction)
                self._mark_processed(message_id)
                
            # Acknowledge message (after commit)
            message.ack()
            
        except Exception as e:
            # Don't mark as processed - will retry
            logger.error(f"Processing failed: {e}")
            message.nack()
            raise
    
    def _is_duplicate(self, message_id: str) -> bool:
        """Check Redis for recently processed messages"""
        return self.redis.exists(f"processed:{message_id}")
    
    def _mark_processed(self, message_id: str):
        """Mark message as processed"""
        # Redis for fast lookups
        self.redis.setex(f"processed:{message_id}", self.dedup_ttl, "1")
        
        # Also persist to DB for durability
        self.db.execute(
            "INSERT INTO processed_messages (id, processed_at) VALUES (?, ?)",
            (message_id, datetime.now())
        )

Stream Processing Implementation

Stream Processing Pipeline Real-world stream processing with Kafka and async processing:
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Any, List, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import json
from abc import ABC, abstractmethod

# ============== Kafka Consumer with Backpressure ==============
class StreamProcessor:
    """Production-ready stream processor with backpressure handling"""
    
    def __init__(
        self,
        kafka_config: Dict[str, Any],
        topic: str,
        group_id: str,
        max_concurrent: int = 100,
        batch_size: int = 100,
        batch_timeout_ms: int = 1000
    ):
        self.kafka_config = kafka_config
        self.topic = topic
        self.group_id = group_id
        self.max_concurrent = max_concurrent
        self.batch_size = batch_size
        self.batch_timeout_ms = batch_timeout_ms
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.handlers: List[Callable] = []
        self.running = False
    
    def add_handler(self, handler: Callable) -> None:
        """Add processing handler"""
        self.handlers.append(handler)
    
    async def start(self) -> None:
        """Start consuming and processing messages"""
        from aiokafka import AIOKafkaConsumer
        
        self.consumer = AIOKafkaConsumer(
            self.topic,
            bootstrap_servers=self.kafka_config['bootstrap_servers'],
            group_id=self.group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            max_poll_records=self.batch_size
        )
        
        await self.consumer.start()
        self.running = True
        
        try:
            async for message in self.consumer:
                await self._process_with_backpressure(message)
        finally:
            await self.consumer.stop()
    
    async def _process_with_backpressure(self, message) -> None:
        """Process with concurrency limit"""
        async with self.semaphore:
            try:
                event = json.loads(message.value.decode())
                
                # Run all handlers
                for handler in self.handlers:
                    await handler(event)
                
                # Commit offset after successful processing
                await self.consumer.commit()
                
            except Exception as e:
                await self._handle_error(message, e)
    
    async def _handle_error(self, message, error: Exception) -> None:
        """Send to dead letter queue on failure"""
        from aiokafka import AIOKafkaProducer
        
        dlq_message = {
            "original_topic": self.topic,
            "original_message": message.value.decode(),
            "error": str(error),
            "timestamp": datetime.utcnow().isoformat(),
            "partition": message.partition,
            "offset": message.offset
        }
        
        producer = AIOKafkaProducer(
            bootstrap_servers=self.kafka_config['bootstrap_servers']
        )
        await producer.start()
        try:
            await producer.send_and_wait(
                f"{self.topic}.dlq",
                json.dumps(dlq_message).encode()
            )
        finally:
            await producer.stop()

# ============== Windowed Aggregations ==============
@dataclass
class WindowedAggregator:
    """Tumbling window aggregation"""
    
    window_size: timedelta
    aggregations: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    watermark_delay: timedelta = field(default_factory=lambda: timedelta(seconds=30))
    
    def add_event(self, event: Dict[str, Any], timestamp: datetime) -> None:
        """Add event to appropriate window"""
        window_key = self._get_window_key(timestamp)
        
        if window_key not in self.aggregations:
            self.aggregations[window_key] = {
                "count": 0,
                "sum": 0.0,
                "min": float('inf'),
                "max": float('-inf'),
                "window_start": window_key,
                "window_end": window_key + self.window_size
            }
        
        agg = self.aggregations[window_key]
        value = event.get("value", 0)
        
        agg["count"] += 1
        agg["sum"] += value
        agg["min"] = min(agg["min"], value)
        agg["max"] = max(agg["max"], value)
    
    def _get_window_key(self, timestamp: datetime) -> datetime:
        """Get window start time for timestamp"""
        window_seconds = self.window_size.total_seconds()
        ts_seconds = timestamp.timestamp()
        window_start = (ts_seconds // window_seconds) * window_seconds
        return datetime.fromtimestamp(window_start)
    
    def get_closed_windows(self, watermark: datetime) -> List[Dict[str, Any]]:
        """Get windows that are past the watermark"""
        closed = []
        cutoff = watermark - self.watermark_delay
        
        for window_key, agg in list(self.aggregations.items()):
            if agg["window_end"] < cutoff:
                agg["avg"] = agg["sum"] / agg["count"] if agg["count"] > 0 else 0
                closed.append(agg)
                del self.aggregations[window_key]
        
        return closed

# ============== Exactly-Once Processing ==============
class ExactlyOnceProcessor:
    """
    Exactly-once semantics with transactional processing
    """
    
    def __init__(self, db_pool, redis_client):
        self.db_pool = db_pool
        self.redis = redis_client
        self.dedup_ttl = 86400 * 7  # 7 days
    
    async def process(
        self, 
        event: Dict[str, Any], 
        processor: Callable
    ) -> Optional[Any]:
        """Process event with exactly-once guarantee"""
        event_id = event.get("event_id")
        
        if not event_id:
            raise ValueError("Event must have event_id for exactly-once")
        
        # Check if already processed
        if await self._is_duplicate(event_id):
            return None
        
        # Acquire lock for this event
        lock_key = f"lock:event:{event_id}"
        lock_acquired = await self.redis.set(
            lock_key, "1", 
            ex=60,  # Lock timeout
            nx=True
        )
        
        if not lock_acquired:
            return None  # Another instance is processing
        
        try:
            async with self.db_pool.acquire() as conn:
                async with conn.transaction():
                    # Process the event
                    result = await processor(event, conn)
                    
                    # Mark as processed in same transaction
                    await conn.execute("""
                        INSERT INTO processed_events (event_id, processed_at)
                        VALUES ($1, $2)
                        ON CONFLICT (event_id) DO NOTHING
                    """, event_id, datetime.utcnow())
            
            # Also cache in Redis for fast lookups
            await self.redis.setex(
                f"processed:{event_id}", 
                self.dedup_ttl, 
                "1"
            )
            
            return result
            
        finally:
            await self.redis.delete(lock_key)
    
    async def _is_duplicate(self, event_id: str) -> bool:
        """Check Redis first, then DB"""
        # Fast path: check Redis
        if await self.redis.exists(f"processed:{event_id}"):
            return True
        
        # Slow path: check database
        async with self.db_pool.acquire() as conn:
            result = await conn.fetchval("""
                SELECT 1 FROM processed_events 
                WHERE event_id = $1
            """, event_id)
            
            if result:
                # Backfill Redis cache
                await self.redis.setex(
                    f"processed:{event_id}", 
                    self.dedup_ttl, 
                    "1"
                )
                return True
        
        return False

# ============== Stream Join Implementation ==============
class StreamJoiner:
    """Join two streams within a time window"""
    
    def __init__(
        self, 
        window_size: timedelta,
        left_key: str,
        right_key: str
    ):
        self.window_size = window_size
        self.left_key = left_key
        self.right_key = right_key
        self.left_buffer: Dict[str, List[Dict]] = defaultdict(list)
        self.right_buffer: Dict[str, List[Dict]] = defaultdict(list)
    
    def add_left(self, event: Dict[str, Any]) -> List[Dict]:
        """Add event from left stream, return any matches"""
        key = event.get(self.left_key)
        timestamp = datetime.fromisoformat(event["timestamp"])
        
        # Clean old events
        self._cleanup_buffer(self.left_buffer, timestamp)
        self._cleanup_buffer(self.right_buffer, timestamp)
        
        # Store in buffer
        self.left_buffer[key].append({
            "event": event,
            "timestamp": timestamp
        })
        
        # Find matches in right buffer
        matches = []
        for right_event in self.right_buffer.get(key, []):
            if abs((right_event["timestamp"] - timestamp).total_seconds()) < \
               self.window_size.total_seconds():
                matches.append({
                    "left": event,
                    "right": right_event["event"],
                    "join_timestamp": timestamp.isoformat()
                })
        
        return matches
    
    def add_right(self, event: Dict[str, Any]) -> List[Dict]:
        """Add event from right stream, return any matches"""
        key = event.get(self.right_key)
        timestamp = datetime.fromisoformat(event["timestamp"])
        
        # Clean old events
        self._cleanup_buffer(self.left_buffer, timestamp)
        self._cleanup_buffer(self.right_buffer, timestamp)
        
        # Store in buffer
        self.right_buffer[key].append({
            "event": event,
            "timestamp": timestamp
        })
        
        # Find matches in left buffer
        matches = []
        for left_event in self.left_buffer.get(key, []):
            if abs((left_event["timestamp"] - timestamp).total_seconds()) < \
               self.window_size.total_seconds():
                matches.append({
                    "left": left_event["event"],
                    "right": event,
                    "join_timestamp": timestamp.isoformat()
                })
        
        return matches
    
    def _cleanup_buffer(
        self, 
        buffer: Dict[str, List[Dict]], 
        current_time: datetime
    ) -> None:
        """Remove events outside the window"""
        cutoff = current_time - self.window_size
        
        for key in list(buffer.keys()):
            buffer[key] = [
                e for e in buffer[key] 
                if e["timestamp"] > cutoff
            ]
            if not buffer[key]:
                del buffer[key]

ETL vs ELT

┌─────────────────────────────────────────────────────────────────┐
│              ETL vs ELT                                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ETL (Extract-Transform-Load)                                  │
│  ────────────────────────────                                   │
│                                                                 │
│  Source → Extract → Transform → Load → Warehouse               │
│                         ↑                                       │
│                   Processing server                            │
│                                                                 │
│  • Transform BEFORE loading                                    │
│  • Need powerful ETL server                                    │
│  • Less storage in warehouse                                   │
│  • Harder to re-transform                                      │
│  • Tools: Informatica, Talend, SSIS                           │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  ELT (Extract-Load-Transform)                                  │
│  ────────────────────────────                                   │
│                                                                 │
│  Source → Extract → Load → Transform → Results                 │
│                       ↑         ↑                              │
│                 Raw storage   Warehouse does transform         │
│                                                                 │
│  • Load raw data first                                         │
│  • Transform using warehouse compute                           │
│  • Keep raw data (can re-transform)                           │
│  • Modern approach (cloud warehouses)                         │
│  • Tools: dbt, Fivetran + Snowflake/BigQuery                  │
│                                                                 │
│  MODERN DATA STACK:                                            │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐        │
│  │ Sources │ → │ Fivetran│ → │Snowflake│ → │   dbt   │        │
│  │ (APIs,  │   │ (Extract│   │ (Load)  │   │(Transform│       │
│  │  DBs)   │   │  + Load)│   │         │   │  in SQL) │       │
│  └─────────┘   └─────────┘   └─────────┘   └─────────┘        │
│                                    │                           │
│                                    ▼                           │
│                             ┌─────────────┐                    │
│                             │  Looker/    │                    │
│                             │  Tableau    │                    │
│                             └─────────────┘                    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Data Quality

class DataQualityChecker:
    """
    Data quality checks for pipeline
    """
    
    def __init__(self):
        self.checks = []
    
    def add_check(self, check_func, name: str, severity: str = "error"):
        self.checks.append({
            "name": name,
            "func": check_func,
            "severity": severity
        })
    
    def validate(self, df) -> List[Dict]:
        results = []
        for check in self.checks:
            passed = check["func"](df)
            results.append({
                "check": check["name"],
                "passed": passed,
                "severity": check["severity"]
            })
            
            if not passed and check["severity"] == "error":
                raise DataQualityError(f"Check failed: {check['name']}")
        
        return results

# Example usage
checker = DataQualityChecker()

# Completeness: No null values in required columns
checker.add_check(
    lambda df: df[['user_id', 'order_id', 'amount']].notna().all().all(),
    "required_fields_not_null",
    severity="error"
)

# Freshness: Data is recent
checker.add_check(
    lambda df: (datetime.now() - df['created_at'].max()).hours < 24,
    "data_freshness_24h",
    severity="warning"
)

# Uniqueness: No duplicate primary keys
checker.add_check(
    lambda df: df['order_id'].is_unique,
    "order_id_unique",
    severity="error"
)

# Range: Values within expected bounds
checker.add_check(
    lambda df: (df['amount'] > 0).all() and (df['amount'] < 1000000).all(),
    "amount_reasonable_range",
    severity="warning"
)

# Volume: Expected number of records
checker.add_check(
    lambda df: len(df) >= 1000,  # Expect at least 1000 orders per day
    "minimum_record_count",
    severity="warning"
)

Senior Interview Questions

Requirements clarification:
  • What metrics? (pageviews, conversions, revenue)
  • How real-time? (seconds vs minutes)
  • Scale? (events per second)
Architecture:
Events → Kafka → Flink/Spark Streaming → 
  ├─→ Pre-aggregated tables (ClickHouse/Druid)
  └─→ Real-time materialized views

Dashboard ← Query Layer ← Time-series DB
Key decisions:
  1. Pre-aggregate: Don’t query raw events for dashboards
  2. Materialized views: Update incrementally, not full recompute
  3. Time-series DB: Optimized for time-based queries
Schema evolution strategies:
  1. Backward compatible: New code reads old data
    • Add optional fields only
    • Don’t remove/rename fields
  2. Forward compatible: Old code reads new data
    • Ignore unknown fields
  3. Schema registry: Centralized schema versioning
    • Avro/Protobuf with Confluent Schema Registry
    • Validate compatibility on publish
Migration strategy:
  1. Deploy new schema (backward compatible)
  2. Backfill if needed
  3. Deploy new producers
  4. Old data still works
Layers:
  1. Bronze (Raw): Exact copy of source, append-only
  2. Silver (Cleaned): Deduplicated, validated, typed
  3. Gold (Business): Aggregated, ready for consumption
Technologies:
  • Storage: S3/ADLS with Delta Lake/Iceberg format
  • Compute: Spark/Databricks
  • Catalog: AWS Glue, Hive Metastore
  • Query: Athena, Presto, Trino
Best practices:
  • Partition by date for time-series data
  • Use columnar formats (Parquet)
  • Implement data quality checks at each layer
  • Track data lineage
Backpressure: When consumer can’t keep up with producerStrategies:
  1. Buffering: Kafka naturally buffers in log
  2. Rate limiting: Limit producer rate
  3. Sampling: Process subset of events
  4. Auto-scaling: Add more consumers
  5. Load shedding: Drop low-priority events
Implementation:
# Reactive Streams style backpressure
async def process_with_backpressure(stream, max_concurrent=100):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_one(event):
        async with semaphore:
            await process_event(event)
    
    async for event in stream:
        asyncio.create_task(process_one(event))

Interview Deep-Dive

Strong Answer:Fraud detection has a hard latency constraint: you must return an approve/deny decision before the payment gateway times out, typically within 100-500ms. Batch processing runs on bounded datasets with latency measured in minutes to hours — by the time a batch job detects a fraudulent transaction, the money is already gone.Architecture:
  • Ingestion: Kafka topic with transactions partitioned by merchant_id (keeps related transactions together for pattern detection). At 50K TPS with ~1KB per transaction event, that is 50 MB/sec = 432 TB/day of raw event data.
  • Stream processor: Apache Flink with event-time windowing. Each transaction triggers three parallel checks:
    1. Rule engine (sub-10ms): Hard-coded rules like “decline if amount exceeds 3x the user’s average transaction in the last 30 days.” This requires a pre-computed user profile in Redis (average transaction amount, transaction count, last transaction time). Redis lookup: ~1ms.
    2. Velocity check (sub-20ms): Count transactions per card in sliding windows (1 minute, 1 hour, 24 hours). Flink maintains this state in-memory with RocksDB state backend. If a card has 10+ transactions in 1 minute, flag it.
    3. ML model scoring (sub-50ms): Feature vector (transaction amount, merchant category, time of day, device fingerprint, geo-distance from last transaction) fed into a pre-trained model served via a low-latency inference service (TensorFlow Serving or ONNX Runtime). The model returns a fraud probability score.
  • Decision aggregation: Combine all three scores. If any hard rule triggers, decline. If ML score exceeds threshold (0.8), decline. If velocity check + ML score is borderline, route to human review queue.
  • Total pipeline latency budget: Kafka consume (5ms) + parallel checks (50ms worst case) + decision logic (2ms) + Kafka produce for downstream (5ms) = ~62ms end-to-end. Well within the 500ms gateway timeout.
Back-of-envelope for infrastructure: Flink needs to maintain windowed state for velocity checks. 100M active cards * 3 windows * ~200 bytes per window = 60 GB of state. Fits in a 10-node Flink cluster with 8 GB state per node. Kafka: 50K TPS with 7 days retention = 50K * 86,400 * 7 * 1KB = 30 TB. A 30-broker Kafka cluster with 1 TB each.Follow-up: Your ML model is retrained weekly on batch data, but fraud patterns shift within hours. How do you close this gap?Use a lambda-style approach for the model. The weekly batch-trained model is the “base model” that captures long-term patterns. Layer an “online model” on top that retrains incrementally on confirmed fraud/not-fraud labels as they arrive (typically 24-48 hours after the transaction, when chargebacks come in). Flink can do online learning with frameworks like Apache Flink ML. The online model adjusts feature weights based on recent patterns. Combined score = 0.7 * base_model + 0.3 * online_model. This gives you the stability of batch training with the adaptiveness of online learning.
Strong Answer:The core trade-off is operational complexity vs. reprocessing capability.Lambda architecture’s pain: You are maintaining two codepaths that must produce the same results — one in Spark (batch) and one in Flink or Kafka Streams (speed). Every business logic change requires updating both, testing both, and verifying they agree. In my experience, the batch and speed layers inevitably diverge, and debugging discrepancies is the single biggest time sink for the data engineering team.Kappa architecture’s promise: One codebase, one processing engine (Flink), one source of truth (Kafka log). When you need to reprocess, replay the Kafka topic from the beginning through an updated version of your Flink job.The critical question: can you replay fast enough?
  • Your nightly Spark batch takes 4 hours to process a full day of data. Assume 1 day of data = 10 TB.
  • In Kappa, reprocessing means replaying Kafka. Flink can typically process faster than real-time when reading from Kafka (no external I/O latency). If real-time throughput is 50 MB/sec, replay throughput might be 500 MB/sec (10x, limited by state checkpointing and sink writes). 10 TB / 500 MB/sec = 20,000 seconds = ~5.5 hours. Comparable to the 4-hour batch.
  • But what if you need to reprocess 30 days of data (a bug was found in the business logic 30 days ago)? That is 300 TB. Replay: 300 TB / 500 MB/sec = 600,000 seconds = ~7 days. With Lambda, you run the corrected Spark job against the data lake in 4 hours (it processes 30 days in parallel, not sequentially).
My recommendation: Migrate to Kappa if your reprocessing window is typically under 7 days and your data volume is manageable. Keep the data lake (S3/HDFS) as a backup — not a processing layer, just storage. If you need to reprocess more than 7 days, spin up a temporary Spark job against the data lake as an escape hatch. This gives you the simplicity of Kappa for 95% of cases with a safety net for the rare full-history reprocessing.Follow-up: You move to Kappa. Your Flink job has a bug that corrupted 3 days of output data in your downstream analytics database. How do you recover?This is the Kappa architecture’s Achilles heel, and why you keep the raw event log. Step 1: fix the bug and deploy the corrected Flink job. Step 2: reset the Flink consumer offset to 3 days ago and replay into a PARALLEL output (a new table or a staging database), not the production output. Step 3: validate the replayed output against known-good reference data. Step 4: atomically swap the production table to point to the corrected output (rename tables or update a view). The key: never replay directly into production until you have validated the output. Kafka’s retention policy must be set to keep at least 7-14 days of raw events to make this recovery possible. At 50 MB/sec * 86,400 sec * 14 days = ~60 TB of Kafka retention — expensive but essential for disaster recovery.
Strong Answer:Ingestion layer (Kafka):
  • 1B events/day = 11,574 events/sec average, ~35K peak.
  • Average event size: 500 bytes. Throughput: 11,574 * 500 = 5.8 MB/sec average, 17.4 MB/sec peak.
  • Kafka retention: 7 days for hot replay. 7 * 86,400 * 11,574 * 500 bytes = 3.5 TB.
  • Kafka cluster: 3 brokers with replication factor 3. Each broker stores 3.5 TB / 3 * 3 (replication) = 3.5 TB. Use i3.xlarge instances (1 TB NVMe). Need 4 i3.xlarge per broker = 12 instances total. At ~0.31/hr:0.31/hr: 2,700/month.
Stream processing (Flink):
  • 5-minute tumbling windows for aggregations. State size: depends on aggregation dimensions. If aggregating by 1M unique keys with 200 bytes of state each = 200 MB of state. Modest — a 3-node Flink cluster handles this easily. 3 m5.2xlarge at ~0.38/hr:0.38/hr: 820/month.
Long-term storage (S3):
  • 1B events * 500 bytes = 500 GB/day raw. Compressed with Parquet: ~100 GB/day.
  • 1 year: 100 GB * 365 = 36.5 TB.
  • S3 Standard: 0.023/GB/month.For36.5TB:0.023/GB/month. For 36.5 TB: 839/month.
  • After 90 days, move to S3 Glacier Instant Retrieval: 0.004/GB/month.Averagecostdropstoroughly0.004/GB/month. Average cost drops to roughly 400/month.
Aggregation output (time-series database):
  • 5-minute aggregations for 1M keys = 288 aggregation points per key per day. 1M * 288 * 100 bytes = 28.8 GB/day. 1 year = 10.5 TB.
  • TimescaleDB on a db.r6g.4xlarge: ~$2,500/month. With compression (10x on time-series data): 1 TB actual storage.
Data transfer:
  • Kafka to Flink: same VPC, free.
  • Flink to S3: $0.00/GB within same region.
  • API reads from TimescaleDB: negligible.
Total monthly cost: Kafka (2,700)+Flink(2,700) + Flink (820) + S3 (400)+TimescaleDB(400) + TimescaleDB (2,500) + monitoring/overhead (500)= 500) = ~6,920/month or ~$83,000/year.Follow-up: The business wants to cut costs by 50%. Where do you start?The biggest cost is Kafka at 2,700/month,drivenbythe7dayretentionwithreplication.Reduceretentionto3days(youhaveS3forreplaybeyondthat):saves 2,700/month, driven by the 7-day retention with replication. Reduce retention to 3 days (you have S3 for replay beyond that): saves ~1,000. Switch from i3.xlarge to graviton-based instances for 20% savings on Kafka and Flink: saves ~700.UsespotinstancesforFlink(statelessprocessingcantolerateinterruptionswithcheckpointing):saves 700. Use spot instances for Flink (stateless processing can tolerate interruptions with checkpointing): saves ~400. Move from TimescaleDB managed to self-hosted on reserved instances: saves ~800.Totalsavings: 800. Total savings: ~2,900, bringing the cost to ~4,000/montha424,000/month -- a 42% reduction. To hit 50%, also evaluate whether 5-minute aggregation granularity is truly needed. If 15-minute windows are acceptable, Flink state and TimescaleDB storage drop by 3x, saving another ~500.