> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Performance Tuning

> Optimize Spark applications for production

# Performance Tuning

<Info>
  **Module Duration**: 4-5 hours
  **Focus**: Memory management, configuration, and optimization
  **Prerequisites**: Understanding of Spark architecture and RDDs/DataFrames
</Info>

## Overview

Performance tuning is critical for running Spark applications efficiently in production. This module covers memory management, executor configuration, shuffle optimization, and advanced tuning techniques to maximize throughput and minimize costs.

### Key Performance Factors

**Memory Management**: Properly configure executor and driver memory.

**Parallelism**: Optimize the number of partitions and tasks.

**Data Serialization**: Choose efficient serialization formats.

**Shuffle Operations**: Minimize and optimize data shuffling.

**Caching Strategy**: Cache data intelligently to improve performance.

## Memory Management

### Understanding Spark Memory Model

Spark divides executor memory into several regions:

1. **Execution Memory**: For shuffles, joins, sorts, and aggregations
2. **Storage Memory**: For caching and broadcasting
3. **User Memory**: For user data structures and internal metadata
4. **Reserved Memory**: Fixed overhead for Spark internals (300MB)

```python theme={null}
# Memory calculation formula
# Usable Memory = (Executor Memory - Reserved Memory) * (1 - spark.memory.fraction)
# Reserved = 300MB
# spark.memory.fraction = 0.6 (default)
# spark.memory.storageFraction = 0.5 (default)

# Example: 4GB executor
# Usable = (4096MB - 300MB) * 0.6 = 2277MB
# Storage = 2277MB * 0.5 = 1138MB
# Execution = 1138MB
```

### Executor Memory Configuration

```python theme={null}
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MemoryTuning") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.memoryOverhead", "1g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# Total executor memory = executor.memory + executor.memoryOverhead
# 8GB + 2GB = 10GB total per executor
```

### Memory Configuration Best Practices

```bash theme={null}
# Submit with optimized memory settings
spark-submit \
  --executor-memory 8g \
  --executor-memoryOverhead 2g \
  --driver-memory 4g \
  --driver-memoryOverhead 1g \
  --conf spark.memory.fraction=0.6 \
  --conf spark.memory.storageFraction=0.5 \
  application.py
```

### Monitoring Memory Usage

```python theme={null}
# Check executor memory metrics
from pyspark import SparkContext

sc = spark.sparkContext

# Get memory status
status = sc._jsc.sc().getExecutorMemoryStatus()
print(f"Executor Memory Status: {status}")

# Monitor storage memory
storage_status = sc._jsc.sc().getRDDStorageInfo()
for rdd in storage_status:
    print(f"RDD {rdd.id()}: {rdd.memSize()} bytes")
```

### Handling Memory Issues

```python theme={null}
# Issue: OutOfMemoryError during execution

# Solution 1: Increase executor memory
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")

# Solution 2: Increase partitions to reduce per-task memory
df = df.repartition(1000)

# Solution 3: Disable broadcast joins for large tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Solution 4: Use external shuffle service
spark.conf.set("spark.shuffle.service.enabled", "true")

# Solution 5: Spill to disk for large shuffles
spark.conf.set("spark.shuffle.spill", "true")
```

## Executor Configuration

### Optimal Executor Sizing

```python theme={null}
# Formula for executor configuration:
# Total Cores = num_executors * executor_cores
# Total Memory = num_executors * (executor_memory + memoryOverhead)

# Example cluster: 10 nodes, 16 cores, 64GB RAM each
# Total: 160 cores, 640GB RAM

# Conservative approach:
# - Leave 1 core and ~7GB per node for OS/Hadoop
# - executor_cores = 5 (sweet spot for HDFS throughput)
# - executor_memory = ~11GB

# Calculation:
# Cores per node after overhead: 16 - 1 = 15
# Executors per node: 15 / 5 = 3
# Total executors: 10 * 3 = 30
# Memory per executor: (64GB - 7GB) / 3 = 19GB
# Split: 15GB executor.memory + 4GB overhead

spark = SparkSession.builder \
    .appName("OptimalConfig") \
    .config("spark.executor.instances", "30") \
    .config("spark.executor.cores", "5") \
    .config("spark.executor.memory", "15g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .getOrCreate()
```

### Dynamic Allocation

```python theme={null}
# Enable dynamic allocation for variable workloads
spark = SparkSession.builder \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    .config("spark.dynamicAllocation.initialExecutors", "10") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
    .config("spark.shuffle.service.enabled", "true") \
    .getOrCreate()
```

### Core Configuration

