Skip to main content

Structured Streaming

Module Duration: 4-5 hours Focus: Real-time data processing and stream analytics Prerequisites: Spark SQL and DataFrames

Overview

Structured Streaming is Apache Spark’s scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It treats streaming data as an unbounded table that is continuously appended, allowing you to write streaming computations using the same DataFrame/Dataset API.

Key Concepts

Unbounded Table Model: Streaming data is treated as an append-only table that grows continuously. Micro-Batch Processing: Data is processed in small batches with latencies as low as 100ms. Event Time Processing: Process data based on timestamps embedded in the data, not arrival time. Fault Tolerance: Exactly-once semantics through checkpointing and write-ahead logs.

Structured Streaming Fundamentals

Basic Streaming Query

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StructuredStreamingBasics") \
    .getOrCreate()

# Read streaming data from a directory
input_stream = spark.readStream \
    .format("json") \
    .schema("timestamp TIMESTAMP, user_id INT, action STRING, item_id INT") \
    .option("maxFilesPerTrigger", 1) \
    .load("/data/streaming/input")

# Process the stream
processed = input_stream \
    .filter(col("action") == "purchase") \
    .groupBy("item_id") \
    .count()

# Write results
query = processed.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .start()

query.awaitTermination()

Scala Implementation

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("StructuredStreamingBasics")
  .getOrCreate()

import spark.implicits._

// Define schema
val schema = StructType(Seq(
  StructField("timestamp", TimestampType, nullable = false),
  StructField("user_id", IntegerType, nullable = false),
  StructField("action", StringType, nullable = false),
  StructField("item_id", IntegerType, nullable = false)
))

// Read streaming data
val inputStream = spark.readStream
  .format("json")
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .load("/data/streaming/input")

// Process the stream
val processed = inputStream
  .filter($"action" === "purchase")
  .groupBy($"item_id")
  .count()

// Write results
val query = processed.writeStream
  .outputMode("complete")
  .format("console")
  .option("truncate", false)
  .start()

query.awaitTermination()

Output Modes

Complete Mode

Outputs the entire updated result table. Use for aggregations.
# Complete mode - output entire result table
query = aggregated_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Append Mode

Only new rows added to the result table. Default mode.
# Append mode - only new rows
query = filtered_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start()

Update Mode

Only rows that were updated in the result table.
# Update mode - only updated rows
query = windowed_counts.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("updates") \
    .start()

Event Time Processing

Event Time vs Processing Time

from pyspark.sql.functions import window, current_timestamp

# Read stream with event time
events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Parse JSON and extract event time
parsed = events.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Use event time for windowing
windowed = parsed \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes", "1 minute"),
        "user_id"
    ) \
    .agg(
        count("*").alias("event_count"),
        sum("amount").alias("total_amount")
    )

Watermarking

Watermarks allow the system to track progress in event time and clean up old state.
# Watermarking for late data handling
watermarked = parsed \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "category"
    ) \
    .count()

# Watermark allows dropping state for windows older than:
# max_event_time - watermark_delay

Scala Watermarking

val watermarked = parsed
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"category"
  )
  .count()

Stateful Operations

mapGroupsWithState

Track custom state for each group.
from pyspark.sql.streaming import GroupState, GroupStateTimeout

# Define state schema
class SessionInfo:
    def __init__(self):
        self.start_time = None
        self.end_time = None
        self.event_count = 0
        self.events = []

def update_session_state(key, values, state):
    """Update session state for each user"""
    if state.exists:
        session = state.get()
    else:
        session = SessionInfo()

    for event in values:
        session.event_count += 1
        session.events.append(event)
        if session.start_time is None:
            session.start_time = event.timestamp
        session.end_time = event.timestamp

    # Set timeout
    state.update(session)
    state.setTimeoutDuration("30 minutes")

    return (key, session.event_count, session.start_time, session.end_time)

# Apply stateful operation
sessions = events.groupByKey(lambda x: x.user_id) \
    .mapGroupsWithState(
        update_session_state,
        GroupStateTimeout.ProcessingTimeTimeout
    )

Scala mapGroupsWithState

import org.apache.spark.sql.streaming._

case class Event(user_id: Int, timestamp: Long, action: String)
case class SessionInfo(
  startTime: Long,
  endTime: Long,
  eventCount: Int,
  events: List[Event]
)

