Structured Streaming
Module Duration: 4-5 hours
Focus: Real-time data processing and stream analytics
Prerequisites: Spark SQL and DataFrames
Overview
Structured Streaming is Apache Spark’s scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It treats streaming data as an unbounded table that is continuously appended, allowing you to write streaming computations using the same DataFrame/Dataset API.
Key Concepts
Unbounded Table Model: Streaming data is treated as an append-only table that grows continuously.
Micro-Batch Processing: Data is processed in small batches with latencies as low as 100ms.
Event Time Processing: Process data based on timestamps embedded in the data, not arrival time.
Fault Tolerance: Exactly-once semantics through checkpointing and write-ahead logs.
Structured Streaming Fundamentals
Basic Streaming Query
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StructuredStreamingBasics") \
.getOrCreate()
# Read streaming data from a directory
input_stream = spark.readStream \
.format("json") \
.schema("timestamp TIMESTAMP, user_id INT, action STRING, item_id INT") \
.option("maxFilesPerTrigger", 1) \
.load("/data/streaming/input")
# Process the stream
processed = input_stream \
.filter(col("action") == "purchase") \
.groupBy("item_id") \
.count()
# Write results
query = processed.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", False) \
.start()
query.awaitTermination()
Scala Implementation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("StructuredStreamingBasics")
.getOrCreate()
import spark.implicits._
// Define schema
val schema = StructType(Seq(
StructField("timestamp", TimestampType, nullable = false),
StructField("user_id", IntegerType, nullable = false),
StructField("action", StringType, nullable = false),
StructField("item_id", IntegerType, nullable = false)
))
// Read streaming data
val inputStream = spark.readStream
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("/data/streaming/input")
// Process the stream
val processed = inputStream
.filter($"action" === "purchase")
.groupBy($"item_id")
.count()
// Write results
val query = processed.writeStream
.outputMode("complete")
.format("console")
.option("truncate", false)
.start()
query.awaitTermination()
Output Modes
Complete Mode
Outputs the entire updated result table. Use for aggregations.
# Complete mode - output entire result table
query = aggregated_df.writeStream \
.outputMode("complete") \
.format("console") \
.start()
Append Mode
Only new rows added to the result table. Default mode.
# Append mode - only new rows
query = filtered_df.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/output") \
.option("checkpointLocation", "/data/checkpoint") \
.start()
Update Mode
Only rows that were updated in the result table.
# Update mode - only updated rows
query = windowed_counts.writeStream \
.outputMode("update") \
.format("memory") \
.queryName("updates") \
.start()
Event Time Processing
Event Time vs Processing Time
from pyspark.sql.functions import window, current_timestamp
# Read stream with event time
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Parse JSON and extract event time
parsed = events.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Use event time for windowing
windowed = parsed \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes", "1 minute"),
"user_id"
) \
.agg(
count("*").alias("event_count"),
sum("amount").alias("total_amount")
)
Watermarking
Watermarks allow the system to track progress in event time and clean up old state.
# Watermarking for late data handling
watermarked = parsed \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"category"
) \
.count()
# Watermark allows dropping state for windows older than:
# max_event_time - watermark_delay
Scala Watermarking
val watermarked = parsed
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"category"
)
.count()
Stateful Operations
mapGroupsWithState
Track custom state for each group.
from pyspark.sql.streaming import GroupState, GroupStateTimeout
# Define state schema
class SessionInfo:
def __init__(self):
self.start_time = None
self.end_time = None
self.event_count = 0
self.events = []
def update_session_state(key, values, state):
"""Update session state for each user"""
if state.exists:
session = state.get()
else:
session = SessionInfo()
for event in values:
session.event_count += 1
session.events.append(event)
if session.start_time is None:
session.start_time = event.timestamp
session.end_time = event.timestamp
# Set timeout
state.update(session)
state.setTimeoutDuration("30 minutes")
return (key, session.event_count, session.start_time, session.end_time)
# Apply stateful operation
sessions = events.groupByKey(lambda x: x.user_id) \
.mapGroupsWithState(
update_session_state,
GroupStateTimeout.ProcessingTimeTimeout
)
Scala mapGroupsWithState
import org.apache.spark.sql.streaming._
case class Event(user_id: Int, timestamp: Long, action: String)
case class SessionInfo(
startTime: Long,
endTime: Long,
eventCount: Int,
events: List[Event]
)
def updateSessionState(
key: Int,
values: Iterator[Event],
state: GroupState[SessionInfo]
): SessionOutput = {
val currentState = if (state.exists) {
state.get
} else {
SessionInfo(0L, 0L, 0, List.empty)
}
val newEvents = values.toList
val updatedState = SessionInfo(
startTime = if (currentState.startTime == 0)
newEvents.head.timestamp else currentState.startTime,
endTime = newEvents.last.timestamp,
eventCount = currentState.eventCount + newEvents.length,
events = currentState.events ++ newEvents
)
state.update(updatedState)
state.setTimeoutDuration("30 minutes")
SessionOutput(key, updatedState.eventCount,
updatedState.startTime, updatedState.endTime)
}
val sessions = events
.groupByKey(_.user_id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(
updateSessionState
)
flatMapGroupsWithState
For more complex stateful processing with multiple outputs per group.
def process_user_session(key, values, state):
"""Process user sessions and emit multiple events"""
results = []
if state.exists:
user_state = state.get()
else:
user_state = {"session_count": 0, "last_activity": None}
for event in values:
user_state["session_count"] += 1
user_state["last_activity"] = event.timestamp
# Emit intermediate results
results.append({
"user_id": key,
"event_type": event.action,
"session_count": user_state["session_count"]
})
state.update(user_state)
return results
sessions = events.groupByKey(lambda x: x.user_id) \
.flatMapGroupsWithState(
process_user_session,
GroupStateTimeout.ProcessingTimeTimeout
)
Window Operations
Tumbling Windows
Fixed-size, non-overlapping windows.
# 10-minute tumbling windows
tumbling = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "10 minutes"),
"category"
) \
.agg(
count("*").alias("count"),
avg("price").alias("avg_price"),
sum("quantity").alias("total_quantity")
)
Sliding Windows
Fixed-size, overlapping windows.
# 10-minute windows sliding every 5 minutes
sliding = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "10 minutes", "5 minutes"),
"product_id"
) \
.agg(
count("*").alias("count"),
max("price").alias("max_price")
)
Session Windows
Dynamic windows based on activity gaps.
from pyspark.sql.functions import session_window
# Session windows with 30-minute gap
session_agg = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
session_window("event_time", "30 minutes"),
"user_id"
) \
.agg(
count("*").alias("session_events"),
sum("duration").alias("session_duration")
)
Scala Window Operations
// Tumbling windows
val tumbling = events
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "10 minutes"),
$"category"
)
.agg(
count("*").as("count"),
avg("price").as("avg_price")
)
// Sliding windows
val sliding = events
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "10 minutes", "5 minutes"),
$"product_id"
)
.count()
// Session windows
val sessions = events
.withWatermark("event_time", "10 minutes")
.groupBy(
session_window($"event_time", "30 minutes"),
$"user_id"
)
.count()
Stream-Stream Joins
Inner Join
# Stream 1: Impressions
impressions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "impressions") \
.load() \
.select(from_json(col("value").cast("string"), impression_schema).alias("data")) \
.select("data.*") \
.withWatermark("impression_time", "10 minutes")
# Stream 2: Clicks
clicks = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clicks") \
.load() \
.select(from_json(col("value").cast("string"), click_schema).alias("data")) \
.select("data.*") \
.withWatermark("click_time", "20 minutes")
# Join streams
joined = impressions.join(
clicks,
expr("""
impression_id = click_impression_id AND
click_time >= impression_time AND
click_time <= impression_time + interval 1 hour
""")
)
Outer Join
# Left outer join
left_join = impressions.join(
clicks,
expr("""
impression_id = click_impression_id AND
click_time >= impression_time AND
click_time <= impression_time + interval 1 hour
"""),
"left_outer"
)
Scala Stream-Stream Join
val impressions = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "impressions")
.load()
.select(from_json($"value".cast("string"), impressionSchema).as("data"))
.select("data.*")
.withWatermark("impression_time", "10 minutes")
val clicks = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "clicks")
.load()
.select(from_json($"value".cast("string"), clickSchema).as("data"))
.select("data.*")
.withWatermark("click_time", "20 minutes")
val joined = impressions.join(
clicks,
expr("""
impression_id = click_impression_id AND
click_time >= impression_time AND
click_time <= impression_time + interval 1 hour
""")
)
Stream-Static Joins
# Static dimension table
products = spark.read \
.format("parquet") \
.load("/data/products")
# Streaming fact data
orders = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load() \
.select(from_json(col("value").cast("string"), order_schema).alias("data")) \
.select("data.*")
# Join stream with static data
enriched = orders.join(products, "product_id")
Fault Tolerance and Checkpointing
Checkpointing
# Configure checkpoint location
query = processed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/output") \
.option("checkpointLocation", "/data/checkpoint") \
.start()
# Checkpoint contains:
# - Metadata (configuration, schema)
# - Offsets (source position)
# - State (for stateful operations)
Recovery from Failures
# Spark automatically recovers from failures using checkpoints
# On restart, it:
# 1. Reads checkpoint metadata
# 2. Recovers last committed offset
# 3. Resumes from that offset
query = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load() \
.writeStream \
.format("parquet") \
.option("path", "/data/output") \
.option("checkpointLocation", "/data/checkpoint") \
.start()
Exactly-Once Semantics
# Idempotent sinks ensure exactly-once
query = processed.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/data/checkpoint") \
.start("/data/delta-table")
# Delta Lake provides ACID transactions
# - Atomic writes
# - Idempotent retries
# - Exactly-once guarantees
Data Sources and Sinks
Kafka Source
# Read from Kafka
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic1,topic2,topic3") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 10000) \
.option("kafka.security.protocol", "SASL_SSL") \
.load()
# Access Kafka metadata
parsed = kafka_stream.selectExpr(
"CAST(key AS STRING)",
"CAST(value AS STRING)",
"topic",
"partition",
"offset",
"timestamp"
)
Kafka Sink
# Write to Kafka
query = processed.selectExpr(
"CAST(user_id AS STRING) AS key",
"to_json(struct(*)) AS value"
).writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output-topic") \
.option("checkpointLocation", "/data/checkpoint") \
.start()
File Sources
# JSON files
json_stream = spark.readStream \
.format("json") \
.schema(schema) \
.option("maxFilesPerTrigger", 10) \
.load("/data/json-input")
# CSV files
csv_stream = spark.readStream \
.format("csv") \
.schema(schema) \
.option("header", True) \
.option("maxFilesPerTrigger", 5) \
.load("/data/csv-input")
# Parquet files
parquet_stream = spark.readStream \
.format("parquet") \
.schema(schema) \
.load("/data/parquet-input")
File Sinks
# Parquet output with partitioning
query = processed.writeStream \
.format("parquet") \
.option("path", "/data/output") \
.option("checkpointLocation", "/data/checkpoint") \
.partitionBy("date", "hour") \
.start()
# Delta output
delta_query = processed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/data/checkpoint") \
.start("/data/delta-output")
Memory Sink (Testing)
# Write to memory for testing
memory_query = processed.writeStream \
.format("memory") \
.queryName("test_results") \
.outputMode("complete") \
.start()
# Query the in-memory table
spark.sql("SELECT * FROM test_results").show()
Trigger Modes
Processing Time Trigger
# Trigger every 10 seconds
query = processed.writeStream \
.trigger(processingTime='10 seconds') \
.format("console") \
.start()
Once Trigger
# Process all available data once and stop
query = processed.writeStream \
.trigger(once=True) \
.format("parquet") \
.option("path", "/data/output") \
.option("checkpointLocation", "/data/checkpoint") \
.start()
Continuous Processing
# Continuous processing with low latency (~1ms)
query = processed.writeStream \
.trigger(continuous='1 second') \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output") \
.option("checkpointLocation", "/data/checkpoint") \
.start()
Available Now Trigger
# Process all available data in multiple batches
query = processed.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("checkpointLocation", "/data/checkpoint") \
.start("/data/output")
Monitoring and Management
Query Management
# Start query
query = df.writeStream \
.format("console") \
.start()
# Get query information
print(f"Query ID: {query.id}")
print(f"Run ID: {query.runId}")
print(f"Name: {query.name}")
print(f"Status: {query.status}")
# Wait for termination
query.awaitTermination()
# Stop query
query.stop()
# Check if query is active
if query.isActive:
print("Query is running")
Multiple Queries
# Manage multiple queries
queries = []
query1 = df1.writeStream.format("console").start()
query2 = df2.writeStream.format("memory").queryName("table2").start()
query3 = df3.writeStream.format("kafka").option("topic", "output").start()
queries = [query1, query2, query3]
# Wait for all queries
for q in queries:
q.awaitTermination()
# Or wait for any to terminate
spark.streams.awaitAnyTermination()
Progress Monitoring
# Get last progress
progress = query.lastProgress
if progress:
print(f"Batch: {progress['batchId']}")
print(f"Input rows: {progress['numInputRows']}")
print(f"Processing rate: {progress['processedRowsPerSecond']}")
print(f"Duration: {progress['durationMs']}")
# Recent progress
for progress in query.recentProgress:
print(progress)
# Scala progress monitoring
val progress = query.lastProgress
println(s"Batch: ${progress.batchId}")
println(s"Input rows: ${progress.numInputRows}")
DStreams (Legacy API)
Basic DStream Operations
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
// Create DStream from socket
val lines = ssc.socketTextStream("localhost", 9999)
// Transformations
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Output operation
wordCounts.print()
// Start the streaming context
ssc.start()
ssc.awaitTermination()
Window Operations in DStreams
// Window operations
val windowedCounts = pairs
.reduceByKeyAndWindow(
_ + _, // Reduce function
_ - _, // Inverse reduce function
Seconds(60), // Window duration
Seconds(10) // Slide duration
)
windowedCounts.print()
Real-World Use Cases
Real-Time ETL Pipeline
from pyspark.sql.functions import *
# Read from multiple sources
raw_events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "raw-events") \
.load()
# Parse and validate
parsed = raw_events \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*") \
.filter(col("user_id").isNotNull()) \
.filter(col("timestamp").isNotNull())
# Enrich with dimension data
users = spark.read.parquet("/data/dimensions/users")
products = spark.read.parquet("/data/dimensions/products")
enriched = parsed \
.join(users, "user_id") \
.join(products, "product_id")
# Aggregate metrics
metrics = enriched \
.withWatermark("timestamp", "15 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
"product_category",
"user_segment"
) \
.agg(
count("*").alias("event_count"),
sum("amount").alias("total_revenue"),
countDistinct("user_id").alias("unique_users")
)
# Write to multiple sinks
# 1. Real-time dashboard (Kafka)
metrics.selectExpr("to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "dashboard-metrics") \
.option("checkpointLocation", "/checkpoints/dashboard") \
.start()
# 2. Historical storage (Delta)
metrics.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/delta") \
.start("/data/metrics")
Fraud Detection System
# Real-time fraud detection
transactions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.load() \
.select(from_json(col("value").cast("string"), txn_schema).alias("data")) \
.select("data.*")
# Calculate real-time features
features = transactions \
.withWatermark("transaction_time", "10 minutes") \
.groupBy(
window("transaction_time", "10 minutes", "1 minute"),
"user_id"
) \
.agg(
count("*").alias("txn_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
stddev("amount").alias("stddev_amount"),
countDistinct("merchant_id").alias("unique_merchants"),
countDistinct("location").alias("unique_locations")
)
# Apply fraud rules
suspicious = features.filter(
(col("txn_count") > 10) |
(col("total_amount") > 10000) |
(col("unique_locations") > 5)
)
# Alert on suspicious activity
suspicious.writeStream \
.foreachBatch(lambda df, epoch_id: alert_fraud(df)) \
.option("checkpointLocation", "/checkpoints/fraud") \
.start()
Click-Through Rate (CTR) Calculation
# Real-time CTR computation
impressions = spark.readStream \
.format("kafka") \
.option("subscribe", "impressions") \
.load() \
.select(from_json(col("value").cast("string"), imp_schema).alias("data")) \
.select("data.*") \
.withWatermark("timestamp", "1 hour")
clicks = spark.readStream \
.format("kafka") \
.option("subscribe", "clicks") \
.load() \
.select(from_json(col("value").cast("string"), click_schema).alias("data")) \
.select("data.*") \
.withWatermark("timestamp", "1 hour")
# Join and calculate CTR
ctr = impressions \
.join(
clicks,
expr("""
impression_id = click_impression_id AND
clicks.timestamp >= impressions.timestamp AND
clicks.timestamp <= impressions.timestamp + interval 1 hour
"""),
"left_outer"
) \
.groupBy(
window("impressions.timestamp", "5 minutes"),
"campaign_id"
) \
.agg(
count("impression_id").alias("impressions"),
count("click_impression_id").alias("clicks")
) \
.withColumn("ctr", col("clicks") / col("impressions") * 100)
# Write results
ctr.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/ctr") \
.start("/data/ctr")
Optimize Trigger Intervals
# Balance latency vs throughput
# High throughput: Longer intervals
query_batch = df.writeStream \
.trigger(processingTime='30 seconds') \
.format("delta") \
.start()
# Low latency: Shorter intervals or continuous
query_realtime = df.writeStream \
.trigger(continuous='1 second') \
.format("kafka") \
.start()
Optimize Watermarks
# Too short: High memory usage, many state entries
# Too long: High latency for results
# Balance based on data arrival patterns
# Conservative watermark (more late data)
df.withWatermark("event_time", "1 hour")
# Aggressive watermark (less late data)
df.withWatermark("event_time", "5 minutes")
State Management
# Configure state store
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
# RocksDB for large state
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# State cleanup
df.withWatermark("event_time", "1 hour") \
.dropDuplicates(["event_id"], "event_time")
Partition Tuning
# Repartition for better parallelism
repartitioned = df.repartition(100, "user_id")
# Coalesce to reduce partitions
coalesced = df.coalesce(10)
Common Pitfalls and Solutions
Issue 1: Late Data Handling
Problem: Data arriving after watermark causes dropped events.
Solution:
# Configure appropriate watermark
df.withWatermark("event_time", "1 hour") # Adjust based on data
# Monitor late data
query.lastProgress["sources"][0]["metrics"]["numRowsDroppedByWatermark"]
Issue 2: State Explosion
Problem: Unbounded state growth causes OOM errors.
Solution:
# Use watermarks to bound state
df.withWatermark("event_time", "24 hours") \
.groupBy(window("event_time", "1 hour"), "key") \
.count()
# Use flatMapGroupsWithState with timeouts
def stateful_func(key, values, state):
state.setTimeoutDuration("1 hour")
# Process and cleanup old state
Issue 3: Checkpoint Compatibility
Problem: Schema changes break checkpoint recovery.
Solution:
# Use a new checkpoint location for schema changes
# Or implement schema evolution in Delta Lake
query.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", "/new/checkpoint/location") \
.start()
Issue 4: Memory Pressure
Problem: Large batches cause executor memory issues.
Solution:
# Limit batch size
spark.readStream \
.option("maxOffsetsPerTrigger", 10000) \
.format("kafka") \
.load()
# Increase executor memory
spark.conf.set("spark.executor.memory", "8g")
Hands-On Exercises
Exercise 1: Real-Time Word Count
Create a streaming word count application that processes text data.
# TODO: Implement streaming word count
# 1. Read from socket source
# 2. Split into words
# 3. Count words in 1-minute windows
# 4. Write to console
# Your code here
Exercise 2: Session Analysis
Implement user session tracking with timeout.
# TODO: Track user sessions
# 1. Read user events from Kafka
# 2. Use mapGroupsWithState for session tracking
# 3. Timeout sessions after 30 minutes of inactivity
# 4. Calculate session metrics
# Your code here
Exercise 3: Join Streams
Implement stream-stream join for ad analytics.
# TODO: Join impressions and clicks
# 1. Read both streams from Kafka
# 2. Apply appropriate watermarks
# 3. Join on impression_id with time bounds
# 4. Calculate click-through rates
# Your code here
Summary
Structured Streaming provides a powerful, fault-tolerant framework for real-time data processing:
- Unified API: Use DataFrame/Dataset API for batch and streaming
- Event Time: Process data based on event timestamps
- Exactly-Once: Achieve exactly-once semantics with checkpointing
- Stateful Operations: Track complex state across events
- Stream Joins: Combine multiple streams with time bounds
- Scalability: Handle high-throughput workloads
Key Takeaways
- Use watermarks appropriately to balance latency and completeness
- Choose output modes based on your use case
- Monitor state size and use timeouts to prevent memory issues
- Configure checkpoints for fault tolerance
- Optimize trigger intervals for your latency requirements
Next Steps
- Practice with real streaming data sources
- Implement complex stateful operations
- Explore integration with Delta Lake
- Study production deployment patterns
- Learn advanced monitoring and troubleshooting
Continue to the next module to explore MLlib for distributed machine learning at scale.