Skip to main content

Capstone Project: Real-Time Analytics Pipeline

Module Duration: 6-8 hours Focus: End-to-end production system Prerequisites: All previous modules

Project Overview

Build a complete real-time analytics pipeline that processes streaming e-commerce data, performs real-time analytics, generates recommendations, and provides insights through dashboards.

System Architecture

Data Sources
  ├── User Events (Kafka)
  ├── Product Catalog (PostgreSQL)
  └── User Profiles (S3)

Streaming Ingestion (Structured Streaming)

Real-Time Processing
  ├── Event Enrichment
  ├── Sessionization
  ├── Feature Engineering
  └── Anomaly Detection

Machine Learning
  ├── Real-Time Recommendations
  ├── Churn Prediction
  └── Customer Segmentation

Storage & Analytics
  ├── Delta Lake (Bronze/Silver/Gold)
  ├── Aggregated Metrics
  └── ML Model Registry

Serving Layer
  ├── REST API
  ├── Real-Time Dashboard
  └── Alert System

Part 1: Data Ingestion

Kafka Producer (Event Generator)

# event_generator.py
from kafka import KafkaProducer
import json
import random
import time
from datetime import datetime

class EventGenerator:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def generate_user_event(self):
        """Generate realistic user events"""
        event_types = ['page_view', 'add_to_cart', 'purchase', 'search', 'remove_from_cart']
        users = range(1, 10001)  # 10k users
        products = range(1, 1001)  # 1k products

        return {
            'event_id': f"evt_{int(time.time() * 1000)}_{random.randint(0, 9999)}",
            'event_type': random.choice(event_types),
            'user_id': random.choice(users),
            'product_id': random.choice(products),
            'timestamp': datetime.utcnow().isoformat(),
            'session_id': f"session_{random.randint(1, 5000)}",
            'page_url': f"/product/{random.choice(products)}",
            'device': random.choice(['mobile', 'desktop', 'tablet']),
            'price': round(random.uniform(10, 1000), 2),
            'quantity': random.randint(1, 5),
            'referrer': random.choice(['google', 'facebook', 'direct', 'email']),
            'location': {
                'country': random.choice(['US', 'UK', 'CA', 'DE', 'FR']),
                'city': random.choice(['New York', 'London', 'Toronto', 'Berlin', 'Paris'])
            }
        }

    def run(self, events_per_second=100, duration_seconds=None):
        """Generate events continuously"""
        start_time = time.time()
        events_generated = 0

        try:
            while True:
                if duration_seconds and (time.time() - start_time) > duration_seconds:
                    break

                event = self.generate_user_event()
                self.producer.send('user-events', value=event)
                events_generated += 1

                # Rate limiting
                time.sleep(1.0 / events_per_second)

                if events_generated % 1000 == 0:
                    print(f"Generated {events_generated} events")

        except KeyboardInterrupt:
            print("Stopping event generator...")
        finally:
            self.producer.close()
            print(f"Total events generated: {events_generated}")

if __name__ == "__main__":
    generator = EventGenerator(['localhost:9092'])
    generator.run(events_per_second=100)

Structured Streaming Ingestion

# streaming_ingestion.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Initialize Spark with Delta Lake
spark = SparkSession.builder \
    .appName("RealTimeAnalytics") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Define event schema
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("session_id", StringType(), False),
    StructField("page_url", StringType(), True),
    StructField("device", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("referrer", StringType(), True),
    StructField("location", StructType([
        StructField("country", StringType(), True),
        StructField("city", StringType(), True)
    ]), True)
])

# Read from Kafka
raw_events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

# Parse JSON events
parsed_events = raw_events.select(
    from_json(col("value").cast("string"), event_schema).alias("data"),
    col("timestamp").alias("kafka_timestamp"),
    col("offset"),
    col("partition")
).select("data.*", "kafka_timestamp", "offset", "partition")

# Write to Bronze layer (raw data)
bronze_query = parsed_events.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/delta/checkpoints/bronze") \
    .trigger(processingTime="10 seconds") \
    .start("/delta/bronze/events")

Part 2: Real-Time Processing

Event Enrichment

# enrichment.py
from pyspark.sql.functions import broadcast

