Skip to main content

Data Processing Patterns & Best Practices

Module Duration: 4-5 hours Focus: Real-world patterns, optimization, production code quality Prerequisites: MapReduce and ecosystem knowledge

Core Design Patterns

Pattern 1: Filtering and Searching

Use Case: Extract subset of data matching criteria Example: Extract error logs from terabytes of application logs
public class ErrorLogExtractor {

    public static class FilterMapper
            extends Mapper<LongWritable, Text, Text, Text> {

        private static final Pattern ERROR_PATTERN =
            Pattern.compile("\\[(ERROR|FATAL)\\]");

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();

            if (ERROR_PATTERN.matcher(line).find()) {
                // Extract timestamp (first field)
                String timestamp = line.substring(0, 19);
                context.write(new Text(timestamp), value);
            }
        }
    }

    // No reducer needed for simple filtering
    // Set job.setNumReduceTasks(0) for map-only job
}
Optimization: Map-only job (no shuffle overhead)

Pattern 2: Summarization / Aggregation

Use Case: Compute statistics (sum, avg, min, max, count) Example: User session analytics
public class SessionAnalytics {

    public static class SessionMapper
            extends Mapper<LongWritable, Text, Text, SessionStats> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: user_id,timestamp,event,duration
            String[] fields = value.toString().split(",");
            String userId = fields[0];
            long duration = Long.parseLong(fields[3]);

            SessionStats stats = new SessionStats();
            stats.setCount(1);
            stats.setTotalDuration(duration);
            stats.setMinDuration(duration);
            stats.setMaxDuration(duration);

            context.write(new Text(userId), stats);
        }
    }

    public static class SessionReducer
            extends Reducer<Text, SessionStats, Text, SessionStats> {

        @Override
        public void reduce(Text key, Iterable<SessionStats> values,
                          Context context)
                throws IOException, InterruptedException {

            long count = 0;
            long totalDuration = 0;
            long minDuration = Long.MAX_VALUE;
            long maxDuration = Long.MIN_VALUE;

            for (SessionStats stats : values) {
                count += stats.getCount();
                totalDuration += stats.getTotalDuration();
                minDuration = Math.min(minDuration, stats.getMinDuration());
                maxDuration = Math.max(maxDuration, stats.getMaxDuration());
            }

            SessionStats result = new SessionStats();
            result.setCount(count);
            result.setTotalDuration(totalDuration);
            result.setAvgDuration(totalDuration / count);
            result.setMinDuration(minDuration);
            result.setMaxDuration(maxDuration);

            context.write(key, result);
        }
    }

    // Custom Writable for SessionStats
    public static class SessionStats implements Writable {
        private long count;
        private long totalDuration;
        private long avgDuration;
        private long minDuration;
        private long maxDuration;

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(count);
            out.writeLong(totalDuration);
            out.writeLong(avgDuration);
            out.writeLong(minDuration);
            out.writeLong(maxDuration);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            count = in.readLong();
            totalDuration = in.readLong();
            avgDuration = in.readLong();
            minDuration = in.readLong();
            maxDuration = in.readLong();
        }

        // Getters and setters...
    }
}
Optimization: Use combiner with same logic as reducer

Pattern 3: Top-N / Ranking

Use Case: Find top K items by some metric Example: Top 100 products by revenue
public class TopProducts {

    public static class RevenueMapper
            extends Mapper<LongWritable, Text, DoubleWritable, Text> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: product_id,product_name,revenue
            String[] fields = value.toString().split(",");
            String productInfo = fields[0] + "," + fields[1];
            double revenue = Double.parseDouble(fields[2]);

            // Emit with negative revenue for descending sort
            context.write(new DoubleWritable(-revenue), new Text(productInfo));
        }
    }

    public static class TopNReducer
            extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {

        private int N = 100;
        private int count = 0;

        @Override
        protected void setup(Context context) {
            N = context.getConfiguration().getInt("top.n", 100);
        }

        @Override
        public void reduce(DoubleWritable key, Iterable<Text> values,
                          Context context)
                throws IOException, InterruptedException {

            for (Text product : values) {
                if (count >= N) return; // Stop after N items

                // Convert back to positive revenue
                double revenue = -key.get();
                context.write(product, new DoubleWritable(revenue));
                count++;
            }
        }
    }

    // In driver: job.setNumReduceTasks(1) for total ordering
}
Advanced: Use TotalOrderPartitioner for distributed Top-N

