Skip to main content

MapReduce Programming Model

Module Duration: 5-6 hours Hands-on Projects: 8 coding exercises Prerequisites: Java programming, HDFS basics from Module 2

Introduction

MapReduce is the programming paradigm that made distributed computing accessible to developers. By providing a simple abstraction (map and reduce functions), it hides the complexity of:
  • Parallelization
  • Fault tolerance
  • Data distribution
  • Load balancing
In this module, you’ll write production-quality MapReduce code and master optimization techniques.

The MapReduce Paradigm

Core Concept

All MapReduce programs follow this pattern:
INPUT → MAP → SHUFFLE & SORT → REDUCE → OUTPUT
Map Phase: Transform input records into intermediate key-value pairs Shuffle Phase: Group all values by key (framework handles this) Reduce Phase: Process each group to produce final output

Classic Example: WordCount

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

    // Mapper: Splits lines into words, emits (word, 1)
    public static class TokenizerMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // key: byte offset in file (not used here)
            // value: one line of text

            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken().toLowerCase());
                context.write(word, one);
                // Emits: ("hello", 1), ("world", 1), etc.
            }
        }
    }

    // Reducer: Sums up counts for each word
    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                          Context context)
                throws IOException, InterruptedException {

            // key: a word
            // values: all counts for that word [1, 1, 1, ...]

            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write(key, result);
            // Emits: ("hello", 42)
        }
    }

    // Driver: Configures and runs the job
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");

        // Set jar containing this class
        job.setJarByClass(WordCount.class);

        // Set Mapper and Reducer classes
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

        // Set output key/value types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // Set input/output paths
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // Run job and wait for completion
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Running the Job:
# Compile
javac -classpath `hadoop classpath` -d wordcount_classes WordCount.java
jar -cvf wordcount.jar -C wordcount_classes/ .

# Run on Hadoop
hadoop jar wordcount.jar WordCount /input/textfiles /output/wordcount

# View results
hdfs dfs -cat /output/wordcount/part-r-00000

MapReduce Execution Flow

Job Submission and Task Execution

┌──────────────────────────────────────────────────────────────┐
│                     CLIENT SUBMITS JOB                       │
└────────────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│                     JobTracker (Master)                      │
│  • Receives job JAR and configuration                        │
│  • Splits input into InputSplits                             │
│  • Assigns tasks to TaskTrackers                             │
│  • Monitors progress and handles failures                    │
└────────┬────────────────┬────────────────┬──────────────────┘
         │                │                │
         ▼                ▼                ▼
    ┌─────────┐      ┌─────────┐      ┌─────────┐
    │TaskTracker│     │TaskTracker│   │TaskTracker│
    │  Node 1  │      │  Node 2  │     │  Node 3  │
    └────┬─────┘      └────┬─────┘     └────┬─────┘
         │                 │                 │
    ┌────▼─────┐      ┌────▼─────┐     ┌────▼─────┐
    │ Map Task │      │ Map Task │     │ Map Task │
    │    1     │      │    2     │     │    3     │
    └────┬─────┘      └────┬─────┘     └────┬─────┘
         │                 │                 │
         │    Intermediate │                 │
         │    Key-Value    │                 │
         │    Pairs        │                 │
         └────────┬────────┴─────────────────┘


         ┌────────────────┐
         │ SHUFFLE & SORT │
         │  (Group by Key)│
         └────────┬───────┘

         ┌────────┴─────────┬─────────────┐
         │                  │             │
         ▼                  ▼             ▼
    ┌─────────┐        ┌─────────┐   ┌─────────┐
    │ Reduce  │        │ Reduce  │   │ Reduce  │
    │  Task 1 │        │  Task 2 │   │  Task 3 │
    └────┬────┘        └────┬────┘   └────┬────┘
         │                  │             │
         ▼                  ▼             ▼
    ┌─────────┐        ┌─────────┐   ┌─────────┐
    │Output   │        │Output   │   │Output   │
    │part-r-  │        │part-r-  │   │part-r-  │
    │  00000  │        │  00001  │   │  00002  │
    └─────────┘        └─────────┘   └─────────┘

InputSplit vs HDFS Block

