Capstone Project: Real-Time Analytics Pipeline
Module Duration: 6-8 hours
Focus: End-to-end production system
Prerequisites: All previous modules
Project Overview
Build a complete real-time analytics pipeline that processes streaming e-commerce data, performs real-time analytics, generates recommendations, and provides insights through dashboards.System Architecture
Copy
Data Sources
├── User Events (Kafka)
├── Product Catalog (PostgreSQL)
└── User Profiles (S3)
↓
Streaming Ingestion (Structured Streaming)
↓
Real-Time Processing
├── Event Enrichment
├── Sessionization
├── Feature Engineering
└── Anomaly Detection
↓
Machine Learning
├── Real-Time Recommendations
├── Churn Prediction
└── Customer Segmentation
↓
Storage & Analytics
├── Delta Lake (Bronze/Silver/Gold)
├── Aggregated Metrics
└── ML Model Registry
↓
Serving Layer
├── REST API
├── Real-Time Dashboard
└── Alert System
Part 1: Data Ingestion
Kafka Producer (Event Generator)
Copy
# event_generator.py
from kafka import KafkaProducer
import json
import random
import time
from datetime import datetime
class EventGenerator:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_user_event(self):
"""Generate realistic user events"""
event_types = ['page_view', 'add_to_cart', 'purchase', 'search', 'remove_from_cart']
users = range(1, 10001) # 10k users
products = range(1, 1001) # 1k products
return {
'event_id': f"evt_{int(time.time() * 1000)}_{random.randint(0, 9999)}",
'event_type': random.choice(event_types),
'user_id': random.choice(users),
'product_id': random.choice(products),
'timestamp': datetime.utcnow().isoformat(),
'session_id': f"session_{random.randint(1, 5000)}",
'page_url': f"/product/{random.choice(products)}",
'device': random.choice(['mobile', 'desktop', 'tablet']),
'price': round(random.uniform(10, 1000), 2),
'quantity': random.randint(1, 5),
'referrer': random.choice(['google', 'facebook', 'direct', 'email']),
'location': {
'country': random.choice(['US', 'UK', 'CA', 'DE', 'FR']),
'city': random.choice(['New York', 'London', 'Toronto', 'Berlin', 'Paris'])
}
}
def run(self, events_per_second=100, duration_seconds=None):
"""Generate events continuously"""
start_time = time.time()
events_generated = 0
try:
while True:
if duration_seconds and (time.time() - start_time) > duration_seconds:
break
event = self.generate_user_event()
self.producer.send('user-events', value=event)
events_generated += 1
# Rate limiting
time.sleep(1.0 / events_per_second)
if events_generated % 1000 == 0:
print(f"Generated {events_generated} events")
except KeyboardInterrupt:
print("Stopping event generator...")
finally:
self.producer.close()
print(f"Total events generated: {events_generated}")
if __name__ == "__main__":
generator = EventGenerator(['localhost:9092'])
generator.run(events_per_second=100)
Structured Streaming Ingestion
Copy
# streaming_ingestion.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark with Delta Lake
spark = SparkSession.builder \
.appName("RealTimeAnalytics") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Define event schema
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("user_id", IntegerType(), False),
StructField("product_id", IntegerType(), False),
StructField("timestamp", TimestampType(), False),
StructField("session_id", StringType(), False),
StructField("page_url", StringType(), True),
StructField("device", StringType(), True),
StructField("price", DoubleType(), True),
StructField("quantity", IntegerType(), True),
StructField("referrer", StringType(), True),
StructField("location", StructType([
StructField("country", StringType(), True),
StructField("city", StringType(), True)
]), True)
])
# Read from Kafka
raw_events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
# Parse JSON events
parsed_events = raw_events.select(
from_json(col("value").cast("string"), event_schema).alias("data"),
col("timestamp").alias("kafka_timestamp"),
col("offset"),
col("partition")
).select("data.*", "kafka_timestamp", "offset", "partition")
# Write to Bronze layer (raw data)
bronze_query = parsed_events.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/delta/checkpoints/bronze") \
.trigger(processingTime="10 seconds") \
.start("/delta/bronze/events")
Part 2: Real-Time Processing
Event Enrichment
Copy
# enrichment.py
from pyspark.sql.functions import broadcast
# Load dimension tables
products = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/ecommerce") \
.option("dbtable", "products") \
.option("user", "postgres") \
.option("password", "password") \
.load()
users = spark.read \
.format("parquet") \
.load("/data/users")
# Broadcast small tables
broadcast_products = broadcast(products)
broadcast_users = broadcast(users)
# Read from Bronze
bronze_events = spark.readStream \
.format("delta") \
.load("/delta/bronze/events")
# Enrich events
enriched = bronze_events \
.join(broadcast_products, "product_id") \
.join(broadcast_users, "user_id") \
.select(
"event_id",
"event_type",
"user_id",
"user_name",
"user_segment",
"user_lifetime_value",
"product_id",
"product_name",
"product_category",
"product_price",
"timestamp",
"session_id",
"device",
"price",
"quantity",
(col("price") * col("quantity")).alias("revenue"),
"referrer",
"location"
)
# Write to Silver layer (enriched data)
silver_query = enriched.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/delta/checkpoints/silver") \
.trigger(processingTime="10 seconds") \
.start("/delta/silver/enriched_events")
Session Analysis
Copy
# sessionization.py
from pyspark.sql.streaming import GroupState, GroupStateTimeout
class SessionState:
def __init__(self):
self.session_start = None
self.session_end = None
self.events = []
self.page_views = 0
self.add_to_cart = 0
self.purchases = 0
self.total_revenue = 0.0
def update_session(session_id, events, state):
"""Update session state with new events"""
if state.exists:
session = state.get()
else:
session = SessionState()
for event in events:
# Update session metrics
if session.session_start is None:
session.session_start = event.timestamp
session.session_end = event.timestamp
session.events.append(event)
# Count event types
if event.event_type == 'page_view':
session.page_views += 1
elif event.event_type == 'add_to_cart':
session.add_to_cart += 1
elif event.event_type == 'purchase':
session.purchases += 1
session.total_revenue += event.revenue
# Set timeout for inactive sessions (30 minutes)
state.update(session)
state.setTimeoutDuration("30 minutes")
# Calculate session duration
if session.session_start and session.session_end:
duration = (session.session_end - session.session_start).total_seconds()
else:
duration = 0
return {
'session_id': session_id,
'user_id': events[0].user_id if events else None,
'start_time': session.session_start,
'end_time': session.session_end,
'duration_seconds': duration,
'event_count': len(session.events),
'page_views': session.page_views,
'add_to_cart': session.add_to_cart,
'purchases': session.purchases,
'total_revenue': session.total_revenue,
'converted': session.purchases > 0
}
# Apply sessionization
sessions = enriched \
.groupByKey(lambda x: x.session_id) \
.mapGroupsWithState(
update_session,
GroupStateTimeout.ProcessingTimeTimeout
)
Real-Time Metrics
Copy
# real_time_metrics.py
# Calculate real-time KPIs
metrics = enriched \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes", "1 minute"),
"product_category",
"user_segment",
"device"
) \
.agg(
count("*").alias("event_count"),
countDistinct("user_id").alias("unique_users"),
countDistinct("session_id").alias("unique_sessions"),
sum(when(col("event_type") == "page_view", 1).otherwise(0)).alias("page_views"),
sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
sum(when(col("event_type") == "purchase", col("revenue")).otherwise(0)).alias("total_revenue"),
avg(when(col("event_type") == "purchase", col("revenue"))).alias("avg_order_value")
) \
.withColumn("conversion_rate",
(col("purchases") / col("unique_sessions") * 100))
# Write to Gold layer (aggregated metrics)
gold_query = metrics.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/delta/checkpoints/gold") \
.trigger(processingTime="1 minute") \
.start("/delta/gold/metrics")
Part 3: Machine Learning
Feature Engineering
Copy
# feature_engineering.py
# User features (last 7 days)
user_features = enriched \
.withWatermark("timestamp", "7 days") \
.groupBy(
window("timestamp", "7 days", "1 day"),
"user_id"
) \
.agg(
count("*").alias("total_events"),
countDistinct("session_id").alias("session_count"),
sum(when(col("event_type") == "page_view", 1).otherwise(0)).alias("page_views"),
sum(when(col("event_type") == "add_to_cart", 1).otherwise(0)).alias("cart_additions"),
sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
sum("revenue").alias("total_spent"),
avg("revenue").alias("avg_order_value"),
countDistinct("product_category").alias("categories_viewed"),
countDistinct("product_id").alias("products_viewed"),
max("timestamp").alias("last_activity")
) \
.withColumn("conversion_rate",
col("purchases") / col("session_count")) \
.withColumn("cart_abandonment_rate",
(col("cart_additions") - col("purchases")) / col("cart_additions")) \
.withColumn("days_since_last_activity",
datediff(current_timestamp(), col("last_activity")))
Real-Time Recommendations
Copy
# recommendations.py
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Prepare training data (batch)
interactions = spark.read \
.format("delta") \
.load("/delta/silver/enriched_events") \
.filter(col("event_type").isin(["page_view", "add_to_cart", "purchase"])) \
.withColumn("rating",
when(col("event_type") == "purchase", 3.0)
.when(col("event_type") == "add_to_cart", 2.0)
.otherwise(1.0)
) \
.groupBy("user_id", "product_id") \
.agg(sum("rating").alias("rating"))
# Train ALS model
als = ALS(
userCol="user_id",
itemCol="product_id",
ratingCol="rating",
rank=10,
maxIter=10,
regParam=0.1,
coldStartStrategy="drop"
)
model = als.fit(interactions)
# Generate recommendations
user_recs = model.recommendForAllUsers(10)
# Broadcast model for real-time scoring
broadcast_model = spark.sparkContext.broadcast(model)
def get_recommendations(user_id):
"""Get real-time recommendations for user"""
model = broadcast_model.value
recs = model.recommendForUserSubset(
spark.createDataFrame([(user_id,)], ["user_id"]),
10
)
return recs.collect()[0].recommendations
# Apply to stream
recommendations_stream = enriched \
.filter(col("event_type") == "page_view") \
.select("user_id", "timestamp") \
.dropDuplicates(["user_id"]) \
.withColumn("recommendations",
udf(get_recommendations)("user_id"))
Churn Prediction
Copy
# churn_prediction.py
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
# Prepare features for churn prediction
churn_features = user_features.withColumn(
"is_churned",
when(col("days_since_last_activity") > 30, 1).otherwise(0)
)
# Feature vector
feature_cols = [
"session_count",
"page_views",
"cart_additions",
"purchases",
"total_spent",
"conversion_rate",
"cart_abandonment_rate",
"days_since_last_activity"
]
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features"
)
# Train model
gbt = GBTClassifier(
featuresCol="features",
labelCol="is_churned",
maxIter=20,
maxDepth=5
)
pipeline = Pipeline(stages=[assembler, gbt])
model = pipeline.fit(churn_features)
# Real-time scoring
churn_scores = user_features \
.transform(model) \
.select("user_id", "prediction", "probability") \
.withColumn("churn_risk",
col("probability").getItem(1) * 100)
Part 4: Analytics and Serving
Batch Analytics
Copy
# batch_analytics.py
# Daily user activity report
daily_users = spark.read \
.format("delta") \
.load("/delta/gold/metrics") \
.filter(col("window.start").cast("date") == current_date()) \
.groupBy("product_category", "user_segment") \
.agg(
sum("unique_users").alias("total_users"),
sum("purchases").alias("total_purchases"),
sum("total_revenue").alias("total_revenue"),
avg("conversion_rate").alias("avg_conversion_rate")
) \
.orderBy(desc("total_revenue"))
# Product performance
product_performance = spark.read \
.format("delta") \
.load("/delta/silver/enriched_events") \
.filter(col("event_type") == "purchase") \
.groupBy("product_id", "product_name", "product_category") \
.agg(
count("*").alias("units_sold"),
sum("revenue").alias("total_revenue"),
countDistinct("user_id").alias("unique_buyers")
) \
.withColumn("rank",
dense_rank().over(
Window.partitionBy("product_category")
.orderBy(desc("total_revenue"))
)
) \
.filter(col("rank") <= 10)
# Save reports
daily_users.write \
.format("delta") \
.mode("overwrite") \
.save("/delta/reports/daily_users")
product_performance.write \
.format("delta") \
.mode("overwrite") \
.save("/delta/reports/top_products")
REST API
Copy
# api.py
from flask import Flask, jsonify, request
from pyspark.sql import SparkSession
app = Flask(__name__)
# Initialize Spark
spark = SparkSession.builder \
.appName("AnalyticsAPI") \
.getOrCreate()
@app.route('/metrics/realtime', methods=['GET'])
def get_realtime_metrics():
"""Get latest real-time metrics"""
metrics = spark.read \
.format("delta") \
.load("/delta/gold/metrics") \
.orderBy(desc("window.end")) \
.limit(100) \
.toPandas()
return jsonify(metrics.to_dict('records'))
@app.route('/user/<int:user_id>/recommendations', methods=['GET'])
def get_user_recommendations(user_id):
"""Get personalized recommendations"""
recs = spark.read \
.format("delta") \
.load("/delta/ml/recommendations") \
.filter(col("user_id") == user_id) \
.toPandas()
return jsonify(recs.to_dict('records'))
@app.route('/user/<int:user_id>/churn-risk', methods=['GET'])
def get_churn_risk(user_id):
"""Get churn prediction"""
risk = spark.read \
.format("delta") \
.load("/delta/ml/churn_scores") \
.filter(col("user_id") == user_id) \
.select("churn_risk", "last_activity") \
.toPandas()
return jsonify(risk.to_dict('records'))
@app.route('/dashboard/kpis', methods=['GET'])
def get_dashboard_kpis():
"""Get dashboard KPIs"""
kpis = spark.sql("""
SELECT
SUM(unique_users) as total_users,
SUM(purchases) as total_purchases,
SUM(total_revenue) as total_revenue,
AVG(conversion_rate) as avg_conversion_rate
FROM delta.`/delta/gold/metrics`
WHERE window.start >= CURRENT_DATE - INTERVAL 1 DAY
""").toPandas()
return jsonify(kpis.to_dict('records')[0])
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
Monitoring Dashboard
Copy
# dashboard.py
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
def create_dashboard():
"""Create real-time analytics dashboard"""
# Read latest metrics
metrics = spark.read \
.format("delta") \
.load("/delta/gold/metrics") \
.filter(col("window.start") >= current_date() - interval("7 days")) \
.toPandas()
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
'Revenue Trend',
'Conversion Rate',
'User Engagement',
'Product Categories'
)
)
# Revenue trend
fig.add_trace(
go.Scatter(
x=metrics['window_start'],
y=metrics['total_revenue'],
mode='lines+markers',
name='Revenue'
),
row=1, col=1
)
# Conversion rate
fig.add_trace(
go.Scatter(
x=metrics['window_start'],
y=metrics['conversion_rate'],
mode='lines+markers',
name='Conversion %'
),
row=1, col=2
)
# User engagement
fig.add_trace(
go.Bar(
x=metrics['window_start'],
y=metrics['unique_users'],
name='Active Users'
),
row=2, col=1
)
# Category distribution
category_sales = metrics.groupby('product_category')['total_revenue'].sum()
fig.add_trace(
go.Pie(
labels=category_sales.index,
values=category_sales.values,
name='Categories'
),
row=2, col=2
)
fig.update_layout(height=800, showlegend=True, title_text="Real-Time Analytics Dashboard")
return fig
# Update dashboard every minute
if __name__ == "__main__":
from dash import Dash, dcc, html
from dash.dependencies import Input, Output
app = Dash(__name__)
app.layout = html.Div([
html.H1("E-Commerce Real-Time Analytics"),
dcc.Graph(id='dashboard'),
dcc.Interval(
id='interval-component',
interval=60*1000, # Update every minute
n_intervals=0
)
])
@app.callback(
Output('dashboard', 'figure'),
Input('interval-component', 'n_intervals')
)
def update_dashboard(n):
return create_dashboard()
app.run_server(debug=True)
Part 5: Deployment and Testing
Docker Compose Setup
Copy
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
postgres:
image: postgres:14
environment:
POSTGRES_DB: ecommerce
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
spark-master:
image: bitnami/spark:3.5.0
environment:
- SPARK_MODE=master
- SPARK_MASTER_HOST=spark-master
ports:
- "8080:8080"
- "7077:7077"
spark-worker:
image: bitnami/spark:3.5.0
depends_on:
- spark-master
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=2
deploy:
replicas: 3
volumes:
postgres_data:
Testing Suite
Copy
# test_pipeline.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder \
.appName("PipelineTests") \
.master("local[*]") \
.getOrCreate()
def test_event_ingestion(spark):
"""Test event parsing and validation"""
test_events = [
{
"event_id": "test_001",
"event_type": "purchase",
"user_id": 1,
"product_id": 100,
"timestamp": "2024-01-01T12:00:00",
"revenue": 50.0
}
]
df = spark.createDataFrame(test_events)
assert df.count() == 1
assert df.filter(col("event_type") == "purchase").count() == 1
def test_enrichment(spark):
"""Test event enrichment logic"""
events = spark.createDataFrame([
(1, 100, 50.0)
], ["user_id", "product_id", "revenue"])
users = spark.createDataFrame([
(1, "John", "premium")
], ["user_id", "user_name", "segment"])
enriched = events.join(users, "user_id")
assert enriched.count() == 1
assert enriched.select("user_name").collect()[0][0] == "John"
def test_metrics_calculation(spark):
"""Test real-time metrics aggregation"""
events = spark.createDataFrame([
("2024-01-01 12:00:00", "purchase", 1, 100, 50.0),
("2024-01-01 12:01:00", "purchase", 2, 101, 75.0),
("2024-01-01 12:02:00", "page_view", 3, 102, 0.0)
], ["timestamp", "event_type", "user_id", "product_id", "revenue"])
metrics = events \
.groupBy("event_type") \
.agg(
count("*").alias("count"),
sum("revenue").alias("total_revenue")
)
purchases = metrics.filter(col("event_type") == "purchase").collect()[0]
assert purchases["count"] == 2
assert purchases["total_revenue"] == 125.0
def test_recommendations(spark):
"""Test recommendation generation"""
# Implementation of recommendation tests
pass
if __name__ == "__main__":
pytest.main([__file__])
Summary and Next Steps
What You’ve Built
A production-grade real-time analytics pipeline including:- Data Ingestion: Kafka streaming with fault tolerance
- Processing: Multi-layer Delta Lake architecture (Bronze/Silver/Gold)
- Analytics: Real-time metrics and aggregations
- Machine Learning: Recommendations and churn prediction
- Serving: REST API and interactive dashboard
- Operations: Monitoring, logging, and testing
Production Readiness Checklist
- Configure proper resource allocation
- Set up monitoring and alerting
- Implement data quality checks
- Add comprehensive error handling
- Configure checkpointing and recovery
- Set up CI/CD pipeline
- Implement security (authentication/authorization)
- Add data retention policies
- Configure auto-scaling
- Set up disaster recovery
Performance Optimizations
- Partitioning: Optimize Delta table partitioning
- Caching: Cache frequently accessed dimension tables
- Shuffles: Minimize shuffle operations
- File Size: Compact small files regularly
- Z-Ordering: Optimize for common query patterns
Extensions and Improvements
- Advanced ML: Implement more sophisticated models
- A/B Testing: Add experimentation framework
- Anomaly Detection: Real-time fraud detection
- Personalization: Enhanced recommendation algorithms
- Multi-Region: Deploy across multiple regions
This capstone project demonstrates all concepts learned throughout the course. Use it as a template for building your own data engineering solutions.