Module Duration: 4-5 hours
Research Focus: In-depth analysis of the foundational Spark paper
Outcome: Deep understanding of WHY Spark works the way it does
Full Citation:
Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, and Ion Stoica. 2012. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. In Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation (NSDI’12). USENIX Association, USA, 2.Published: April 2012, NSDI (top-tier systems conference)Authors: UC Berkeley AMPLab (now RISELab)
Matei Zaharia (lead author, Spark creator, now Databricks CTO)
Ion Stoica (UC Berkeley professor, systems legend)
Michael Franklin (database systems expert)
Team that also created Mesos, Alluxio
Impact:
10,000+ citations (one of most cited systems papers)
Problem 1: Disk I/O BottleneckEvery MapReduce operation writes to disk:
Copy
MapReduce WordCount:┌─────────┐│ Input │ (Read from HDFS)│ Data │└────┬────┘ │ ▼┌─────────┐│ Map │ (Process in memory)└────┬────┘ │ ▼ WRITE TO DISK (shuffle)┌─────────┐│Local Disk│└────┬────┘ │ ▼ READ FROM DISK┌─────────┐│ Shuffle │ (Network + disk)└────┬────┘ │ ▼ WRITE TO DISK┌─────────┐│ Reduce │└────┬────┘ │ ▼ WRITE TO DISK (output)┌─────────┐│ HDFS │└─────────┘Total disk writes: 3x data sizeTotal disk reads: 2x data size
Why This Matters:
Disk I/O: ~100 MB/s
Memory: ~10 GB/s
100x performance gap
Problem 2: Multi-Pass Algorithms Are Painfully SlowMany important algorithms require iteration:
Copy
// Machine Learning: Logistic Regression// Needs 10-20 iterations over same datasetMapReduce approach (pseudo-code):for (i <- 1 to 20) { // Read from HDFS data = loadFromHDFS("training_data") // Run map-reduce iteration gradient = data.map(computeGradient).reduce(sum) updateWeights(gradient) // Write back to HDFS saveToHDFS(weights)}// Problem: Reads entire dataset from disk 20 times!// 100GB dataset × 20 iterations = 2TB disk reads// On a cluster: Hours of wasted time
Real-World Impact:
PageRank: 10+ iterations
K-Means: 20-50 iterations
Gradient Descent: 50-100 iterations
Each iteration: Full disk read/write cycle
Problem 3: Interactive Queries Are Impossible
Copy
Data scientist workflow:1. Load 1TB dataset from HDFS (2 minutes)2. Run query 1 → Wait 5 minutes3. Run query 2 on same data → Wait 5 minutes again!4. Run query 3 → Wait 5 minutes again!Why? MapReduce can't keep data in memory between queries
“Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they lack abstractions for leveraging distributed memory. This makes them inefficient for an important class of emerging applications: those that reuse intermediate results across multiple computations.”
Translation: MapReduce is great for simple ETL, terrible for everything else we actually want to do.
RDD: An immutable, partitioned collection of records that can be operated on in parallel.The Magic: Instead of writing intermediate results to disk, keep them in memory with a fault-tolerant abstraction.
Copy
Spark approach to WordCount:┌─────────┐│ Input │ (Read from HDFS once)│ Data │└────┬────┘ │ ▼ STAYS IN MEMORY┌─────────┐│ RDD │ (Distributed across cluster RAM)│ (Map) │└────┬────┘ │ ▼ STAYS IN MEMORY (no disk!)┌─────────┐│ RDD ││ (Reduce)│└────┬────┘ │ ▼ WRITE TO DISK (only final result)┌─────────┐│ HDFS │└─────────┘Total disk writes: 1x data size (final output only)Total disk reads: 1x data size (initial input only)
Performance Impact: 100x faster for iterative workloads
Operations that define new RDDs from existing ones:
Copy
// Narrow transformations (no shuffle needed)map(f: T => U) // Apply f to each elementfilter(f: T => Boolean) // Keep elements where f returns trueflatMap(f: T => Seq[U]) // Map then flatten resultsmapPartitions(f) // Apply f to entire partition at once// Wide transformations (require shuffle)groupByKey() // Group values by key (avoid if possible!)reduceByKey(f) // Reduce values per key (preferred over groupByKey)sortByKey() // Sort RDD by keyjoin(other) // Join two RDDs by keycogroup(other) // Group multiple RDDs by key
Lazy Evaluation Explained:
Copy
val data = sc.textFile("huge_file.txt") // ① Not executed yet!val filtered = data.filter(_.contains("ERROR")) // ② Still not executed!val mapped = filtered.map(_.length) // ③ Still not executed!// Only when an action is called:val result = mapped.collect() // ④ NOW everything executes!// Spark builds execution plan:// textFile → filter → map → collect// Then optimizes and executes all at once
Why Lazy Evaluation?
Query Optimization:
Copy
// User writes:data.filter(x => x > 10).map(x => x * 2)// Spark can optimize to:data.mapPartitions { partition => partition.filter(x => x > 10).map(x => x * 2)}// Fuses operations, avoids intermediate RDD materialization
Avoid Unnecessary Work:
Copy
val data = sc.textFile("1TB_file.txt")val processed = data.map(expensiveOperation) // Not executedval sample = processed.take(10) // Only processes enough to get 10 items!
Better Resource Utilization: Only allocate resources when actually needed
Operations that trigger execution and return values:
Copy
// Return data to driverreduce(f: (T, T) => T) // Aggregate all elementscollect() // Return all elements to driver (dangerous for big data!)count() // Count number of elementsfirst() // Return first elementtake(n) // Return first n elementstakeSample(n) // Random sample of n elements// Write to storagesaveAsTextFile(path) // Write as text filessaveAsSequenceFile(path) // Write as Hadoop SequenceFilesaveAsObjectFile(path) // Serialize objects to file// Side effectsforeach(f: T => Unit) // Apply f to each element (for side effects)foreachPartition(f) // Apply f to each partition
Critical Warning:
Copy
// DANGER: Don't collect large RDDs!val bigData = sc.textFile("1PB_of_data.txt")val result = bigData.collect() // ❌ Will crash driver with OutOfMemoryError!// Instead: Use distributed operationsval count = bigData.count() // ✅ Computed on cluster, only count returnedval sample = bigData.take(100) // ✅ Only 100 items to driver
import org.apache.spark.storage.StorageLevel// Common persistence levelsMEMORY_ONLY // Store deserialized in JVM heap (fastest, most memory)MEMORY_AND_DISK // Spill to disk if memory fullMEMORY_ONLY_SER // Store serialized (saves space, slower)DISK_ONLY // Store only on diskOFF_HEAP // Store in off-heap memory (Tachyon/Alluxio)MEMORY_AND_DISK_SER // Serialized in memory, spill to disk// Usage exampleval important = data.filter(_.important)important.persist(StorageLevel.MEMORY_AND_DISK)// Now 'important' will be kept in memory for reuse// Or use cache() shorthand for MEMORY_ONLYimportant.cache()// Don't forget to unpersist when done!important.unpersist()
Storage Level Decision Tree:
Copy
Is data reused multiple times?├─ No → Don't persist└─ Yes ↓ Does it fit in memory? ├─ Yes → MEMORY_ONLY (fastest) └─ No ↓ Can you afford to recompute if evicted? ├─ Yes → MEMORY_ONLY (let it recompute) └─ No → MEMORY_AND_DISK Is serialization overhead acceptable? ├─ No → Keep deserialized └─ Yes → Use _SER variants (save memory)
// ExamplesgroupByKey, reduceByKey, join with non-co-partitioned inputs// VisualizationParent RDD: [P1] [P2] [P3] [P4] ↓↘ ↓↗↘ ↓↗ ↓Child RDD: [C1] [C2] [C3]// Each child partition depends on MULTIPLE parent partitions
Why This Classification Matters:
Fault Tolerance:
Copy
// Narrow: Only recompute lost partitionParent: [P1] [P2] [P3] ↓ ✗ ↓Child: [C1] [C2] [C3]// If C2 lost: Only recompute P2 → C2// Wide: Must recompute from multiple parentsParent: [P1] [P2] [P3] ↓↘ ↓↗ ↓Child: [C1] [✗]// If C2 lost: Need data from P1, P2, P3// If parents not cached: Recompute all!
Performance:
Copy
// Narrow transformations can be pipelined:data.map(f1).filter(f2).map(f3)// Executes as: x => f3(f2(f1(x)))// One pass through data!// Wide transformation creates stage boundary:data.map(f1).groupByKey().map(f2)// ↑ STAGE 1 ↑ shuffle ↑ STAGE 2 ↑// Two stages, shuffle writes/reads disk
Optimization:
Copy
// BAD: Wide dependency earlydata.groupByKey().filter(hasMany)// Shuffles ALL data, then filters// GOOD: Narrow dependency earlydata.filter(relevant).groupByKey()// Filters first (no shuffle), then shuffles less data// EVEN BETTER: Use reduceByKey instead of groupByKeydata.map(x => (x.key, 1)).reduceByKey(_ + _)// Combines locally before shuffle (combiner pattern)
case class Lineage( rdd: RDD[_], dependency: Dependency[_], parent: Option[Lineage])// Example lineage:val logs = sc.textFile("input.txt")val errors = logs.filter(_.contains("ERROR"))val counts = errors.map(x => (x, 1)).reduceByKey(_ + _)// Lineage graph:counts.toDebugString/*(2) ShuffledRDD[2] at reduceByKey +-(2) MapPartitionsRDD[1] at map +-(2) MapPartitionsRDD[0] at filter +-(2) input.txt MapPartitionsRDD*/
Recovery Algorithm (from paper):
Copy
Algorithm: RecoverPartition(rdd, partition)1. If partition is cached and available: Return cached data2. Else if partition's parent RDDs are available: parentData = for each parent in rdd.dependencies: RecoverPartition(parent.rdd, parent.partition) Return rdd.compute(partition, parentData)3. Else: Recursively recover parent partitions first Then compute this partitionOptimization: Only recompute lost partitions, not entire RDD
Cost Analysis:
Copy
Scenario: 1000-partition RDD, 1 partition lostReplication approach:- Storage: 1000 partitions × 3 replicas = 3000 partition-copies- Recovery: Read from replica (instant)Lineage approach:- Storage: Only lineage graph (< 1KB typically)- Recovery: Recompute 1 partition from sourceWinner: Lineage (for most workloads)Exception: Very long lineage chains → use checkpointing
Hadoop:- Time: 171 seconds per iteration- Total: 1710 seconds- Bottleneck: Shuffle and disk I/OSpark:- Time: 23 seconds per iteration (with caching)- Total: 230 seconds- Speedup: 7.4xWhy smaller speedup than logistic regression?- PageRank has wide dependencies (join)- Shuffle still requires disk in Spark- But in-memory between iterations helps
Code Comparison:
Copy
// PageRank in Spark (simplified)var ranks = links.mapValues(v => 1.0)links.cache() // Cache link structurefor (i <- 1 to 10) { val contribs = links.join(ranks).flatMap { case (url, (links, rank)) => links.map(dest => (dest, rank / links.size)) } ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)}// Key: links RDD is cached and reused 10 times// Only ranks RDD is recomputed each iteration
Scheduling preferences:1. PROCESS_LOCAL: Task on same executor caching the data2. NODE_LOCAL: Task on same node as data3. RACK_LOCAL: Task on same rack4. ANY: No locality preferenceExample:Task 1: Needs partition 0- HDFS block 0 is on node5- Spark tries to schedule on node5- If node5 busy, tries rack-local- Last resort: any available node
Step 5: Execute and Monitor
Copy
Driver tracks:- Task completion status- Partial results- Failures and retries- Shuffle data locationOn failure:- Resubmit failed task- If executor lost: Recompute lost RDD partitions using lineage- If too many failures: Abort job
object PageRank { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("PageRank")) // Load links: each line is "source_url dest_url1 dest_url2 ..." val lines = sc.textFile("hdfs://pagerank/links.txt") // Parse into (source, Array[destinations]) val links = lines.map { line => val parts = line.split("\\s+") (parts(0), parts.tail) }.cache() // Cache because used in every iteration // Initialize ranks var ranks = links.mapValues(v => 1.0) // Iterate for (i <- 1 to 10) { // Calculate contributions from each page val contribs = links.join(ranks).flatMap { case (url, (links, rank)) => val size = links.length links.map(dest => (dest, rank / size)) } // Aggregate contributions and apply damping factor ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) // Optional: Show progress if (i % 2 == 0) { println(s"Iteration $i complete") } } // Output top 10 pages val top = ranks.sortBy(_._2, ascending = false).take(10) top.foreach { case (url, rank) => println(s"$url: $rank") } sc.stop() }}
2010: Spark research project begins at UC Berkeley2012: RDD paper published at NSDI2013: Spark becomes Apache incubator project Yahoo begins production deployment2014: Spark becomes Apache top-level project Databricks founded by Spark creators Spark 1.0 released2015: Spark surpasses Hadoop in popularity surveys Over 1000 contributors2016: Spark 2.0 with structured APIs Databricks raises $140M2018: Spark 3.0 development begins2020: Spark 3.0 released (Adaptive Query Execution)2024: Still the dominant big data framework 10,000+ companies using in production
2012-2014: RDDs only - Low-level, flexible - Manual optimization needed - Type-safe but verbose2015: DataFrames introduced - Higher-level API - Automatic optimization (Catalyst) - Better performance - But not type-safe in Scala2016: Datasets (type-safe DataFrames) - Best of both worlds - Type-safe + optimized - Recommended for most use cases2024 Recommendation: - Use DataFrames/Datasets by default - Use RDDs only for: * Unstructured data * Low-level control * Custom partitioning logic
Misconception 4: “Lineage makes Spark fault-tolerant for free”
Partially wrong. Challenges:
Long lineage chains:
Copy
var rdd = sc.textFile("input")for (i <- 1 to 1000) { rdd = rdd.map(someTransformation)}// Lineage is 1000 transformations deep!// If partition lost late: Expensive to recomputeSolution: Checkpoint periodicallyrdd.checkpoint() // every 100 iterations
Wide dependencies:
Copy
val grouped = data.groupByKey() // Wide dependency// If partition lost: Must re-shuffle!// Shuffle data may be goneSolution: Cache intermediate resultsgrouped.cache()
Non-deterministic functions:
Copy
rdd.map(x => (x, Random.nextInt())) // Non-deterministic!// Recomputation gives different results// Breaks lineage-based recoverySolution: Use deterministic transformations
Q4: “When would you use cache() vs persist()?”Answer:
Copy
// cache() is shorthand for persist(MEMORY_ONLY)rdd.cache()// Same as:rdd.persist(StorageLevel.MEMORY_ONLY)// Use cache() when:// - Data fits in memory// - Recomputation is expensive// - Data used multiple times// Use persist(MEMORY_AND_DISK) when:// - Data might not fit in memory// - Don't want to recompute if evicted// Use persist(MEMORY_ONLY_SER) when:// - Data fits but memory tight// - Can afford serialization overhead
Q5: “How do you optimize this Spark job?”
Copy
// BEFORE (inefficient)val data = sc.textFile("huge_file.txt")val result = data .groupByKey() // ❌ Wide shuffle .filter(_.size > 100) // ❌ After shuffle .mapValues(_.sum)// AFTER (optimized)val result = data .filter(relevant) // ✅ Filter early .mapValues(x => (x, 1)) .reduceByKey { case ((sum1, cnt1), (sum2, cnt2)) => (sum1 + sum2, cnt1 + cnt2) } // ✅ Use reduceByKey .filter { case (k, (sum, cnt)) => cnt > 100 }// Improvements:// 1. Filter before shuffle (less data to shuffle)// 2. Use reduceByKey instead of groupByKey (local combine)// 3. More efficient aggregation
Study Tip: The RDD paper is remarkably readable. Read it alongside this module for maximum understanding. Every design decision will make sense in context!