# Load dimension tables
products = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/ecommerce") \
    .option("dbtable", "products") \
    .option("user", "postgres") \
    .option("password", "password") \
    .load()

users = spark.read \
    .format("parquet") \
    .load("/data/users")

# Broadcast small tables
broadcast_products = broadcast(products)
broadcast_users = broadcast(users)

# Read from Bronze
bronze_events = spark.readStream \
    .format("delta") \
    .load("/delta/bronze/events")

# Enrich events
enriched = bronze_events \
    .join(broadcast_products, "product_id") \
    .join(broadcast_users, "user_id") \
    .select(
        "event_id",
        "event_type",
        "user_id",
        "user_name",
        "user_segment",
        "user_lifetime_value",
        "product_id",
        "product_name",
        "product_category",
        "product_price",
        "timestamp",
        "session_id",
        "device",
        "price",
        "quantity",
        (col("price") * col("quantity")).alias("revenue"),
        "referrer",
        "location"
    )

# Write to Silver layer (enriched data)
silver_query = enriched.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/delta/checkpoints/silver") \
    .trigger(processingTime="10 seconds") \
    .start("/delta/silver/enriched_events")

Session Analysis

# sessionization.py
from pyspark.sql.streaming import GroupState, GroupStateTimeout

class SessionState:
    def __init__(self):
        self.session_start = None
        self.session_end = None
        self.events = []
        self.page_views = 0
        self.add_to_cart = 0
        self.purchases = 0
        self.total_revenue = 0.0

def update_session(session_id, events, state):
    """Update session state with new events"""
    if state.exists:
        session = state.get()
    else:
        session = SessionState()

    for event in events:
        # Update session metrics
        if session.session_start is None:
            session.session_start = event.timestamp
        session.session_end = event.timestamp
        session.events.append(event)

        # Count event types
        if event.event_type == 'page_view':
            session.page_views += 1
        elif event.event_type == 'add_to_cart':
            session.add_to_cart += 1
        elif event.event_type == 'purchase':
            session.purchases += 1
            session.total_revenue += event.revenue

    # Set timeout for inactive sessions (30 minutes)
    state.update(session)
    state.setTimeoutDuration("30 minutes")

    # Calculate session duration
    if session.session_start and session.session_end:
        duration = (session.session_end - session.session_start).total_seconds()
    else:
        duration = 0

    return {
        'session_id': session_id,
        'user_id': events[0].user_id if events else None,
        'start_time': session.session_start,
        'end_time': session.session_end,
        'duration_seconds': duration,
        'event_count': len(session.events),
        'page_views': session.page_views,
        'add_to_cart': session.add_to_cart,
        'purchases': session.purchases,
        'total_revenue': session.total_revenue,
        'converted': session.purchases > 0
    }

# Apply sessionization
sessions = enriched \
    .groupByKey(lambda x: x.session_id) \
    .mapGroupsWithState(
        update_session,
        GroupStateTimeout.ProcessingTimeTimeout
    )

Real-Time Metrics

# real_time_metrics.py

# Calculate real-time KPIs
metrics = enriched \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes", "1 minute"),
        "product_category",
        "user_segment",
        "device"
    ) \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_id").alias("unique_users"),
        countDistinct("session_id").alias("unique_sessions"),
        sum(when(col("event_type") == "page_view", 1).otherwise(0)).alias("page_views"),
        sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        sum(when(col("event_type") == "purchase", col("revenue")).otherwise(0)).alias("total_revenue"),
        avg(when(col("event_type") == "purchase", col("revenue"))).alias("avg_order_value")
    ) \
    .withColumn("conversion_rate",
        (col("purchases") / col("unique_sessions") * 100))

# Write to Gold layer (aggregated metrics)
gold_query = metrics.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/delta/checkpoints/gold") \
    .trigger(processingTime="1 minute") \
    .start("/delta/gold/metrics")

Part 3: Machine Learning

Feature Engineering

# feature_engineering.py