Pattern 4: Distinct / Deduplication

Use Case: Remove duplicate records Example: Unique visitors
public class UniqueVisitors {

    public static class DistinctMapper
            extends Mapper<LongWritable, Text, Text, NullWritable> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: timestamp,user_id,page
            String[] fields = value.toString().split(",");
            String userId = fields[1];

            context.write(new Text(userId), NullWritable.get());
        }
    }

    public static class DistinctReducer
            extends Reducer<Text, NullWritable, Text, NullWritable> {

        @Override
        public void reduce(Text key, Iterable<NullWritable> values,
                          Context context)
                throws IOException, InterruptedException {

            // Key is unique due to shuffle
            context.write(key, NullWritable.get());
        }
    }
}
Memory-Efficient Alternative: Bloom filter for approximate deduplication

Pattern 5: Binning / Bucketing

Use Case: Group data into categories Example: Age groups from user data
public class AgeBinning {

    public static class BinMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: user_id,name,age
            String[] fields = value.toString().split(",");
            int age = Integer.parseInt(fields[2]);

            String bin = getAgeBin(age);
            context.write(new Text(bin), one);
        }

        private String getAgeBin(int age) {
            if (age < 18) return "Under 18";
            if (age < 25) return "18-24";
            if (age < 35) return "25-34";
            if (age < 50) return "35-49";
            if (age < 65) return "50-64";
            return "65+";
        }
    }

    // Use standard IntSumReducer for counting
}

Pattern 6: Join Patterns

Reduce-Side Join (Default)

Use Case: Join large datasets
public class ReduceSideJoin {

    public static class JoinMapper
            extends Mapper<LongWritable, Text, TextPair, Text> {

        private String inputTag;

        @Override
        protected void setup(Context context) {
            FileSplit split = (FileSplit) context.getInputSplit();
            inputTag = split.getPath().getParent().getName();
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] fields = value.toString().split(",");

            if (inputTag.equals("users")) {
                // users: user_id,name,email
                String userId = fields[0];
                String data = fields[1] + "," + fields[2];
                context.write(new TextPair(userId, "0"), // 0 for sorting
                             new Text("U" + data));
            } else {
                // orders: order_id,user_id,amount
                String userId = fields[1];
                String data = fields[0] + "," + fields[2];
                context.write(new TextPair(userId, "1"), // 1 for sorting
                             new Text("O" + data));
            }
        }
    }

    public static class JoinReducer
            extends Reducer<TextPair, Text, Text, Text> {

        @Override
        public void reduce(TextPair key, Iterable<Text> values,
                          Context context)
                throws IOException, InterruptedException {

            Iterator<Text> iter = values.iterator();
            Text user = iter.next(); // User arrives first (sorted)

            if (!user.toString().startsWith("U")) {
                return; // No user found
            }

            String userData = user.toString().substring(1);

            // Join user with all orders
            while (iter.hasNext()) {
                Text order = iter.next();
                if (order.toString().startsWith("O")) {
                    String orderData = order.toString().substring(1);
                    context.write(new Text(key.getFirst()),
                                 new Text(userData + "," + orderData));
                }
            }
        }
    }

    // TextPair: Composite key for secondary sort
    public static class TextPair implements WritableComparable<TextPair> {
        private Text first;
        private Text second;

        // Implement compareTo, write, readFields...
    }
}

Map-Side Join (Small Table)

Use Case: Join when one dataset fits in memory
public class MapSideJoin {

    public static class JoinMapper
            extends Mapper<LongWritable, Text, Text, Text> {

        private Map<String, String> userMap = new HashMap<>();

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {

            // Load small table from Distributed Cache
            URI[] cacheFiles = context.getCacheFiles();
            BufferedReader reader = new BufferedReader(
                new FileReader(new File("users.txt")));

            String line;
            while ((line = reader.readLine()) != null) {
                String[] fields = line.split(",");
                String userId = fields[0];
                String userData = fields[1] + "," + fields[2];
                userMap.put(userId, userData);
            }
            reader.close();
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Read orders (large table)
            String[] fields = value.toString().split(",");
            String userId = fields[1];

            String userData = userMap.get(userId);
            if (userData != null) {
                context.write(new Text(userId),
                             new Text(userData + "," + value.toString()));
            }
        }
    }