```python theme={null}
# Configure cores per executor
spark.conf.set("spark.executor.cores", "5")

# Set default parallelism
spark.conf.set("spark.default.parallelism", "200")

# SQL shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
```

## Partitioning Strategies

### Understanding Partitions

```python theme={null}
# Check current partitions
df = spark.read.parquet("/data/large_dataset")
print(f"Number of partitions: {df.rdd.getNumPartitions()}")

# Optimal partition size: 128MB - 1GB per partition
# Calculate optimal partitions
data_size_gb = 100  # GB
partition_size_gb = 0.5  # 500MB
optimal_partitions = int(data_size_gb / partition_size_gb)
print(f"Optimal partitions: {optimal_partitions}")
```

### Repartition vs Coalesce

```python theme={null}
# Repartition: Full shuffle, can increase or decrease partitions
df_repartitioned = df.repartition(200)

# Coalesce: No shuffle, only decreases partitions
df_coalesced = df.coalesce(50)

# Repartition by column for better data locality
df_partitioned = df.repartition(100, "user_id")

# Multiple columns
df_partitioned = df.repartition(100, "date", "region")
```

### Partition Tuning Examples

```python theme={null}
# Example 1: Wide transformations need more partitions
# Initial data: 1000 partitions
transactions = spark.read.parquet("/data/transactions")

# After filter: still 1000 partitions but less data
filtered = transactions.filter(col("amount") > 1000)

# Coalesce to reduce partitions and improve performance
optimized = filtered.coalesce(200)

# Example 2: Partition before expensive operations
df = df.repartition(500, "customer_id") \
    .groupBy("customer_id") \
    .agg(
        sum("amount").alias("total"),
        count("*").alias("count")
    )
```

### Custom Partitioning

```python theme={null}
from pyspark.sql.functions import spark_partition_id

# Check partition distribution
df.withColumn("partition_id", spark_partition_id()) \
    .groupBy("partition_id") \
    .count() \
    .orderBy("partition_id") \
    .show()

# Repartition by hash
df_hash = df.repartition(100, hash("user_id"))

# Range partitioning
df_range = df.repartitionByRange(100, "timestamp")
```

## Shuffle Optimization

### Understanding Shuffle Operations

```python theme={null}
# Operations that cause shuffles:
# - groupBy, reduceByKey, aggregateByKey
# - join (except broadcast joins)
# - repartition, coalesce (with shuffle)
# - distinct, intersection
# - sort, orderBy

# Monitor shuffles
spark.conf.set("spark.sql.shuffle.partitions", "200")
```

### Reducing Shuffle

```python theme={null}
# Problem: Multiple shuffles
df1 = spark.read.parquet("/data/users")
df2 = spark.read.parquet("/data/transactions")

# Bad: Multiple shuffles
result = df1.groupBy("region").count() \
    .join(df2.groupBy("region").sum("amount"), "region")

# Better: Combine operations
from pyspark.sql.functions import countDistinct, sum

result = df1.join(df2, df1.user_id == df2.user_id) \
    .groupBy("region") \
    .agg(
        countDistinct(df1.user_id).alias("user_count"),
        sum("amount").alias("total_amount")
    )
```

### Shuffle Configuration

```python theme={null}
# Optimize shuffle behavior
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.shuffle.file.buffer", "64k")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")

# Use external shuffle service
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.shuffle.service.port", "7337")
```

### Broadcast Joins

```python theme={null}
from pyspark.sql.functions import broadcast

# Small table broadcasting
small_df = spark.read.parquet("/data/dimension")  # < 10MB
large_df = spark.read.parquet("/data/facts")  # > 10GB

# Broadcast small table to avoid shuffle
result = large_df.join(broadcast(small_df), "key")

# Configure broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")  # 10MB

# Disable auto broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
```

### Sort Merge Join Optimization

```python theme={null}
# Optimize sort merge joins
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.sortMergeJoin.preferSortMerge", "true")

# Pre-partition data by join key
df1 = df1.repartition(200, "join_key").sortWithinPartitions("join_key")
df2 = df2.repartition(200, "join_key").sortWithinPartitions("join_key")

result = df1.join(df2, "join_key")
```

## Caching Strategies

### When to Cache

```python theme={null}
# Cache when:
# 1. Data is accessed multiple times
# 2. Dataset is small enough to fit in memory
# 3. Computing the dataset is expensive

# Example: Iterative algorithm
df = spark.read.parquet("/data/large_dataset")

# Cache before multiple actions
df_cached = df.filter(col("active") == True).cache()

# Multiple operations on cached data
count1 = df_cached.count()
summary = df_cached.describe().show()
sample = df_cached.sample(0.1).show()

# Unpersist when done
df_cached.unpersist()
```

