Skip to main content

Demystifying RDD Programming: From Theory to Production

Module Duration: 4-5 hours Focus: Complete RDD API mastery with production-ready patterns Prerequisites: Spark Introduction (Module 1) Hands-on Labs: 12+ coding exercises in Scala and Python

Introduction: Why RDDs Matter

While DataFrames and Datasets are now the recommended APIs for most Spark applications, understanding RDDs (Resilient Distributed Datasets) is critical for:
  1. Deep Performance Tuning: Understanding how DataFrames compile down to RDDs helps you optimize queries
  2. Unstructured Data: Text processing, binary data, and complex nested structures often require RDD-level control
  3. Custom Partitioning: Advanced optimization requires understanding RDD partitioning
  4. Interview Success: Most Spark interviews deeply probe RDD concepts
  5. Legacy Code: Many production systems still use RDD APIs
Common Misconception: “RDDs are deprecated, I can skip them.”Reality: DataFrames are built ON TOP of RDDs. You’re using RDDs whether you know it or not. Understanding them makes you a better Spark developer.

Part 1: RDD Fundamentals - The Core Abstraction

What is an RDD? (Deep Dive)

From the original paper (Zaharia et al., NSDI 2012):
“An RDD is a read-only, partitioned collection of records. RDDs can only be created through deterministic operations on either (1) data in stable storage or (2) other RDDs.”
Let’s break this down:

1. Read-Only (Immutable)

// This doesn't modify the original RDD
val data = sc.parallelize(1 to 1000)
val doubled = data.map(_ * 2)  // Creates NEW RDD

// data is still [1, 2, 3, ..., 1000]
// doubled is [2, 4, 6, ..., 2000]
Why Immutability?
  • Enables lineage-based fault tolerance
  • Thread-safe by default (no race conditions)
  • Easy reasoning about distributed state
  • Enables speculative execution

2. Partitioned Collection

val data = sc.parallelize(1 to 1000, numSlices = 4)
println(s"Number of partitions: ${data.getNumPartitions}")
// Output: Number of partitions: 4

// Each partition runs on a different executor
data.mapPartitionsWithIndex { (idx, iter) =>
  Iterator(s"Partition $idx has ${iter.size} elements")
}.collect()
// Output:
// Partition 0 has 250 elements
// Partition 1 has 250 elements
// Partition 2 has 250 elements
// Partition 3 has 250 elements
Partitioning is Critical:
  • Each partition = one task
  • More partitions = more parallelism (up to a point)
  • Fewer partitions = less overhead but underutilized cluster
Rule of Thumb: Use 2-4 partitions per CPU core in your cluster.For a cluster with 10 executors × 4 cores = 40 cores total, aim for 80-160 partitions.

3. Deterministic Creation

RDDs can ONLY be created through: A. From Stable Storage:
// From HDFS
val logsRDD = sc.textFile("hdfs://namenode:9000/logs/access.log")

// From S3
val s3Data = sc.textFile("s3a://my-bucket/data/*.json")

// From local file (testing only)
val localData = sc.textFile("file:///tmp/test.txt")
B. From Existing RDDs (Transformations):
val numbers = sc.parallelize(1 to 100)
val evens = numbers.filter(_ % 2 == 0)      // Transformation
val squared = evens.map(x => x * x)         // Another transformation
C. From In-Memory Collections:
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val keyValue = sc.parallelize(Seq(("a", 1), ("b", 2)))
Why “Deterministic” Matters:
  • If partition is lost, Spark can recompute it using the lineage
  • Non-deterministic operations (random numbers, timestamps) break this guarantee

Part 2: RDD Operations - Transformations vs Actions

The Two Types of Operations

Every RDD operation falls into one of two categories:
TransformationsActions
Lazy (not executed immediately)Eager (triggers execution)
Returns a new RDDReturns a value to driver
Builds computation DAGExecutes the DAG
Examples: map, filter, joinExamples: collect, count, save

Lazy Evaluation: Why It’s Genius

val lines = sc.textFile("huge_10TB_file.txt")        // NOT executed
val errors = lines.filter(_.contains("ERROR"))       // NOT executed
val errorWords = errors.flatMap(_.split(" "))       // NOT executed
val wordCounts = errorWords.map(w => (w, 1))        // NOT executed
val aggregated = wordCounts.reduceByKey(_ + _)      // STILL not executed!