def updateSessionState(
  key: Int,
  values: Iterator[Event],
  state: GroupState[SessionInfo]
): SessionOutput = {

  val currentState = if (state.exists) {
    state.get
  } else {
    SessionInfo(0L, 0L, 0, List.empty)
  }

  val newEvents = values.toList
  val updatedState = SessionInfo(
    startTime = if (currentState.startTime == 0)
      newEvents.head.timestamp else currentState.startTime,
    endTime = newEvents.last.timestamp,
    eventCount = currentState.eventCount + newEvents.length,
    events = currentState.events ++ newEvents
  )

  state.update(updatedState)
  state.setTimeoutDuration("30 minutes")

  SessionOutput(key, updatedState.eventCount,
    updatedState.startTime, updatedState.endTime)
}

val sessions = events
  .groupByKey(_.user_id)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(
    updateSessionState
  )

flatMapGroupsWithState

For more complex stateful processing with multiple outputs per group.
def process_user_session(key, values, state):
    """Process user sessions and emit multiple events"""
    results = []

    if state.exists:
        user_state = state.get()
    else:
        user_state = {"session_count": 0, "last_activity": None}

    for event in values:
        user_state["session_count"] += 1
        user_state["last_activity"] = event.timestamp

        # Emit intermediate results
        results.append({
            "user_id": key,
            "event_type": event.action,
            "session_count": user_state["session_count"]
        })

    state.update(user_state)
    return results

sessions = events.groupByKey(lambda x: x.user_id) \
    .flatMapGroupsWithState(
        process_user_session,
        GroupStateTimeout.ProcessingTimeTimeout
    )

Window Operations

Tumbling Windows

Fixed-size, non-overlapping windows.
# 10-minute tumbling windows
tumbling = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "10 minutes"),
        "category"
    ) \
    .agg(
        count("*").alias("count"),
        avg("price").alias("avg_price"),
        sum("quantity").alias("total_quantity")
    )

Sliding Windows

Fixed-size, overlapping windows.
# 10-minute windows sliding every 5 minutes
sliding = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "10 minutes", "5 minutes"),
        "product_id"
    ) \
    .agg(
        count("*").alias("count"),
        max("price").alias("max_price")
    )

Session Windows

Dynamic windows based on activity gaps.
from pyspark.sql.functions import session_window

# Session windows with 30-minute gap
session_agg = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        session_window("event_time", "30 minutes"),
        "user_id"
    ) \
    .agg(
        count("*").alias("session_events"),
        sum("duration").alias("session_duration")
    )

Scala Window Operations

// Tumbling windows
val tumbling = events
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "10 minutes"),
    $"category"
  )
  .agg(
    count("*").as("count"),
    avg("price").as("avg_price")
  )

// Sliding windows
val sliding = events
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "10 minutes", "5 minutes"),
    $"product_id"
  )
  .count()

// Session windows
val sessions = events
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    session_window($"event_time", "30 minutes"),
    $"user_id"
  )
  .count()

Stream-Stream Joins

Inner Join

# Stream 1: Impressions
impressions = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "impressions") \
    .load() \
    .select(from_json(col("value").cast("string"), impression_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("impression_time", "10 minutes")

# Stream 2: Clicks
clicks = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clicks") \
    .load() \
    .select(from_json(col("value").cast("string"), click_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("click_time", "20 minutes")

# Join streams
joined = impressions.join(
    clicks,
    expr("""
        impression_id = click_impression_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 1 hour
    """)
)

Outer Join

# Left outer join
left_join = impressions.join(
    clicks,
    expr("""
        impression_id = click_impression_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 1 hour
    """),
    "left_outer"
)

Scala Stream-Stream Join

val impressions = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "impressions")
  .load()
  .select(from_json($"value".cast("string"), impressionSchema).as("data"))
  .select("data.*")
  .withWatermark("impression_time", "10 minutes")

val clicks = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "clicks")
  .load()
  .select(from_json($"value".cast("string"), clickSchema).as("data"))
  .select("data.*")
  .withWatermark("click_time", "20 minutes")

val joined = impressions.join(
  clicks,
  expr("""
    impression_id = click_impression_id AND
    click_time >= impression_time AND
    click_time <= impression_time + interval 1 hour
  """)
)