### Storage Levels

```python theme={null}
from pyspark import StorageLevel

# Different storage levels
df.persist(StorageLevel.MEMORY_ONLY)  # Default, deserialized
df.persist(StorageLevel.MEMORY_ONLY_SER)  # Serialized, more space efficient
df.persist(StorageLevel.MEMORY_AND_DISK)  # Spill to disk if needed
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # Serialized with disk spill
df.persist(StorageLevel.DISK_ONLY)  # Only on disk
df.persist(StorageLevel.MEMORY_ONLY_2)  # Replicated twice
df.persist(StorageLevel.OFF_HEAP)  # Off-heap memory

# Example usage
large_df = spark.read.parquet("/data/large")

# Use serialized storage for large datasets
large_df.persist(StorageLevel.MEMORY_AND_DISK_SER)

# Process
result = large_df.groupBy("category").count()
result.show()

# Clean up
large_df.unpersist()
```

### Caching Best Practices

```python theme={null}
# 1. Cache after expensive operations
df = spark.read.parquet("/data/raw") \
    .filter(col("year") == 2024) \
    .join(dimension_table, "id") \
    .withColumn("processed", complex_udf("value")) \
    .cache()  # Cache after expensive transformations

# 2. Cache DataFrames, not RDDs (when possible)
# DataFrames use columnar format, more memory efficient

# 3. Monitor cache usage
spark.catalog.clearCache()  # Clear all cached tables

# 4. Selective caching
subset = df.select("id", "value", "timestamp").cache()

# 5. Cache at the right granularity
# Too early: Waste memory on unnecessary data
# Too late: Recompute expensive operations
```

### Checkpoint vs Cache

```python theme={null}
# Cache: Stores in memory/disk, lineage retained
df_cached = df.cache()

# Checkpoint: Saves to reliable storage, breaks lineage
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df_checkpointed = df.checkpoint()

# Use checkpoint for:
# - Very long lineages (avoid stack overflow)
# - Iterative algorithms with many iterations
# - Recovery from failures in long-running jobs
```

## Serialization

### Kryo Serialization

```python theme={null}
# Enable Kryo serialization for better performance
spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .getOrCreate()

# Register custom classes
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
```

### Scala Kryo Registration

```scala theme={null}
import org.apache.spark.serializer.KryoSerializer
import com.esotericsoftware.kryo.Kryo

class MyKryoRegistrator extends org.apache.spark.serializer.KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClass])
    kryo.register(classOf[AnotherClass])
  }
}

val spark = SparkSession.builder()
  .config("spark.serializer", classOf[KryoSerializer].getName)
  .config("spark.kryo.registrator", classOf[MyKryoRegistrator].getName)
  .getOrCreate()
```

### Data Format Optimization

```python theme={null}
# Use efficient file formats
# Parquet: Columnar, compressed, predicate pushdown
df.write.parquet("/data/output")

# ORC: Similar to Parquet, better for Hive
df.write.orc("/data/output")

# Avro: Row-based, schema evolution
df.write.format("avro").save("/data/output")

# Comparison
# CSV: Slow, no schema, human-readable
# JSON: Slow, schema inference, nested structures
# Parquet: Fast, columnar, best for analytics
# ORC: Fast, columnar, good compression
# Avro: Fast, row-based, schema evolution
```

## Query Optimization

### Catalyst Optimizer

```python theme={null}
# View query plan
df = spark.read.parquet("/data/transactions")
filtered = df.filter(col("amount") > 1000)
aggregated = filtered.groupBy("category").sum("amount")

# Logical plan
aggregated.explain(True)

# Physical plan
aggregated.explain()

# Cost-based optimization
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
```

### Predicate Pushdown

```python theme={null}
# Good: Predicate pushed to data source
df = spark.read.parquet("/data/large") \
    .filter(col("year") == 2024) \
    .filter(col("amount") > 1000)
# Only reads data for year 2024

# Column pruning
result = df.select("id", "amount", "timestamp")
# Only reads selected columns from Parquet

# Partition pruning
partitioned_df = spark.read.parquet("/data/partitioned")
# Path: /data/partitioned/year=2024/month=01/
filtered = partitioned_df.filter(col("year") == 2024)
# Only scans year=2024 partitions
```

### Join Optimization