// Only NOW does Spark execute everything in one optimized pass
val result = aggregated.collect()  // ACTION - triggers execution
What Spark Does During Lazy Evaluation:
  1. Builds a DAG (Directed Acyclic Graph) of operations
  2. Optimizes the DAG (pipeline stages, minimize shuffles)
  3. Only reads necessary data (if you filter early, less data flows through)
Performance Impact:
// BAD: Multiple passes over data
val lines = sc.textFile("data.txt")
val count1 = lines.filter(_.contains("ERROR")).count()   // Pass 1
val count2 = lines.filter(_.contains("WARN")).count()    // Pass 2
val count3 = lines.filter(_.contains("INFO")).count()    // Pass 3

// GOOD: Single pass with one action
val lines = sc.textFile("data.txt").cache()  // Cache for reuse
val counts = lines.map { line =>
  if (line.contains("ERROR")) ("ERROR", 1)
  else if (line.contains("WARN")) ("WARN", 1)
  else ("INFO", 1)
}.reduceByKey(_ + _).collect()  // Single pass!

Part 3: Common Transformations (with Real Examples)

3.1 Element-Wise Transformations

map(func) - One-to-One Transformation

// Example: Parse log lines to extract timestamp
val logs = sc.textFile("access.log")
val timestamps = logs.map { line =>
  val parts = line.split(" ")
  parts(3)  // Timestamp is 4th field
}

// PySpark equivalent
logs = sc.textFile("access.log")
timestamps = logs.map(lambda line: line.split(" ")[3])
Performance Note: map is a narrow transformation - no shuffle required, very fast.

flatMap(func) - One-to-Many Transformation

// Classic example: Split sentences into words
val sentences = sc.parallelize(Seq(
  "Apache Spark is fast",
  "Spark handles big data"
))

val words = sentences.flatMap(_.split(" "))
// Result: ["Apache", "Spark", "is", "fast", "Spark", "handles", "big", "data"]

// vs map (notice the nested structure)
val wordsNested = sentences.map(_.split(" "))
// Result: [Array("Apache", "Spark", "is", "fast"), Array("Spark", "handles", "big", "data")]
Real-World Use Case: Log parsing with multi-line entries
case class LogEntry(timestamp: String, level: String, message: String)

val multiLineLogRDD = sc.textFile("app.log")
val parsedLogs = multiLineLogRDD.flatMap { line =>
  try {
    // Some logs span multiple lines, parse carefully
    val parsed = parseLogLine(line)  // Custom parser
    Some(parsed)
  } catch {
    case e: Exception => None  // Skip malformed lines
  }
}

filter(func) - Select Elements

// Filter errors from logs
val logs = sc.textFile("system.log")
val errors = logs.filter(_.contains("ERROR"))

// Filter with complex logic
case class Transaction(id: String, amount: Double, status: String)
val transactions: RDD[Transaction] = loadTransactions()

val suspiciousTransactions = transactions.filter { txn =>
  txn.amount > 10000 && txn.status == "PENDING"
}
Performance Tip: Filter early in your pipeline to reduce data volume.
// GOOD: Filter before expensive operations
sc.textFile("huge.log")
  .filter(_.contains("ERROR"))      // Reduce data volume first
  .map(parseComplexLog)              // Expensive parsing on less data
  .collect()

// BAD: Parse everything, then filter
sc.textFile("huge.log")
  .map(parseComplexLog)              // Parsing billions of records
  .filter(_.level == "ERROR")        // Filter at the end
  .collect()

3.2 Pair RDD Transformations

Pair RDDs (RDDs of (K, V) tuples) unlock powerful operations:

reduceByKey(func) - Aggregate by Key

// Word count example
val words = sc.textFile("book.txt").flatMap(_.split(" "))
val wordCounts = words
  .map(word => (word, 1))
  .reduceByKey(_ + _)  // Combine counts for same word

// Result: [("spark", 42), ("hadoop", 18), ...]
How it Works (with shuffle optimization):
Partition 1:                    Partition 2:
("spark", 1)                    ("spark", 1)
("spark", 1)                    ("hadoop", 1)
("hadoop", 1)                   ("spark", 1)
    ↓                               ↓
