Skip to main content

Chapter 3: MapReduce Framework

MapReduce is the distributed data processing framework that sits at the heart of Hadoop. Inspired by Google’s MapReduce paper, it provides a simple yet powerful programming model that abstracts away the complexities of parallel programming, fault tolerance, and data distribution.
Chapter Goals:
  • Understand the MapReduce programming model
  • Master map and reduce phases in detail
  • Learn the complete execution flow from job submission to completion
  • Explore shuffle and sort mechanisms
  • Study fault tolerance and speculative execution
  • Compare Hadoop MapReduce with Google’s original implementation

The MapReduce Programming Model

Core Concept

MapReduce is inspired by functional programming concepts, specifically the map and reduce operations:
+---------------------------------------------------------------+
|              MAPREDUCE PROGRAMMING MODEL                      |
+---------------------------------------------------------------+
|                                                               |
|  Input Data → MAP Function → Intermediate Data                |
|               (per record)   (key-value pairs)                |
|                                     ↓                         |
|                              SHUFFLE & SORT                   |
|                              (group by key)                   |
|                                     ↓                         |
|              Grouped Data → REDUCE Function → Output          |
|                             (per key)                         |
|                                                               |
|  Simple Model, Powerful Results:                              |
|  ─────────────────────────────                                |
|  • Programmers write TWO functions (map + reduce)             |
|  • Framework handles parallelization automatically            |
|  • No manual thread management needed                         |
|  • Fault tolerance built in                                   |
|  • Data locality optimized by framework                       |
|                                                               |
+---------------------------------------------------------------+

Map and Reduce Functions

Map: Transform Input Records
MAP FUNCTION SIGNATURE
─────────────────────

map(K1 key, V1 value) → list<K2, V2>

Input:
• K1: Input key type (e.g., line number, file offset)
• V1: Input value type (e.g., line of text)

Output:
• Emit zero or more (K2, V2) pairs
• K2: Intermediate key (determines grouping)
• V2: Intermediate value


EXAMPLE: Word Count Map
───────────────────────

Input: (offset, "hello world hello")

Process:
1. Split into words: ["hello", "world", "hello"]
2. Emit for each word:
   emit("hello", 1)
   emit("world", 1)
   emit("hello", 1)

Output: [("hello", 1), ("world", 1), ("hello", 1)]


KEY INSIGHTS:
────────────
• Maps are stateless and independent
• Each map processes ONE input record
• Maps run in parallel across cluster
• No communication between mappers
• Can emit 0, 1, or many outputs per input
Map in Java (Hadoop API):
public class WordCountMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    String line = value.toString();
    String[] words = line.split("\\s+");

    for (String w : words) {
      word.set(w);
      context.write(word, one);  // emit(word, 1)
    }
  }
}

MapReduce Execution Architecture

Components

+---------------------------------------------------------------+
|              MAPREDUCE EXECUTION COMPONENTS                   |
+---------------------------------------------------------------+
|                                                               |
|                      ┌──────────────────┐                     |
|                      │   Client         │                     |
|                      │ (Submits Job)    │                     |
|                      └────────┬─────────┘                     |
|                               │                               |
|                               │ 1. Submit Job                 |
|                               ↓                               |
|                      ┌──────────────────┐                     |
|                      │   JobTracker     │                     |
|                      │   (Master)       │                     |
|                      ├──────────────────┤                     |
|                      │ • Job Scheduling │                     |
|                      │ • Task Allocation│                     |
|                      │ • Progress Track │                     |
|                      │ • Failure Handle │                     |
|                      └────────┬─────────┘                     |
|                               │                               |
|              ┌────────────────┼────────────────┐              |
|              │                │                │              |
|              ↓                ↓                ↓              |
|    ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      |
|    │TaskTracker 1 │  │TaskTracker 2 │  │TaskTracker 3 │      |
|    ├──────────────┤  ├──────────────┤  ├──────────────┤      |
|    │ Map Tasks    │  │ Map Tasks    │  │ Map Tasks    │      |
|    │ Reduce Tasks │  │ Reduce Tasks │  │ Reduce Tasks │      |
|    └──────────────┘  └──────────────┘  └──────────────┘      |
|                                                               |
|  JobTracker (Hadoop 1.x):                                     |
|  • One per cluster                                            |
|  • Single point of failure (before HA)                        |
|  • Schedules all tasks                                        |
|  • Monitors task progress                                     |
|                                                               |
|  TaskTracker:                                                 |
|  • One per worker node                                        |
|  • Reports to JobTracker via heartbeat (every 3 sec)          |
|  • Executes map and reduce tasks                              |
|  • Has fixed number of "slots" for tasks                      |
|                                                               |
+---------------------------------------------------------------+

NOTE: Hadoop 2.x replaced JobTracker/TaskTracker with YARN
      (covered in Chapter 4)

Job Submission Flow

Client Submits Job to JobTracker
CLIENT ACTIONS:
──────────────

1. Configure Job
   - Set mapper class
   - Set reducer class
   - Set input/output formats
   - Specify input/output paths
   - Set number of reducers

2. Submit Job JAR
   job.submit() OR job.waitForCompletion(true)