HDFS File (350MB):
┌─────────────┬─────────────┬─────────────┐
│   Block 1   │   Block 2   │   Block 3   │
│   128MB     │   128MB     │   94MB      │
└─────────────┴─────────────┴─────────────┘

InputSplits (for text files):
┌─────────────┬─────────────┬─────────────┐
│   Split 1   │   Split 2   │   Split 3   │
│  ~128MB     │  ~128MB     │  ~94MB      │
│  (aligned   │             │             │
│  to line    │             │             │
│  boundaries)│             │             │
└─────────────┴─────────────┴─────────────┘
       ↓             ↓             ↓
   Map Task 1    Map Task 2    Map Task 3
Key Point: One InputSplit = One Map Task
  • Framework tries to align splits with HDFS blocks for data locality
  • Text splits may span blocks slightly to avoid breaking lines

Advanced MapReduce Patterns

Pattern 1: Filtering

Extract records matching criteria.
// Example: Filter log entries with ERROR level
public class ErrorLogFilter {

    public static class FilterMapper
            extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();

            // Parse log line
            if (line.contains("ERROR")) {
                // Extract timestamp and message
                String[] parts = line.split("\\|");
                if (parts.length >= 3) {
                    String timestamp = parts[0].trim();
                    String message = parts[2].trim();
                    context.write(new Text(timestamp), new Text(message));
                }
            }
        }
    }

    // No reducer needed - identity reducer or no reducer at all
    // Set job.setNumReduceTasks(0) for map-only job
}

Pattern 2: Summarization (Aggregation)

Compute summary statistics.
// Example: Calculate average, min, max temperature per day
public class TemperatureSummary {

    public static class TempMapper
            extends Mapper<LongWritable, Text, Text, DoubleWritable> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: date,station,temperature
            // Example: 2024-01-15,NYC001,42.5
            String[] fields = value.toString().split(",");

            if (fields.length == 3) {
                String date = fields[0];
                double temp = Double.parseDouble(fields[2]);
                context.write(new Text(date), new DoubleWritable(temp));
            }
        }
    }

    public static class TempReducer
            extends Reducer<Text, DoubleWritable, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<DoubleWritable> values,
                          Context context)
                throws IOException, InterruptedException {

            double sum = 0;
            double min = Double.MAX_VALUE;
            double max = Double.MIN_VALUE;
            int count = 0;

            for (DoubleWritable val : values) {
                double temp = val.get();
                sum += temp;
                min = Math.min(min, temp);
                max = Math.max(max, temp);
                count++;
            }

            double avg = sum / count;

            String summary = String.format("avg=%.2f, min=%.2f, max=%.2f",
                avg, min, max);
            context.write(key, new Text(summary));
        }
    }
}

Pattern 3: Joins (Reduce-Side Join)

Join two datasets on a common key.
// Example: Join users with orders
// Input 1: users.txt → user_id,name,city
// Input 2: orders.txt → order_id,user_id,amount

public class ReduceSideJoin {

    public static class JoinMapper
            extends Mapper<LongWritable, Text, Text, Text> {

        private String filename;

        @Override
        protected void setup(Context context) {
            // Determine which file we're reading
            FileSplit split = (FileSplit) context.getInputSplit();
            filename = split.getPath().getName();
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = line.split(",");

            if (filename.contains("users")) {
                // users.txt: user_id,name,city
                String userId = fields[0];
                String userData = "USER:" + fields[1] + "," + fields[2];
                context.write(new Text(userId), new Text(userData));

            } else if (filename.contains("orders")) {
                // orders.txt: order_id,user_id,amount
                String userId = fields[1];
                String orderData = "ORDER:" + fields[0] + "," + fields[2];
                context.write(new Text(userId), new Text(orderData));
            }
        }
    }

    public static class JoinReducer
            extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            // key: user_id
            // values: [USER:..., ORDER:..., ORDER:..., ...]

            String userData = null;
            List<String> orders = new ArrayList<>();

            for (Text val : values) {
                String value = val.toString();
                if (value.startsWith("USER:")) {
                    userData = value.substring(5); // Remove "USER:" prefix
                } else if (value.startsWith("ORDER:")) {
                    orders.add(value.substring(6)); // Remove "ORDER:"
                }
            }