# User features (last 7 days)
user_features = enriched \
    .withWatermark("timestamp", "7 days") \
    .groupBy(
        window("timestamp", "7 days", "1 day"),
        "user_id"
    ) \
    .agg(
        count("*").alias("total_events"),
        countDistinct("session_id").alias("session_count"),
        sum(when(col("event_type") == "page_view", 1).otherwise(0)).alias("page_views"),
        sum(when(col("event_type") == "add_to_cart", 1).otherwise(0)).alias("cart_additions"),
        sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        sum("revenue").alias("total_spent"),
        avg("revenue").alias("avg_order_value"),
        countDistinct("product_category").alias("categories_viewed"),
        countDistinct("product_id").alias("products_viewed"),
        max("timestamp").alias("last_activity")
    ) \
    .withColumn("conversion_rate",
        col("purchases") / col("session_count")) \
    .withColumn("cart_abandonment_rate",
        (col("cart_additions") - col("purchases")) / col("cart_additions")) \
    .withColumn("days_since_last_activity",
        datediff(current_timestamp(), col("last_activity")))

Real-Time Recommendations

# recommendations.py
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Prepare training data (batch)
interactions = spark.read \
    .format("delta") \
    .load("/delta/silver/enriched_events") \
    .filter(col("event_type").isin(["page_view", "add_to_cart", "purchase"])) \
    .withColumn("rating",
        when(col("event_type") == "purchase", 3.0)
        .when(col("event_type") == "add_to_cart", 2.0)
        .otherwise(1.0)
    ) \
    .groupBy("user_id", "product_id") \
    .agg(sum("rating").alias("rating"))

# Train ALS model
als = ALS(
    userCol="user_id",
    itemCol="product_id",
    ratingCol="rating",
    rank=10,
    maxIter=10,
    regParam=0.1,
    coldStartStrategy="drop"
)

model = als.fit(interactions)

# Generate recommendations
user_recs = model.recommendForAllUsers(10)

# Broadcast model for real-time scoring
broadcast_model = spark.sparkContext.broadcast(model)

def get_recommendations(user_id):
    """Get real-time recommendations for user"""
    model = broadcast_model.value
    recs = model.recommendForUserSubset(
        spark.createDataFrame([(user_id,)], ["user_id"]),
        10
    )
    return recs.collect()[0].recommendations

# Apply to stream
recommendations_stream = enriched \
    .filter(col("event_type") == "page_view") \
    .select("user_id", "timestamp") \
    .dropDuplicates(["user_id"]) \
    .withColumn("recommendations",
        udf(get_recommendations)("user_id"))

Churn Prediction

# churn_prediction.py
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler

# Prepare features for churn prediction
churn_features = user_features.withColumn(
    "is_churned",
    when(col("days_since_last_activity") > 30, 1).otherwise(0)
)

# Feature vector
feature_cols = [
    "session_count",
    "page_views",
    "cart_additions",
    "purchases",
    "total_spent",
    "conversion_rate",
    "cart_abandonment_rate",
    "days_since_last_activity"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# Train model
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="is_churned",
    maxIter=20,
    maxDepth=5
)

pipeline = Pipeline(stages=[assembler, gbt])
model = pipeline.fit(churn_features)

# Real-time scoring
churn_scores = user_features \
    .transform(model) \
    .select("user_id", "prediction", "probability") \
    .withColumn("churn_risk",
        col("probability").getItem(1) * 100)

Part 4: Analytics and Serving

Batch Analytics

# batch_analytics.py

# Daily user activity report
daily_users = spark.read \
    .format("delta") \
    .load("/delta/gold/metrics") \
    .filter(col("window.start").cast("date") == current_date()) \
    .groupBy("product_category", "user_segment") \
    .agg(
        sum("unique_users").alias("total_users"),
        sum("purchases").alias("total_purchases"),
        sum("total_revenue").alias("total_revenue"),
        avg("conversion_rate").alias("avg_conversion_rate")
    ) \
    .orderBy(desc("total_revenue"))

# Product performance
product_performance = spark.read \
    .format("delta") \
    .load("/delta/silver/enriched_events") \
    .filter(col("event_type") == "purchase") \
    .groupBy("product_id", "product_name", "product_category") \
    .agg(
        count("*").alias("units_sold"),
        sum("revenue").alias("total_revenue"),
        countDistinct("user_id").alias("unique_buyers")
    ) \
    .withColumn("rank",
        dense_rank().over(
            Window.partitionBy("product_category")
            .orderBy(desc("total_revenue"))
        )
    ) \
    .filter(col("rank") <= 10)

