Skip to main content

Capstone Project: Building a Real-Time Analytics Platform

Project Duration: 20-30 hours Learning Style: Hands-On Implementation + Architecture Design + Production Deployment Outcome: A complete, production-ready Cassandra application demonstrating mastery of all concepts

Project Overview

You will design and implement SensorMetrics, a real-time IoT analytics platform that:
  • Ingests millions of sensor readings per second from 100,000+ devices
  • Stores time-series data with automatic TTL-based expiration
  • Provides real-time dashboards with low-latency queries (< 10ms p95)
  • Supports multi-datacenter deployment for geographic distribution
  • Handles device failures gracefully (late data, out-of-order events)
  • Operates 24/7 with 99.9% uptime and automated recovery
This capstone synthesizes concepts from all previous modules:
  • Architecture (consistent hashing, replication, vnodes)
  • Data Modeling (partition keys, clustering columns, time-series patterns)
  • Read/Write Internals (CommitLog, MemTable, SSTable, compaction)
  • Cluster Operations (gossip, failure detection, repair, multi-DC)
  • Performance (JVM tuning, monitoring, capacity planning)

Part 1: Requirements Analysis

Functional Requirements

1. Data Ingestion:
  • 100,000 sensors sending metrics every 10 seconds
  • Throughput: 10,000 writes/second sustained, 50,000 writes/second peak
  • Metrics: temperature, humidity, pressure, battery level
  • Late arrivals: Up to 5 minutes delayed data must be accepted
  • Deduplication: Same reading shouldn’t be stored twice
2. Data Retention:
  • Hot data: Last 7 days (low-latency queries)
  • Warm data: 8-30 days (moderate latency acceptable)
  • Cold data: 31-365 days (high latency acceptable, compressed)
  • Ancient data: > 365 days deleted automatically (TTL)
3. Query Patterns: Pattern 1: Recent metrics for a single sensor
SELECT temperature, humidity, timestamp
FROM sensor_metrics
WHERE sensor_id = 'sensor-12345'
  AND timestamp > now() - 1h;
  • Frequency: Very high (1000s/sec)
  • Latency requirement: < 10ms p95
Pattern 2: Aggregate metrics for a sensor
SELECT sensor_id, date, avg_temperature, max_temperature, min_temperature
FROM sensor_daily_aggregates
WHERE sensor_id = 'sensor-12345'
  AND date >= '2023-11-01'
  AND date <= '2023-11-15';
  • Frequency: Moderate (100s/sec)
  • Latency requirement: < 50ms p95
Pattern 3: All sensors in a building
SELECT sensor_id, latest_temperature, latest_timestamp
FROM sensors_by_building
WHERE building_id = 'building-789';
  • Frequency: Low (10s/sec)
  • Latency requirement: < 100ms p95
Pattern 4: Anomaly alerts
SELECT sensor_id, temperature, timestamp
FROM sensor_metrics
WHERE sensor_id = 'sensor-12345'
  AND timestamp > now() - 10m
  AND temperature > 80  -- Filtering (requires ALLOW FILTERING or index)
  • Frequency: Low (monitoring system)
  • Latency requirement: < 1s

Non-Functional Requirements

Availability:
  • 99.9% uptime (< 9 hours downtime/year)
  • No single point of failure
  • Graceful degradation during node failures
Scalability:
  • Horizontal scaling to 1M+ sensors
  • Linear performance scaling with nodes
  • Automatic rebalancing on node addition
Performance:
  • Write latency: < 5ms p95
  • Read latency: < 10ms p95 (hot data)
  • Throughput: 50,000 writes/sec peak
Durability:
  • No data loss for acknowledged writes
  • Automatic replication (RF=3)
  • Regular backups with 7-day retention
Geographic Distribution:
  • Multi-datacenter deployment (US-East, EU-West)
  • Local reads/writes (< 50ms latency)
  • Eventual consistency across DCs (< 10 seconds)

Part 2: Architecture Design

High-Level Architecture

┌─────────────────────────────────────────────────────────────┐
│                     IoT Devices (100K+)                      │
└──────────────┬──────────────────────────────────────────────┘
               │ HTTP POST /metrics

┌─────────────────────────────────────────────────────────────┐
│              Load Balancer (HAProxy / AWS ALB)               │
└──────────────┬──────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│           Ingestion API (Python Flask / FastAPI)             │
│  • Validation                                                 │
│  • Deduplication (bloom filter)                              │
│  • Batching (100 writes → 1 batch)                           │
└──────────────┬──────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                    Cassandra Cluster                         │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Datacenter: us-east (3 nodes)                       │   │
│  │  Datacenter: eu-west (3 nodes)                       │   │
│  │  RF=3 per DC, CL=LOCAL_QUORUM                        │   │
│  └─────────────────────────────────────────────────────┘   │
└──────────────┬──────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│         Background Processing (Python / Spark)               │
│  • Compute daily aggregates                                  │
│  • Anomaly detection                                         │
│  • Data quality monitoring                                   │
└──────────────┬──────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│           Dashboard API (Python FastAPI)                     │
│  • Real-time queries                                         │
│  • Aggregated queries                                        │
│  • Caching (Redis)                                           │
└──────────────┬──────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│              Frontend (React / Grafana)                      │
│  • Real-time charts                                          │
│  • Alert dashboard                                           │
└─────────────────────────────────────────────────────────────┘

Cassandra Cluster Design

Cluster Configuration:
Datacenter: us-east
├── Node 1: 10.0.1.10 (64GB RAM, 8 cores, 4TB SSD)
├── Node 2: 10.0.1.11 (64GB RAM, 8 cores, 4TB SSD)
└── Node 3: 10.0.1.12 (64GB RAM, 8 cores, 4TB SSD)