Local reduce (pre-shuffle):     Local reduce:
("spark", 2)                    ("spark", 2)
("hadoop", 1)                   ("hadoop", 1)
    ↓                               ↓
        -------- SHUFFLE --------

        Final reduce:
        ("spark", 4)
        ("hadoop", 2)
Scala vs PySpark Performance:
// Scala: Uses efficient combiners
rdd.reduceByKey(_ + _)  // Fast, minimizes shuffle data

# Python: Less efficient due to serialization
rdd.reduceByKey(lambda a, b: a + b)  # Slower for large datasets
Performance Critical: reduceByKey is much faster than groupByKey followed by reduce!
// BAD: Shuffles ALL data
rdd.groupByKey()                    // Shuffle everything
   .mapValues(_.sum)                // Then aggregate

// GOOD: Reduces locally first, then shuffles
rdd.reduceByKey(_ + _)              // Local reduce, shuffle less data

groupByKey() - Group Values by Key

// Group user actions by user ID
case class Action(userId: String, action: String, timestamp: Long)
val actions: RDD[Action] = loadActions()

val actionsByUser = actions
  .map(a => (a.userId, a))
  .groupByKey()  // Groups all actions per user

// Result: ("user123", Iterator[Action1, Action2, ...])
When to Use vs Avoid: Use groupByKey when:
  • You need ALL values grouped together (e.g., building per-user sessions)
  • Subsequent operation requires full value list
Avoid groupByKey when:
  • You’re going to aggregate (use reduceByKey or aggregateByKey instead)
  • Values are large (shuffle will be enormous)

aggregateByKey() - Most Powerful (and Flexible)

// Calculate average transaction amount per user
case class Transaction(userId: String, amount: Double)
val transactions: RDD[Transaction] = loadTransactions()

val txnPairs = transactions.map(t => (t.userId, t.amount))

val avgPerUser = txnPairs.aggregateByKey((0.0, 0))( // (sum, count)
  // Sequence operation: combines values within partition
  seqOp = { case ((sum, count), amount) =>
    (sum + amount, count + 1)
  },
  // Combine operation: merges partition results
  combOp = { case ((sum1, count1), (sum2, count2)) =>
    (sum1 + sum2, count1 + count2)
  }
).mapValues { case (sum, count) => sum / count }

// Result: ("user1", 156.73), ("user2", 203.45), ...
Why This is Powerful:
  • seqOp runs locally within each partition (no shuffle)
  • Only aggregated results shuffle across network
  • Generalizes reduceByKey, foldByKey, combineByKey

join() - Combine Two RDDs by Key

// Join users with their orders
val users: RDD[(String, User)] = loadUsers()      // (userId, User)
val orders: RDD[(String, Order)] = loadOrders()   // (userId, Order)

val usersWithOrders = users.join(orders)
// Result: (userId, (User, Order))

// Real example: Enrich clickstream with user profiles
val clicks: RDD[(String, Click)] = sc.textFile("clicks.log")
  .map(parseClick)
  .map(c => (c.userId, c))

val profiles: RDD[(String, Profile)] = loadFromDB()

val enrichedClicks = clicks.join(profiles)
// Now each click has full user profile attached
Join Types:
val a = sc.parallelize(Seq(("k1", "v1"), ("k2", "v2")))
val b = sc.parallelize(Seq(("k1", "w1"), ("k3", "w3")))

a.join(b).collect()
// Inner join: [("k1", ("v1", "w1"))]

a.leftOuterJoin(b).collect()
// [("k1", (Some("v1"), Some("w1"))), ("k2", (Some("v2"), None))]

a.rightOuterJoin(b).collect()
// [("k1", (Some("v1"), Some("w1"))), ("k3", (None, Some("w3")))]

a.fullOuterJoin(b).collect()
// [("k1", (Some("v1"), Some("w1"))),
//  ("k2", (Some("v2"), None)),
//  ("k3", (None, Some("w3")))]
Performance of Joins (Critical!):
// Scenario: Small RDD (100 KB) joining with Large RDD (100 GB)