            // Join: emit each order with user data
            if (userData != null) {
                for (String order : orders) {
                    String output = userData + "," + order;
                    context.write(key, new Text(output));
                }
            }
        }
    }
}

Pattern 4: Secondary Sort

Control the order of values arriving at reducer.
// Example: Group events by user, sorted by timestamp
public class SecondarySortExample {

    // Composite key: (userId, timestamp)
    public static class CompositeKey
            implements WritableComparable<CompositeKey> {

        private Text userId;
        private LongWritable timestamp;

        public CompositeKey() {
            this.userId = new Text();
            this.timestamp = new LongWritable();
        }

        public CompositeKey(String userId, long timestamp) {
            this.userId = new Text(userId);
            this.timestamp = new LongWritable(timestamp);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            userId.write(out);
            timestamp.write(out);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            userId.readFields(in);
            timestamp.readFields(in);
        }

        @Override
        public int compareTo(CompositeKey other) {
            // First compare by userId
            int cmp = this.userId.compareTo(other.userId);
            if (cmp != 0) return cmp;

            // Then by timestamp (ascending)
            return this.timestamp.compareTo(other.timestamp);
        }

        // Getters and setters...
    }

    // Partition by userId only (not timestamp)
    public static class UserPartitioner
            extends Partitioner<CompositeKey, Text> {

        @Override
        public int getPartition(CompositeKey key, Text value,
                               int numPartitions) {
            return Math.abs(key.userId.hashCode()) % numPartitions;
        }
    }

    // Group by userId only
    public static class UserGroupComparator
            extends WritableComparator {

        protected UserGroupComparator() {
            super(CompositeKey.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            CompositeKey k1 = (CompositeKey) a;
            CompositeKey k2 = (CompositeKey) b;
            return k1.userId.compareTo(k2.userId);
        }
    }

    public static class SortMapper
            extends Mapper<LongWritable, Text, CompositeKey, Text> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: userId,timestamp,event
            String[] fields = value.toString().split(",");
            String userId = fields[0];
            long timestamp = Long.parseLong(fields[1]);
            String event = fields[2];

            context.write(new CompositeKey(userId, timestamp),
                         new Text(event));
        }
    }

    public static class SortReducer
            extends Reducer<CompositeKey, Text, Text, Text> {

        @Override
        public void reduce(CompositeKey key, Iterable<Text> values,
                          Context context)
                throws IOException, InterruptedException {

            // Values arrive sorted by timestamp!
            StringBuilder events = new StringBuilder();

            for (Text event : values) {
                events.append(event.toString()).append(";");
            }

            context.write(key.userId, new Text(events.toString()));
        }
    }

    // In driver:
    // job.setPartitionerClass(UserPartitioner.class);
    // job.setGroupingComparatorClass(UserGroupComparator.class);
}

Optimization Techniques

Combiner Functions

Reduce shuffle data volume by pre-aggregating on map side.
public class WordCountWithCombiner {

    // Same Mapper as before...

    // Combiner: Same logic as Reducer
    public static class SumCombiner
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                          Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    // In driver:
    // job.setCombinerClass(SumCombiner.class);
}
Impact:
Without Combiner:
  Mapper emits 1 million: ("the", 1), ("the", 1), ...
  Shuffle transfers: 1 million pairs

With Combiner:
  Mapper emits 1 million locally
  Combiner aggregates: ("the", 50000)
  Shuffle transfers: 1 pair per mapper
  Reduction: 99.9% less network traffic
When to Use: Function must be commutative and associative
  • ✅ Sum, Count, Max, Min
  • ❌ Average (use sum + count instead), Median

Custom Partitioners

Control which reducer receives which keys.
public class CustomPartitioner
        extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text key, IntWritable value,
                           int numReduceTasks) {

        // Example: Partition by first letter of word
        String word = key.toString();
        char firstLetter = word.charAt(0);

        if (firstLetter >= 'a' && firstLetter <= 'm') {
            return 0; // First half of alphabet to reducer 0
        } else {
            return 1 % numReduceTasks; // Second half to reducer 1
        }
    }
}

Distributed Cache

Share read-only data across all tasks.
public class DistributedCacheExample {