Stream-Static Joins

# Static dimension table
products = spark.read \
    .format("parquet") \
    .load("/data/products")

# Streaming fact data
orders = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .load() \
    .select(from_json(col("value").cast("string"), order_schema).alias("data")) \
    .select("data.*")

# Join stream with static data
enriched = orders.join(products, "product_id")

Fault Tolerance and Checkpointing

Checkpointing

# Configure checkpoint location
query = processed.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start()

# Checkpoint contains:
# - Metadata (configuration, schema)
# - Offsets (source position)
# - State (for stateful operations)

Recovery from Failures

# Spark automatically recovers from failures using checkpoints
# On restart, it:
# 1. Reads checkpoint metadata
# 2. Recovers last committed offset
# 3. Resumes from that offset

query = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load() \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start()

Exactly-Once Semantics

# Idempotent sinks ensure exactly-once
query = processed.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start("/data/delta-table")

# Delta Lake provides ACID transactions
# - Atomic writes
# - Idempotent retries
# - Exactly-once guarantees

Data Sources and Sinks

Kafka Source

# Read from Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic1,topic2,topic3") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .load()

# Access Kafka metadata
parsed = kafka_stream.selectExpr(
    "CAST(key AS STRING)",
    "CAST(value AS STRING)",
    "topic",
    "partition",
    "offset",
    "timestamp"
)

Kafka Sink

# Write to Kafka
query = processed.selectExpr(
    "CAST(user_id AS STRING) AS key",
    "to_json(struct(*)) AS value"
).writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output-topic") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start()

File Sources

# JSON files
json_stream = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 10) \
    .load("/data/json-input")

# CSV files
csv_stream = spark.readStream \
    .format("csv") \
    .schema(schema) \
    .option("header", True) \
    .option("maxFilesPerTrigger", 5) \
    .load("/data/csv-input")

# Parquet files
parquet_stream = spark.readStream \
    .format("parquet") \
    .schema(schema) \
    .load("/data/parquet-input")

File Sinks

# Parquet output with partitioning
query = processed.writeStream \
    .format("parquet") \
    .option("path", "/data/output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .partitionBy("date", "hour") \
    .start()

# Delta output
delta_query = processed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start("/data/delta-output")

Memory Sink (Testing)

# Write to memory for testing
memory_query = processed.writeStream \
    .format("memory") \
    .queryName("test_results") \
    .outputMode("complete") \
    .start()

# Query the in-memory table
spark.sql("SELECT * FROM test_results").show()

Trigger Modes

Processing Time Trigger

# Trigger every 10 seconds
query = processed.writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .start()

Once Trigger

# Process all available data once and stop
query = processed.writeStream \
    .trigger(once=True) \
    .format("parquet") \
    .option("path", "/data/output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start()

Continuous Processing

# Continuous processing with low latency (~1ms)
query = processed.writeStream \
    .trigger(continuous='1 second') \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start()

Available Now Trigger

# Process all available data in multiple batches
query = processed.writeStream \
    .trigger(availableNow=True) \
    .format("delta") \
    .option("checkpointLocation", "/data/checkpoint") \
    .start("/data/output")

Monitoring and Management

Query Management

# Start query
query = df.writeStream \
    .format("console") \
    .start()

# Get query information
print(f"Query ID: {query.id}")
print(f"Run ID: {query.runId}")
print(f"Name: {query.name}")
print(f"Status: {query.status}")

# Wait for termination
query.awaitTermination()

# Stop query
query.stop()

# Check if query is active
if query.isActive:
    print("Query is running")

Multiple Queries

# Manage multiple queries
queries = []

query1 = df1.writeStream.format("console").start()
query2 = df2.writeStream.format("memory").queryName("table2").start()
query3 = df3.writeStream.format("kafka").option("topic", "output").start()

queries = [query1, query2, query3]

# Wait for all queries
for q in queries:
    q.awaitTermination()

# Or wait for any to terminate
spark.streams.awaitAnyTermination()

Progress Monitoring

# Get last progress
progress = query.lastProgress
if progress:
    print(f"Batch: {progress['batchId']}")
    print(f"Input rows: {progress['numInputRows']}")
    print(f"Processing rate: {progress['processedRowsPerSecond']}")
    print(f"Duration: {progress['durationMs']}")

# Recent progress
for progress in query.recentProgress:
    print(progress)

# Scala progress monitoring
val progress = query.lastProgress
println(s"Batch: ${progress.batchId}")
println(s"Input rows: ${progress.numInputRows}")

DStreams (Legacy API)

Basic DStream Operations

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

// Create DStream from socket
val lines = ssc.socketTextStream("localhost", 9999)

// Transformations
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Output operation
wordCounts.print()

// Start the streaming context
ssc.start()
ssc.awaitTermination()

Window Operations in DStreams

// Window operations
val windowedCounts = pairs
  .reduceByKeyAndWindow(
    _ + _,           // Reduce function
    _ - _,           // Inverse reduce function
    Seconds(60),     // Window duration
    Seconds(10)      // Slide duration
  )

windowedCounts.print()

Real-World Use Cases

Real-Time ETL Pipeline

from pyspark.sql.functions import *

# Read from multiple sources
raw_events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "raw-events") \
    .load()

# Parse and validate
parsed = raw_events \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*") \
    .filter(col("user_id").isNotNull()) \
    .filter(col("timestamp").isNotNull())

# Enrich with dimension data
users = spark.read.parquet("/data/dimensions/users")
products = spark.read.parquet("/data/dimensions/products")

enriched = parsed \
    .join(users, "user_id") \
    .join(products, "product_id")

# Aggregate metrics
metrics = enriched \
    .withWatermark("timestamp", "15 minutes") \
    .groupBy(
        window("timestamp", "5 minutes"),
        "product_category",
        "user_segment"
    ) \
    .agg(
        count("*").alias("event_count"),
        sum("amount").alias("total_revenue"),
        countDistinct("user_id").alias("unique_users")
    )

# Write to multiple sinks
# 1. Real-time dashboard (Kafka)
metrics.selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "dashboard-metrics") \
    .option("checkpointLocation", "/checkpoints/dashboard") \
    .start()