```python theme={null}
# 1. Broadcast hash join (small table < 10MB)
result = large_df.join(broadcast(small_df), "key")

# 2. Sort merge join (both tables large)
# Pre-sort and partition by join key
df1_sorted = df1.repartition("join_key").sortWithinPartitions("join_key")
df2_sorted = df2.repartition("join_key").sortWithinPartitions("join_key")
result = df1_sorted.join(df2_sorted, "join_key")

# 3. Bucket join (pre-bucketed tables)
df1.write.bucketBy(100, "join_key").saveAsTable("table1")
df2.write.bucketBy(100, "join_key").saveAsTable("table2")
result = spark.table("table1").join(spark.table("table2"), "join_key")

# 4. Skewed join handling
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
```

### Adaptive Query Execution (AQE)

```python theme={null}
# Enable AQE for dynamic optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "200")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# AQE automatically:
# - Coalesces post-shuffle partitions
# - Converts sort-merge join to broadcast join
# - Optimizes skewed joins
```

## Performance Monitoring

### Spark UI

```python theme={null}
# Access Spark UI at: http://driver-node:4040

# Key metrics to monitor:
# - Jobs: Number and duration
# - Stages: Task distribution and timing
# - Storage: Cached RDDs/DataFrames
# - Executors: Resource usage
# - SQL: Query plans and execution
```

### Programmatic Monitoring

```python theme={null}
from pyspark.sql.functions import col, count, sum
import time

# Measure execution time
start_time = time.time()

df = spark.read.parquet("/data/large")
result = df.groupBy("category").agg(
    count("*").alias("count"),
    sum("amount").alias("total")
).cache()

result.show()

end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")

# Get metrics
print(f"Number of partitions: {result.rdd.getNumPartitions()}")

# Executor metrics
executor_metrics = spark.sparkContext._jsc.sc().getExecutorMemoryStatus()
print(f"Executor memory status: {executor_metrics}")
```

### Logging and Debugging

```python theme={null}
# Set log level
spark.sparkContext.setLogLevel("WARN")  # ERROR, WARN, INFO, DEBUG

# Custom logging
import logging
logger = logging.getLogger("MySparkApp")
logger.setLevel(logging.INFO)

# Log partitioning info
logger.info(f"Input partitions: {df.rdd.getNumPartitions()}")
logger.info(f"After processing: {result.rdd.getNumPartitions()}")

# Profile application
spark.conf.set("spark.python.profile", "true")
```

## Advanced Configuration

### Compression

```python theme={null}
# Configure compression
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")  # snappy, gzip, lzo, uncompressed
spark.conf.set("spark.sql.orc.compression.codec", "zlib")
spark.conf.set("spark.io.compression.codec", "lz4")  # For shuffle
spark.conf.set("spark.rdd.compress", "true")

# Trade-offs:
# - snappy: Fast compression/decompression, moderate compression
# - gzip: Slower, better compression ratio
# - lz4: Fastest, good for shuffle
# - uncompressed: No CPU overhead, more I/O
```

### Speculation

```python theme={null}
# Enable speculative execution for stragglers
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.interval", "100ms")
spark.conf.set("spark.speculation.multiplier", "1.5")
spark.conf.set("spark.speculation.quantile", "0.75")
```

### Garbage Collection

```python theme={null}
# GC tuning for executors
spark = SparkSession.builder \
    .config("spark.executor.extraJavaOptions",
            "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12") \
    .getOrCreate()

# Monitor GC
spark.conf.set("spark.executor.extraJavaOptions",
               "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
```

## Real-World Optimization Example

```python theme={null}
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import StorageLevel

# Create optimized Spark session
spark = SparkSession.builder \
    .appName("OptimizedETL") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "5") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

# Read with optimal partitioning
raw_data = spark.read.parquet("/data/transactions") \
    .repartition(200, "user_id")

# Broadcast small dimension table
users = spark.read.parquet("/data/users")
users_broadcast = broadcast(users)

# Optimized join
enriched = raw_data.join(users_broadcast, "user_id")

# Cache before multiple operations
enriched_cached = enriched.select(
    "user_id", "transaction_id", "amount",
    "category", "timestamp", "user_segment"
).persist(StorageLevel.MEMORY_AND_DISK_SER)

# First aggregation
daily_stats = enriched_cached.groupBy(
    window("timestamp", "1 day"),
    "user_segment"
).agg(
    count("*").alias("transaction_count"),
    sum("amount").alias("total_amount"),
    avg("amount").alias("avg_amount")
).coalesce(50)

# Save with compression
daily_stats.write \
    .mode("overwrite") \
    .parquet("/data/output/daily_stats")

# Second aggregation reuses cache
category_stats = enriched_cached.groupBy("category").agg(
    count("*").alias("count"),
    sum("amount").alias("total")
).coalesce(10)

category_stats.write \
    .mode("overwrite") \
    .parquet("/data/output/category_stats")

# Clean up
enriched_cached.unpersist()

# Stop session
spark.stop()
```

