While DataFrames and Datasets are now the recommended APIs for most Spark applications, understanding RDDs (Resilient Distributed Datasets) is critical for:
Deep Performance Tuning: Understanding how DataFrames compile down to RDDs helps you optimize queries
Unstructured Data: Text processing, binary data, and complex nested structures often require RDD-level control
Interview Success: Most Spark interviews deeply probe RDD concepts
Legacy Code: Many production systems still use RDD APIs
Common Misconception: “RDDs are deprecated, I can skip them.”Reality: DataFrames are built ON TOP of RDDs. You’re using RDDs whether you know it or not. Understanding them makes you a better Spark developer.
From the original paper (Zaharia et al., NSDI 2012):
“An RDD is a read-only, partitioned collection of records. RDDs can only be created through deterministic operations on either (1) data in stable storage or (2) other RDDs.”
// This doesn't modify the original RDDval data = sc.parallelize(1 to 1000)val doubled = data.map(_ * 2) // Creates NEW RDD// data is still [1, 2, 3, ..., 1000]// doubled is [2, 4, 6, ..., 2000]
val data = sc.parallelize(1 to 1000, numSlices = 4)println(s"Number of partitions: ${data.getNumPartitions}")// Output: Number of partitions: 4// Each partition runs on a different executordata.mapPartitionsWithIndex { (idx, iter) => Iterator(s"Partition $idx has ${iter.size} elements")}.collect()// Output:// Partition 0 has 250 elements// Partition 1 has 250 elements// Partition 2 has 250 elements// Partition 3 has 250 elements
Partitioning is Critical:
Each partition = one task
More partitions = more parallelism (up to a point)
Fewer partitions = less overhead but underutilized cluster
Rule of Thumb: Use 2-4 partitions per CPU core in your cluster.For a cluster with 10 executors × 4 cores = 40 cores total, aim for 80-160 partitions.
RDDs can ONLY be created through:A. From Stable Storage:
Copy
// From HDFSval logsRDD = sc.textFile("hdfs://namenode:9000/logs/access.log")// From S3val s3Data = sc.textFile("s3a://my-bucket/data/*.json")// From local file (testing only)val localData = sc.textFile("file:///tmp/test.txt")
B. From Existing RDDs (Transformations):
Copy
val numbers = sc.parallelize(1 to 100)val evens = numbers.filter(_ % 2 == 0) // Transformationval squared = evens.map(x => x * x) // Another transformation
C. From In-Memory Collections:
Copy
val data = sc.parallelize(List(1, 2, 3, 4, 5))val keyValue = sc.parallelize(Seq(("a", 1), ("b", 2)))
Why “Deterministic” Matters:
If partition is lost, Spark can recompute it using the lineage
Non-deterministic operations (random numbers, timestamps) break this guarantee
val lines = sc.textFile("huge_10TB_file.txt") // NOT executedval errors = lines.filter(_.contains("ERROR")) // NOT executedval errorWords = errors.flatMap(_.split(" ")) // NOT executedval wordCounts = errorWords.map(w => (w, 1)) // NOT executedval aggregated = wordCounts.reduceByKey(_ + _) // STILL not executed!// Only NOW does Spark execute everything in one optimized passval result = aggregated.collect() // ACTION - triggers execution
What Spark Does During Lazy Evaluation:
Builds a DAG (Directed Acyclic Graph) of operations
Optimizes the DAG (pipeline stages, minimize shuffles)
Only reads necessary data (if you filter early, less data flows through)
Performance Impact:
Copy
// BAD: Multiple passes over dataval lines = sc.textFile("data.txt")val count1 = lines.filter(_.contains("ERROR")).count() // Pass 1val count2 = lines.filter(_.contains("WARN")).count() // Pass 2val count3 = lines.filter(_.contains("INFO")).count() // Pass 3// GOOD: Single pass with one actionval lines = sc.textFile("data.txt").cache() // Cache for reuseval counts = lines.map { line => if (line.contains("ERROR")) ("ERROR", 1) else if (line.contains("WARN")) ("WARN", 1) else ("INFO", 1)}.reduceByKey(_ + _).collect() // Single pass!
Performance Tip: Filter early in your pipeline to reduce data volume.
Copy
// GOOD: Filter before expensive operationssc.textFile("huge.log") .filter(_.contains("ERROR")) // Reduce data volume first .map(parseComplexLog) // Expensive parsing on less data .collect()// BAD: Parse everything, then filtersc.textFile("huge.log") .map(parseComplexLog) // Parsing billions of records .filter(_.level == "ERROR") // Filter at the end .collect()
// Scala: Uses efficient combinersrdd.reduceByKey(_ + _) // Fast, minimizes shuffle data# Python: Less efficient due to serializationrdd.reduceByKey(lambda a, b: a + b) # Slower for large datasets
Performance Critical: reduceByKey is much faster than groupByKey followed by reduce!
Copy
// BAD: Shuffles ALL datardd.groupByKey() // Shuffle everything .mapValues(_.sum) // Then aggregate// GOOD: Reduces locally first, then shufflesrdd.reduceByKey(_ + _) // Local reduce, shuffle less data
// Group user actions by user IDcase class Action(userId: String, action: String, timestamp: Long)val actions: RDD[Action] = loadActions()val actionsByUser = actions .map(a => (a.userId, a)) .groupByKey() // Groups all actions per user// Result: ("user123", Iterator[Action1, Action2, ...])
When to Use vs Avoid:✅ Use groupByKey when:
You need ALL values grouped together (e.g., building per-user sessions)
Subsequent operation requires full value list
❌ Avoid groupByKey when:
You’re going to aggregate (use reduceByKey or aggregateByKey instead)
// Scenario: Small RDD (100 KB) joining with Large RDD (100 GB)// BAD: Regular join causes huge shuffleval smallRDD: RDD[(String, Data)] = ... // 100 KBval largeRDD: RDD[(String, Data)] = ... // 100 GBval result = smallRDD.join(largeRDD) // Shuffles 100 GB across network!// GOOD: Broadcast small RDD to all nodesval broadcastVar = sc.broadcast(smallRDD.collectAsMap())val result = largeRDD.mapPartitions { partition => val smallMap = broadcastVar.value // Available locally on each node partition.flatMap { case (key, value) => smallMap.get(key).map(smallValue => (key, (value, smallValue))) }}// No shuffle! Small RDD sent once to each executor
// Save results to HDFSval wordCounts = words.map((_, 1)).reduceByKey(_ + _)wordCounts.saveAsTextFile("hdfs://namenode:9000/output/wordcount")// Save to S3wordCounts.saveAsTextFile("s3a://my-bucket/results/wordcount")
val numbers = sc.parallelize(1 to 100)val sum = numbers.reduce(_ + _) // 5050// Find maximum valueval max = numbers.reduce((a, b) => if (a > b) a else b)
Requirement: Operation must be commutative and associative.
Copy
// GOOD: Addition is commutative and associativenumbers.reduce(_ + _) // 1+2+3 = 2+1+3 = 3+2+1// BAD: Subtraction is NOTnumbers.reduce(_ - _) // Result depends on evaluation order!
import org.apache.spark.Partitioner// Example: Partition by user regionclass RegionPartitioner(numPartitions: Int) extends Partitioner { def numPartitions: Int = numPartitions def getPartition(key: Any): Int = { val userId = key.asInstanceOf[String] // Extract region from userId (e.g., "US-12345" -> "US") val region = userId.split("-")(0) region match { case "US" => 0 case "EU" => 1 case "ASIA" => 2 case _ => 3 } }}// Usageval users: RDD[(String, UserData)] = loadUsers()val partitionedByRegion = users.partitionBy(new RegionPartitioner(4))// Now all US users are in partition 0, EU in 1, etc.// Queries like "get all US users" scan only one partition!
import org.apache.spark.storage.StorageLevel// Memory only (default for cache())rdd.persist(StorageLevel.MEMORY_ONLY)// Memory and disk (spill to disk if memory full)rdd.persist(StorageLevel.MEMORY_AND_DISK)// Serialized in memory (more compact, slower access)rdd.persist(StorageLevel.MEMORY_ONLY_SER)// Replicated for fault tolerancerdd.persist(StorageLevel.MEMORY_AND_DISK_2) // 2x replication// Disk onlyrdd.persist(StorageLevel.DISK_ONLY)// Off-heap (Tachyon/Alluxio)rdd.persist(StorageLevel.OFF_HEAP)
// Cache RDDval data = sc.textFile("file.txt").cache()data.count() // Triggers caching// ... use data multiple times ...// Done with it, free memorydata.unpersist()
Spark automatically manages cache using LRU (Least Recently Used). But manual unpersist() is good practice for large cached RDDs.
// WRONG: One key has 90% of dataval skewed = data.groupByKey() // One task gets 90% of work// RIGHT: Add salt to spread loadval salted = data.map { case (key, value) => val salt = Random.nextInt(10) ((key, salt), value)}.groupByKey()// Now spread across 10 tasks per key
// WRONG: Recompute every iterationvar data = loadData()for (i <- 1 to 100) { data = data.map(expensiveTransform) // Recomputes from source!}// RIGHT: Cache intermediate resultsvar data = loadData().cache()for (i <- 1 to 100) { data = data.map(expensiveTransform).cache()}
Narrow: Each input partition contributes to at most one output partition. No shuffle. Examples: map, filter, union.
Wide: Each input partition may contribute to multiple output partitions. Requires shuffle. Examples: groupByKey, join, reduceByKey.
Q: Why is reduceByKey faster than groupByKey?A:
reduceByKey performs local aggregation (combiner) before shuffle, reducing network data. groupByKey shuffles all values, then aggregates.
Copy
// groupByKey: Shuffle (k1, v1), (k1, v2), (k1, v3) -> 3 values over network// reduceByKey: Local combine -> Shuffle (k1, v1+v2) -> 1 value over network
Q: What is lineage in RDD?A:
Lineage is the graph of transformations used to build an RDD. If a partition is lost, Spark uses lineage to recompute only that partition, not the entire RDD.