Datacenter: eu-west
├── Node 4: 10.1.1.10 (64GB RAM, 8 cores, 4TB SSD)
├── Node 5: 10.1.1.11 (64GB RAM, 8 cores, 4TB SSD)
└── Node 6: 10.1.1.12 (64GB RAM, 8 cores, 4TB SSD)

Replication Factor: 3 per DC
Total Capacity: 6 nodes × 4TB = 24TB raw (8TB effective with RF=3)
Capacity Planning:
Data volume calculation:
• 100,000 sensors × 1 reading/10s = 10,000 writes/sec
• Each reading: ~200 bytes (sensor_id, timestamp, 4 metrics, metadata)
• Daily: 10,000 writes/s × 86,400s × 200 bytes = 172.8 GB/day
• 365 days: 172.8 GB × 365 = 63 TB/year

With RF=3: 63 TB × 3 = 189 TB/year total cluster data

Compression (TWCS): ~50% compression ratio
Actual: 189 TB × 0.5 = 94.5 TB/year

With 6 nodes: 94.5 TB / 6 ≈ 16 TB/node/year
Current storage: 4 TB/node → Need expansion within 3 months!

Recommendation: Start with 12 nodes (2TB/node/year = 2 years runway)

Part 3: Data Model Design

Schema Design Process

Step 1: Identify Queries (done in Part 1) Step 2: Design Tables (one table per query pattern)

Table 1: Raw Sensor Metrics (Query Pattern 1)

Query:
SELECT temperature, humidity, pressure, battery, timestamp
FROM sensor_metrics
WHERE sensor_id = ?
  AND timestamp > ?
  AND timestamp < ?;
Schema:
CREATE TABLE sensor_metrics (
    sensor_id text,           -- Partition key
    timestamp timestamp,      -- Clustering key (time-series)
    temperature decimal,
    humidity decimal,
    pressure decimal,
    battery_level int,        -- 0-100
    metadata map<text, text>, -- Extensible metadata
    PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)  -- Newest first
  AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_size': '1',
    'compaction_window_unit': 'DAYS'
  }
  AND default_time_to_live = 31536000;  -- 365 days
Design Decisions:
  1. Partition Key = sensor_id:
    • Queries are always for a specific sensor
    • Ensures even distribution (100K sensors)
    • Partition size: ~200 bytes/reading × 8,640 readings/day = 1.7 MB/day (acceptable)
  2. Clustering Key = timestamp DESC:
    • Time-series data sorted newest-first
    • Efficient range queries (WHERE timestamp > ?)
    • Descending order for “latest N readings” queries
  3. TWCS Compaction:
    • Time-bucketed data (1-day windows)
    • Entire SSTable dropped when TTL expires (ultra-fast)
    • Minimal compaction overhead
  4. TTL = 365 days:
    • Automatic deletion (no manual cleanup)
    • Aligns with retention policy
Partition Size Validation:
Max partition size = 200 bytes × (365 days × 8,640 readings/day)
                   = 200 bytes × 3,153,600 readings
                   = 630 MB (acceptable, < 100MB warning threshold per year)

Table 2: Daily Aggregates (Query Pattern 2)

Query:
SELECT date, avg_temperature, max_temperature, min_temperature, sample_count
FROM sensor_daily_aggregates
WHERE sensor_id = ?
  AND date >= ?
  AND date <= ?;
Schema:
CREATE TABLE sensor_daily_aggregates (
    sensor_id text,
    date date,               -- Clustering key (YYYY-MM-DD)
    avg_temperature decimal,
    max_temperature decimal,
    min_temperature decimal,
    avg_humidity decimal,
    max_humidity decimal,
    min_humidity decimal,
    avg_pressure decimal,
    max_pressure decimal,
    min_pressure decimal,
    sample_count bigint,     -- Number of readings aggregated
    PRIMARY KEY (sensor_id, date)
) WITH CLUSTERING ORDER BY (date DESC)
  AND compaction = {
    'class': 'LeveledCompactionStrategy',
    'sstable_size_in_mb': 160
  }
  AND default_time_to_live = 94608000;  -- 3 years (longer retention for aggregates)
Design Decisions:
  1. Partition Key = sensor_id:
    • Matches query pattern
    • Small partitions (1 row/day = 365 rows/year)
  2. Clustering Key = date:
    • Enables efficient range queries
    • Sorted descending (recent aggregates first)
  3. LCS Compaction:
    • Aggregates updated daily (read-modify-write)
    • LCS handles updates efficiently
  4. Longer TTL (3 years):
    • Aggregates more valuable than raw data
    • Smaller storage footprint (365 rows/sensor vs 3M+ raw readings)

Table 3: Sensors by Building (Query Pattern 3)

Query:
SELECT sensor_id, latest_temperature, latest_humidity, latest_timestamp
FROM sensors_by_building
WHERE building_id = ?;
Schema:
CREATE TABLE sensors_by_building (
    building_id text,
    sensor_id text,          -- Clustering key (allows filtering by sensor)
    latest_temperature decimal,
    latest_humidity decimal,
    latest_pressure decimal,
    latest_battery_level int,
    latest_timestamp timestamp,
    sensor_location text,    -- e.g., "Floor 3, Room 301"
    PRIMARY KEY (building_id, sensor_id)
) WITH compaction = {
    'class': 'LeveledCompactionStrategy',
    'sstable_size_in_mb': 160
  };