// BAD: Regular join causes huge shuffle
val smallRDD: RDD[(String, Data)] = ...  // 100 KB
val largeRDD: RDD[(String, Data)] = ...  // 100 GB

val result = smallRDD.join(largeRDD)  // Shuffles 100 GB across network!

// GOOD: Broadcast small RDD to all nodes
val broadcastVar = sc.broadcast(smallRDD.collectAsMap())
val result = largeRDD.mapPartitions { partition =>
  val smallMap = broadcastVar.value  // Available locally on each node
  partition.flatMap { case (key, value) =>
    smallMap.get(key).map(smallValue => (key, (value, smallValue)))
  }
}
// No shuffle! Small RDD sent once to each executor

Part 4: Actions - Triggering Execution

4.1 Basic Actions

collect() - Return All Elements to Driver

val data = sc.parallelize(1 to 100)
val results: Array[Int] = data.collect()
DANGER: collect() brings ALL data to the driver. If your RDD is 1 TB, your driver will crash!Safe Usage:
// BAD: Collect huge dataset
val allData = sc.textFile("1TB_file.txt").collect()  // OOM!

// GOOD: Collect after aggressive filtering
val sample = sc.textFile("1TB_file.txt")
  .filter(_.contains("CRITICAL_ERROR"))  // Maybe 1000 lines
  .collect()  // Safe

count() - Count Elements

val errorCount = logs.filter(_.contains("ERROR")).count()
println(s"Found $errorCount errors")
Performance Note: count() is optimized - doesn’t materialize data, just counts in each partition and sums.

take(n) - Return First n Elements

// Preview first 10 results without collecting everything
val preview = hugeRDD.take(10)
preview.foreach(println)
Implementation: Spark tries to fetch from minimal number of partitions.

first() - Return First Element

val firstError = logs.filter(_.contains("ERROR")).first()
// Stops scanning as soon as first match found

4.2 Saving Actions

saveAsTextFile(path) - Write to Storage

// Save results to HDFS
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.saveAsTextFile("hdfs://namenode:9000/output/wordcount")

// Save to S3
wordCounts.saveAsTextFile("s3a://my-bucket/results/wordcount")
Output Structure:
/output/wordcount/
  _SUCCESS           (empty marker file)
  part-00000         (partition 0 results)
  part-00001         (partition 1 results)
  ...

saveAsSequenceFile(path) - Binary Format

val keyValueRDD: RDD[(String, Int)] = wordCounts
keyValueRDD.saveAsSequenceFile("hdfs://namenode:9000/output/binary")
When to Use: Hadoop SequenceFiles are more efficient than text for Spark-to-Spark data exchange.

4.3 Aggregation Actions

reduce(func) - Aggregate All Elements

val numbers = sc.parallelize(1 to 100)
val sum = numbers.reduce(_ + _)  // 5050

// Find maximum value
val max = numbers.reduce((a, b) => if (a > b) a else b)
Requirement: Operation must be commutative and associative.
// GOOD: Addition is commutative and associative
numbers.reduce(_ + _)  // 1+2+3 = 2+1+3 = 3+2+1

// BAD: Subtraction is NOT
numbers.reduce(_ - _)  // Result depends on evaluation order!

fold(zeroValue)(func) - Reduce with Initial Value

val numbers = sc.parallelize(1 to 100)
val sum = numbers.fold(0)(_ + _)  // 5050

// Merge all maps
val maps = sc.parallelize(Seq(
  Map("a" -> 1, "b" -> 2),
  Map("b" -> 3, "c" -> 4)
))
val merged = maps.fold(Map.empty[String, Int]) { (acc, map) =>
  acc ++ map
}
// Result: Map("a" -> 1, "b" -> 3, "c" -> 4)

aggregate(zeroValue)(seqOp, combOp) - Most Flexible

// Calculate sum and count in one pass
val numbers = sc.parallelize(1 to 100)
val (sum, count) = numbers.aggregate((0, 0))(
  seqOp = { case ((s, c), num) => (s + num, c + 1) },
  combOp = { case ((s1, c1), (s2, c2)) => (s1 + s2, c1 + c2) }
)
val average = sum.toDouble / count  // 50.5

Part 5: Partitioning - The Key to Performance

Why Partitioning Matters