3. JobTracker Assigns Job ID
   job_202401241200_0001


WHAT GETS UPLOADED:
──────────────────

→ Job JAR file (contains mapper/reducer code)
→ Job configuration (XML)
→ Input splits metadata (which files/blocks to process)


JOBTRACKER RECEIVES:
───────────────────

{
  "jobId": "job_202401241200_0001",
  "user": "hadoop",
  "jarPath": "hdfs://namenode/tmp/hadoop-jars/job.jar",
  "inputPath": "hdfs://namenode/user/input/",
  "outputPath": "hdfs://namenode/user/output/",
  "mapperClass": "com.example.WordCountMapper",
  "reducerClass": "com.example.WordCountReducer",
  "numReducers": 5,
  "inputFormat": "TextInputFormat"
}
JobTracker Computes Input Splits
INPUT SPLITS COMPUTATION
───────────────────────

Goal: Divide input into chunks for parallel processing

Input: /user/input/largefile.txt (1GB file)
       Stored in HDFS with 128MB blocks


HDFS Blocks:
┌────────────────────────────────────────┐
│ Block 1 (128MB) → DN1, DN3, DN5        │
│ Block 2 (128MB) → DN2, DN4, DN6        │
│ Block 3 (128MB) → DN1, DN2, DN7        │
│ Block 4 (128MB) → DN3, DN4, DN8        │
│ Block 5 (128MB) → DN5, DN6, DN9        │
│ Block 6 (128MB) → DN7, DN8, DN10       │
│ Block 7 (128MB) → DN1, DN4, DN9        │
│ Block 8 (96MB)  → DN2, DN5, DN10       │
└────────────────────────────────────────┘


Input Splits (typically 1 split = 1 block):
┌────────────────────────────────────────┐
│ Split 1: Block 1, size=128MB           │
│   Locations: [DN1, DN3, DN5]           │
│   Preferred: DN1                       │
├────────────────────────────────────────┤
│ Split 2: Block 2, size=128MB           │
│   Locations: [DN2, DN4, DN6]           │
│   Preferred: DN2                       │
├────────────────────────────────────────┤
│ ... (splits 3-7) ...                   │
├────────────────────────────────────────┤
│ Split 8: Block 8, size=96MB            │
│   Locations: [DN2, DN5, DN10]          │
│   Preferred: DN2                       │
└────────────────────────────────────────┘

Result: 8 map tasks (one per split)


KEY INSIGHT: Data Locality
──────────────────────────
• Each split knows which DataNodes have the data
• JobTracker tries to schedule map task on node with data
• If local node busy, use rack-local node
• Last resort: off-rack node (expensive network transfer)
JobTracker Schedules Tasks
TASK SCHEDULING ALGORITHM
────────────────────────

For Each Input Split:
1. Create a Map Task
2. Find TaskTracker with:
   a) Data locality (same node as block)
   b) Available slot
   c) Healthy status

Example:
────────

Split 1 (Block 1 on [DN1, DN3, DN5]):
  Best: Schedule on TaskTracker@DN1 (node-local)
  OK:   Schedule on TaskTracker@DN3 (node-local)
  Fair: Schedule on TaskTracker@DN2 (rack-local)
  Poor: Schedule on TaskTracker@DN20 (off-rack)


SCHEDULER TYPES (Pluggable):
───────────────────────────

1. FIFO Scheduler (Default):
   - First job gets all resources
   - Simple but unfair

2. Fair Scheduler:
   - Equal share for all jobs
   - Better for multi-tenant clusters

3. Capacity Scheduler:
   - Multiple queues with capacity guarantees
   - Used in production (Yahoo, etc.)


SLOT ALLOCATION:
───────────────

Each TaskTracker has fixed slots:

TaskTracker@DN1:
┌─────────────────────────────┐
│ Map Slots:    [4 total]     │
│   Slot 1: RUNNING (map_001) │
│   Slot 2: RUNNING (map_005) │
│   Slot 3: AVAILABLE         │
│   Slot 4: AVAILABLE         │
├─────────────────────────────┤
│ Reduce Slots: [2 total]     │
│   Slot 1: RUNNING (red_002) │
│   Slot 2: AVAILABLE         │
└─────────────────────────────┘