    // No reducer needed
}

Optimization Techniques

1. Combiner Usage

Rule: Use combiner when operation is associative and commutative
// GOOD: Sum, Max, Min
public class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

// BAD: Average (not associative)
// avg(1,2) + avg(3,4) ≠ avg(1,2,3,4)

// SOLUTION: Emit sum and count, compute average in reducer
public class AvgCombiner extends Reducer<Text, IntPair, Text, IntPair> {
    @Override
    public void reduce(Text key, Iterable<IntPair> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0, count = 0;
        for (IntPair pair : values) {
            sum += pair.getSum();
            count += pair.getCount();
        }
        context.write(key, new IntPair(sum, count));
    }
}

2. Custom Partitioner for Load Balancing

// Problem: Skewed keys (e.g., one user with 90% of data)
// Solution: Add random salt to distribute load

public class SaltedPartitioner extends Partitioner<TextPair, Text> {

    @Override
    public int getPartition(TextPair key, Text value, int numPartitions) {
        // Use both key and salt for partitioning
        return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

// In Mapper: Add salt to key
String saltedKey = key + "_" + random.nextInt(10);

3. Compression

Intermediate Compression:
// In driver
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec",
              SnappyCodec.class, CompressionCodec.class);
Output Compression:
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
Codec Comparison:
CodecCompression RatioSpeedSplittable
GzipHighSlowNo
Bzip2HighestSlowestYes
SnappyMediumFastNo*
LZOMediumFastYes*
*Requires sequence file or container format

4. Speculative Execution

<property>
    <name>mapreduce.map.speculative</name>
    <value>true</value>
</property>

<property>
    <name>mapreduce.reduce.speculative</name>
    <value>false</value>
    <!-- Often disabled for reducers with side effects -->
</property>

5. Memory Management

// Reducer memory tuning
conf.setInt("mapreduce.reduce.memory.mb", 4096);
conf.set("mapreduce.reduce.java.opts", "-Xmx3276m");

// Sort buffer size
conf.setInt("mapreduce.task.io.sort.mb", 512);

// Spill threshold
conf.setFloat("mapreduce.map.sort.spill.percent", 0.8f);

Anti-Patterns to Avoid

Anti-Pattern 1: Too Many Small Files

Problem:
1 million files × 1KB each = 1GB data
But 1 million NameNode metadata entries!
Solution:
// Use SequenceFile or HAR (Hadoop Archive)
public class SequenceFileWriter {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path seqFile = new Path("/output/combined.seq");

        SequenceFile.Writer writer = SequenceFile.createWriter(
            fs, conf, seqFile,
            Text.class, // key
            BytesWritable.class // value
        );

        // Write multiple small files into one sequence file
        for (File smallFile : smallFiles) {
            byte[] content = readFile(smallFile);
            writer.append(new Text(smallFile.getName()),
                         new BytesWritable(content));
        }

        writer.close();
    }
}

Anti-Pattern 2: Not Reusing Objects

// BAD: Creates millions of objects
for (String word : words) {
    context.write(new Text(word), new IntWritable(1));
}

// GOOD: Reuse objects
private Text wordText = new Text();
private IntWritable one = new IntWritable(1);

for (String word : words) {
    wordText.set(word);
    context.write(wordText, one);
}

Anti-Pattern 3: Ignoring Data Locality

// BAD: Random data access breaks locality
List<String> files = hdfs.listFiles("/data/*");
Collections.shuffle(files);
for (String file : files) process(file);

// GOOD: Let Hadoop schedule tasks near data
// Just submit job, framework handles locality

Anti-Pattern 4: Skewed Keys