Bad Partitioning = Slow Performance:
  • Data skew (one partition has 90% of data)
  • Unnecessary shuffles
  • Poor parallelism
Good Partitioning = Fast Performance:
  • Even data distribution
  • Minimal shuffles
  • Maximum parallelism

Default Partitioning

// HDFS file partitioning
val data = sc.textFile("hdfs://namenode:9000/data")
// Default: One partition per HDFS block (128 MB)

// Programmatic partitioning
val data = sc.parallelize(1 to 1000, numSlices = 4)
// Creates 4 partitions

Hash Partitioning (Default for Pair RDDs)

val pairs = sc.parallelize(Seq(
  ("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 4)
))

val partitioned = pairs.partitionBy(new HashPartitioner(2))
// Keys hashed to 2 partitions

// Check partitioning
partitioned.mapPartitionsWithIndex { (idx, iter) =>
  Iterator((idx, iter.toList))
}.collect()

// Output might be:
// (0, List(("banana", 2), ("cherry", 4)))  // hash("banana") % 2 = 0
// (1, List(("apple", 1), ("apple", 3)))    // hash("apple") % 2 = 1

Range Partitioning

val data = sc.parallelize(1 to 100)
val pairs = data.map(x => (x, x * x))

val rangePartitioned = pairs.partitionBy(
  new RangePartitioner(4, pairs)
)
// Partition 0: 1-25
// Partition 1: 26-50
// Partition 2: 51-75
// Partition 3: 76-100
When to Use:
  • Data needs to be sorted
  • Range queries are common
  • Join on sorted keys

Custom Partitioning

import org.apache.spark.Partitioner

// Example: Partition by user region
class RegionPartitioner(numPartitions: Int) extends Partitioner {
  def numPartitions: Int = numPartitions

  def getPartition(key: Any): Int = {
    val userId = key.asInstanceOf[String]
    // Extract region from userId (e.g., "US-12345" -> "US")
    val region = userId.split("-")(0)

    region match {
      case "US" => 0
      case "EU" => 1
      case "ASIA" => 2
      case _ => 3
    }
  }
}

// Usage
val users: RDD[(String, UserData)] = loadUsers()
val partitionedByRegion = users.partitionBy(new RegionPartitioner(4))

// Now all US users are in partition 0, EU in 1, etc.
// Queries like "get all US users" scan only one partition!

Repartitioning Operations

repartition(numPartitions) - Increase/Decrease Partitions

val data = sc.textFile("file.txt")  // Suppose 100 partitions
val fewer = data.repartition(10)     // Reduce to 10 (full shuffle)

val more = data.repartition(200)     // Increase to 200 (full shuffle)
Cost: Always causes a full shuffle (expensive).

coalesce(numPartitions) - Decrease Partitions Efficiently

val data = sc.textFile("file.txt")  // 100 partitions
val fewer = data.coalesce(10)        // Reduce to 10 (minimal shuffle)
Performance:
// Scenario: After filtering, most partitions are empty
val largeFile = sc.textFile("huge.txt")  // 1000 partitions
val filtered = largeFile.filter(_.contains("RARE_PATTERN"))  // 99% filtered out

// BAD: Running 1000 tasks for mostly empty partitions
filtered.count()  // 1000 tasks, but 990 are empty!

// GOOD: Coalesce to fewer partitions
val optimized = filtered.coalesce(10)
optimized.count()  // Only 10 tasks
Rule: Use coalesce after aggressive filtering to reduce task overhead.

Part 6: Persistence and Caching

Why Cache?

// BAD: Recompute expensive RDD multiple times
val expensive = sc.textFile("huge.txt")
  .map(parseComplexFormat)
  .filter(validateData)
  .map(enrichWithExternalData)  // Calls external API!

val count1 = expensive.filter(_.category == "A").count()  // Full computation
val count2 = expensive.filter(_.category == "B").count()  // Recomputed again!
val count3 = expensive.filter(_.category == "C").count()  // And again!

// GOOD: Cache the expensive computation
val expensive = sc.textFile("huge.txt")
  .map(parseComplexFormat)
  .filter(validateData)
  .map(enrichWithExternalData)
  .cache()  // or .persist()

val count1 = expensive.filter(_.category == "A").count()  // Computed and cached
val count2 = expensive.filter(_.category == "B").count()  // Read from cache
val count3 = expensive.filter(_.category == "C").count()  // Read from cache

Storage Levels

import org.apache.spark.storage.StorageLevel

// Memory only (default for cache())
rdd.persist(StorageLevel.MEMORY_ONLY)

// Memory and disk (spill to disk if memory full)
rdd.persist(StorageLevel.MEMORY_AND_DISK)

// Serialized in memory (more compact, slower access)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)

// Replicated for fault tolerance
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)  // 2x replication

