Performance Tuning
Module Duration: 4-5 hours
Focus: Memory management, configuration, and optimization
Prerequisites: Understanding of Spark architecture and RDDs/DataFrames
Overview
Performance tuning is critical for running Spark applications efficiently in production. This module covers memory management, executor configuration, shuffle optimization, and advanced tuning techniques to maximize throughput and minimize costs.
Memory Management: Properly configure executor and driver memory.
Parallelism: Optimize the number of partitions and tasks.
Data Serialization: Choose efficient serialization formats.
Shuffle Operations: Minimize and optimize data shuffling.
Caching Strategy: Cache data intelligently to improve performance.
Memory Management
Understanding Spark Memory Model
Spark divides executor memory into several regions:
- Execution Memory: For shuffles, joins, sorts, and aggregations
- Storage Memory: For caching and broadcasting
- User Memory: For user data structures and internal metadata
- Reserved Memory: Fixed overhead for Spark internals (300MB)
# Memory calculation formula
# Usable Memory = (Executor Memory - Reserved Memory) * (1 - spark.memory.fraction)
# Reserved = 300MB
# spark.memory.fraction = 0.6 (default)
# spark.memory.storageFraction = 0.5 (default)
# Example: 4GB executor
# Usable = (4096MB - 300MB) * 0.6 = 2277MB
# Storage = 2277MB * 0.5 = 1138MB
# Execution = 1138MB
Executor Memory Configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.memoryOverhead", "2g") \
.config("spark.driver.memory", "4g") \
.config("spark.driver.memoryOverhead", "1g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.getOrCreate()
# Total executor memory = executor.memory + executor.memoryOverhead
# 8GB + 2GB = 10GB total per executor
Memory Configuration Best Practices
# Submit with optimized memory settings
spark-submit \
--executor-memory 8g \
--executor-memoryOverhead 2g \
--driver-memory 4g \
--driver-memoryOverhead 1g \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.5 \
application.py
Monitoring Memory Usage
# Check executor memory metrics
from pyspark import SparkContext
sc = spark.sparkContext
# Get memory status
status = sc._jsc.sc().getExecutorMemoryStatus()
print(f"Executor Memory Status: {status}")
# Monitor storage memory
storage_status = sc._jsc.sc().getRDDStorageInfo()
for rdd in storage_status:
print(f"RDD {rdd.id()}: {rdd.memSize()} bytes")
Handling Memory Issues
# Issue: OutOfMemoryError during execution
# Solution 1: Increase executor memory
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")
# Solution 2: Increase partitions to reduce per-task memory
df = df.repartition(1000)
# Solution 3: Disable broadcast joins for large tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
# Solution 4: Use external shuffle service
spark.conf.set("spark.shuffle.service.enabled", "true")
# Solution 5: Spill to disk for large shuffles
spark.conf.set("spark.shuffle.spill", "true")
Executor Configuration
Optimal Executor Sizing
# Formula for executor configuration:
# Total Cores = num_executors * executor_cores
# Total Memory = num_executors * (executor_memory + memoryOverhead)
# Example cluster: 10 nodes, 16 cores, 64GB RAM each
# Total: 160 cores, 640GB RAM
# Conservative approach:
# - Leave 1 core and ~7GB per node for OS/Hadoop
# - executor_cores = 5 (sweet spot for HDFS throughput)
# - executor_memory = ~11GB
# Calculation:
# Cores per node after overhead: 16 - 1 = 15
# Executors per node: 15 / 5 = 3
# Total executors: 10 * 3 = 30
# Memory per executor: (64GB - 7GB) / 3 = 19GB
# Split: 15GB executor.memory + 4GB overhead
spark = SparkSession.builder \
.appName("OptimalConfig") \
.config("spark.executor.instances", "30") \
.config("spark.executor.cores", "5") \
.config("spark.executor.memory", "15g") \
.config("spark.executor.memoryOverhead", "4g") \
.getOrCreate()
Dynamic Allocation
# Enable dynamic allocation for variable workloads
spark = SparkSession.builder \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
.config("spark.dynamicAllocation.initialExecutors", "10") \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.config("spark.shuffle.service.enabled", "true") \
.getOrCreate()
Core Configuration
# Configure cores per executor
spark.conf.set("spark.executor.cores", "5")
# Set default parallelism
spark.conf.set("spark.default.parallelism", "200")
# SQL shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
Partitioning Strategies
Understanding Partitions
# Check current partitions
df = spark.read.parquet("/data/large_dataset")
print(f"Number of partitions: {df.rdd.getNumPartitions()}")
# Optimal partition size: 128MB - 1GB per partition
# Calculate optimal partitions
data_size_gb = 100 # GB
partition_size_gb = 0.5 # 500MB
optimal_partitions = int(data_size_gb / partition_size_gb)
print(f"Optimal partitions: {optimal_partitions}")
Repartition vs Coalesce
# Repartition: Full shuffle, can increase or decrease partitions
df_repartitioned = df.repartition(200)
# Coalesce: No shuffle, only decreases partitions
df_coalesced = df.coalesce(50)
# Repartition by column for better data locality
df_partitioned = df.repartition(100, "user_id")
# Multiple columns
df_partitioned = df.repartition(100, "date", "region")
Partition Tuning Examples
# Example 1: Wide transformations need more partitions
# Initial data: 1000 partitions
transactions = spark.read.parquet("/data/transactions")
# After filter: still 1000 partitions but less data
filtered = transactions.filter(col("amount") > 1000)
# Coalesce to reduce partitions and improve performance
optimized = filtered.coalesce(200)
# Example 2: Partition before expensive operations
df = df.repartition(500, "customer_id") \
.groupBy("customer_id") \
.agg(
sum("amount").alias("total"),
count("*").alias("count")
)
Custom Partitioning
from pyspark.sql.functions import spark_partition_id
# Check partition distribution
df.withColumn("partition_id", spark_partition_id()) \
.groupBy("partition_id") \
.count() \
.orderBy("partition_id") \
.show()
# Repartition by hash
df_hash = df.repartition(100, hash("user_id"))
# Range partitioning
df_range = df.repartitionByRange(100, "timestamp")
Shuffle Optimization
Understanding Shuffle Operations
# Operations that cause shuffles:
# - groupBy, reduceByKey, aggregateByKey
# - join (except broadcast joins)
# - repartition, coalesce (with shuffle)
# - distinct, intersection
# - sort, orderBy
# Monitor shuffles
spark.conf.set("spark.sql.shuffle.partitions", "200")
Reducing Shuffle
# Problem: Multiple shuffles
df1 = spark.read.parquet("/data/users")
df2 = spark.read.parquet("/data/transactions")
# Bad: Multiple shuffles
result = df1.groupBy("region").count() \
.join(df2.groupBy("region").sum("amount"), "region")
# Better: Combine operations
from pyspark.sql.functions import countDistinct, sum
result = df1.join(df2, df1.user_id == df2.user_id) \
.groupBy("region") \
.agg(
countDistinct(df1.user_id).alias("user_count"),
sum("amount").alias("total_amount")
)
Shuffle Configuration
# Optimize shuffle behavior
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.shuffle.file.buffer", "64k")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
# Use external shuffle service
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.shuffle.service.port", "7337")
Broadcast Joins
from pyspark.sql.functions import broadcast
# Small table broadcasting
small_df = spark.read.parquet("/data/dimension") # < 10MB
large_df = spark.read.parquet("/data/facts") # > 10GB
# Broadcast small table to avoid shuffle
result = large_df.join(broadcast(small_df), "key")
# Configure broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") # 10MB
# Disable auto broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Sort Merge Join Optimization
# Optimize sort merge joins
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.sortMergeJoin.preferSortMerge", "true")
# Pre-partition data by join key
df1 = df1.repartition(200, "join_key").sortWithinPartitions("join_key")
df2 = df2.repartition(200, "join_key").sortWithinPartitions("join_key")
result = df1.join(df2, "join_key")
Caching Strategies
When to Cache
# Cache when:
# 1. Data is accessed multiple times
# 2. Dataset is small enough to fit in memory
# 3. Computing the dataset is expensive
# Example: Iterative algorithm
df = spark.read.parquet("/data/large_dataset")
# Cache before multiple actions
df_cached = df.filter(col("active") == True).cache()
# Multiple operations on cached data
count1 = df_cached.count()
summary = df_cached.describe().show()
sample = df_cached.sample(0.1).show()
# Unpersist when done
df_cached.unpersist()
Storage Levels
from pyspark import StorageLevel
# Different storage levels
df.persist(StorageLevel.MEMORY_ONLY) # Default, deserialized
df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized, more space efficient
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if needed
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Serialized with disk spill
df.persist(StorageLevel.DISK_ONLY) # Only on disk
df.persist(StorageLevel.MEMORY_ONLY_2) # Replicated twice
df.persist(StorageLevel.OFF_HEAP) # Off-heap memory
# Example usage
large_df = spark.read.parquet("/data/large")
# Use serialized storage for large datasets
large_df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# Process
result = large_df.groupBy("category").count()
result.show()
# Clean up
large_df.unpersist()
Caching Best Practices
# 1. Cache after expensive operations
df = spark.read.parquet("/data/raw") \
.filter(col("year") == 2024) \
.join(dimension_table, "id") \
.withColumn("processed", complex_udf("value")) \
.cache() # Cache after expensive transformations
# 2. Cache DataFrames, not RDDs (when possible)
# DataFrames use columnar format, more memory efficient
# 3. Monitor cache usage
spark.catalog.clearCache() # Clear all cached tables
# 4. Selective caching
subset = df.select("id", "value", "timestamp").cache()
# 5. Cache at the right granularity
# Too early: Waste memory on unnecessary data
# Too late: Recompute expensive operations
Checkpoint vs Cache
# Cache: Stores in memory/disk, lineage retained
df_cached = df.cache()
# Checkpoint: Saves to reliable storage, breaks lineage
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df_checkpointed = df.checkpoint()
# Use checkpoint for:
# - Very long lineages (avoid stack overflow)
# - Iterative algorithms with many iterations
# - Recovery from failures in long-running jobs
Serialization
Kryo Serialization
# Enable Kryo serialization for better performance
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.config("spark.kryoserializer.buffer.max", "512m") \
.getOrCreate()
# Register custom classes
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
Scala Kryo Registration
import org.apache.spark.serializer.KryoSerializer
import com.esotericsoftware.kryo.Kryo
class MyKryoRegistrator extends org.apache.spark.serializer.KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[MyClass])
kryo.register(classOf[AnotherClass])
}
}
val spark = SparkSession.builder()
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[MyKryoRegistrator].getName)
.getOrCreate()
# Use efficient file formats
# Parquet: Columnar, compressed, predicate pushdown
df.write.parquet("/data/output")
# ORC: Similar to Parquet, better for Hive
df.write.orc("/data/output")
# Avro: Row-based, schema evolution
df.write.format("avro").save("/data/output")
# Comparison
# CSV: Slow, no schema, human-readable
# JSON: Slow, schema inference, nested structures
# Parquet: Fast, columnar, best for analytics
# ORC: Fast, columnar, good compression
# Avro: Fast, row-based, schema evolution
Query Optimization
Catalyst Optimizer
# View query plan
df = spark.read.parquet("/data/transactions")
filtered = df.filter(col("amount") > 1000)
aggregated = filtered.groupBy("category").sum("amount")
# Logical plan
aggregated.explain(True)
# Physical plan
aggregated.explain()
# Cost-based optimization
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
Predicate Pushdown
# Good: Predicate pushed to data source
df = spark.read.parquet("/data/large") \
.filter(col("year") == 2024) \
.filter(col("amount") > 1000)
# Only reads data for year 2024
# Column pruning
result = df.select("id", "amount", "timestamp")
# Only reads selected columns from Parquet
# Partition pruning
partitioned_df = spark.read.parquet("/data/partitioned")
# Path: /data/partitioned/year=2024/month=01/
filtered = partitioned_df.filter(col("year") == 2024)
# Only scans year=2024 partitions
Join Optimization
# 1. Broadcast hash join (small table < 10MB)
result = large_df.join(broadcast(small_df), "key")
# 2. Sort merge join (both tables large)
# Pre-sort and partition by join key
df1_sorted = df1.repartition("join_key").sortWithinPartitions("join_key")
df2_sorted = df2.repartition("join_key").sortWithinPartitions("join_key")
result = df1_sorted.join(df2_sorted, "join_key")
# 3. Bucket join (pre-bucketed tables)
df1.write.bucketBy(100, "join_key").saveAsTable("table1")
df2.write.bucketBy(100, "join_key").saveAsTable("table2")
result = spark.table("table1").join(spark.table("table2"), "join_key")
# 4. Skewed join handling
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
Adaptive Query Execution (AQE)
# Enable AQE for dynamic optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "200")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
# AQE automatically:
# - Coalesces post-shuffle partitions
# - Converts sort-merge join to broadcast join
# - Optimizes skewed joins
Spark UI
# Access Spark UI at: http://driver-node:4040
# Key metrics to monitor:
# - Jobs: Number and duration
# - Stages: Task distribution and timing
# - Storage: Cached RDDs/DataFrames
# - Executors: Resource usage
# - SQL: Query plans and execution
Programmatic Monitoring
from pyspark.sql.functions import col, count, sum
import time
# Measure execution time
start_time = time.time()
df = spark.read.parquet("/data/large")
result = df.groupBy("category").agg(
count("*").alias("count"),
sum("amount").alias("total")
).cache()
result.show()
end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")
# Get metrics
print(f"Number of partitions: {result.rdd.getNumPartitions()}")
# Executor metrics
executor_metrics = spark.sparkContext._jsc.sc().getExecutorMemoryStatus()
print(f"Executor memory status: {executor_metrics}")
Logging and Debugging
# Set log level
spark.sparkContext.setLogLevel("WARN") # ERROR, WARN, INFO, DEBUG
# Custom logging
import logging
logger = logging.getLogger("MySparkApp")
logger.setLevel(logging.INFO)
# Log partitioning info
logger.info(f"Input partitions: {df.rdd.getNumPartitions()}")
logger.info(f"After processing: {result.rdd.getNumPartitions()}")
# Profile application
spark.conf.set("spark.python.profile", "true")
Advanced Configuration
Compression
# Configure compression
spark.conf.set("spark.sql.parquet.compression.codec", "snappy") # snappy, gzip, lzo, uncompressed
spark.conf.set("spark.sql.orc.compression.codec", "zlib")
spark.conf.set("spark.io.compression.codec", "lz4") # For shuffle
spark.conf.set("spark.rdd.compress", "true")
# Trade-offs:
# - snappy: Fast compression/decompression, moderate compression
# - gzip: Slower, better compression ratio
# - lz4: Fastest, good for shuffle
# - uncompressed: No CPU overhead, more I/O
Speculation
# Enable speculative execution for stragglers
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.interval", "100ms")
spark.conf.set("spark.speculation.multiplier", "1.5")
spark.conf.set("spark.speculation.quantile", "0.75")
Garbage Collection
# GC tuning for executors
spark = SparkSession.builder \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12") \
.getOrCreate()
# Monitor GC
spark.conf.set("spark.executor.extraJavaOptions",
"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
Real-World Optimization Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import StorageLevel
# Create optimized Spark session
spark = SparkSession.builder \
.appName("OptimizedETL") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "5") \
.config("spark.executor.memoryOverhead", "2g") \
.config("spark.default.parallelism", "200") \
.getOrCreate()
# Read with optimal partitioning
raw_data = spark.read.parquet("/data/transactions") \
.repartition(200, "user_id")
# Broadcast small dimension table
users = spark.read.parquet("/data/users")
users_broadcast = broadcast(users)
# Optimized join
enriched = raw_data.join(users_broadcast, "user_id")
# Cache before multiple operations
enriched_cached = enriched.select(
"user_id", "transaction_id", "amount",
"category", "timestamp", "user_segment"
).persist(StorageLevel.MEMORY_AND_DISK_SER)
# First aggregation
daily_stats = enriched_cached.groupBy(
window("timestamp", "1 day"),
"user_segment"
).agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount")
).coalesce(50)
# Save with compression
daily_stats.write \
.mode("overwrite") \
.parquet("/data/output/daily_stats")
# Second aggregation reuses cache
category_stats = enriched_cached.groupBy("category").agg(
count("*").alias("count"),
sum("amount").alias("total")
).coalesce(10)
category_stats.write \
.mode("overwrite") \
.parquet("/data/output/category_stats")
# Clean up
enriched_cached.unpersist()
# Stop session
spark.stop()
Pre-Production Checklist
-
Resource Allocation
-
Partitioning
-
Shuffle Optimization
-
Caching
-
Serialization
-
Query Optimization
Anti-Pattern 1: Too Many Small Files
# Bad: Creates many small files
df.write.mode("append").parquet("/data/output")
# Results in: 1000s of tiny files
# Good: Coalesce before writing
df.coalesce(10).write.mode("append").parquet("/data/output")
Anti-Pattern 2: Unnecessary Shuffles
# Bad: Multiple shuffles
df.groupBy("key").count() \
.join(df.groupBy("key").sum("value"), "key")
# Good: Single shuffle
df.groupBy("key").agg(
count("*").alias("count"),
sum("value").alias("sum")
)
Anti-Pattern 3: Data Skew
# Problem: Skewed data causes stragglers
# Key "A" has 90% of data
# Solution 1: Salt keys
from pyspark.sql.functions import rand, concat, lit
salted = df.withColumn("salted_key",
concat(col("key"), lit("_"), (rand() * 10).cast("int"))
)
# Solution 2: Adaptive query execution
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Solution 3: Split and union
common_keys = df.groupBy("key").count().filter(col("count") > 1000000)
skewed = df.join(broadcast(common_keys), "key")
normal = df.join(broadcast(common_keys), "key", "left_anti")
# Process separately and union
result = process_skewed(skewed).union(process_normal(normal))
Anti-Pattern 4: Collecting Large Data
# Bad: Brings all data to driver
all_data = df.collect() # OOM on driver
# Good: Process in distributed manner
df.write.parquet("/output") # Distributed write
# Or sample if analysis needed
sample = df.sample(0.01).collect()
Hands-On Exercises
Exercise 1: Memory Tuning
# TODO: Optimize memory configuration
# 1. Calculate optimal executor memory
# 2. Configure memory overhead
# 3. Set storage fraction
# 4. Monitor memory usage
# Your code here
Exercise 2: Partition Optimization
# TODO: Optimize partitioning strategy
# 1. Analyze current partition distribution
# 2. Determine optimal partition count
# 3. Repartition by appropriate keys
# 4. Measure performance improvement
# Your code here
Exercise 3: Shuffle Reduction
# TODO: Minimize shuffle operations
# 1. Identify shuffle operations in query plan
# 2. Use broadcast joins where appropriate
# 3. Combine aggregations
# 4. Compare execution times
# Your code here
Summary
Performance tuning is essential for production Spark applications:
- Memory Management: Configure executor and driver memory appropriately
- Partitioning: Optimize partition size and distribution
- Shuffle: Minimize and optimize shuffle operations
- Caching: Cache intelligently with appropriate storage levels
- Serialization: Use Kryo and efficient file formats
- Monitoring: Continuously monitor and adjust configurations
Key Takeaways
- Profile before optimizing - measure, don’t guess
- Start with executor sizing and partitioning
- Minimize shuffles through query optimization
- Cache strategically for reused datasets
- Use adaptive query execution for dynamic optimization
- Monitor Spark UI for bottlenecks
- Test configurations with production-like data
Continue to the next module to learn about cluster deployment and operations.