Problem: One key has 90% of data → One reducer does all work Solutions:
  1. Salting: Add random prefix to key, distribute across reducers
  2. Custom Partitioner: Detect hot keys, distribute specially
  3. Two-stage aggregation: Pre-aggregate, then final aggregation

Real-World Use Cases

Use Case 1: Log Analysis Pipeline

public class WebLogAnalysis {

    // Stage 1: Parse and filter logs
    public static class ParseMapper
            extends Mapper<LongWritable, Text, Text, LogRecord> {

        private static final Pattern LOG_PATTERN =
            Pattern.compile("^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] \"(\\S+) (\\S+) (\\S+)\" (\\d+) (\\d+)");

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            Matcher matcher = LOG_PATTERN.matcher(value.toString());
            if (matcher.matches()) {
                String ip = matcher.group(1);
                String timestamp = matcher.group(2);
                String method = matcher.group(3);
                String url = matcher.group(4);
                int statusCode = Integer.parseInt(matcher.group(6));
                long bytes = Long.parseLong(matcher.group(7));

                LogRecord record = new LogRecord(
                    ip, timestamp, method, url, statusCode, bytes);

                // Key by URL for aggregation
                context.write(new Text(url), record);
            }
        }
    }

    // Stage 2: Aggregate statistics
    public static class AggregateReducer
            extends Reducer<Text, LogRecord, Text, URLStats> {

        @Override
        public void reduce(Text key, Iterable<LogRecord> values,
                          Context context)
                throws IOException, InterruptedException {

            int totalRequests = 0;
            long totalBytes = 0;
            int errors = 0;
            Set<String> uniqueIPs = new HashSet<>();

            for (LogRecord record : values) {
                totalRequests++;
                totalBytes += record.getBytes();
                if (record.getStatusCode() >= 400) errors++;
                uniqueIPs.add(record.getIp());
            }

            URLStats stats = new URLStats(
                totalRequests, totalBytes, errors, uniqueIPs.size());

            context.write(key, stats);
        }
    }
}

Use Case 2: Sessionization

Group user events into sessions (30-minute timeout):
public class Sessionization {

    public static class SessionMapper
            extends Mapper<LongWritable, Text, Text, Event> {

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Input: user_id,timestamp,event_type
            String[] fields = value.toString().split(",");
            String userId = fields[0];
            long timestamp = Long.parseLong(fields[1]);
            String eventType = fields[2];

            Event event = new Event(timestamp, eventType);
            context.write(new Text(userId), event);
        }
    }

    public static class SessionReducer
            extends Reducer<Text, Event, Text, Session> {

        private static final long SESSION_TIMEOUT = 30 * 60 * 1000; // 30 min

        @Override
        public void reduce(Text key, Iterable<Event> values,
                          Context context)
                throws IOException, InterruptedException {

            // Sort events by timestamp
            List<Event> events = new ArrayList<>();
            for (Event e : values) {
                events.add(new Event(e));
            }
            Collections.sort(events);

            // Group into sessions
            List<Session> sessions = new ArrayList<>();
            Session currentSession = null;

            for (Event event : events) {
                if (currentSession == null ||
                    event.getTimestamp() - currentSession.getLastEventTime()
                        > SESSION_TIMEOUT) {

                    // Start new session
                    if (currentSession != null) {
                        sessions.add(currentSession);
                    }
                    currentSession = new Session(event.getTimestamp());
                }

                currentSession.addEvent(event);
            }

            if (currentSession != null) {
                sessions.add(currentSession);
            }

            // Emit sessions
            for (int i = 0; i < sessions.size(); i++) {
                context.write(new Text(key + "_session_" + i),
                             sessions.get(i));
            }
        }
    }
}

Performance Checklist

  • Use combiners where applicable
  • Enable compression (intermediate and output)
  • Optimize mapper/reducer memory settings
  • Use appropriate file formats (ORC, Parquet for structured data)
  • Minimize shuffle data (filter early, use combiner)
  • Partition data appropriately (avoid skew)
  • Use distributed cache for small lookup tables
  • Monitor and tune based on metrics (shuffle time, GC time)
  • Consider Spark for iterative workloads

What’s Next?

Module 7: Production Deployment & Operations

Learn to deploy, secure, monitor, and maintain production Hadoop clusters