// Disk only
rdd.persist(StorageLevel.DISK_ONLY)

// Off-heap (Tachyon/Alluxio)
rdd.persist(StorageLevel.OFF_HEAP)

Comparison Table

Storage LevelSpace UsedCPU TimeIn MemoryOn DiskFault Tolerance
MEMORY_ONLYHighLowYesNoRecompute
MEMORY_ONLY_SERLowHighYesNoRecompute
MEMORY_AND_DISKHighMediumSomeSomeRecompute or disk
MEMORY_AND_DISK_SERLowHighSomeSomeRecompute or disk
DISK_ONLYLowHighNoYesRead from disk

When to Cache

Cache when:
  • RDD is used multiple times (iterative algorithms, machine learning)
  • RDD is expensive to compute
  • RDD is result of wide transformations (shuffle)
Don’t cache when:
  • RDD is only used once
  • RDD is cheap to recompute
  • Limited cluster memory

Unpersisting

// Cache RDD
val data = sc.textFile("file.txt").cache()
data.count()  // Triggers caching

// ... use data multiple times ...

// Done with it, free memory
data.unpersist()
Spark automatically manages cache using LRU (Least Recently Used). But manual unpersist() is good practice for large cached RDDs.

Part 7: Real-World Patterns and Best Practices

Pattern 1: ETL Pipeline

object ProductETL {
  case class RawProduct(id: String, name: String, price: String, category: String)
  case class Product(id: String, name: String, price: Double, category: String)

  def run(spark: SparkContext): Unit = {
    // Extract
    val rawData = spark.textFile("s3://data/products.csv")

    // Transform
    val products = rawData
      .filter(!_.startsWith("#"))  // Skip comments
      .map(_.split(","))
      .filter(_.length == 4)       // Validate structure
      .map { fields =>
        try {
          Some(Product(
            id = fields(0).trim,
            name = fields(1).trim,
            price = fields(2).trim.toDouble,
            category = fields(3).trim
          ))
        } catch {
          case _: Exception => None
        }
      }
      .filter(_.isDefined)
      .map(_.get)
      .cache()  // Cache for validation and stats

    // Validation
    val validCount = products.count()
    val invalidCount = rawData.count() - validCount
    println(s"Valid: $validCount, Invalid: $invalidCount")

    // Load
    products
      .map(p => s"${p.id}\t${p.name}\t${p.price}\t${p.category}")
      .saveAsTextFile("s3://data/products-clean")

    // Cleanup
    products.unpersist()
  }
}

Pattern 2: Log Analysis

object LogAnalyzer {
  case class LogEntry(
    timestamp: Long,
    level: String,
    component: String,
    message: String
  )

  def analyzeErrors(sc: SparkContext, logPath: String): Unit = {
    val logs = sc.textFile(logPath)
      .map(parseLog)
      .filter(_.level == "ERROR")
      .cache()  // Multiple analyses

    // Top error messages
    val topErrors = logs
      .map(l => (l.message, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)
      .take(10)

    println("Top 10 Errors:")
    topErrors.foreach { case (msg, count) =>
      println(s"  $count: $msg")
    }

    // Errors by component
    val errorsByComponent = logs
      .map(l => (l.component, 1))
      .reduceByKey(_ + _)
      .collect()

    println("Errors by Component:")
    errorsByComponent.foreach { case (comp, count) =>
      println(s"  $comp: $count")
    }

    logs.unpersist()
  }

  def parseLog(line: String): LogEntry = {
    // Simplified parser
    val parts = line.split("\\|")
    LogEntry(
      timestamp = parts(0).toLong,
      level = parts(1),
      component = parts(2),
      message = parts(3)
    )
  }
}

Pattern 3: Join Optimization

object JoinOptimization {
  // Small dataset (user profiles): 10 MB
  // Large dataset (clickstream): 1 TB