# Save reports
daily_users.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/reports/daily_users")

product_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/delta/reports/top_products")

REST API

# api.py
from flask import Flask, jsonify, request
from pyspark.sql import SparkSession

app = Flask(__name__)

# Initialize Spark
spark = SparkSession.builder \
    .appName("AnalyticsAPI") \
    .getOrCreate()

@app.route('/metrics/realtime', methods=['GET'])
def get_realtime_metrics():
    """Get latest real-time metrics"""
    metrics = spark.read \
        .format("delta") \
        .load("/delta/gold/metrics") \
        .orderBy(desc("window.end")) \
        .limit(100) \
        .toPandas()

    return jsonify(metrics.to_dict('records'))

@app.route('/user/<int:user_id>/recommendations', methods=['GET'])
def get_user_recommendations(user_id):
    """Get personalized recommendations"""
    recs = spark.read \
        .format("delta") \
        .load("/delta/ml/recommendations") \
        .filter(col("user_id") == user_id) \
        .toPandas()

    return jsonify(recs.to_dict('records'))

@app.route('/user/<int:user_id>/churn-risk', methods=['GET'])
def get_churn_risk(user_id):
    """Get churn prediction"""
    risk = spark.read \
        .format("delta") \
        .load("/delta/ml/churn_scores") \
        .filter(col("user_id") == user_id) \
        .select("churn_risk", "last_activity") \
        .toPandas()

    return jsonify(risk.to_dict('records'))

@app.route('/dashboard/kpis', methods=['GET'])
def get_dashboard_kpis():
    """Get dashboard KPIs"""
    kpis = spark.sql("""
        SELECT
            SUM(unique_users) as total_users,
            SUM(purchases) as total_purchases,
            SUM(total_revenue) as total_revenue,
            AVG(conversion_rate) as avg_conversion_rate
        FROM delta.`/delta/gold/metrics`
        WHERE window.start >= CURRENT_DATE - INTERVAL 1 DAY
    """).toPandas()

    return jsonify(kpis.to_dict('records')[0])

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

Monitoring Dashboard

# dashboard.py
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd

def create_dashboard():
    """Create real-time analytics dashboard"""

    # Read latest metrics
    metrics = spark.read \
        .format("delta") \
        .load("/delta/gold/metrics") \
        .filter(col("window.start") >= current_date() - interval("7 days")) \
        .toPandas()

    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=(
            'Revenue Trend',
            'Conversion Rate',
            'User Engagement',
            'Product Categories'
        )
    )

    # Revenue trend
    fig.add_trace(
        go.Scatter(
            x=metrics['window_start'],
            y=metrics['total_revenue'],
            mode='lines+markers',
            name='Revenue'
        ),
        row=1, col=1
    )

    # Conversion rate
    fig.add_trace(
        go.Scatter(
            x=metrics['window_start'],
            y=metrics['conversion_rate'],
            mode='lines+markers',
            name='Conversion %'
        ),
        row=1, col=2
    )

    # User engagement
    fig.add_trace(
        go.Bar(
            x=metrics['window_start'],
            y=metrics['unique_users'],
            name='Active Users'
        ),
        row=2, col=1
    )

    # Category distribution
    category_sales = metrics.groupby('product_category')['total_revenue'].sum()
    fig.add_trace(
        go.Pie(
            labels=category_sales.index,
            values=category_sales.values,
            name='Categories'
        ),
        row=2, col=2
    )

    fig.update_layout(height=800, showlegend=True, title_text="Real-Time Analytics Dashboard")
    return fig

# Update dashboard every minute
if __name__ == "__main__":
    from dash import Dash, dcc, html
    from dash.dependencies import Input, Output

    app = Dash(__name__)

    app.layout = html.Div([
        html.H1("E-Commerce Real-Time Analytics"),
        dcc.Graph(id='dashboard'),
        dcc.Interval(
            id='interval-component',
            interval=60*1000,  # Update every minute
            n_intervals=0
        )
    ])

    @app.callback(
        Output('dashboard', 'figure'),
        Input('interval-component', 'n_intervals')
    )
    def update_dashboard(n):
        return create_dashboard()

    app.run_server(debug=True)