Design Decisions:
  1. Partition Key = building_id:
    • Query retrieves all sensors in a building
    • Partition size: ~100 sensors/building × 200 bytes = 20 KB (tiny!)
  2. Clustering Key = sensor_id:
    • Sorted by sensor_id for easy lookup
    • Enables WHERE building_id = ? AND sensor_id = ? queries
  3. Denormalization:
    • Stores latest values (redundant with sensor_metrics)
    • Avoids scatter-gather query across all sensors
    • Updated on every sensor reading (write amplification accepted)
  4. No TTL:
    • Metadata table (doesn’t expire)
    • Updated in-place as new readings arrive

Table 4: Anomaly Events (Query Pattern 4)

Query:
SELECT sensor_id, event_time, event_type, metric_name, metric_value, threshold
FROM sensor_anomalies
WHERE sensor_id = ?
  AND event_time > ?;
Schema:
CREATE TABLE sensor_anomalies (
    sensor_id text,
    event_time timestamp,    -- When anomaly detected
    event_type text,         -- 'HIGH_TEMP', 'LOW_BATTERY', etc.
    metric_name text,
    metric_value decimal,
    threshold decimal,
    severity text,           -- 'WARNING', 'CRITICAL'
    PRIMARY KEY (sensor_id, event_time, event_type)
) WITH CLUSTERING ORDER BY (event_time DESC, event_type ASC)
  AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_size': '7',
    'compaction_window_unit': 'DAYS'
  }
  AND default_time_to_live = 7776000;  -- 90 days
Design Decisions:
  1. Composite Clustering Key:
    • event_time for time-series ordering
    • event_type to allow multiple anomalies at same timestamp
  2. TWCS with 7-day windows:
    • Anomalies are time-series data
    • Fast TTL-based expiration
  3. Shorter TTL (90 days):
    • Anomalies less valuable after resolution
    • Reduces storage

Complete Keyspace Definition

CREATE KEYSPACE sensor_platform WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'us-east': 3,
  'eu-west': 3
};

USE sensor_platform;

-- Tables defined above
CREATE TABLE sensor_metrics (...);
CREATE TABLE sensor_daily_aggregates (...);
CREATE TABLE sensors_by_building (...);
CREATE TABLE sensor_anomalies (...);

Part 4: Implementation

Phase 1: Ingestion API

Technology: Python FastAPI (async I/O for high throughput) File: ingestion_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BatchStatement, SimpleStatement
from datetime import datetime, timedelta
from typing import List, Dict
import hashlib

app = FastAPI()

# Cassandra connection
cluster = Cluster(
    contact_points=['10.0.1.10', '10.0.1.11', '10.0.1.12'],
    port=9042,
    load_balancing_policy=TokenAwarePolicy(
        DCAwareRoundRobinPolicy(local_dc='us-east')
    ),
    protocol_version=4,
)
session = cluster.connect('sensor_platform')

# Prepared statements (performance optimization)
insert_metric_stmt = session.prepare("""
    INSERT INTO sensor_metrics
    (sensor_id, timestamp, temperature, humidity, pressure, battery_level)
    VALUES (?, ?, ?, ?, ?, ?)
    USING TTL 31536000
""")

update_building_stmt = session.prepare("""
    UPDATE sensors_by_building
    SET latest_temperature = ?,
        latest_humidity = ?,
        latest_pressure = ?,
        latest_battery_level = ?,
        latest_timestamp = ?
    WHERE building_id = ? AND sensor_id = ?
""")

# Deduplication bloom filter (in-memory, simple implementation)
# Production: Use Redis or Cassandra-based deduplication
recent_hashes = set()

class SensorReading(BaseModel):
    sensor_id: str
    building_id: str
    timestamp: datetime
    temperature: float
    humidity: float
    pressure: float
    battery_level: int

    @validator('timestamp')
    def timestamp_not_future(cls, v):
        if v > datetime.utcnow() + timedelta(minutes=5):
            raise ValueError('Timestamp cannot be > 5 minutes in future')
        return v

    @validator('timestamp')
    def timestamp_not_too_old(cls, v):
        if v < datetime.utcnow() - timedelta(days=7):
            raise ValueError('Timestamp too old (> 7 days)')
        return v

    @validator('battery_level')
    def battery_in_range(cls, v):
        if not 0 <= v <= 100:
            raise ValueError('Battery level must be 0-100')
        return v

@app.post("/metrics")
async def ingest_metrics(readings: List[SensorReading]):
    """
    Ingest batch of sensor readings.
    Expected batch size: 100-1000 readings per request.
    """
    if len(readings) > 1000:
        raise HTTPException(status_code=400, detail="Batch too large (max 1000)")

    # Deduplication check
    unique_readings = []
    for reading in readings:
        # Hash: sensor_id + timestamp (dedup key)
        hash_key = hashlib.md5(
            f"{reading.sensor_id}_{reading.timestamp.isoformat()}".encode()
        ).hexdigest()

        if hash_key not in recent_hashes:
            unique_readings.append(reading)
            recent_hashes.add(hash_key)

            # Limit in-memory set size (rolling window)
            if len(recent_hashes) > 100000:
                recent_hashes.pop()  # Simple FIFO (production: use TTL)

    if not unique_readings:
        return {"status": "ok", "inserted": 0, "duplicates": len(readings)}

    # Batch insert (performance optimization)
    batch = BatchStatement()

    for reading in unique_readings:
        # Insert raw metric
        batch.add(insert_metric_stmt, (
            reading.sensor_id,
            reading.timestamp,
            reading.temperature,
            reading.humidity,
            reading.pressure,
            reading.battery_level
        ))

        # Update building latest values (denormalization)
        batch.add(update_building_stmt, (
            reading.temperature,
            reading.humidity,
            reading.pressure,
            reading.battery_level,
            reading.timestamp,
            reading.building_id,
            reading.sensor_id
        ))

    # Execute batch with LOCAL_QUORUM (fast, consistent)
    session.execute(batch, timeout=10.0)

    # Async anomaly detection (fire-and-forget)
    # Production: Use message queue (Kafka/RabbitMQ)
    for reading in unique_readings:
        if reading.temperature > 80:
            _trigger_anomaly_alert(reading, 'HIGH_TEMP', 'temperature', 80)
        if reading.battery_level < 10:
            _trigger_anomaly_alert(reading, 'LOW_BATTERY', 'battery_level', 10)

    return {
        "status": "ok",
        "inserted": len(unique_readings),
        "duplicates": len(readings) - len(unique_readings)
    }