  def badJoin(sc: SparkContext): Unit = {
    val profiles: RDD[(String, Profile)] = loadProfiles(sc)
    val clicks: RDD[(String, Click)] = loadClicks(sc)

    // BAD: Both sides shuffle
    val enriched = clicks.join(profiles)  // 1 TB shuffle!
    enriched.saveAsTextFile("output")
  }

  def goodJoin(sc: SparkContext): Unit = {
    val profiles: RDD[(String, Profile)] = loadProfiles(sc)
    val clicks: RDD[(String, Click)] = loadClicks(sc)

    // GOOD: Broadcast small side
    val profileMap = sc.broadcast(profiles.collectAsMap())

    val enriched = clicks.mapPartitions { partition =>
      val localProfiles = profileMap.value
      partition.flatMap { case (userId, click) =>
        localProfiles.get(userId).map { profile =>
          (userId, EnrichedClick(click, profile))
        }
      }
    }

    enriched.saveAsTextFile("output")
    profileMap.unpersist()  // Free broadcast memory
  }
}

Pattern 4: Iterative Algorithm (PageRank)

object PageRank {
  def run(sc: SparkContext, links: RDD[(String, Seq[String])], iterations: Int): RDD[(String, Double)] = {
    var ranks = links.mapValues(_ => 1.0)

    // Cache links since used in every iteration
    links.cache()

    for (i <- 1 to iterations) {
      val contributions = links.join(ranks).flatMap {
        case (url, (neighbors, rank)) =>
          neighbors.map(dest => (dest, rank / neighbors.size))
      }

      ranks = contributions
        .reduceByKey(_ + _)
        .mapValues(0.15 + 0.85 * _)

      // Cache intermediate ranks for next iteration
      if (i < iterations) ranks.cache()
    }

    links.unpersist()
    ranks
  }
}

Part 8: Performance Optimization Checklist

🎯 Partitioning

  • Use 2-4 partitions per CPU core
  • Coalesce after filtering (reduce task overhead)
  • Custom partitioning for skewed data
  • Hash partition for joins on same key
  • Check partition sizes: rdd.mapPartitions(iter => Iterator(iter.size)).collect()

🎯 Shuffles

  • Minimize shuffles (avoid groupByKey, prefer reduceByKey)
  • Broadcast small RDDs in joins (< 100 MB)
  • Use mapPartitions instead of map for batch processing
  • Pre-partition data if multiple joins on same key

🎯 Caching

  • Cache RDDs used multiple times
  • Choose appropriate storage level
  • Unpersist when done
  • Monitor cache usage in Spark UI

🎯 Data Format

  • Use efficient formats (Parquet > JSON > Text)
  • Enable compression (Snappy for speed, GZIP for size)
  • Schema evolution support (Parquet, Avro)

🎯 Code Patterns