Part 5: Deployment and Testing

Docker Compose Setup

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

  postgres:
    image: postgres:14
    environment:
      POSTGRES_DB: ecommerce
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  spark-master:
    image: bitnami/spark:3.5.0
    environment:
      - SPARK_MODE=master
      - SPARK_MASTER_HOST=spark-master
    ports:
      - "8080:8080"
      - "7077:7077"

  spark-worker:
    image: bitnami/spark:3.5.0
    depends_on:
      - spark-master
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=2
    deploy:
      replicas: 3

volumes:
  postgres_data:

Testing Suite

# test_pipeline.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder \
        .appName("PipelineTests") \
        .master("local[*]") \
        .getOrCreate()

def test_event_ingestion(spark):
    """Test event parsing and validation"""
    test_events = [
        {
            "event_id": "test_001",
            "event_type": "purchase",
            "user_id": 1,
            "product_id": 100,
            "timestamp": "2024-01-01T12:00:00",
            "revenue": 50.0
        }
    ]

    df = spark.createDataFrame(test_events)
    assert df.count() == 1
    assert df.filter(col("event_type") == "purchase").count() == 1

def test_enrichment(spark):
    """Test event enrichment logic"""
    events = spark.createDataFrame([
        (1, 100, 50.0)
    ], ["user_id", "product_id", "revenue"])

    users = spark.createDataFrame([
        (1, "John", "premium")
    ], ["user_id", "user_name", "segment"])

    enriched = events.join(users, "user_id")
    assert enriched.count() == 1
    assert enriched.select("user_name").collect()[0][0] == "John"

def test_metrics_calculation(spark):
    """Test real-time metrics aggregation"""
    events = spark.createDataFrame([
        ("2024-01-01 12:00:00", "purchase", 1, 100, 50.0),
        ("2024-01-01 12:01:00", "purchase", 2, 101, 75.0),
        ("2024-01-01 12:02:00", "page_view", 3, 102, 0.0)
    ], ["timestamp", "event_type", "user_id", "product_id", "revenue"])

    metrics = events \
        .groupBy("event_type") \
        .agg(
            count("*").alias("count"),
            sum("revenue").alias("total_revenue")
        )

    purchases = metrics.filter(col("event_type") == "purchase").collect()[0]
    assert purchases["count"] == 2
    assert purchases["total_revenue"] == 125.0

def test_recommendations(spark):
    """Test recommendation generation"""
    # Implementation of recommendation tests
    pass

if __name__ == "__main__":
    pytest.main([__file__])

Summary and Next Steps

What You’ve Built

A production-grade real-time analytics pipeline including:
  1. Data Ingestion: Kafka streaming with fault tolerance
  2. Processing: Multi-layer Delta Lake architecture (Bronze/Silver/Gold)
  3. Analytics: Real-time metrics and aggregations
  4. Machine Learning: Recommendations and churn prediction
  5. Serving: REST API and interactive dashboard
  6. Operations: Monitoring, logging, and testing

Production Readiness Checklist

  • Configure proper resource allocation
  • Set up monitoring and alerting
  • Implement data quality checks
  • Add comprehensive error handling
  • Configure checkpointing and recovery
  • Set up CI/CD pipeline
  • Implement security (authentication/authorization)
  • Add data retention policies
  • Configure auto-scaling
  • Set up disaster recovery

Performance Optimizations

  1. Partitioning: Optimize Delta table partitioning
  2. Caching: Cache frequently accessed dimension tables
  3. Shuffles: Minimize shuffle operations
  4. File Size: Compact small files regularly
  5. Z-Ordering: Optimize for common query patterns

Extensions and Improvements

  1. Advanced ML: Implement more sophisticated models
  2. A/B Testing: Add experimentation framework
  3. Anomaly Detection: Real-time fraud detection
  4. Personalization: Enhanced recommendation algorithms
  5. Multi-Region: Deploy across multiple regions
Congratulations! You’ve completed the Spark mastery course and built a production-ready real-time analytics system.
This capstone project demonstrates all concepts learned throughout the course. Use it as a template for building your own data engineering solutions.