def _trigger_anomaly_alert(reading: SensorReading, event_type: str, metric_name: str, threshold: float):
    """Insert anomaly event (async background task in production)"""
    insert_anomaly = session.prepare("""
        INSERT INTO sensor_anomalies
        (sensor_id, event_time, event_type, metric_name, metric_value, threshold, severity)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """)

    severity = 'CRITICAL' if event_type == 'LOW_BATTERY' else 'WARNING'
    metric_value = getattr(reading, metric_name.replace('battery_level', 'battery_level'))

    session.execute(insert_anomaly, (
        reading.sensor_id,
        reading.timestamp,
        event_type,
        metric_name,
        metric_value,
        threshold,
        severity
    ))

@app.get("/health")
async def health_check():
    """Health check for load balancer"""
    try:
        session.execute("SELECT now() FROM system.local", timeout=2.0)
        return {"status": "healthy"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Cassandra unavailable: {e}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, workers=4)
Key Design Decisions:
  1. Batching: Client sends 100-1000 readings per request (reduces network overhead)
  2. Prepared Statements: Pre-compiled queries (10x performance improvement)
  3. Deduplication: Bloom filter prevents duplicate writes
  4. Validation: Pydantic models ensure data quality
  5. LOCAL_QUORUM: Fast writes to local DC, async replication to remote DC
  6. Denormalization: Update sensors_by_building in same batch (eventual consistency)

Phase 2: Background Aggregation

Technology: Python (scheduled job or Spark for larger scale) File: daily_aggregator.py
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from datetime import datetime, timedelta
from statistics import mean
import schedule
import time

cluster = Cluster(['10.0.1.10'])
session = cluster.connect('sensor_platform')