Problem: Rigid slots (can't share between map/reduce)
Solution: YARN (Hadoop 2.x) uses flexible containers
TaskTracker Executes Tasks
MAP TASK EXECUTION
─────────────────

1. TaskTracker Receives Assignment:
   {
     "taskId": "task_202401241200_0001_m_000001",
     "jobId": "job_202401241200_0001",
     "splitLocation": "hdfs://namenode/user/input/file.txt",
     "splitOffset": 0,
     "splitLength": 134217728,
     "jarPath": "hdfs://namenode/tmp/job.jar"
   }

2. Prepare Execution Environment:
   - Download job JAR from HDFS
   - Create local working directory
   - Set up classpath

3. Launch Map Task (separate JVM):
   java -Xmx1024m \
        -classpath job.jar:hadoop-libs \
        org.apache.hadoop.mapred.MapTask \
        task_202401241200_0001_m_000001

4. Map Task Reads Input Split:
   - Open HDFS file at offset
   - Read splitLength bytes
   - Parse using InputFormat (e.g., TextInputFormat)
   - For each record, call user's map() function

5. Write Intermediate Output:
   - Buffer in memory (default 100MB)
   - Spill to disk when buffer full
   - Partition by reducer (hash(key) % numReducers)
   - Sort within each partition
   - Optionally combine (local reduce)


REDUCE TASK EXECUTION
────────────────────

1. Shuffle Phase (copy map outputs):
   - Connect to all mappers
   - Download intermediate data for this reducer's partition
   - Merge sorted files on disk

2. Sort Phase:
   - Final merge sort of all data
   - Group values by key

3. Reduce Phase:
   - For each key, call user's reduce() function
   - Write output to HDFS


PROGRESS REPORTING:
──────────────────

Every 3 seconds, TaskTracker → JobTracker:
{
  "taskId": "task_202401241200_0001_m_000001",
  "progress": 0.45,  // 45% complete
  "status": "RUNNING",
  "counters": {
    "MAP_INPUT_RECORDS": 1000000,
    "MAP_OUTPUT_RECORDS": 5000000,
    "MAP_OUTPUT_BYTES": 125000000
  }
}

The Shuffle and Sort Phase

The shuffle is the most complex and critical part of MapReduce—it’s where intermediate data is transferred from mappers to reducers and sorted.

Shuffle Architecture

+---------------------------------------------------------------+
|                  SHUFFLE AND SORT MECHANISM                   |
+---------------------------------------------------------------+
|                                                               |
|   MAP TASK 1          MAP TASK 2          MAP TASK 3          |
|   ┌──────────┐       ┌──────────┐       ┌──────────┐         |
|   │  map()   │       │  map()   │       │  map()   │         |
|   └────┬─────┘       └────┬─────┘       └────┬─────┘         |
|        │                  │                  │                |
|        │ emit(k,v)        │                  │                |
|        ↓                  ↓                  ↓                |
|   ┌──────────┐       ┌──────────┐       ┌──────────┐         |
|   │ In-Memory│       │ In-Memory│       │ In-Memory│         |
|   │  Buffer  │       │  Buffer  │       │  Buffer  │         |
|   │ (100MB)  │       │ (100MB)  │       │ (100MB)  │         |
|   └────┬─────┘       └────┬─────┘       └────┬─────┘         |
|        │ spill            │                  │                |
|        ↓                  ↓                  ↓                |
|   ┌──────────┐       ┌──────────┐       ┌──────────┐         |
|   │Partition │       │Partition │       │Partition │         |
|   │  & Sort  │       │  & Sort  │       │  & Sort  │         |
|   └────┬─────┘       └────┬─────┘       └────┬─────┘         |
|        │                  │                  │                |
|        ↓                  ↓                  ↓                |
|   Spill Files        Spill Files        Spill Files          |
|   ┌──┬──┬──┐         ┌──┬──┬──┐         ┌──┬──┬──┐           |
|   │R0│R1│R2│         │R0│R1│R2│         │R0│R1│R2│           |
|   └──┴──┴──┘         └──┴──┴──┘         └──┴──┴──┘           |
|        │                  │                  │                |
|        └──────────────────┼──────────────────┘                |
|                           │                                   |
|                    HTTP SHUFFLE                               |
|                           │                                   |
|        ┌──────────────────┼──────────────────┐                |
|        │                  │                  │                |
|        ↓                  ↓                  ↓                |
|   REDUCE TASK 0      REDUCE TASK 1      REDUCE TASK 2        |
|   ┌──────────┐       ┌──────────┐       ┌──────────┐         |
|   │  Fetch   │       │  Fetch   │       │  Fetch   │         |
|   │ (R0 from │       │ (R1 from │       │ (R2 from │         |
|   │all maps) │       │all maps) │       │all maps) │         |
|   └────┬─────┘       └────┬─────┘       └────┬─────┘         |
|        │                  │                  │                |
|        ↓                  ↓                  ↓                |
|   ┌──────────┐       ┌──────────┐       ┌──────────┐         |
|   │  Merge   │       │  Merge   │       │  Merge   │         |
|   │  & Sort  │       │  & Sort  │       │  & Sort  │         |
|   └────┬─────┘       └────┬─────┘       └────┬─────┘         |
|        │                  │                  │                |
|        ↓                  ↓                  ↓                |
|   ┌──────────┐       ┌──────────┐       ┌──────────┐         |
|   │ reduce() │       │ reduce() │       │ reduce() │         |
|   └────┬─────┘       └────┬─────┘       └────┬─────┘         |
|        │                  │                  │                |
|        ↓                  ↓                  ↓                |
|     Output           Output           Output                 |
|                                                               |
+---------------------------------------------------------------+

Map-Side Processing

Determining Which Reducer Gets Each Key
PARTITIONING ALGORITHM
─────────────────────

For each emitted (key, value) from mapper:

partitionId = hash(key) % numReducers


Example with 3 Reducers:
───────────────────────

Mapper emits:
("apple", 1)   → hash("apple") % 3   = 2 → Reducer 2
("banana", 1)  → hash("banana") % 3  = 0 → Reducer 0
("cherry", 1)  → hash("cherry") % 3  = 1 → Reducer 1
("apple", 1)   → hash("apple") % 3   = 2 → Reducer 2
("banana", 1)  → hash("banana") % 3  = 0 → Reducer 0


KEY PROPERTY: Consistent Hashing
────────────────────────────────
• Same key always goes to same reducer
• All values for a key end up together
• Enables grouping in reduce phase


CUSTOM PARTITIONER:
──────────────────

Default partitioner is HashPartitioner.
You can provide custom logic:

public class CustomPartitioner
    extends Partitioner<Text, IntWritable> {

  @Override
  public int getPartition(Text key, IntWritable value,
                         int numPartitions) {
    // Example: partition by first letter
    char firstLetter = key.toString().charAt(0);
    return (firstLetter - 'a') % numPartitions;
  }
}

This ensures all words starting with same letter
go to same reducer.


LOAD BALANCING CONCERN:
──────────────────────
• Hash may create unbalanced partitions (skew)
• Some reducers get more data than others
• Solution: Better partitioning or use Combiners

Reduce-Side Processing

Fetching Map Outputs
SHUFFLE PHASE (from Reducer's perspective)
─────────────────────────────────────────

Reducer Knows:
• Which mappers to fetch from (all of them)
• Which partition to fetch (based on reducer ID)


FETCH PROTOCOL:
──────────────

For each completed map task:

1. Reducer sends HTTP GET request:
   http://tasktracker1:50060/mapOutput?
     jobId=job_202401241200_0001&
     mapId=task_0001_m_000001&
     reduce=2

2. TaskTracker serves partition 2 from map output

3. Reducer saves to local disk or memory


FETCH TIMELINE:
──────────────

Reducers start fetching AS SOON AS first map completes.
Don't wait for all maps to finish!

Timeline:
───────────────────────────────────────────────────
Map 1 done    → Reduce starts fetching from Map 1
Map 2 done    → Reduce fetches from Map 2
...
Map N done    → Reduce fetches from Map N
All maps done → Reduce proceeds to merge/sort
───────────────────────────────────────────────────


FETCH STRATEGIES:
────────────────

Memory Buffer:
• Small map outputs kept in memory (default 70% of heap)
• Fast access, no disk I/O

Disk Spill:
• Large outputs spilled to disk
• Merged later with on-disk merge

Configuration:
• mapreduce.reduce.shuffle.parallelcopies = 5
  (fetch from 5 mappers simultaneously)
• mapreduce.reduce.shuffle.memory.limit.percent = 0.25
  (use 25% of heap for in-memory shuffle)


FETCH FAILURES:
──────────────

If fetch fails:
• Retry same TaskTracker (transient error)
• After N retries, inform JobTracker
• JobTracker may re-run map task elsewhere

Fault Tolerance in MapReduce

Handling Failures

Task Failure

Map or Reduce Task Crashes:Detection:
  • Task not reporting progress
  • Task sends failure notification
  • TaskTracker crashes
Recovery:
  • Reschedule task on different node
  • Re-execute from beginning (stateless)
  • No cascading failures

Node Failure

TaskTracker Fails:Detection:
  • No heartbeat for 10 minutes
  • JobTracker marks node dead
Recovery:
  • All tasks on node marked failed
  • Reschedule on healthy nodes
  • Even completed map tasks re-run (outputs lost)

Straggler Tasks

Slow Tasks (Stragglers):Problem:
  • One slow task delays entire job
  • Common due to hardware issues, data skew
Solution: Speculative Execution
  • Launch duplicate task on different node
  • First to complete wins
  • Other task killed

JobTracker Failure

Master Fails (Hadoop 1.x):Problem:
  • Single point of failure
  • Job must restart from beginning
Solution (Hadoop 2.x):
  • YARN ResourceManager HA
  • Job state checkpointed
  • Can resume after failover

Speculative Execution

+---------------------------------------------------------------+
|                  SPECULATIVE EXECUTION                        |
+---------------------------------------------------------------+
|                                                               |
|  Problem: Stragglers (Slow Tasks)                             |
|  ──────────────────────────────                               |
|                                                               |
|  Job has 100 map tasks.                                       |
|  99 tasks complete in 2 minutes.                              |
|  1 task still running after 10 minutes (bad disk, CPU).       |
|  → Entire job delayed!                                        |
|                                                               |
|                                                               |
|  Solution: Speculative Execution                              |
|  ───────────────────────────────                              |
|                                                               |
|  JobTracker monitors progress of all tasks.                   |
|                                                               |
|  Identifies stragglers:                                       |
|  • Tasks slower than average                                  |
|  • Tasks not making progress                                  |
|                                                               |
|  For each straggler:                                          |
|  1. Launch duplicate task on different node                   |
|  2. Two tasks race to completion                              |
|  3. First to finish wins                                      |
|  4. Kill the other task                                       |
|                                                               |
|                                                               |
|  Example Timeline:                                            |
|  ────────────────                                             |
|                                                               |
|  Normal Task (Task A):                                        |
|  [========================================] 100% (2 min)      |
|                                                               |
|  Straggler (Task B):                                          |
|  [==========>........................] 25% (2 min)            |
|           ^                                                   |
|           │                                                   |
|       At 2 min mark, JobTracker launches speculative copy:    |
|                                                               |
|  Speculative (Task B'):                                       |
|           [========================================] (2 min)   |
|                                                               |
|  Result: Job finishes at 4 min instead of 10 min              |
|                                                               |
|                                                               |
|  Configuration:                                               |
|  ──────────────                                               |
|                                                               |
|  mapreduce.map.speculative=true     (default: true)           |
|  mapreduce.reduce.speculative=true  (default: true)           |
|                                                               |
|                                                               |
|  When to Disable:                                             |
|  ────────────────                                             |
|                                                               |
|  • Tasks have side effects (write to external DB)             |
|  • Cluster at full capacity (no spare resources)              |
|  • Non-idempotent operations                                  |
|                                                               |
+---------------------------------------------------------------+

MapReduce Optimizations

Configuration Tuning

JVM Heap and Buffer Sizes
MAP TASK MEMORY:
───────────────

mapreduce.map.memory.mb = 1024
(1GB physical memory for map container)

mapreduce.map.java.opts = -Xmx800m
(800MB JVM heap for map task)

Why less than 1024?
→ Leave room for off-heap memory, OS overhead


REDUCE TASK MEMORY:
──────────────────

mapreduce.reduce.memory.mb = 2048
(2GB physical memory for reduce container)

mapreduce.reduce.java.opts = -Xmx1600m
(1.6GB JVM heap for reduce task)

Reducers often need more memory (shuffle, sort)


SORT BUFFER:
───────────

mapreduce.task.io.sort.mb = 100
(100MB for map output buffer)

Larger buffer → fewer spills → better performance
But: Takes memory from heap

mapreduce.task.io.sort.factor = 10
(Merge up to 10 files at once)

Higher factor → fewer merge rounds


SHUFFLE BUFFER:
──────────────

mapreduce.reduce.shuffle.input.buffer.percent = 0.70
(Use 70% of reduce heap for shuffle)

mapreduce.reduce.shuffle.merge.percent = 0.66
(Spill to disk when 66% full)


RECOMMENDATIONS:
───────────────

Small Jobs (< 100GB):
• map.memory.mb: 1024
• reduce.memory.mb: 2048

Medium Jobs (100GB - 1TB):
• map.memory.mb: 1536
• reduce.memory.mb: 3072

Large Jobs (> 1TB):
• map.memory.mb: 2048
• reduce.memory.mb: 4096
• sort.mb: 200

Advanced MapReduce Patterns

Common Design Patterns

Select Subset of Records
PATTERN: Filter
GOAL: Keep only records matching criteria

Use Case: Extract error logs from all logs


MAP FUNCTION:
────────────

map(offset, line):
  if line.contains("ERROR"):
    emit(line, null)
  // Don't emit non-error lines


REDUCE FUNCTION:
───────────────

NONE (map-only job)
job.setNumReduceTasks(0);


EXAMPLE:
───────

Input:
2024-01-24 10:00:00 INFO Started service
2024-01-24 10:00:05 ERROR Connection failed
2024-01-24 10:00:10 INFO Retrying...
2024-01-24 10:00:15 ERROR Timeout

Output:
2024-01-24 10:00:05 ERROR Connection failed
2024-01-24 10:00:15 ERROR Timeout


OPTIMIZATION:
────────────

• No reducer needed (map-only)
• Each mapper writes directly to HDFS
• Very efficient for simple filtering
Find Top N Items
PATTERN: Top N
GOAL: Find N largest/smallest items

Use Case: Top 10 most frequent words


APPROACH 1: In-Mapper Top N
───────────────────────────

map(offset, line):
  words = line.split()
  for word in words:
    localCounts[word] += 1

  // In cleanup():
  topN = localCounts.topN(10)
  for (word, count) in topN:
    emit(word, count)

reduce(word, counts):
  totalCount = sum(counts)
  emit(word, totalCount)

Final: Sort all reducer output, take top 10


APPROACH 2: Two-Stage
─────────────────────

Job 1: Word Count
  → Output: All (word, count) pairs

Job 2: Sort and Top N
  → Mapper: emit(count, word)  // flip key-value
  → Reducer: emit top N


APPROACH 3: Single Reducer
──────────────────────────

map(offset, line):
  // Normal word count map
  for word in line.split():
    emit(word, 1)

reduce(word, counts):
  emit(sum(counts), word)
  // Emit (count, word) instead of (word, count)

Configuration:
• Set numReducers = 1
• Total sort in reducer
• Emit top N


TRADE-OFFS:
──────────

Approach 1: Less data shuffled, but complex
Approach 2: Simpler, but two jobs
Approach 3: Simple, but single reducer bottleneck
Joining Two Datasets
PATTERN: Join
GOAL: Combine records from two datasets

Use Case: Join users with their orders


Dataset 1 (users.txt):
user123,Alice
user456,Bob

Dataset 2 (orders.txt):
order001,user123,100
order002,user456,200
order003,user123,50


REDUCE-SIDE JOIN:
────────────────

map(filename, line):
  if filename == "users.txt":
    userId, name = line.split(',')
    emit(userId, ("USER", name))
  else:  // orders.txt
    orderId, userId, amount = line.split(',')
    emit(userId, ("ORDER", orderId, amount))


reduce(userId, values):
  userName = null
  orders = []

  for value in values:
    if value[0] == "USER":
      userName = value[1]
    else:  // ORDER
      orders.append(value)

  for order in orders:
    emit(userId, userName, order)


Output:
user123,Alice,order001,100
user123,Alice,order003,50
user456,Bob,order002,200


MAP-SIDE JOIN (if one dataset is small):
───────────────────────────────────────

Setup:
• Load small dataset (users) into memory
• Use Distributed Cache

map(offset, line):
  // Parse order
  orderId, userId, amount = line.split(',')

  // Lookup user in memory
  userName = usersMap.get(userId)

  emit(userId, userName + "," + orderId + "," + amount)

No Reducer Needed!


TRADE-OFFS:
──────────

Reduce-Side Join:
✓ Works for any dataset size
✗ Expensive shuffle
✗ All data must go through reduce

Map-Side Join:
✓ Very efficient (no shuffle)
✓ Happens in map phase
✗ Requires small dataset
✗ Dataset must fit in memory
Control Value Order in Reduce
PATTERN: Secondary Sort
GOAL: Control order of values arriving at reducer

Use Case: Process user events in chronological order


PROBLEM:
───────

Default: Reducer receives values in ARBITRARY order

reduce(userId, events):
  // events might arrive as: [event3, event1, event2]
  // Want chronological: [event1, event2, event3]


SOLUTION: Composite Key
──────────────────────

1. Create composite key: (userId, timestamp)

2. Custom Partitioner:
   // Partition by userId only
   partition = hash(userId) % numReducers

3. Custom Comparator:
   // Sort by userId, then timestamp
   compare (key1, key2):
     if key1.userId != key2.userId:
       return compare(key1.userId, key2.userId)
     else:
       return compare(key1.timestamp, key2.timestamp)

4. Grouping Comparator:
   // Group by userId only
   compare(key1, key2):
     return compare(key1.userId, key2.userId)


IMPLEMENTATION:
──────────────

public class CompositeKey implements WritableComparable {
  String userId;
  Long timestamp;

  @Override
  public int compareTo(CompositeKey other) {
    int cmp = userId.compareTo(other.userId);
    if (cmp != 0) return cmp;
    return timestamp.compareTo(other.timestamp);
  }
}

job.setPartitionerClass(UserPartitioner.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setGroupingComparatorClass(UserGroupingComparator.class);


RESULT:
──────

Reducer receives:
userId: user123
values: [event1@t1, event2@t2, event3@t3]  // sorted!
Sharing Read-Only Data
PATTERN: Distributed Cache
GOAL: Share small files across all mappers/reducers

Use Case: Load lookup table in map-side join


PROBLEM:
───────

Need small dataset available to all tasks:
• IP → Country mapping
• User ID → Name mapping
• Product catalog

Don't want to:
• Embed in JAR (inflexible)
• Read from HDFS for each record (slow)


SOLUTION: Distributed Cache
──────────────────────────

1. Upload file to HDFS:
   hdfs dfs -put users.txt /shared/users.txt

2. Add to job configuration:
   job.addCacheFile(new URI("/shared/users.txt"));

3. In mapper setup, load into memory:

   public class MyMapper extends Mapper {
     Map<String, String> usersMap = new HashMap<>();

     @Override
     protected void setup(Context context) {
       // Read cached file
       Path[] cacheFiles = context.getLocalCacheFiles();
       BufferedReader reader = new BufferedReader(
         new FileReader(cacheFiles[0].toString()));

       String line;
       while ((line = reader.readLine()) != null) {
         String[] parts = line.split(",");
         usersMap.put(parts[0], parts[1]);
       }
       reader.close();
     }

     @Override
     public void map(K key, V value, Context context) {
       // Use usersMap for lookups
       String userId = ...;
       String userName = usersMap.get(userId);
       ...
     }
   }


DISTRIBUTED CACHE PROPERTIES:
────────────────────────────

✓ Files copied to each node once
✓ Cached on local disk
✓ Read-only (no modifications)
✓ Efficient for small files (< 100MB)
✗ Not for large datasets
✗ All tasks share same copy (no per-task data)


USE CASES:
─────────

• Lookup tables
• Configuration files
• Small dimension tables (star schema)
• Stop words lists
• Machine learning models

Comparing Hadoop MapReduce with Google’s MapReduce

Key Differences

+---------------------------------------------------------------+
|          GOOGLE MAPREDUCE vs HADOOP MAPREDUCE                 |
+---------------------------------------------------------------+
|                                                               |
|  GOOGLE (Proprietary)        HADOOP (Open Source)            |
|  ───────────────────         ──────────────────              |
|                                                               |
|  Language: C++               Language: Java                   |
|  ────────────────            ───────────────                  |
|  • High performance          • Portability                    |
|  • Memory efficient          • Easier development             |
|  • Complex to develop        • Rich ecosystem                 |
|                                                               |
|  Scheduler: Internal         Scheduler: JobTracker (1.x)      |
|  ───────────────────         YARN (2.x+)                      |
|  • Integrated with Borg      • Pluggable schedulers           |
|  • Advanced features         • Fair, FIFO, Capacity           |
|                                                               |
|  Storage: GFS                Storage: HDFS                    |
|  ────────────                ───────────                      |
|  • 64MB chunks               • 128MB+ blocks                  |
|  • Optimized for Google      • Configurable                   |
|                                                               |
|  Locality: Automatic         Locality: Best-effort            |
|  ───────────────────         ──────────────────              |
|  • Tight GFS integration     • DataNode affinity              |
|  • High locality %           • Good locality %                |
|                                                               |
|  Network: Custom             Network: TCP/HTTP                |
|  ───────────────────         ──────────────────              |
|  • Google's network stack    • Standard protocols             |
|  • Optimized protocols       • Easier debugging               |
|                                                               |
|  Evolution: Unknown          Evolution: YARN → Spark          |
|  ──────────────────          ──────────────────────          |
|  • Internal only             • Public, community-driven       |
|  • Likely replaced           • Continuous improvement         |
|                                                               |
+---------------------------------------------------------------+

SIMILARITIES:
────────────
• Same programming model (map + reduce)
• Data locality optimization
• Fault tolerance via re-execution
• Shuffle and sort mechanisms
• Speculative execution

Why Hadoop Made Different Choices

Portability Over Performance
Google's Choice (C++):
─────────────────────
• Full control over performance
• Tight integration with internal systems
• Systems programming expertise available
• Don't need cross-platform compatibility

Hadoop's Choice (Java):
──────────────────────
• Write once, run anywhere (JVM)
• Larger developer community
• Easier to attract contributors
• Faster development cycle
• Good-enough performance

Result:
──────
Hadoop's Java choice enabled:
✓ Rapid adoption across diverse environments
✓ Rich ecosystem (Hive, Pig, HBase in Java)
✓ Lower barrier to entry
✗ Some performance overhead (GC pauses)

Key Takeaways

Remember These Core Insights:
  1. Simple Model, Powerful Abstraction: Map and reduce are simple functions, but framework handles all complexity (distribution, fault tolerance, optimization)
  2. Shuffle is the Bottleneck: Most optimization focuses on reducing shuffle data (combiners, compression, partitioning)
  3. Data Locality is Critical: Moving computation to data (not vice versa) is fundamental to MapReduce efficiency
  4. Fault Tolerance via Stateless Tasks: Re-execution works because tasks are stateless and deterministic
  5. Speculative Execution Handles Stragglers: Don’t wait for slow tasks—run duplicates and take first result
  6. Map-Only Jobs for Simple Cases: Not everything needs reduce—filtering, transformation can be map-only
  7. Combiners are Free Performance: If your reduce is associative/commutative, always use combiner
  8. Java Enabled the Ecosystem: Performance trade-off was worth it for portability and community growth

Interview Questions

Expected Answer:MapReduce is a programming model for processing large datasets in parallel across a distributed cluster.Two Main Functions:
  1. Map: Transforms each input record into key-value pairs
    • Example: For word count, map(“hello world”) → [(“hello”, 1), (“world”, 1)]
  2. Reduce: Aggregates values for each key
    • Example: reduce(“hello”, [1, 1, 1]) → (“hello”, 3)
Framework Responsibilities:
  • Automatically parallelizes map and reduce tasks
  • Distributes data across cluster
  • Handles failures by re-executing tasks
  • Optimizes data locality (schedule tasks near data)
  • Manages shuffle (moving data from mappers to reducers)
Key Benefit: Developers write simple sequential code (map and reduce functions), framework handles all distributed systems complexity.
Expected Answer:The shuffle is the process of moving intermediate data from mappers to reducers.Map Side (Pre-Shuffle):
  1. Buffer: Map outputs go to circular in-memory buffer (default 100MB)
  2. Spill: When 80% full, background thread spills to disk
    • Partition by reducer (hash(key) % numReducers)
    • Sort within each partition
    • Optionally run combiner (local aggregation)
  3. Merge: Multiple spills merged into single sorted file per map task
Network Transfer:
  1. Fetch: Reducers fetch their partitions from all mappers via HTTP
    • Starts as soon as first map completes (don’t wait for all)
    • Parallel copies from multiple mappers (default 5)
Reduce Side (Post-Shuffle):
  1. Merge: Reducer merges fetched data
    • Keep small segments in memory
    • Spill large segments to disk
    • Multi-way merge sort (k-way merge using min-heap)
  2. Group: Sorted data automatically groups values by key
  3. Reduce: Call reduce() for each unique key with all its values
Optimizations:
  • Compression reduces network transfer
  • Combiner reduces data volume
  • Pipelining (fetch while maps still running)
Expected Answer:How Speculative Execution Works:
  1. Monitoring: JobTracker tracks progress of all tasks
  2. Straggler Detection: Identifies tasks significantly slower than average
    • Based on progress rate, not absolute time
    • Considers task has made progress recently
  3. Duplicate Launch: Launches speculative copy on different node
    • Runs in parallel with original
    • Both tasks process same input split
  4. Race to Completion: First task to complete wins
    • Outputs from winner are used
    • Loser task is killed
When to Disable:
  1. Side Effects: Tasks write to external database
    • Duplicate writes could corrupt data
    • Non-idempotent operations
  2. Resource Constraints: Cluster at full utilization
    • No spare slots for speculative tasks
    • Would delay other jobs
  3. Heterogeneous Hardware: Some nodes intentionally slower
    • Speculative execution would waste resources
    • Example: mixed SSD and HDD nodes
  4. Debugging: Want to see actual task failures
    • Speculative execution masks underlying issues
Configuration:
mapreduce.map.speculative=false
mapreduce.reduce.speculative=false
Best Practice: Keep enabled for most production workloads, but ensure tasks are idempotent.
Expected Answer:Approach 1: Two-Job PipelineJob 1: Word Count
Map: (offset, line) → emit(word, 1) for each word
Reduce: (word, [1,1,1,...]) → emit(word, count)
Output: (word, count) pairs in HDFS
Job 2: Top 10
Map: (word, count) → emit(count, word)  // flip key-value
Reduce:
  - Single reducer (numReduceTasks=1)
  - Sort all (count, word) pairs descending
  - Emit top 10
Approach 2: Single Job with In-Mapper Aggregation
Map:
  setup():
    Initialize HashMap<String, Integer> localCounts
    Initialize PriorityQueue<Pair> topN (size 10)

  map(offset, line):
    for word in line.split():
      localCounts[word] += 1

  cleanup():
    for (word, count) in localCounts:
      if topN.size() < 10 or count > topN.peek():
        topN.add((count, word))
        if topN.size() > 10:
          topN.poll()

    for (count, word) in topN:
      emit(count, word)

Reduce:
  - Single reducer
  - Maintain global topN heap
  - Emit final top 10
Trade-offs:
  • Approach 1: Simpler, reusable word count, but two jobs
  • Approach 2: More efficient, less shuffle data, but more complex
Optimizations:
  • Use combiner in Approach 1 to reduce shuffle
  • Consider top 100 per mapper, then top 10 in reducer (reduce network)
  • If only need approximate top 10, use sampling
Expected Answer:1. Identify BottleneckCheck job counters and logs:
  • Map time vs reduce time vs shuffle time
  • Data skew (some reducers much slower)
  • Spill counts (too many disk writes)
  • GC time (memory pressure)
2. Map Phase Optimization
  • Input Splits: Ensure 1 split = 1 block for locality
  • Combiner: Add combiner to reduce map output
  • Compression: Compress map output (Snappy)
  • Memory: Increase sort buffer (io.sort.mb)
  • Avoid Small Files: Combine small files before processing
3. Shuffle Optimization
  • Compression: Always compress intermediate data
  • Combiner: Reduces shuffle volume dramatically
  • Fetch Parallelism: Increase parallel copies
  • Memory: Increase shuffle buffer percentage
4. Reduce Phase Optimization
  • Number of Reducers:
    • Too few: Reducers become bottleneck
    • Too many: Overhead, small output files
    • Rule of thumb: 0.95 or 1.75 × (nodes × max containers per node)
  • Skew Handling:
    • Use better partitioner
    • Salt skewed keys
    • Increase reducers
  • Memory: Increase reducer heap size
5. Code Optimization
  • Avoid object creation in map/reduce
  • Reuse Writable objects
  • Use efficient data structures
  • Profile with JVM profiler
6. Cluster Configuration
  • Slots: Ensure map/reduce slots properly configured
  • Locality: Check data locality percentage
  • Speculative Execution: Enable for stragglers
  • JVM Reuse: Reuse JVMs for multiple tasks
Example Diagnosis:
Problem: Job takes 2 hours, shuffle takes 90% of time

Analysis:
- Check map output size: 500 GB
- Check reduce input size: 500 GB
- No combiner being used
- Uncompressed intermediate data

Solution:
- Add combiner: 500 GB → 50 GB (90% reduction)
- Enable compression: 50 GB → 20 GB (60% reduction)
- Result: Shuffle time drops from 108 min to ~10 min
- Total job time: 2 hours → 30 minutes

Further Reading

MapReduce Paper

“MapReduce: Simplified Data Processing on Large Clusters” (2004) Dean and Ghemawat - Original Google paper

Hadoop Documentation

Official Apache Hadoop MapReduce documentation Configuration, APIs, and best practices

Data-Intensive Applications

Martin Kleppmann - Chapter 10 Batch Processing with MapReduce

Hadoop: The Definitive Guide

Tom White - Chapters 6-8 Comprehensive MapReduce coverage

Up Next

In Chapter 4: YARN, we’ll explore:
  • How Hadoop 2.x evolved beyond MapReduce
  • ResourceManager and NodeManager architecture
  • Generic resource management for multiple frameworks
  • ApplicationMaster pattern
  • How YARN enables Spark, Flink, and other frameworks
We’ve mastered MapReduce, the original Hadoop processing model. Next, we’ll see how YARN generalized resource management to support any distributed application, not just MapReduce.