# 2. Historical storage (Delta)
metrics.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/delta") \
    .start("/data/metrics")

Fraud Detection System

# Real-time fraud detection
transactions = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .load() \
    .select(from_json(col("value").cast("string"), txn_schema).alias("data")) \
    .select("data.*")

# Calculate real-time features
features = transactions \
    .withWatermark("transaction_time", "10 minutes") \
    .groupBy(
        window("transaction_time", "10 minutes", "1 minute"),
        "user_id"
    ) \
    .agg(
        count("*").alias("txn_count"),
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount"),
        stddev("amount").alias("stddev_amount"),
        countDistinct("merchant_id").alias("unique_merchants"),
        countDistinct("location").alias("unique_locations")
    )

# Apply fraud rules
suspicious = features.filter(
    (col("txn_count") > 10) |
    (col("total_amount") > 10000) |
    (col("unique_locations") > 5)
)

# Alert on suspicious activity
suspicious.writeStream \
    .foreachBatch(lambda df, epoch_id: alert_fraud(df)) \
    .option("checkpointLocation", "/checkpoints/fraud") \
    .start()

Click-Through Rate (CTR) Calculation

# Real-time CTR computation
impressions = spark.readStream \
    .format("kafka") \
    .option("subscribe", "impressions") \
    .load() \
    .select(from_json(col("value").cast("string"), imp_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "1 hour")

clicks = spark.readStream \
    .format("kafka") \
    .option("subscribe", "clicks") \
    .load() \
    .select(from_json(col("value").cast("string"), click_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "1 hour")

# Join and calculate CTR
ctr = impressions \
    .join(
        clicks,
        expr("""
            impression_id = click_impression_id AND
            clicks.timestamp >= impressions.timestamp AND
            clicks.timestamp <= impressions.timestamp + interval 1 hour
        """),
        "left_outer"
    ) \
    .groupBy(
        window("impressions.timestamp", "5 minutes"),
        "campaign_id"
    ) \
    .agg(
        count("impression_id").alias("impressions"),
        count("click_impression_id").alias("clicks")
    ) \
    .withColumn("ctr", col("clicks") / col("impressions") * 100)

# Write results
ctr.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/ctr") \
    .start("/data/ctr")

Performance Optimization

Optimize Trigger Intervals

# Balance latency vs throughput
# High throughput: Longer intervals
query_batch = df.writeStream \
    .trigger(processingTime='30 seconds') \
    .format("delta") \
    .start()

# Low latency: Shorter intervals or continuous
query_realtime = df.writeStream \
    .trigger(continuous='1 second') \
    .format("kafka") \
    .start()

Optimize Watermarks

# Too short: High memory usage, many state entries
# Too long: High latency for results
# Balance based on data arrival patterns

# Conservative watermark (more late data)
df.withWatermark("event_time", "1 hour")

# Aggressive watermark (less late data)
df.withWatermark("event_time", "5 minutes")

State Management

# Configure state store
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
              "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")

# RocksDB for large state
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
              "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

# State cleanup
df.withWatermark("event_time", "1 hour") \
  .dropDuplicates(["event_id"], "event_time")

Partition Tuning

# Repartition for better parallelism
repartitioned = df.repartition(100, "user_id")

# Coalesce to reduce partitions
coalesced = df.coalesce(10)

Common Pitfalls and Solutions

Issue 1: Late Data Handling

Problem: Data arriving after watermark causes dropped events. Solution:
# Configure appropriate watermark
df.withWatermark("event_time", "1 hour")  # Adjust based on data

# Monitor late data
query.lastProgress["sources"][0]["metrics"]["numRowsDroppedByWatermark"]

Issue 2: State Explosion

Problem: Unbounded state growth causes OOM errors. Solution:
# Use watermarks to bound state
df.withWatermark("event_time", "24 hours") \
  .groupBy(window("event_time", "1 hour"), "key") \
  .count()

# Use flatMapGroupsWithState with timeouts
def stateful_func(key, values, state):
    state.setTimeoutDuration("1 hour")
    # Process and cleanup old state

Issue 3: Checkpoint Compatibility

Problem: Schema changes break checkpoint recovery. Solution:
# Use a new checkpoint location for schema changes
# Or implement schema evolution in Delta Lake
query.writeStream \
    .format("delta") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", "/new/checkpoint/location") \
    .start()

Issue 4: Memory Pressure

Problem: Large batches cause executor memory issues. Solution:
# Limit batch size
spark.readStream \
    .option("maxOffsetsPerTrigger", 10000) \
    .format("kafka") \
    .load()

# Increase executor memory
spark.conf.set("spark.executor.memory", "8g")

Hands-On Exercises

Exercise 1: Real-Time Word Count

Create a streaming word count application that processes text data.
# TODO: Implement streaming word count
# 1. Read from socket source
# 2. Split into words
# 3. Count words in 1-minute windows
# 4. Write to console

# Your code here

Exercise 2: Session Analysis

Implement user session tracking with timeout.
# TODO: Track user sessions
# 1. Read user events from Kafka
# 2. Use mapGroupsWithState for session tracking
# 3. Timeout sessions after 30 minutes of inactivity
# 4. Calculate session metrics

# Your code here

Exercise 3: Join Streams

Implement stream-stream join for ad analytics.
# TODO: Join impressions and clicks
# 1. Read both streams from Kafka
# 2. Apply appropriate watermarks
# 3. Join on impression_id with time bounds
# 4. Calculate click-through rates

# Your code here

Summary

Structured Streaming provides a powerful, fault-tolerant framework for real-time data processing:
  • Unified API: Use DataFrame/Dataset API for batch and streaming
  • Event Time: Process data based on event timestamps
  • Exactly-Once: Achieve exactly-once semantics with checkpointing
  • Stateful Operations: Track complex state across events
  • Stream Joins: Combine multiple streams with time bounds
  • Scalability: Handle high-throughput workloads

Key Takeaways

  1. Use watermarks appropriately to balance latency and completeness
  2. Choose output modes based on your use case
  3. Monitor state size and use timeouts to prevent memory issues
  4. Configure checkpoints for fault tolerance
  5. Optimize trigger intervals for your latency requirements

Next Steps

  • Practice with real streaming data sources
  • Implement complex stateful operations
  • Explore integration with Delta Lake
  • Study production deployment patterns
  • Learn advanced monitoring and troubleshooting

Continue to the next module to explore MLlib for distributed machine learning at scale.