## Performance Checklist

### Pre-Production Checklist

1. **Resource Allocation**
   * [ ] Executor memory sized appropriately
   * [ ] Executor cores optimized (4-5 per executor)
   * [ ] Driver memory sufficient for collect operations
   * [ ] Dynamic allocation configured

2. **Partitioning**
   * [ ] Data partitioned by common join/group keys
   * [ ] Partition size between 128MB-1GB
   * [ ] Avoid small files problem
   * [ ] Use bucketing for repeated joins

3. **Shuffle Optimization**
   * [ ] Minimize shuffle operations
   * [ ] Broadcast small tables
   * [ ] Configure shuffle partitions appropriately
   * [ ] Enable adaptive query execution

4. **Caching**
   * [ ] Cache frequently accessed data
   * [ ] Use appropriate storage level
   * [ ] Unpersist when done
   * [ ] Monitor cache usage

5. **Serialization**
   * [ ] Kryo serializer enabled
   * [ ] Custom classes registered
   * [ ] Use efficient file formats (Parquet/ORC)

6. **Query Optimization**
   * [ ] Predicate pushdown utilized
   * [ ] Column pruning effective
   * [ ] Join strategy optimized
   * [ ] Statistics collected for CBO

## Common Performance Anti-Patterns

### Anti-Pattern 1: Too Many Small Files

```python theme={null}
# Bad: Creates many small files
df.write.mode("append").parquet("/data/output")
# Results in: 1000s of tiny files

# Good: Coalesce before writing
df.coalesce(10).write.mode("append").parquet("/data/output")
```

### Anti-Pattern 2: Unnecessary Shuffles

```python theme={null}
# Bad: Multiple shuffles
df.groupBy("key").count() \
  .join(df.groupBy("key").sum("value"), "key")

# Good: Single shuffle
df.groupBy("key").agg(
    count("*").alias("count"),
    sum("value").alias("sum")
)
```

### Anti-Pattern 3: Data Skew

```python theme={null}
# Problem: Skewed data causes stragglers
# Key "A" has 90% of data

# Solution 1: Salt keys
from pyspark.sql.functions import rand, concat, lit

salted = df.withColumn("salted_key",
    concat(col("key"), lit("_"), (rand() * 10).cast("int"))
)

# Solution 2: Adaptive query execution
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Solution 3: Split and union
common_keys = df.groupBy("key").count().filter(col("count") > 1000000)
skewed = df.join(broadcast(common_keys), "key")
normal = df.join(broadcast(common_keys), "key", "left_anti")

# Process separately and union
result = process_skewed(skewed).union(process_normal(normal))
```

### Anti-Pattern 4: Collecting Large Data

```python theme={null}
# Bad: Brings all data to driver
all_data = df.collect()  # OOM on driver

# Good: Process in distributed manner
df.write.parquet("/output")  # Distributed write

# Or sample if analysis needed
sample = df.sample(0.01).collect()
```

## Hands-On Exercises

### Exercise 1: Memory Tuning

```python theme={null}
# TODO: Optimize memory configuration
# 1. Calculate optimal executor memory
# 2. Configure memory overhead
# 3. Set storage fraction
# 4. Monitor memory usage

# Your code here
```

### Exercise 2: Partition Optimization

```python theme={null}
# TODO: Optimize partitioning strategy
# 1. Analyze current partition distribution
# 2. Determine optimal partition count
# 3. Repartition by appropriate keys
# 4. Measure performance improvement

# Your code here
```

### Exercise 3: Shuffle Reduction

```python theme={null}
# TODO: Minimize shuffle operations
# 1. Identify shuffle operations in query plan
# 2. Use broadcast joins where appropriate
# 3. Combine aggregations
# 4. Compare execution times

# Your code here
```

## Summary

Performance tuning is essential for production Spark applications:

* **Memory Management**: Configure executor and driver memory appropriately
* **Partitioning**: Optimize partition size and distribution
* **Shuffle**: Minimize and optimize shuffle operations
* **Caching**: Cache intelligently with appropriate storage levels
* **Serialization**: Use Kryo and efficient file formats
* **Monitoring**: Continuously monitor and adjust configurations

### Key Takeaways

1. Profile before optimizing - measure, don't guess
2. Start with executor sizing and partitioning
3. Minimize shuffles through query optimization
4. Cache strategically for reused datasets
5. Use adaptive query execution for dynamic optimization
6. Monitor Spark UI for bottlenecks
7. Test configurations with production-like data

***

<Note>
  Continue to the next module to learn about cluster deployment and operations.
</Note>