def compute_daily_aggregates(date: datetime.date):
    """
    Compute daily aggregates for all sensors.
    Runs once per day at 00:05 (after day completes).
    """
    print(f"Computing aggregates for {date}...")

    # Get all unique sensors (production: maintain sensor registry)
    sensors_query = "SELECT DISTINCT sensor_id FROM sensor_metrics LIMIT 100000"
    sensors = session.execute(sensors_query)

    insert_aggregate = session.prepare("""
        INSERT INTO sensor_daily_aggregates
        (sensor_id, date, avg_temperature, max_temperature, min_temperature,
         avg_humidity, max_humidity, min_humidity,
         avg_pressure, max_pressure, min_pressure, sample_count)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)

    for sensor_row in sensors:
        sensor_id = sensor_row.sensor_id

        # Query all readings for this sensor on this date
        start_ts = datetime.combine(date, datetime.min.time())
        end_ts = datetime.combine(date, datetime.max.time())

        readings_query = session.prepare("""
            SELECT temperature, humidity, pressure
            FROM sensor_metrics
            WHERE sensor_id = ? AND timestamp >= ? AND timestamp <= ?
        """)

        readings = list(session.execute(readings_query, (sensor_id, start_ts, end_ts)))

        if not readings:
            continue  # No data for this sensor today

        # Compute aggregates
        temperatures = [r.temperature for r in readings]
        humidities = [r.humidity for r in readings]
        pressures = [r.pressure for r in readings]

        session.execute(insert_aggregate, (
            sensor_id,
            date,
            mean(temperatures), max(temperatures), min(temperatures),
            mean(humidities), max(humidities), min(humidities),
            mean(pressures), max(pressures), min(pressures),
            len(readings)
        ))

    print(f"Aggregates computed for {date}: {len(list(sensors))} sensors")

def daily_job():
    """Run daily at 00:05"""
    yesterday = (datetime.utcnow() - timedelta(days=1)).date()
    compute_daily_aggregates(yesterday)

# Schedule job
schedule.every().day.at("00:05").do(daily_job)

if __name__ == "__main__":
    print("Daily aggregator started...")
    while True:
        schedule.run_pending()
        time.sleep(60)
Production Optimization: Use Apache Spark for parallel aggregation:
# spark_aggregator.py (production version)
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, max, min, count

spark = SparkSession.builder \
    .appName("SensorAggregator") \
    .config("spark.cassandra.connection.host", "10.0.1.10") \
    .getOrCreate()

date = "2023-11-15"

# Read from Cassandra
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="sensor_metrics", keyspace="sensor_platform") \
    .load() \
    .filter(f"timestamp >= '{date} 00:00:00' AND timestamp < '{date} 23:59:59'")

# Compute aggregates
aggregates = df.groupBy("sensor_id").agg(
    mean("temperature").alias("avg_temperature"),
    max("temperature").alias("max_temperature"),
    min("temperature").alias("min_temperature"),
    mean("humidity").alias("avg_humidity"),
    max("humidity").alias("max_humidity"),
    min("humidity").alias("min_humidity"),
    mean("pressure").alias("avg_pressure"),
    max("pressure").alias("max_pressure"),
    min("pressure").alias("min_pressure"),
    count("*").alias("sample_count")
).withColumn("date", lit(date))

# Write back to Cassandra
aggregates.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="sensor_daily_aggregates", keyspace="sensor_platform") \
    .mode("append") \
    .save()

Phase 3: Dashboard API

File: dashboard_api.py
from fastapi import FastAPI, HTTPException
from cassandra.cluster import Cluster
from datetime import datetime, timedelta
from typing import List, Optional
import redis

app = FastAPI()

# Cassandra
cluster = Cluster(['10.0.1.10'])
session = cluster.connect('sensor_platform')

# Redis cache (for expensive queries)
cache = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
CACHE_TTL = 300  # 5 minutes

@app.get("/sensors/{sensor_id}/recent")
async def get_recent_metrics(sensor_id: str, hours: int = 1):
    """Get recent metrics for a sensor (last N hours)"""
    if hours > 24:
        raise HTTPException(status_code=400, detail="Max 24 hours")

    # Check cache
    cache_key = f"recent:{sensor_id}:{hours}"
    cached = cache.get(cache_key)
    if cached:
        return {"source": "cache", "data": eval(cached)}

    # Query Cassandra
    query = session.prepare("""
        SELECT timestamp, temperature, humidity, pressure, battery_level
        FROM sensor_metrics
        WHERE sensor_id = ?
          AND timestamp >= ?
        LIMIT 1000
    """)

    start_time = datetime.utcnow() - timedelta(hours=hours)
    rows = session.execute(query, (sensor_id, start_time))

    results = [{
        "timestamp": row.timestamp.isoformat(),
        "temperature": float(row.temperature),
        "humidity": float(row.humidity),
        "pressure": float(row.pressure),
        "battery_level": row.battery_level
    } for row in rows]

    # Cache result
    cache.setex(cache_key, CACHE_TTL, str(results))

    return {"source": "cassandra", "data": results}

@app.get("/sensors/{sensor_id}/aggregates")
async def get_daily_aggregates(
    sensor_id: str,
    start_date: str,  # YYYY-MM-DD
    end_date: str
):
    """Get daily aggregates for a sensor (date range)"""
    query = session.prepare("""
        SELECT date, avg_temperature, max_temperature, min_temperature,
               avg_humidity, max_humidity, min_humidity,
               avg_pressure, max_pressure, min_pressure, sample_count
        FROM sensor_daily_aggregates
        WHERE sensor_id = ?
          AND date >= ?
          AND date <= ?
    """)

    rows = session.execute(query, (sensor_id, start_date, end_date))

    return [
        {
            "date": row.date.isoformat(),
            "temperature": {
                "avg": float(row.avg_temperature),
                "max": float(row.max_temperature),
                "min": float(row.min_temperature)
            },
            "humidity": {
                "avg": float(row.avg_humidity),
                "max": float(row.max_humidity),
                "min": float(row.min_humidity)
            },
            "pressure": {
                "avg": float(row.avg_pressure),
                "max": float(row.max_pressure),
                "min": float(row.min_pressure)
            },
            "sample_count": row.sample_count
        }
        for row in rows
    ]

@app.get("/buildings/{building_id}/sensors")
async def get_building_sensors(building_id: str):
    """Get all sensors in a building with latest values"""
    query = session.prepare("""
        SELECT sensor_id, sensor_location,
               latest_temperature, latest_humidity, latest_pressure,
               latest_battery_level, latest_timestamp
        FROM sensors_by_building
        WHERE building_id = ?
    """)

    rows = session.execute(query, (building_id,))

    return [
        {
            "sensor_id": row.sensor_id,
            "location": row.sensor_location,
            "latest": {
                "temperature": float(row.latest_temperature),
                "humidity": float(row.latest_humidity),
                "pressure": float(row.latest_pressure),
                "battery_level": row.latest_battery_level,
                "timestamp": row.latest_timestamp.isoformat()
            }
        }
        for row in rows
    ]

@app.get("/sensors/{sensor_id}/anomalies")
async def get_sensor_anomalies(sensor_id: str, hours: int = 24):
    """Get recent anomalies for a sensor"""
    query = session.prepare("""
        SELECT event_time, event_type, metric_name, metric_value, threshold, severity
        FROM sensor_anomalies
        WHERE sensor_id = ?
          AND event_time >= ?
    """)

    start_time = datetime.utcnow() - timedelta(hours=hours)
    rows = session.execute(query, (sensor_id, start_time))

    return [
        {
            "event_time": row.event_time.isoformat(),
            "event_type": row.event_type,
            "metric_name": row.metric_name,
            "metric_value": float(row.metric_value),
            "threshold": float(row.threshold),
            "severity": row.severity
        }
        for row in rows
    ]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

Part 5: Deployment and Operations

Cassandra Configuration

cassandra.yaml (production settings):
cluster_name: 'SensorPlatform'

# Seeds (2 per DC)
seed_provider:
  - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
      - seeds: "10.0.1.10,10.0.1.11,10.1.1.10,10.1.1.11"

# Listen addresses
listen_address: 10.0.1.10  # Node-specific
rpc_address: 10.0.1.10

# Data directories (separate disks)
data_file_directories:
  - /mnt/data1/cassandra/data
  - /mnt/data2/cassandra/data
commitlog_directory: /mnt/ssd1/cassandra/commitlog
saved_caches_directory: /var/lib/cassandra/saved_caches
hints_directory: /var/lib/cassandra/hints

# Snitch
endpoint_snitch: GossipingPropertyFileSnitch

# Num tokens
num_tokens: 256

# Compaction
compaction_throughput_mb_per_sec: 64
concurrent_compactors: 4

# MemTable
memtable_heap_space_in_mb: 2048
memtable_flush_writers: 4

# CommitLog
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
commitlog_total_space_in_mb: 8192

# Caching
key_cache_size_in_mb: 100
row_cache_size_in_mb: 0  # Disabled

# Thread pools
concurrent_reads: 32
concurrent_writes: 32
concurrent_counter_writes: 32

# Timeouts
read_request_timeout_in_ms: 5000
write_request_timeout_in_ms: 2000
range_request_timeout_in_ms: 10000

# Failure detection
phi_convict_threshold: 8

# Repair
repair_session_max_tree_depth: 15
cassandra-rackdc.properties (us-east nodes):
dc=us-east
rack=rack1
JVM Configuration (jvm11-server.options):
# Heap size
-Xms8G
-Xmx8G

# Young gen
-Xmn1600M

# GC
-XX:+UseG1GC
-XX:G1RSetUpdatingPauseTimePercent=5
-XX:MaxGCPauseMillis=200
-XX:InitiatingHeapOccupancyPercent=70
-XX:ParallelGCThreads=16

# GC logging
-Xlog:gc*,gc+age=trace,safepoint:file=/var/log/cassandra/gc.log:time,uptime:filecount=10,filesize=10m

# Heap dump on OOM
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/cassandra/heap_dump.hprof

# JMX
-Dcom.sun.management.jmxremote.port=7199
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=true

Monitoring Setup

Prometheus Configuration (prometheus.yml):
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'cassandra'
    static_configs:
      - targets:
        - '10.0.1.10:7070'
        - '10.0.1.11:7070'
        - '10.0.1.12:7070'
        - '10.1.1.10:7070'
        - '10.1.1.11:7070'
        - '10.1.1.12:7070'
        labels:
          cluster: 'sensor-platform'

  - job_name: 'ingestion-api'
    static_configs:
      - targets: ['10.0.1.20:8000', '10.1.1.20:8000']

  - job_name: 'dashboard-api'
    static_configs:
      - targets: ['10.0.1.21:8001', '10.1.1.21:8001']
Alert Rules (alerts.yml):
groups:
  - name: cassandra_alerts
    interval: 30s
    rules:
      # Critical: Dropped mutations (data loss!)
      - alert: DroppedMutations
        expr: cassandra_dropped_message_total{message_type="MUTATION"} > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Cassandra node {{ $labels.instance }} dropping writes"
          description: "{{ $value }} mutations dropped in last minute"

      # Critical: GC pause too long
      - alert: HighGCPause
        expr: cassandra_jvm_gc_pause_seconds{quantile="0.99"} > 1.0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Cassandra node {{ $labels.instance }} has high GC pauses"
          description: "p99 GC pause: {{ $value }}s"

      # Warning: Pending compactions
      - alert: PendingCompactions
        expr: cassandra_table_pending_compactions > 30
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Node {{ $labels.instance }} has {{ $value }} pending compactions"

      # Warning: High read latency
      - alert: HighReadLatency
        expr: cassandra_table_read_latency_seconds{quantile="0.95"} > 0.05
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High read latency on {{ $labels.keyspace }}.{{ $labels.table }}"
          description: "p95 latency: {{ $value }}s"

      # Critical: Node down
      - alert: NodeDown
        expr: up{job="cassandra"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Cassandra node {{ $labels.instance }} is down"

Backup Strategy

Automated Snapshot Script (backup.sh):
#!/bin/bash
# Daily snapshots uploaded to S3

KEYSPACE="sensor_platform"
SNAPSHOT_NAME="daily_$(date +%Y%m%d)"
S3_BUCKET="s3://sensor-platform-backups"
HOSTNAME=$(hostname)

# Create snapshot
nodetool snapshot -t $SNAPSHOT_NAME $KEYSPACE

# Find snapshot directories
SNAPSHOT_DIRS=$(find /var/lib/cassandra/data/$KEYSPACE -type d -name $SNAPSHOT_NAME)

# Upload each table's snapshot to S3
for DIR in $SNAPSHOT_DIRS; do
  TABLE=$(echo $DIR | awk -F'/' '{print $(NF-2)}')
  aws s3 sync $DIR $S3_BUCKET/$HOSTNAME/$KEYSPACE/$TABLE/$SNAPSHOT_NAME/
done

# Clean up local snapshot
nodetool clearsnapshot -t $SNAPSHOT_NAME

# Delete snapshots older than 7 days from S3
aws s3 ls $S3_BUCKET/$HOSTNAME/ | awk '{print $2}' | while read PREFIX; do
  SNAPSHOT_DATE=$(echo $PREFIX | grep -oP '\d{8}')
  if [ ! -z "$SNAPSHOT_DATE" ]; then
    DAYS_OLD=$(( ($(date +%s) - $(date -d $SNAPSHOT_DATE +%s)) / 86400 ))
    if [ $DAYS_OLD -gt 7 ]; then
      aws s3 rm --recursive $S3_BUCKET/$HOSTNAME/$PREFIX
    fi
  fi
done

echo "Backup complete: $S3_BUCKET/$HOSTNAME/$KEYSPACE/$SNAPSHOT_NAME/"
Cron Schedule:
# Daily backup at 3 AM
0 3 * * * /usr/local/bin/backup.sh >> /var/log/cassandra/backup.log 2>&1

Repair Schedule

Automated Repair with Cassandra Reaper:
# Install Reaper
docker run -d \
  --name reaper \
  -p 8080:8080 \
  -e REAPER_STORAGE_TYPE=cassandra \
  -e REAPER_CASS_CONTACT_POINTS=["10.0.1.10"] \
  thelastpickle/cassandra-reaper:latest

# Create repair schedule (via API)
curl -X POST http://localhost:8080/repair_schedule \
  -d "clusterName=SensorPlatform" \
  -d "keyspace=sensor_platform" \
  -d "owner=admin" \
  -d "intensity=0.9" \
  -d "scheduleDaysBetween=7" \
  -d "segmentCountPerNode=64"
Alternative: Manual Repair Script:
#!/bin/bash
# repair.sh - Weekly incremental repair

nodetool repair -inc sensor_platform sensor_metrics
nodetool repair -inc sensor_platform sensor_daily_aggregates
nodetool repair -inc sensor_platform sensors_by_building
nodetool repair -inc sensor_platform sensor_anomalies

echo "Repair complete: $(date)"
# Weekly repair on Sunday at 2 AM
0 2 * * 0 /usr/local/bin/repair.sh >> /var/log/cassandra/repair.log 2>&1

Part 6: Testing and Validation

Load Testing

cassandra-stress Configuration:
# sensor_metrics_stress.yaml
keyspace: sensor_platform

table: sensor_metrics

columnspec:
  - name: sensor_id
    size: fixed(20)
    population: uniform(1..100000)
  - name: timestamp
    cluster: fixed(1000)
  - name: temperature
    size: fixed(8)
  - name: humidity
    size: fixed(8)
  - name: pressure
    size: fixed(8)
  - name: battery_level
    size: fixed(4)

insert:
  partitions: fixed(1)
  batchtype: UNLOGGED

queries:
  recent_metrics:
    cql: SELECT * FROM sensor_metrics WHERE sensor_id = ? AND timestamp > ?
    fields: samerow
Run Load Test:
# Write test (10K writes/sec for 1 hour)
cassandra-stress user profile=sensor_metrics_stress.yaml \
  ops\(insert=1\) \
  n=36000000 \
  -rate threads=100 throttle=10000/s \
  -node 10.0.1.10,10.0.1.11,10.0.1.12 \
  -log file=write_test.log

# Read test (5K reads/sec for 1 hour)
cassandra-stress user profile=sensor_metrics_stress.yaml \
  ops\(recent_metrics=1\) \
  n=18000000 \
  -rate threads=50 throttle=5000/s \
  -node 10.0.1.10,10.0.1.11,10.0.1.12 \
  -log file=read_test.log

# Mixed workload (70% read, 30% write)
cassandra-stress user profile=sensor_metrics_stress.yaml \
  ops\(insert=3,recent_metrics=7\) \
  n=36000000 \
  -rate threads=100 \
  -node 10.0.1.10,10.0.1.11,10.0.1.12 \
  -log file=mixed_test.log
Expected Results:
MetricTargetActual (to be filled)
Write throughput10,000 ops/sec_____
Write latency p95< 5ms_____
Read throughput5,000 ops/sec_____
Read latency p95< 10ms_____
Error rate0%_____

Failure Testing

Test 1: Node Failure
# Simulate node failure
systemctl stop cassandra  # On node 10.0.1.12

# Verify writes still succeed (RF=3, CL=QUORUM)
# Expected: No errors, hints accumulate

# Bring node back
systemctl start cassandra

# Verify hint replay
nodetool statushandoff

# Expected: Hints delivered, data consistent
Test 2: Network Partition
# Simulate network partition (us-east isolated from eu-west)
iptables -A INPUT -s 10.1.0.0/16 -j DROP
iptables -A OUTPUT -d 10.1.0.0/16 -j DROP

# Verify LOCAL_QUORUM writes succeed
# Verify EACH_QUORUM writes fail (expected)

# Heal partition
iptables -F

# Verify cross-DC replication resumes
# Run repair to ensure consistency
Test 3: Overload (DDoS Simulation)
# Send 100K writes/sec (10x normal load)
cassandra-stress user profile=sensor_metrics_stress.yaml \
  ops\(insert=1\) \
  n=360000000 \
  -rate threads=500 throttle=100000/s \
  -node 10.0.1.10

# Monitor for:
# - Dropped mutations (should be 0)
# - GC pauses (should stay < 500ms)
# - Pending compactions (monitor, should recover)

# Expected: Some increased latency, but no data loss

Part 7: Advanced Challenges (Optional)

Challenge 1: Hot Partition Mitigation

Problem: One sensor (sensor-99999) sends 10x more data than others, creating a hot partition. Task:
  1. Detect hot partition (use nodetool toppartitions)
  2. Redesign data model to shard hot partition:
    CREATE TABLE sensor_metrics_sharded (
        sensor_id text,
        shard int,        -- 0-9 (distribute across 10 partitions)
        timestamp timestamp,
        ...,
        PRIMARY KEY ((sensor_id, shard), timestamp)
    );
    
  3. Modify ingestion API to compute shard: shard = hash(timestamp) % 10
  4. Modify dashboard API to query all shards and merge results

Challenge 2: Cross-DC Latency Optimization

Problem: Reads from eu-west to us-east data take 150ms (cross-Atlantic latency). Task:
  1. Implement read-from-local-DC-first logic:
    # Try local DC first
    result = session.execute(query, consistency_level=LOCAL_ONE)
    if result:
        return result
    # Fallback to remote DC
    return session.execute(query, consistency_level=ONE)
    
  2. Measure latency improvement
  3. Consider trade-offs (stale data vs latency)

Challenge 3: Materialized Views (Advanced)

Task: Create a materialized view for “sensors by latest temperature”:
CREATE MATERIALIZED VIEW sensors_by_temperature AS
  SELECT sensor_id, latest_temperature, latest_timestamp
  FROM sensors_by_building
  WHERE building_id IS NOT NULL
    AND sensor_id IS NOT NULL
    AND latest_temperature IS NOT NULL
  PRIMARY KEY (latest_temperature, building_id, sensor_id)
  WITH CLUSTERING ORDER BY (latest_temperature DESC);
Use Case: Quickly find hottest/coldest sensors in a building Warning: Materialized views have performance implications (write amplification)

Part 8: Deliverables and Evaluation

Project Deliverables

  1. Architecture Diagram (draw.io or similar)
    • Cassandra cluster topology
    • Application components
    • Data flow
  2. Data Model Documentation
    • CQL schema definitions
    • Query pattern → table mapping
    • Design decision rationale
  3. Implementation Code
    • Ingestion API (ingestion_api.py)
    • Dashboard API (dashboard_api.py)
    • Background jobs (daily_aggregator.py)
  4. Configuration Files
    • cassandra.yaml
    • jvm11-server.options
    • prometheus.yml, alerts.yml
  5. Operational Runbook
    • Deployment procedure
    • Backup/restore steps
    • Common troubleshooting scenarios
  6. Load Test Results
    • cassandra-stress logs
    • Latency percentiles
    • Throughput measurements
  7. Presentation (10-15 slides)
    • Problem statement
    • Architecture overview
    • Data model design
    • Performance results
    • Lessons learned

Evaluation Criteria

CategoryWeightCriteria
Architecture20%Appropriate use of Cassandra features (replication, consistency, compaction)
Data Model25%Partitioning strategy, denormalization, query alignment
Implementation20%Code quality, error handling, batching, prepared statements
Performance20%Meets latency/throughput targets, efficient queries
Operations10%Monitoring, backups, repair schedules, runbook
Documentation5%Clarity, completeness, diagrams

Mastery Checklist

  • Data Modeling
    • Query-driven design (1 query = 1 table)
    • Appropriate partition keys (even distribution)
    • Time-series clustering keys
    • Denormalization for performance
    • TTL for automatic expiration
  • Write Path
    • Batch inserts (100+ per batch)
    • Prepared statements
    • Appropriate consistency level (LOCAL_QUORUM)
    • Deduplication logic
  • Read Path
    • Efficient partition key queries (no ALLOW FILTERING)
    • Caching (Redis or application-level)
    • Pagination for large results
  • Compaction
    • TWCS for time-series data
    • LCS for frequently updated data
    • Appropriate window sizes
  • Multi-DC
    • NetworkTopologyStrategy (RF per DC)
    • LOCAL consistency levels
    • Per-DC repair
  • JVM Tuning
    • Heap ≤ 8GB
    • G1GC with 200ms pause target
    • GC logging enabled
  • Monitoring
    • Prometheus + Grafana setup
    • Critical alerts (dropped mutations, GC, node down)
    • Dashboard for key metrics
  • Backup/Repair
    • Automated daily snapshots
    • Weekly repair schedule
    • Tested restore procedure

Part 9: Real-World Extensions

Extending the Project

1. Stream Processing Integration:
  • Add Kafka for event streaming
  • Use Kafka Connect Cassandra Sink for ingestion
  • Implement Kafka Streams for real-time anomaly detection
2. Machine Learning:
  • Train ML model to predict sensor failures (battery level, read patterns)
  • Use Spark MLlib or TensorFlow
  • Store predictions in Cassandra for dashboard
3. Multi-Tenancy:
  • Support multiple customers (tenant isolation)
  • Redesign schema with tenant_id in partition key
  • Implement tenant-level quotas
4. Advanced Analytics:
  • Time-series forecasting (predict future temperature)
  • Correlation analysis (temperature vs humidity patterns)
  • Spatial queries (sensors within 1km radius)

Production Considerations

Security:
# cassandra.yaml
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer

# Enable SSL/TLS
client_encryption_options:
  enabled: true
  keystore: /etc/cassandra/keystore.jks
  keystore_password: changeit
High Availability:
  • Deploy across 3+ availability zones per DC
  • Use Kubernetes for API layer (auto-scaling, self-healing)
  • Implement circuit breakers and retries in clients
Cost Optimization:
  • Use tiered storage (hot=SSD, cold=HDD)
  • Implement data lifecycle (move old data to S3 for archival)
  • Right-size nodes (monitor CPU, memory, disk usage)

Summary

You’ve designed and implemented a production-ready, scalable IoT analytics platform using Apache Cassandra. This capstone demonstrated: Data Modeling: Query-driven design with proper partitioning ✅ Write Optimization: Batching, prepared statements, TWCS compaction ✅ Read Optimization: Caching, denormalization, efficient queries ✅ Multi-DC: Geographic distribution with LOCAL consistency ✅ Performance Tuning: JVM, OS, Cassandra configuration ✅ Operations: Monitoring, backups, repair, troubleshooting ✅ Production Deployment: Complete stack from ingestion to visualization Congratulations on completing the Cassandra mastery course! 🎉 You now have the skills to:
  • Design schemas for any query pattern
  • Operate Cassandra clusters at scale
  • Troubleshoot performance issues
  • Deploy production-ready systems

What’s Next?