  • Filter early in pipeline
  • Avoid collect() on large RDDs
  • Use mapPartitions for connection pooling
  • Prefer built-in operations over UDFs

Part 9: Common Pitfalls and Solutions

Pitfall 1: OutOfMemoryError on Driver

// WRONG: Collecting huge RDD
val allData = sc.textFile("1TB_file.txt").collect()  // CRASH!

// RIGHT: Process in distributed manner
sc.textFile("1TB_file.txt")
  .map(processLine)
  .saveAsTextFile("output")  // Results stay distributed

Pitfall 2: Data Skew

// WRONG: One key has 90% of data
val skewed = data.groupByKey()  // One task gets 90% of work

// RIGHT: Add salt to spread load
val salted = data.map { case (key, value) =>
  val salt = Random.nextInt(10)
  ((key, salt), value)
}.groupByKey()
// Now spread across 10 tasks per key

Pitfall 3: Unnecessary Shuffles

// WRONG: Multiple shuffles
data.groupByKey().mapValues(_.sum)  // Shuffle all data

// RIGHT: Reduce locally first
data.reduceByKey(_ + _)  // Shuffle only aggregates

Pitfall 4: Not Caching Iterative Algorithms

// WRONG: Recompute every iteration
var data = loadData()
for (i <- 1 to 100) {
  data = data.map(expensiveTransform)  // Recomputes from source!
}

// RIGHT: Cache intermediate results
var data = loadData().cache()
for (i <- 1 to 100) {
  data = data.map(expensiveTransform).cache()
}

Part 10: Interview Preparation

Conceptual Questions

Q: Explain narrow vs wide transformations. A:
  • Narrow: Each input partition contributes to at most one output partition. No shuffle. Examples: map, filter, union.
  • Wide: Each input partition may contribute to multiple output partitions. Requires shuffle. Examples: groupByKey, join, reduceByKey.
Q: Why is reduceByKey faster than groupByKey? A: reduceByKey performs local aggregation (combiner) before shuffle, reducing network data. groupByKey shuffles all values, then aggregates.
// groupByKey: Shuffle (k1, v1), (k1, v2), (k1, v3) -> 3 values over network
// reduceByKey: Local combine -> Shuffle (k1, v1+v2) -> 1 value over network
Q: What is lineage in RDD? A: Lineage is the graph of transformations used to build an RDD. If a partition is lost, Spark uses lineage to recompute only that partition, not the entire RDD.

Coding Questions

Q: Find top 10 most frequent words in a text file.
val words = sc.textFile("book.txt")
  .flatMap(_.toLowerCase.split("\\W+"))
  .filter(_.nonEmpty)
  .map((_, 1))
  .reduceByKey(_ + _)
  .sortBy(_._2, ascending = false)
  .take(10)

words.foreach(println)
Q: Remove duplicate lines from a file.
val unique = sc.textFile("input.txt")
  .distinct()
  .saveAsTextFile("output-unique")
Q: Calculate average salary by department.
case class Employee(name: String, dept: String, salary: Double)
val employees: RDD[Employee] = loadEmployees()

val avgByDept = employees
  .map(e => (e.dept, e.salary))
  .aggregateByKey((0.0, 0))(
    seqOp = { case ((sum, count), salary) => (sum + salary, count + 1) },
    combOp = { case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2) }
  )
  .mapValues { case (sum, count) => sum / count }

avgByDept.collect().foreach(println)

Summary and Next Steps

What You’ve Mastered

✅ RDD fundamentals (immutability, partitioning, lineage) ✅ Transformations (map, filter, flatMap, join, aggregateByKey) ✅ Actions (collect, count, reduce, save) ✅ Partitioning strategies (hash, range, custom) ✅ Caching and persistence ✅ Real-world ETL and analytics patterns ✅ Performance optimization techniques ✅ Common pitfalls and solutions

Key Takeaways

  1. Filter Early: Reduce data volume as soon as possible
  2. Avoid Shuffles: Use reduceByKey over groupByKey, broadcast small RDDs
  3. Cache Wisely: Cache RDDs used multiple times, unpersist when done
  4. Partition Smart: 2-4 partitions per core, coalesce after filtering
  5. Think Lazy: Spark doesn’t execute until an action

Next Module Preview

In Module 3: Spark SQL & DataFrames, you’ll learn:
  • Why DataFrames are 5-10x faster than RDDs
  • Catalyst optimizer internals
  • Tungsten execution engine
  • When to use DataFrames vs RDDs
  • Advanced SQL patterns

Module 3: Spark SQL & DataFrames

Level up with optimized DataFrame APIs

Additional Resources

Practice Datasets

Further Reading

  • Research Paper: “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing” (Zaharia et al., NSDI 2012)
  • Book: “Learning Spark” (2nd Edition) - Chapters 3-4
  • Documentation: Spark RDD Programming Guide

Hands-On Labs

Try these exercises to reinforce learning:
  1. Word Count Variations: Implement word count with case-insensitivity, stop word removal, and bigrams
  2. Log Analysis: Parse Apache access logs, find top IPs, most requested URLs, and error rates
  3. Join Practice: Join datasets (e.g., orders + customers), try broadcast joins
  4. PageRank: Implement full PageRank algorithm with link parsing and iteration
  5. Custom Partitioner: Build a domain-specific partitioner for your use case
Estimated Practice Time: 6-8 hours of hands-on coding to master RDD concepts