    public static class LookupMapper
            extends Mapper<LongWritable, Text, Text, Text> {

        private Map<String, String> lookupTable = new HashMap<>();

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {

            // Load distributed cache file
            URI[] cacheFiles = context.getCacheFiles();
            if (cacheFiles != null && cacheFiles.length > 0) {
                BufferedReader reader = new BufferedReader(
                    new FileReader(new File("lookup.txt")));

                String line;
                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");
                    lookupTable.put(parts[0], parts[1]);
                }
                reader.close();
            }
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String id = value.toString();
            String name = lookupTable.get(id);

            if (name != null) {
                context.write(new Text(id), new Text(name));
            }
        }
    }

    // In driver:
    // job.addCacheFile(new URI("/shared/lookup.txt#lookup.txt"));
}

Counters and Monitoring

Track job statistics and custom metrics.
public class CounterExample {

    enum RecordCounters {
        TOTAL_RECORDS,
        VALID_RECORDS,
        INVALID_RECORDS
    }

    public static class CountingMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            context.getCounter(RecordCounters.TOTAL_RECORDS).increment(1);

            String line = value.toString();
            String[] fields = line.split(",");

            if (fields.length == 3) {
                context.getCounter(RecordCounters.VALID_RECORDS).increment(1);
                // Process record...
            } else {
                context.getCounter(RecordCounters.INVALID_RECORDS).increment(1);
                // Skip malformed record
            }
        }
    }

    // View counters after job completion:
    // Counters counters = job.getCounters();
    // long totalRecords = counters.findCounter(
    //     RecordCounters.TOTAL_RECORDS).getValue();
}

Unit Testing MapReduce

Test your map and reduce functions independently.
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class WordCountTest {

    private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;

    @Before
    public void setUp() {
        TokenizerMapper mapper = new TokenizerMapper();
        IntSumReducer reducer = new IntSumReducer();

        mapDriver = MapDriver.newMapDriver(mapper);
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
    }

    @Test
    public void testMapper() throws IOException {
        mapDriver.withInput(new LongWritable(0),
                           new Text("hello world hello"));

        mapDriver.withOutput(new Text("hello"), new IntWritable(1));
        mapDriver.withOutput(new Text("world"), new IntWritable(1));
        mapDriver.withOutput(new Text("hello"), new IntWritable(1));

        mapDriver.runTest();
    }

    @Test
    public void testReducer() throws IOException {
        List<IntWritable> values = Arrays.asList(
            new IntWritable(1),
            new IntWritable(1),
            new IntWritable(1)
        );

        reduceDriver.withInput(new Text("hello"), values);
        reduceDriver.withOutput(new Text("hello"), new IntWritable(3));

        reduceDriver.runTest();
    }
}

Common Pitfalls

Anti-Pattern 1: Not Reusing Objects
// BAD: Creates new object for each emit
public void map(...) {
    for (String word : words) {
        context.write(new Text(word), new IntWritable(1));
        // Creates 1 million objects → GC pressure
    }
}

// GOOD: Reuse objects
private Text wordText = new Text();
private IntWritable one = new IntWritable(1);

public void map(...) {
    for (String word : words) {
        wordText.set(word);
        context.write(wordText, one);
        // Reuses same objects → no GC pressure
    }
}
Anti-Pattern 2: Skewed Keys
// Problem: One key has 90% of data
// Result: One reducer does 90% of work, others idle

// Solution: Add random salt to key
String saltedKey = key + "_" + random.nextInt(10);
// Distribute across 10 reducers, then combine results

Interview Focus

Key Questions:
  1. “Explain shuffle and sort phase”
    • Map output partitioned by key hash
    • Sorted by key
    • Transferred to reducers over network
    • Merged and grouped before reduce
  2. “When would you use a combiner?”
    • Reduce shuffle traffic
    • Function must be associative + commutative
    • Example: SUM yes, AVERAGE no (without modification)
  3. “How to optimize a slow MapReduce job?”
    • Add combiner
    • Increase parallelism (more mappers/reducers)
    • Use compression
    • Fix data skew
    • Optimize code (reuse objects)

What’s Next?

Module 4: YARN Resource Management

Learn how YARN manages cluster resources and schedules applications