MapReduce is the distributed data processing framework that sits at the heart of Hadoop. Inspired by Google’s MapReduce paper, it provides a simple yet powerful programming model that abstracts away the complexities of parallel programming, fault tolerance, and data distribution.
Chapter Goals:
Understand the MapReduce programming model
Master map and reduce phases in detail
Learn the complete execution flow from job submission to completion
Explore shuffle and sort mechanisms
Study fault tolerance and speculative execution
Compare Hadoop MapReduce with Google’s original implementation
MAP FUNCTION SIGNATURE─────────────────────map(K1 key, V1 value) → list<K2, V2>Input:• K1: Input key type (e.g., line number, file offset)• V1: Input value type (e.g., line of text)Output:• Emit zero or more (K2, V2) pairs• K2: Intermediate key (determines grouping)• V2: Intermediate valueEXAMPLE: Word Count Map───────────────────────Input: (offset, "hello world hello")Process:1. Split into words: ["hello", "world", "hello"]2. Emit for each word: emit("hello", 1) emit("world", 1) emit("hello", 1)Output: [("hello", 1), ("world", 1), ("hello", 1)]KEY INSIGHTS:────────────• Maps are stateless and independent• Each map processes ONE input record• Maps run in parallel across cluster• No communication between mappers• Can emit 0, 1, or many outputs per input
Map in Java (Hadoop API):
Copy
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+"); for (String w : words) { word.set(w); context.write(word, one); // emit(word, 1) } }}
Reduce: Aggregate Values per Key
Copy
REDUCE FUNCTION SIGNATURE────────────────────────reduce(K2 key, Iterator<V2> values) → list<K3, V3>Input:• K2: Intermediate key (from map)• values: ALL values for this key (grouped by framework)Output:• Emit zero or more (K3, V3) pairs• Typically K3 = K2, V3 is aggregated resultEXAMPLE: Word Count Reduce──────────────────────────Input: ("hello", [1, 1, 1, 1, 1])Process:1. Sum all values: 1 + 1 + 1 + 1 + 1 = 52. Emit result: emit("hello", 5)Output: ("hello", 5)Another Input: ("world", [1, 1])Process:1. Sum: 1 + 1 = 22. Emit: emit("world", 2)Output: ("world", 2)KEY INSIGHTS:────────────• Reducer receives ALL values for a key• Keys are sorted before reduce• Reducers run in parallel (different keys)• Each reducer processes multiple keys• Values arrive as an iterator (may not fit in memory)
Reduce in Java (Hadoop API):
Copy
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}
Full Word Count MapReduce Job
Copy
public class WordCount { // Map Phase public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } // Reduce Phase public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } // Main: Configure and submit job public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
PARTITIONING ALGORITHM─────────────────────For each emitted (key, value) from mapper:partitionId = hash(key) % numReducersExample with 3 Reducers:───────────────────────Mapper emits:("apple", 1) → hash("apple") % 3 = 2 → Reducer 2("banana", 1) → hash("banana") % 3 = 0 → Reducer 0("cherry", 1) → hash("cherry") % 3 = 1 → Reducer 1("apple", 1) → hash("apple") % 3 = 2 → Reducer 2("banana", 1) → hash("banana") % 3 = 0 → Reducer 0KEY PROPERTY: Consistent Hashing────────────────────────────────• Same key always goes to same reducer• All values for a key end up together• Enables grouping in reduce phaseCUSTOM PARTITIONER:──────────────────Default partitioner is HashPartitioner.You can provide custom logic:public class CustomPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // Example: partition by first letter char firstLetter = key.toString().charAt(0); return (firstLetter - 'a') % numPartitions; }}This ensures all words starting with same lettergo to same reducer.LOAD BALANCING CONCERN:──────────────────────• Hash may create unbalanced partitions (skew)• Some reducers get more data than others• Solution: Better partitioning or use Combiners
In-Memory Buffer and Spill Files
Copy
MAP OUTPUT BUFFER────────────────Configuration: io.sort.mb = 100 (default 100MB)┌─────────────────────────────────────┐│ Circular Memory Buffer ││ (100MB) │├─────────────────────────────────────┤│ (key1, val1, partition1, metadata) ││ (key2, val2, partition2, metadata) ││ (key3, val3, partition3, metadata) ││ ... ││ ... │└─────────────────────────────────────┘ │ │ When 80% full (configurable) ↓BACKGROUND SPILL TO DISK───────────────────────1. Sort by (partition, key): - Primary sort: partition ID - Secondary sort: key2. Optionally run Combiner (mini-reduce): - Local aggregation to reduce data - E.g., for word count: ("hello",[1,1,1]) → ("hello",3)3. Write sorted partitions to disk: spill_0.out: ┌──────────────────────────────┐ │ Partition 0 (sorted by key) │ │ ("apple", 2) │ │ ("banana", 1) │ ├──────────────────────────────┤ │ Partition 1 (sorted by key) │ │ ("cherry", 3) │ │ ("date", 1) │ ├──────────────────────────────┤ │ Partition 2 (sorted by key) │ │ ("elderberry", 1) │ │ ("fig", 2) │ └──────────────────────────────┘MULTIPLE SPILLS:───────────────If map produces lots of output:- spill_0.out- spill_1.out- spill_2.out- ...These must be merged before shuffle.FINAL MERGE:───────────Merge all spill files into single output file:- Multi-way merge (up to io.sort.factor files at once)- Maintains sort order- Run combiner again if configured- Result: Single sorted file per map task
Combiner: Local Reduce for Optimization
Copy
COMBINER OPTIMIZATION────────────────────Problem:• Map emits many duplicate keys• Lots of network transfer during shuffle• Reducers receive massive inputExample (Word Count without Combiner):─────────────────────────────────────Mapper 1 output:("hello", 1), ("hello", 1), ("hello", 1), ("world", 1)Mapper 2 output:("hello", 1), ("hello", 1), ("world", 1), ("world", 1)Shuffle transfers: 8 (key, value) pairsSolution: COMBINER (local aggregation)──────────────────────────────────────Combiner runs on map node BEFORE shuffle:Mapper 1 output after combiner:("hello", 3), ("world", 1)Mapper 2 output after combiner:("hello", 2), ("world", 2)Shuffle transfers: 4 (key, value) pairs→ 50% reduction!COMBINER FUNCTION:─────────────────Often SAME as reducer function:public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}// Configure jobjob.setCombinerClass(WordCountCombiner.class);WHEN TO USE COMBINER:────────────────────✓ Associative and commutative operations: - Sum, count, max, min - reduce(reduce(A, B), C) = reduce(A, reduce(B, C))✗ NOT suitable for: - Average (need count and sum separately) - Median, percentiles - Operations where order mattersCOMBINER GUARANTEES:───────────────────• May run 0, 1, or multiple times• Framework decides when to run• Your code must work with or without combiner• Must produce same result regardless
SHUFFLE PHASE (from Reducer's perspective)─────────────────────────────────────────Reducer Knows:• Which mappers to fetch from (all of them)• Which partition to fetch (based on reducer ID)FETCH PROTOCOL:──────────────For each completed map task:1. Reducer sends HTTP GET request: http://tasktracker1:50060/mapOutput? jobId=job_202401241200_0001& mapId=task_0001_m_000001& reduce=22. TaskTracker serves partition 2 from map output3. Reducer saves to local disk or memoryFETCH TIMELINE:──────────────Reducers start fetching AS SOON AS first map completes.Don't wait for all maps to finish!Timeline:───────────────────────────────────────────────────Map 1 done → Reduce starts fetching from Map 1Map 2 done → Reduce fetches from Map 2...Map N done → Reduce fetches from Map NAll maps done → Reduce proceeds to merge/sort───────────────────────────────────────────────────FETCH STRATEGIES:────────────────Memory Buffer:• Small map outputs kept in memory (default 70% of heap)• Fast access, no disk I/ODisk Spill:• Large outputs spilled to disk• Merged later with on-disk mergeConfiguration:• mapreduce.reduce.shuffle.parallelcopies = 5 (fetch from 5 mappers simultaneously)• mapreduce.reduce.shuffle.memory.limit.percent = 0.25 (use 25% of heap for in-memory shuffle)FETCH FAILURES:──────────────If fetch fails:• Retry same TaskTracker (transient error)• After N retries, inform JobTracker• JobTracker may re-run map task elsewhere
The Critical Data Transfer Phase:The shuffle is often the bottleneck in MapReduce jobs. Understanding its internals is key to performance optimization.
Copy
SHUFFLE INTERNALS: MAP-SIDE────────────────────────────1. Memory Buffer (io.sort.mb, default 100MB):- Circular buffer where (key,value) pairs are initially written- When buffer reaches `io.sort.spill.percent` (default 80%), a "spill thread" kicks in- Spill sorts by (partition, key) using quicksort- If combiner is configured, it runs here to reduce output2. Spill to Disk:- Sorted data is written to a spill file: `map-00001.out`- Each spill file contains data for all partitions, but sorted within each partition- Multiple spills may occur if the map task processes large amounts of data3. Final Merge:- Before the map task finishes, all spill files are merged into a single output file- This is a multi-way merge, respecting the `io.sort.factor` (default 10) limit on simultaneous merges- The final file is named `map-00001.out` and stored locally on the DataNode4. Index File:- Alongside the output file, an index file `map-00001.out.index` is created- This index maps each partition ID to its location and length in the output file- Crucial for the reduce-side fetch processSHUFFLE INTERNALS: REDUCE-SIDE──────────────────────────────1. Fetch Initiation:- The reduce task starts fetching map outputs as soon as any map task completes- It connects to the HTTP server running on each TaskTracker that ran a map task- Fetches are performed in parallel, up to `mapreduce.reduce.shuffle.parallelcopies` (default 5)2. In-Memory Buffering:- Fetched data is kept in a memory buffer (size controlled by `mapreduce.reduce.shuffle.input.buffer.percent`)- If data arrives faster than it can be processed, it spills to disk3. On-Disk Merge:- Multiple fetched files are merged on the reducer's local disk- This uses the same multi-way merge algorithm as the map-side final merge- Result is a single, sorted file per reduce task4. Final Sort and Group:- All data is sorted by key (secondary sort during merge ensures this)- Values for each key are grouped together- The user's reduce function is called for each key-groupPERFORMANCE BOTTLENECKS:──────────────────────- Network Bandwidth: Shuffle is often network-bound- Disk I/O: Writing/reading spill files can be slow- Memory Pressure: Both map and reduce sides need adequate heap- Skewed Keys: A few keys with disproportionately large value lists can slow the entire reduce task
Merging All Map Outputs
Copy
MERGE PHASE──────────Reducer has fetched outputs from ALL mappers:In Memory:• 20 small files (each sorted)On Disk:• 50 larger files (each sorted)Goal: Merge into single sorted streamMULTI-LEVEL MERGE:─────────────────Configuration: io.sort.factor = 10(merge up to 10 files at once)Round 1: Merge on-disk files┌─────────────────────────────┐│ Merge files 1-10 → merged1 ││ Merge files 11-20 → merged2 ││ Merge files 21-30 → merged3 ││ Merge files 31-40 → merged4 ││ Merge files 41-50 → merged5 │└─────────────────────────────┘Round 2:┌─────────────────────────────┐│ Merge merged1-merged5 ││ + in-memory files ││ → Final sorted stream │└─────────────────────────────┘MERGE ALGORITHM (K-way merge):──────────────────────────────Use min-heap to efficiently merge K sorted files:Input Files:File 1: [1, 5, 9, 13]File 2: [2, 6, 10, 14]File 3: [3, 7, 11, 15]File 4: [4, 8, 12, 16]Min-Heap (track next element from each file):┌───────────────────────────┐│ (1, file1) ← root (min) ││ (2, file2) ││ (3, file3) ││ (4, file4) │└───────────────────────────┘Algorithm:1. Pop min from heap (1, file1)2. Output: 13. Read next from file1 (5), insert to heap4. Repeat...Result: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]Time Complexity: O(N log K)where N = total elements, K = number of filesFINAL SORTED OUTPUT:───────────────────(key, [values]) grouped by key:("apple", [1, 2, 1, 3, 1])("banana", [1, 1, 4, 1])("cherry", [2, 1, 1])...Ready for reduce() function!
MAP TASK MEMORY:───────────────mapreduce.map.memory.mb = 1024(1GB physical memory for map container)mapreduce.map.java.opts = -Xmx800m(800MB JVM heap for map task)Why less than 1024?→ Leave room for off-heap memory, OS overheadREDUCE TASK MEMORY:──────────────────mapreduce.reduce.memory.mb = 2048(2GB physical memory for reduce container)mapreduce.reduce.java.opts = -Xmx1600m(1.6GB JVM heap for reduce task)Reducers often need more memory (shuffle, sort)SORT BUFFER:───────────mapreduce.task.io.sort.mb = 100(100MB for map output buffer)Larger buffer → fewer spills → better performanceBut: Takes memory from heapmapreduce.task.io.sort.factor = 10(Merge up to 10 files at once)Higher factor → fewer merge roundsSHUFFLE BUFFER:──────────────mapreduce.reduce.shuffle.input.buffer.percent = 0.70(Use 70% of reduce heap for shuffle)mapreduce.reduce.shuffle.merge.percent = 0.66(Spill to disk when 66% full)RECOMMENDATIONS:───────────────Small Jobs (< 100GB):• map.memory.mb: 1024• reduce.memory.mb: 2048Medium Jobs (100GB - 1TB):• map.memory.mb: 1536• reduce.memory.mb: 3072Large Jobs (> 1TB):• map.memory.mb: 2048• reduce.memory.mb: 4096• sort.mb: 200
Reducing I/O with Compression
Copy
WHY COMPRESS:────────────✓ Less disk I/O (spills, shuffle)✓ Less network transfer✓ Faster job completion✗ CPU overhead (compression/decompression)COMPRESSION CODECS:──────────────────┌──────────────┬──────────┬──────────┬───────────┐│ Codec │ Ratio │ Speed │ Splittable│├──────────────┼──────────┼──────────┼───────────┤│ Gzip │ High │ Slow │ No ││ Bzip2 │ Highest │ Slowest │ Yes ││ LZO │ Medium │ Fast │ Yes* ││ Snappy │ Medium │ Fastest │ No │└──────────────┴──────────┴──────────┴───────────┘* LZO requires indexing for splittabilityWHERE TO COMPRESS:─────────────────1. Input Files: - Reduces map input I/O - Must be splittable or small files - Configure: InputFormat handles automatically2. Map Output (Intermediate): - Reduces shuffle traffic (BIG win!) - CPU overhead acceptable Configuration: mapreduce.map.output.compress=true mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec3. Final Output: - Saves HDFS space - Slows down if consumed by non-Hadoop tools Configuration: mapreduce.output.fileoutputformat.compress=true mapreduce.output.fileoutputformat.compress.codec= org.apache.hadoop.io.compress.GzipCodecRECOMMENDATION:──────────────ALWAYS compress intermediate data (map output):→ 30-50% reduction in shuffle time→ Minimal CPU overhead with Snappy/LZOExample:────────Configuration conf = new Configuration();conf.setBoolean("mapreduce.map.output.compress", true);conf.setClass("mapreduce.map.output.compress.codec", SnappyCodec.class, CompressionCodec.class);COMPRESSION IMPACT:──────────────────Uncompressed shuffle: 100 GBSnappy compressed: 40 GBNetwork time (1 Gbps): 100s → 40s (60% faster!)
Local Aggregation
Copy
COMBINER EFFECTIVENESS:──────────────────────Word Count Example:──────────────────WITHOUT Combiner:Map output: 10 million (word, 1) pairsShuffle: 10 million pairs transferredReduce input: 10 million pairsWITH Combiner:Map output: 10 million (word, 1) pairsAfter combiner: 100,000 (word, count) pairsShuffle: 100,000 pairs transferred (99% reduction!)Reduce input: 100,000 pairsWHEN COMBINER HELPS:───────────────────High Reduction Factor:• Many duplicate keys• Simple aggregation (sum, count, max)• Word count, click counting, log aggregationLow Reduction Factor:• Few duplicate keys• Unique IDs, timestamps• Combiner overhead not worth itCOMBINER USAGE PATTERNS:───────────────────────1. Same as Reducer (common): job.setCombinerClass(MyReducer.class);2. Different from Reducer: job.setCombinerClass(MyCombiner.class); job.setReducerClass(MyReducer.class);3. In-Mapper Combining (advanced): // Maintain HashMap in mapper // Emit aggregated results in cleanup()CONFIGURATION:─────────────mapreduce.job.combine.class = MyReducerCOMBINER FREQUENCY:──────────────────Combiner runs:• During spills (map side)• During merge (map side)• Possibly during shuffle merge (reduce side)→ May run 0, 1, or multiple times!→ Must be idempotent and associative
Balancing Reducer Load
Copy
PARTITIONING STRATEGIES:───────────────────────1. Hash Partitioning (Default): partition = hash(key) % numReducers Pros: Simple, uniform for random keys Cons: Poor for skewed keys2. Range Partitioning: partition based on key range Example: • Reducer 0: A-F • Reducer 1: G-M • Reducer 2: N-S • Reducer 3: T-Z Pros: Output is globally sorted Cons: Requires sampling, skew issues3. Custom Partitioning: public class FirstLetterPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // Partition by first letter char first = key.toString().toLowerCase().charAt(0); if (first >= 'a' && first <= 'm') { return 0 % numPartitions; } else { return 1 % numPartitions; } } }SKEW PROBLEM:────────────Unbalanced partitions:Reducer 0: 1 GB (finishes fast)Reducer 1: 10 GB (straggles)Reducer 2: 2 GBReducer 3: 1 GBJob completion time = slowest reducerSKEW SOLUTIONS:──────────────1. Better Partitioner: • Sample data first • Learn key distribution • Assign ranges to balance load2. Increase Number of Reducers: • More parallelism • Smaller data per reducer • job.setNumReduceTasks(20);3. Salting: • Add random suffix to skewed keys • key → key_0, key_1, key_2, ... • Process in two MapReduce rounds4. Combiners: • Reduce data before shuffle • Less data to partition
PATTERN: FilterGOAL: Keep only records matching criteriaUse Case: Extract error logs from all logsMAP FUNCTION:────────────map(offset, line): if line.contains("ERROR"): emit(line, null) // Don't emit non-error linesREDUCE FUNCTION:───────────────NONE (map-only job)job.setNumReduceTasks(0);EXAMPLE:───────Input:2024-01-24 10:00:00 INFO Started service2024-01-24 10:00:05 ERROR Connection failed2024-01-24 10:00:10 INFO Retrying...2024-01-24 10:00:15 ERROR TimeoutOutput:2024-01-24 10:00:05 ERROR Connection failed2024-01-24 10:00:15 ERROR TimeoutOPTIMIZATION:────────────• No reducer needed (map-only)• Each mapper writes directly to HDFS• Very efficient for simple filtering
Top N
Find Top N Items
Copy
PATTERN: Top NGOAL: Find N largest/smallest itemsUse Case: Top 10 most frequent wordsAPPROACH 1: In-Mapper Top N───────────────────────────map(offset, line): words = line.split() for word in words: localCounts[word] += 1 // In cleanup(): topN = localCounts.topN(10) for (word, count) in topN: emit(word, count)reduce(word, counts): totalCount = sum(counts) emit(word, totalCount)Final: Sort all reducer output, take top 10APPROACH 2: Two-Stage─────────────────────Job 1: Word Count → Output: All (word, count) pairsJob 2: Sort and Top N → Mapper: emit(count, word) // flip key-value → Reducer: emit top NAPPROACH 3: Single Reducer──────────────────────────map(offset, line): // Normal word count map for word in line.split(): emit(word, 1)reduce(word, counts): emit(sum(counts), word) // Emit (count, word) instead of (word, count)Configuration:• Set numReducers = 1• Total sort in reducer• Emit top NTRADE-OFFS:──────────Approach 1: Less data shuffled, but complexApproach 2: Simpler, but two jobsApproach 3: Simple, but single reducer bottleneck
Join
Joining Two Datasets
Copy
PATTERN: JoinGOAL: Combine records from two datasetsUse Case: Join users with their ordersDataset 1 (users.txt):user123,Aliceuser456,BobDataset 2 (orders.txt):order001,user123,100order002,user456,200order003,user123,50REDUCE-SIDE JOIN:────────────────map(filename, line): if filename == "users.txt": userId, name = line.split(',') emit(userId, ("USER", name)) else: // orders.txt orderId, userId, amount = line.split(',') emit(userId, ("ORDER", orderId, amount))reduce(userId, values): userName = null orders = [] for value in values: if value[0] == "USER": userName = value[1] else: // ORDER orders.append(value) for order in orders: emit(userId, userName, order)Output:user123,Alice,order001,100user123,Alice,order003,50user456,Bob,order002,200MAP-SIDE JOIN (if one dataset is small):───────────────────────────────────────Setup:• Load small dataset (users) into memory• Use Distributed Cachemap(offset, line): // Parse order orderId, userId, amount = line.split(',') // Lookup user in memory userName = usersMap.get(userId) emit(userId, userName + "," + orderId + "," + amount)No Reducer Needed!TRADE-OFFS:──────────Reduce-Side Join:✓ Works for any dataset size✗ Expensive shuffle✗ All data must go through reduceMap-Side Join:✓ Very efficient (no shuffle)✓ Happens in map phase✗ Requires small dataset✗ Dataset must fit in memory
Secondary Sort
Control Value Order in Reduce
Copy
PATTERN: Secondary SortGOAL: Control order of values arriving at reducerUse Case: Process user events in chronological orderPROBLEM:───────Default: Reducer receives values in ARBITRARY orderreduce(userId, events): // events might arrive as: [event3, event1, event2] // Want chronological: [event1, event2, event3]SOLUTION: Composite Key──────────────────────1. Create composite key: (userId, timestamp)2. Custom Partitioner: // Partition by userId only partition = hash(userId) % numReducers3. Custom Comparator: // Sort by userId, then timestamp compare (key1, key2): if key1.userId != key2.userId: return compare(key1.userId, key2.userId) else: return compare(key1.timestamp, key2.timestamp)4. Grouping Comparator: // Group by userId only compare(key1, key2): return compare(key1.userId, key2.userId)IMPLEMENTATION:──────────────public class CompositeKey implements WritableComparable { String userId; Long timestamp; @Override public int compareTo(CompositeKey other) { int cmp = userId.compareTo(other.userId); if (cmp != 0) return cmp; return timestamp.compareTo(other.timestamp); }}job.setPartitionerClass(UserPartitioner.class);job.setSortComparatorClass(CompositeKeyComparator.class);job.setGroupingComparatorClass(UserGroupingComparator.class);RESULT:──────Reducer receives:userId: user123values: [event1@t1, event2@t2, event3@t3] // sorted!
Distributed Cache
Sharing Read-Only Data
Copy
PATTERN: Distributed CacheGOAL: Share small files across all mappers/reducersUse Case: Load lookup table in map-side joinPROBLEM:───────Need small dataset available to all tasks:• IP → Country mapping• User ID → Name mapping• Product catalogDon't want to:• Embed in JAR (inflexible)• Read from HDFS for each record (slow)SOLUTION: Distributed Cache──────────────────────────1. Upload file to HDFS: hdfs dfs -put users.txt /shared/users.txt2. Add to job configuration: job.addCacheFile(new URI("/shared/users.txt"));3. In mapper setup, load into memory: public class MyMapper extends Mapper { Map<String, String> usersMap = new HashMap<>(); @Override protected void setup(Context context) { // Read cached file Path[] cacheFiles = context.getLocalCacheFiles(); BufferedReader reader = new BufferedReader( new FileReader(cacheFiles[0].toString())); String line; while ((line = reader.readLine()) != null) { String[] parts = line.split(","); usersMap.put(parts[0], parts[1]); } reader.close(); } @Override public void map(K key, V value, Context context) { // Use usersMap for lookups String userId = ...; String userName = usersMap.get(userId); ... } }DISTRIBUTED CACHE PROPERTIES:────────────────────────────✓ Files copied to each node once✓ Cached on local disk✓ Read-only (no modifications)✓ Efficient for small files (< 100MB)✗ Not for large datasets✗ All tasks share same copy (no per-task data)USE CASES:─────────• Lookup tables• Configuration files• Small dimension tables (star schema)• Stop words lists• Machine learning models
Google's Choice (C++):─────────────────────• Full control over performance• Tight integration with internal systems• Systems programming expertise available• Don't need cross-platform compatibilityHadoop's Choice (Java):──────────────────────• Write once, run anywhere (JVM)• Larger developer community• Easier to attract contributors• Faster development cycle• Good-enough performanceResult:──────Hadoop's Java choice enabled:✓ Rapid adoption across diverse environments✓ Rich ecosystem (Hive, Pig, HBase in Java)✓ Lower barrier to entry✗ Some performance overhead (GC pauses)
Community vs Control
Copy
Google's Approach:─────────────────• Publish papers, keep code internal• Full control over evolution• Optimize for Google's workloads• No backward compatibility burdenHadoop's Approach:─────────────────• Open source from day one• Community-driven development• Serve diverse use cases• Maintain backward compatibilityImpact:──────• Hadoop became industry standard• Google's ideas reached millions• Ecosystem innovations (Spark, Hive)• Multiple commercial distributions
Public vs Private Evolution
Copy
Google's Evolution (Opaque):───────────────────────────MapReduce (2004) ↓Flume (streaming, internal) ↓MillWheel (streaming) ↓Cloud Dataflow (public product)Likely deprecated internally, replaced by newer systems.Hadoop's Evolution (Transparent):─────────────────────────────────MapReduce (2006) ↓YARN (2012) - Generic resource management ↓Multiple frameworks: Spark, Flink, Tez ↓Modern: Spark dominates, MapReduce legacyEvery step documented and community-driven.Lessons:───────• Open evolution allows external innovation• Community found MapReduce limitations• Led to better alternatives (Spark)• Google's internal evolution hidden
Simple Model, Powerful Abstraction: Map and reduce are simple functions, but framework handles all complexity (distribution, fault tolerance, optimization)
Shuffle is the Bottleneck: Most optimization focuses on reducing shuffle data (combiners, compression, partitioning)
Data Locality is Critical: Moving computation to data (not vice versa) is fundamental to MapReduce efficiency
Fault Tolerance via Stateless Tasks: Re-execution works because tasks are stateless and deterministic
Speculative Execution Handles Stragglers: Don’t wait for slow tasks—run duplicates and take first result
Map-Only Jobs for Simple Cases: Not everything needs reduce—filtering, transformation can be map-only
Combiners are Free Performance: If your reduce is associative/commutative, always use combiner
Java Enabled the Ecosystem: Performance trade-off was worth it for portability and community growth
Best Practice: Keep enabled for most production workloads, but ensure tasks are idempotent.
System Design: Design a MapReduce job to find top 10 most frequent words
Expected Answer:Approach 1: Two-Job PipelineJob 1: Word Count
Copy
Map: (offset, line) → emit(word, 1) for each wordReduce: (word, [1,1,1,...]) → emit(word, count)Output: (word, count) pairs in HDFS
Job 2: Top 10
Copy
Map: (word, count) → emit(count, word) // flip key-valueReduce: - Single reducer (numReduceTasks=1) - Sort all (count, word) pairs descending - Emit top 10
Approach 2: Single Job with In-Mapper Aggregation
Copy
Map: setup(): Initialize HashMap<String, Integer> localCounts Initialize PriorityQueue<Pair> topN (size 10) map(offset, line): for word in line.split(): localCounts[word] += 1 cleanup(): for (word, count) in localCounts: if topN.size() < 10 or count > topN.peek(): topN.add((count, word)) if topN.size() > 10: topN.poll() for (count, word) in topN: emit(count, word)Reduce: - Single reducer - Maintain global topN heap - Emit final top 10
Trade-offs:
Approach 1: Simpler, reusable word count, but two jobs
Approach 2: More efficient, less shuffle data, but more complex
Optimizations:
Use combiner in Approach 1 to reduce shuffle
Consider top 100 per mapper, then top 10 in reducer (reduce network)
If only need approximate top 10, use sampling
Deep Dive: How would you optimize a MapReduce job that's running slowly?
Expected Answer:1. Identify BottleneckCheck job counters and logs:
Map time vs reduce time vs shuffle time
Data skew (some reducers much slower)
Spill counts (too many disk writes)
GC time (memory pressure)
2. Map Phase Optimization
Input Splits: Ensure 1 split = 1 block for locality
Combiner: Add combiner to reduce map output
Compression: Compress map output (Snappy)
Memory: Increase sort buffer (io.sort.mb)
Avoid Small Files: Combine small files before processing
3. Shuffle Optimization
Compression: Always compress intermediate data
Combiner: Reduces shuffle volume dramatically
Fetch Parallelism: Increase parallel copies
Memory: Increase shuffle buffer percentage
4. Reduce Phase Optimization
Number of Reducers:
Too few: Reducers become bottleneck
Too many: Overhead, small output files
Rule of thumb: 0.95 or 1.75 × (nodes × max containers per node)
Generic resource management for multiple frameworks
ApplicationMaster pattern
How YARN enables Spark, Flink, and other frameworks
We’ve mastered MapReduce, the original Hadoop processing model. Next, we’ll see how YARN generalized resource management to support any distributed application, not just MapReduce.