MapReduce Programming Model
Module Duration: 5-6 hours
Hands-on Projects: 8 coding exercises
Prerequisites: Java programming, HDFS basics from Module 2
Introduction
MapReduce is the programming paradigm that made distributed computing accessible to developers. By providing a simple abstraction (map and reduce functions), it hides the complexity of:- Parallelization
- Fault tolerance
- Data distribution
- Load balancing
The MapReduce Paradigm
Core Concept
All MapReduce programs follow this pattern:Copy
INPUT → MAP → SHUFFLE & SORT → REDUCE → OUTPUT
Classic Example: WordCount
Copy
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
// Mapper: Splits lines into words, emits (word, 1)
public static class TokenizerMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// key: byte offset in file (not used here)
// value: one line of text
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
// Emits: ("hello", 1), ("world", 1), etc.
}
}
}
// Reducer: Sums up counts for each word
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
// key: a word
// values: all counts for that word [1, 1, 1, ...]
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
// Emits: ("hello", 42)
}
}
// Driver: Configures and runs the job
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
// Set jar containing this class
job.setJarByClass(WordCount.class);
// Set Mapper and Reducer classes
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
// Set output key/value types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set input/output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Run job and wait for completion
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Copy
# Compile
javac -classpath `hadoop classpath` -d wordcount_classes WordCount.java
jar -cvf wordcount.jar -C wordcount_classes/ .
# Run on Hadoop
hadoop jar wordcount.jar WordCount /input/textfiles /output/wordcount
# View results
hdfs dfs -cat /output/wordcount/part-r-00000
MapReduce Execution Flow
Job Submission and Task Execution
Copy
┌──────────────────────────────────────────────────────────────┐
│ CLIENT SUBMITS JOB │
└────────────────────────┬─────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ JobTracker (Master) │
│ • Receives job JAR and configuration │
│ • Splits input into InputSplits │
│ • Assigns tasks to TaskTrackers │
│ • Monitors progress and handles failures │
└────────┬────────────────┬────────────────┬──────────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│TaskTracker│ │TaskTracker│ │TaskTracker│
│ Node 1 │ │ Node 2 │ │ Node 3 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│ Map Task │ │ Map Task │ │ Map Task │
│ 1 │ │ 2 │ │ 3 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ Intermediate │ │
│ Key-Value │ │
│ Pairs │ │
└────────┬────────┴─────────────────┘
│
▼
┌────────────────┐
│ SHUFFLE & SORT │
│ (Group by Key)│
└────────┬───────┘
│
┌────────┴─────────┬─────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Reduce │ │ Reduce │ │ Reduce │
│ Task 1 │ │ Task 2 │ │ Task 3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Output │ │Output │ │Output │
│part-r- │ │part-r- │ │part-r- │
│ 00000 │ │ 00001 │ │ 00002 │
└─────────┘ └─────────┘ └─────────┘
InputSplit vs HDFS Block
Copy
HDFS File (350MB):
┌─────────────┬─────────────┬─────────────┐
│ Block 1 │ Block 2 │ Block 3 │
│ 128MB │ 128MB │ 94MB │
└─────────────┴─────────────┴─────────────┘
InputSplits (for text files):
┌─────────────┬─────────────┬─────────────┐
│ Split 1 │ Split 2 │ Split 3 │
│ ~128MB │ ~128MB │ ~94MB │
│ (aligned │ │ │
│ to line │ │ │
│ boundaries)│ │ │
└─────────────┴─────────────┴─────────────┘
↓ ↓ ↓
Map Task 1 Map Task 2 Map Task 3
- Framework tries to align splits with HDFS blocks for data locality
- Text splits may span blocks slightly to avoid breaking lines
Advanced MapReduce Patterns
Pattern 1: Filtering
Extract records matching criteria.Copy
// Example: Filter log entries with ERROR level
public class ErrorLogFilter {
public static class FilterMapper
extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
// Parse log line
if (line.contains("ERROR")) {
// Extract timestamp and message
String[] parts = line.split("\\|");
if (parts.length >= 3) {
String timestamp = parts[0].trim();
String message = parts[2].trim();
context.write(new Text(timestamp), new Text(message));
}
}
}
}
// No reducer needed - identity reducer or no reducer at all
// Set job.setNumReduceTasks(0) for map-only job
}
Pattern 2: Summarization (Aggregation)
Compute summary statistics.Copy
// Example: Calculate average, min, max temperature per day
public class TemperatureSummary {
public static class TempMapper
extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: date,station,temperature
// Example: 2024-01-15,NYC001,42.5
String[] fields = value.toString().split(",");
if (fields.length == 3) {
String date = fields[0];
double temp = Double.parseDouble(fields[2]);
context.write(new Text(date), new DoubleWritable(temp));
}
}
}
public static class TempReducer
extends Reducer<Text, DoubleWritable, Text, Text> {
@Override
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context)
throws IOException, InterruptedException {
double sum = 0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
int count = 0;
for (DoubleWritable val : values) {
double temp = val.get();
sum += temp;
min = Math.min(min, temp);
max = Math.max(max, temp);
count++;
}
double avg = sum / count;
String summary = String.format("avg=%.2f, min=%.2f, max=%.2f",
avg, min, max);
context.write(key, new Text(summary));
}
}
}
Pattern 3: Joins (Reduce-Side Join)
Join two datasets on a common key.Copy
// Example: Join users with orders
// Input 1: users.txt → user_id,name,city
// Input 2: orders.txt → order_id,user_id,amount
public class ReduceSideJoin {
public static class JoinMapper
extends Mapper<LongWritable, Text, Text, Text> {
private String filename;
@Override
protected void setup(Context context) {
// Determine which file we're reading
FileSplit split = (FileSplit) context.getInputSplit();
filename = split.getPath().getName();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
if (filename.contains("users")) {
// users.txt: user_id,name,city
String userId = fields[0];
String userData = "USER:" + fields[1] + "," + fields[2];
context.write(new Text(userId), new Text(userData));
} else if (filename.contains("orders")) {
// orders.txt: order_id,user_id,amount
String userId = fields[1];
String orderData = "ORDER:" + fields[0] + "," + fields[2];
context.write(new Text(userId), new Text(orderData));
}
}
}
public static class JoinReducer
extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// key: user_id
// values: [USER:..., ORDER:..., ORDER:..., ...]
String userData = null;
List<String> orders = new ArrayList<>();
for (Text val : values) {
String value = val.toString();
if (value.startsWith("USER:")) {
userData = value.substring(5); // Remove "USER:" prefix
} else if (value.startsWith("ORDER:")) {
orders.add(value.substring(6)); // Remove "ORDER:"
}
}
// Join: emit each order with user data
if (userData != null) {
for (String order : orders) {
String output = userData + "," + order;
context.write(key, new Text(output));
}
}
}
}
}
Pattern 4: Secondary Sort
Control the order of values arriving at reducer.Copy
// Example: Group events by user, sorted by timestamp
public class SecondarySortExample {
// Composite key: (userId, timestamp)
public static class CompositeKey
implements WritableComparable<CompositeKey> {
private Text userId;
private LongWritable timestamp;
public CompositeKey() {
this.userId = new Text();
this.timestamp = new LongWritable();
}
public CompositeKey(String userId, long timestamp) {
this.userId = new Text(userId);
this.timestamp = new LongWritable(timestamp);
}
@Override
public void write(DataOutput out) throws IOException {
userId.write(out);
timestamp.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
userId.readFields(in);
timestamp.readFields(in);
}
@Override
public int compareTo(CompositeKey other) {
// First compare by userId
int cmp = this.userId.compareTo(other.userId);
if (cmp != 0) return cmp;
// Then by timestamp (ascending)
return this.timestamp.compareTo(other.timestamp);
}
// Getters and setters...
}
// Partition by userId only (not timestamp)
public static class UserPartitioner
extends Partitioner<CompositeKey, Text> {
@Override
public int getPartition(CompositeKey key, Text value,
int numPartitions) {
return Math.abs(key.userId.hashCode()) % numPartitions;
}
}
// Group by userId only
public static class UserGroupComparator
extends WritableComparator {
protected UserGroupComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CompositeKey k1 = (CompositeKey) a;
CompositeKey k2 = (CompositeKey) b;
return k1.userId.compareTo(k2.userId);
}
}
public static class SortMapper
extends Mapper<LongWritable, Text, CompositeKey, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: userId,timestamp,event
String[] fields = value.toString().split(",");
String userId = fields[0];
long timestamp = Long.parseLong(fields[1]);
String event = fields[2];
context.write(new CompositeKey(userId, timestamp),
new Text(event));
}
}
public static class SortReducer
extends Reducer<CompositeKey, Text, Text, Text> {
@Override
public void reduce(CompositeKey key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// Values arrive sorted by timestamp!
StringBuilder events = new StringBuilder();
for (Text event : values) {
events.append(event.toString()).append(";");
}
context.write(key.userId, new Text(events.toString()));
}
}
// In driver:
// job.setPartitionerClass(UserPartitioner.class);
// job.setGroupingComparatorClass(UserGroupComparator.class);
}
Optimization Techniques
Combiner Functions
Reduce shuffle data volume by pre-aggregating on map side.Copy
public class WordCountWithCombiner {
// Same Mapper as before...
// Combiner: Same logic as Reducer
public static class SumCombiner
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
// In driver:
// job.setCombinerClass(SumCombiner.class);
}
Copy
Without Combiner:
Mapper emits 1 million: ("the", 1), ("the", 1), ...
Shuffle transfers: 1 million pairs
With Combiner:
Mapper emits 1 million locally
Combiner aggregates: ("the", 50000)
Shuffle transfers: 1 pair per mapper
Reduction: 99.9% less network traffic
- ✅ Sum, Count, Max, Min
- ❌ Average (use sum + count instead), Median
Custom Partitioners
Control which reducer receives which keys.Copy
public class CustomPartitioner
extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value,
int numReduceTasks) {
// Example: Partition by first letter of word
String word = key.toString();
char firstLetter = word.charAt(0);
if (firstLetter >= 'a' && firstLetter <= 'm') {
return 0; // First half of alphabet to reducer 0
} else {
return 1 % numReduceTasks; // Second half to reducer 1
}
}
}
Distributed Cache
Share read-only data across all tasks.Copy
public class DistributedCacheExample {
public static class LookupMapper
extends Mapper<LongWritable, Text, Text, Text> {
private Map<String, String> lookupTable = new HashMap<>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// Load distributed cache file
URI[] cacheFiles = context.getCacheFiles();
if (cacheFiles != null && cacheFiles.length > 0) {
BufferedReader reader = new BufferedReader(
new FileReader(new File("lookup.txt")));
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
lookupTable.put(parts[0], parts[1]);
}
reader.close();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String id = value.toString();
String name = lookupTable.get(id);
if (name != null) {
context.write(new Text(id), new Text(name));
}
}
}
// In driver:
// job.addCacheFile(new URI("/shared/lookup.txt#lookup.txt"));
}
Counters and Monitoring
Track job statistics and custom metrics.Copy
public class CounterExample {
enum RecordCounters {
TOTAL_RECORDS,
VALID_RECORDS,
INVALID_RECORDS
}
public static class CountingMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(RecordCounters.TOTAL_RECORDS).increment(1);
String line = value.toString();
String[] fields = line.split(",");
if (fields.length == 3) {
context.getCounter(RecordCounters.VALID_RECORDS).increment(1);
// Process record...
} else {
context.getCounter(RecordCounters.INVALID_RECORDS).increment(1);
// Skip malformed record
}
}
}
// View counters after job completion:
// Counters counters = job.getCounters();
// long totalRecords = counters.findCounter(
// RecordCounters.TOTAL_RECORDS).getValue();
}
Unit Testing MapReduce
Test your map and reduce functions independently.Copy
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
public class WordCountTest {
private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
@Before
public void setUp() {
TokenizerMapper mapper = new TokenizerMapper();
IntSumReducer reducer = new IntSumReducer();
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
}
@Test
public void testMapper() throws IOException {
mapDriver.withInput(new LongWritable(0),
new Text("hello world hello"));
mapDriver.withOutput(new Text("hello"), new IntWritable(1));
mapDriver.withOutput(new Text("world"), new IntWritable(1));
mapDriver.withOutput(new Text("hello"), new IntWritable(1));
mapDriver.runTest();
}
@Test
public void testReducer() throws IOException {
List<IntWritable> values = Arrays.asList(
new IntWritable(1),
new IntWritable(1),
new IntWritable(1)
);
reduceDriver.withInput(new Text("hello"), values);
reduceDriver.withOutput(new Text("hello"), new IntWritable(3));
reduceDriver.runTest();
}
}
Common Pitfalls
Anti-Pattern 1: Not Reusing Objects
Copy
// BAD: Creates new object for each emit
public void map(...) {
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
// Creates 1 million objects → GC pressure
}
}
// GOOD: Reuse objects
private Text wordText = new Text();
private IntWritable one = new IntWritable(1);
public void map(...) {
for (String word : words) {
wordText.set(word);
context.write(wordText, one);
// Reuses same objects → no GC pressure
}
}
Anti-Pattern 2: Skewed Keys
Copy
// Problem: One key has 90% of data
// Result: One reducer does 90% of work, others idle
// Solution: Add random salt to key
String saltedKey = key + "_" + random.nextInt(10);
// Distribute across 10 reducers, then combine results
Interview Focus
Key Questions:-
“Explain shuffle and sort phase”
- Map output partitioned by key hash
- Sorted by key
- Transferred to reducers over network
- Merged and grouped before reduce
-
“When would you use a combiner?”
- Reduce shuffle traffic
- Function must be associative + commutative
- Example: SUM yes, AVERAGE no (without modification)
-
“How to optimize a slow MapReduce job?”
- Add combiner
- Increase parallelism (more mappers/reducers)
- Use compression
- Fix data skew
- Optimize code (reuse objects)
What’s Next?
Module 4: YARN Resource Management
Learn how YARN manages cluster resources and schedules applications