Data Processing Patterns & Best Practices
Module Duration: 4-5 hours
Focus: Real-world patterns, optimization, production code quality
Prerequisites: MapReduce and ecosystem knowledge
Core Design Patterns
Pattern 1: Filtering and Searching
Use Case: Extract subset of data matching criteria Example: Extract error logs from terabytes of application logsCopy
public class ErrorLogExtractor {
public static class FilterMapper
extends Mapper<LongWritable, Text, Text, Text> {
private static final Pattern ERROR_PATTERN =
Pattern.compile("\\[(ERROR|FATAL)\\]");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
if (ERROR_PATTERN.matcher(line).find()) {
// Extract timestamp (first field)
String timestamp = line.substring(0, 19);
context.write(new Text(timestamp), value);
}
}
}
// No reducer needed for simple filtering
// Set job.setNumReduceTasks(0) for map-only job
}
Pattern 2: Summarization / Aggregation
Use Case: Compute statistics (sum, avg, min, max, count) Example: User session analyticsCopy
public class SessionAnalytics {
public static class SessionMapper
extends Mapper<LongWritable, Text, Text, SessionStats> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: user_id,timestamp,event,duration
String[] fields = value.toString().split(",");
String userId = fields[0];
long duration = Long.parseLong(fields[3]);
SessionStats stats = new SessionStats();
stats.setCount(1);
stats.setTotalDuration(duration);
stats.setMinDuration(duration);
stats.setMaxDuration(duration);
context.write(new Text(userId), stats);
}
}
public static class SessionReducer
extends Reducer<Text, SessionStats, Text, SessionStats> {
@Override
public void reduce(Text key, Iterable<SessionStats> values,
Context context)
throws IOException, InterruptedException {
long count = 0;
long totalDuration = 0;
long minDuration = Long.MAX_VALUE;
long maxDuration = Long.MIN_VALUE;
for (SessionStats stats : values) {
count += stats.getCount();
totalDuration += stats.getTotalDuration();
minDuration = Math.min(minDuration, stats.getMinDuration());
maxDuration = Math.max(maxDuration, stats.getMaxDuration());
}
SessionStats result = new SessionStats();
result.setCount(count);
result.setTotalDuration(totalDuration);
result.setAvgDuration(totalDuration / count);
result.setMinDuration(minDuration);
result.setMaxDuration(maxDuration);
context.write(key, result);
}
}
// Custom Writable for SessionStats
public static class SessionStats implements Writable {
private long count;
private long totalDuration;
private long avgDuration;
private long minDuration;
private long maxDuration;
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(count);
out.writeLong(totalDuration);
out.writeLong(avgDuration);
out.writeLong(minDuration);
out.writeLong(maxDuration);
}
@Override
public void readFields(DataInput in) throws IOException {
count = in.readLong();
totalDuration = in.readLong();
avgDuration = in.readLong();
minDuration = in.readLong();
maxDuration = in.readLong();
}
// Getters and setters...
}
}
Pattern 3: Top-N / Ranking
Use Case: Find top K items by some metric Example: Top 100 products by revenueCopy
public class TopProducts {
public static class RevenueMapper
extends Mapper<LongWritable, Text, DoubleWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: product_id,product_name,revenue
String[] fields = value.toString().split(",");
String productInfo = fields[0] + "," + fields[1];
double revenue = Double.parseDouble(fields[2]);
// Emit with negative revenue for descending sort
context.write(new DoubleWritable(-revenue), new Text(productInfo));
}
}
public static class TopNReducer
extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {
private int N = 100;
private int count = 0;
@Override
protected void setup(Context context) {
N = context.getConfiguration().getInt("top.n", 100);
}
@Override
public void reduce(DoubleWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
for (Text product : values) {
if (count >= N) return; // Stop after N items
// Convert back to positive revenue
double revenue = -key.get();
context.write(product, new DoubleWritable(revenue));
count++;
}
}
}
// In driver: job.setNumReduceTasks(1) for total ordering
}
Pattern 4: Distinct / Deduplication
Use Case: Remove duplicate records Example: Unique visitorsCopy
public class UniqueVisitors {
public static class DistinctMapper
extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: timestamp,user_id,page
String[] fields = value.toString().split(",");
String userId = fields[1];
context.write(new Text(userId), NullWritable.get());
}
}
public static class DistinctReducer
extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
public void reduce(Text key, Iterable<NullWritable> values,
Context context)
throws IOException, InterruptedException {
// Key is unique due to shuffle
context.write(key, NullWritable.get());
}
}
}
Pattern 5: Binning / Bucketing
Use Case: Group data into categories Example: Age groups from user dataCopy
public class AgeBinning {
public static class BinMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: user_id,name,age
String[] fields = value.toString().split(",");
int age = Integer.parseInt(fields[2]);
String bin = getAgeBin(age);
context.write(new Text(bin), one);
}
private String getAgeBin(int age) {
if (age < 18) return "Under 18";
if (age < 25) return "18-24";
if (age < 35) return "25-34";
if (age < 50) return "35-49";
if (age < 65) return "50-64";
return "65+";
}
}
// Use standard IntSumReducer for counting
}
Pattern 6: Join Patterns
Reduce-Side Join (Default)
Use Case: Join large datasetsCopy
public class ReduceSideJoin {
public static class JoinMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
private String inputTag;
@Override
protected void setup(Context context) {
FileSplit split = (FileSplit) context.getInputSplit();
inputTag = split.getPath().getParent().getName();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (inputTag.equals("users")) {
// users: user_id,name,email
String userId = fields[0];
String data = fields[1] + "," + fields[2];
context.write(new TextPair(userId, "0"), // 0 for sorting
new Text("U" + data));
} else {
// orders: order_id,user_id,amount
String userId = fields[1];
String data = fields[0] + "," + fields[2];
context.write(new TextPair(userId, "1"), // 1 for sorting
new Text("O" + data));
}
}
}
public static class JoinReducer
extends Reducer<TextPair, Text, Text, Text> {
@Override
public void reduce(TextPair key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
Text user = iter.next(); // User arrives first (sorted)
if (!user.toString().startsWith("U")) {
return; // No user found
}
String userData = user.toString().substring(1);
// Join user with all orders
while (iter.hasNext()) {
Text order = iter.next();
if (order.toString().startsWith("O")) {
String orderData = order.toString().substring(1);
context.write(new Text(key.getFirst()),
new Text(userData + "," + orderData));
}
}
}
}
// TextPair: Composite key for secondary sort
public static class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
// Implement compareTo, write, readFields...
}
}
Map-Side Join (Small Table)
Use Case: Join when one dataset fits in memoryCopy
public class MapSideJoin {
public static class JoinMapper
extends Mapper<LongWritable, Text, Text, Text> {
private Map<String, String> userMap = new HashMap<>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// Load small table from Distributed Cache
URI[] cacheFiles = context.getCacheFiles();
BufferedReader reader = new BufferedReader(
new FileReader(new File("users.txt")));
String line;
while ((line = reader.readLine()) != null) {
String[] fields = line.split(",");
String userId = fields[0];
String userData = fields[1] + "," + fields[2];
userMap.put(userId, userData);
}
reader.close();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Read orders (large table)
String[] fields = value.toString().split(",");
String userId = fields[1];
String userData = userMap.get(userId);
if (userData != null) {
context.write(new Text(userId),
new Text(userData + "," + value.toString()));
}
}
}
// No reducer needed
}
Optimization Techniques
1. Combiner Usage
Rule: Use combiner when operation is associative and commutativeCopy
// GOOD: Sum, Max, Min
public class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
// BAD: Average (not associative)
// avg(1,2) + avg(3,4) ≠ avg(1,2,3,4)
// SOLUTION: Emit sum and count, compute average in reducer
public class AvgCombiner extends Reducer<Text, IntPair, Text, IntPair> {
@Override
public void reduce(Text key, Iterable<IntPair> values, Context context)
throws IOException, InterruptedException {
int sum = 0, count = 0;
for (IntPair pair : values) {
sum += pair.getSum();
count += pair.getCount();
}
context.write(key, new IntPair(sum, count));
}
}
2. Custom Partitioner for Load Balancing
Copy
// Problem: Skewed keys (e.g., one user with 90% of data)
// Solution: Add random salt to distribute load
public class SaltedPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
// Use both key and salt for partitioning
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
// In Mapper: Add salt to key
String saltedKey = key + "_" + random.nextInt(10);
3. Compression
Intermediate Compression:Copy
// In driver
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec",
SnappyCodec.class, CompressionCodec.class);
Copy
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
| Codec | Compression Ratio | Speed | Splittable |
|---|---|---|---|
| Gzip | High | Slow | No |
| Bzip2 | Highest | Slowest | Yes |
| Snappy | Medium | Fast | No* |
| LZO | Medium | Fast | Yes* |
4. Speculative Execution
Copy
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>false</value>
<!-- Often disabled for reducers with side effects -->
</property>
5. Memory Management
Copy
// Reducer memory tuning
conf.setInt("mapreduce.reduce.memory.mb", 4096);
conf.set("mapreduce.reduce.java.opts", "-Xmx3276m");
// Sort buffer size
conf.setInt("mapreduce.task.io.sort.mb", 512);
// Spill threshold
conf.setFloat("mapreduce.map.sort.spill.percent", 0.8f);
Anti-Patterns to Avoid
Anti-Pattern 1: Too Many Small Files
Problem:Copy
1 million files × 1KB each = 1GB data
But 1 million NameNode metadata entries!
Copy
// Use SequenceFile or HAR (Hadoop Archive)
public class SequenceFileWriter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path seqFile = new Path("/output/combined.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, seqFile,
Text.class, // key
BytesWritable.class // value
);
// Write multiple small files into one sequence file
for (File smallFile : smallFiles) {
byte[] content = readFile(smallFile);
writer.append(new Text(smallFile.getName()),
new BytesWritable(content));
}
writer.close();
}
}
Anti-Pattern 2: Not Reusing Objects
Copy
// BAD: Creates millions of objects
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
// GOOD: Reuse objects
private Text wordText = new Text();
private IntWritable one = new IntWritable(1);
for (String word : words) {
wordText.set(word);
context.write(wordText, one);
}
Anti-Pattern 3: Ignoring Data Locality
Copy
// BAD: Random data access breaks locality
List<String> files = hdfs.listFiles("/data/*");
Collections.shuffle(files);
for (String file : files) process(file);
// GOOD: Let Hadoop schedule tasks near data
// Just submit job, framework handles locality
Anti-Pattern 4: Skewed Keys
Problem: One key has 90% of data → One reducer does all work Solutions:- Salting: Add random prefix to key, distribute across reducers
- Custom Partitioner: Detect hot keys, distribute specially
- Two-stage aggregation: Pre-aggregate, then final aggregation
Real-World Use Cases
Use Case 1: Log Analysis Pipeline
Copy
public class WebLogAnalysis {
// Stage 1: Parse and filter logs
public static class ParseMapper
extends Mapper<LongWritable, Text, Text, LogRecord> {
private static final Pattern LOG_PATTERN =
Pattern.compile("^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] \"(\\S+) (\\S+) (\\S+)\" (\\d+) (\\d+)");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Matcher matcher = LOG_PATTERN.matcher(value.toString());
if (matcher.matches()) {
String ip = matcher.group(1);
String timestamp = matcher.group(2);
String method = matcher.group(3);
String url = matcher.group(4);
int statusCode = Integer.parseInt(matcher.group(6));
long bytes = Long.parseLong(matcher.group(7));
LogRecord record = new LogRecord(
ip, timestamp, method, url, statusCode, bytes);
// Key by URL for aggregation
context.write(new Text(url), record);
}
}
}
// Stage 2: Aggregate statistics
public static class AggregateReducer
extends Reducer<Text, LogRecord, Text, URLStats> {
@Override
public void reduce(Text key, Iterable<LogRecord> values,
Context context)
throws IOException, InterruptedException {
int totalRequests = 0;
long totalBytes = 0;
int errors = 0;
Set<String> uniqueIPs = new HashSet<>();
for (LogRecord record : values) {
totalRequests++;
totalBytes += record.getBytes();
if (record.getStatusCode() >= 400) errors++;
uniqueIPs.add(record.getIp());
}
URLStats stats = new URLStats(
totalRequests, totalBytes, errors, uniqueIPs.size());
context.write(key, stats);
}
}
}
Use Case 2: Sessionization
Group user events into sessions (30-minute timeout):Copy
public class Sessionization {
public static class SessionMapper
extends Mapper<LongWritable, Text, Text, Event> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Input: user_id,timestamp,event_type
String[] fields = value.toString().split(",");
String userId = fields[0];
long timestamp = Long.parseLong(fields[1]);
String eventType = fields[2];
Event event = new Event(timestamp, eventType);
context.write(new Text(userId), event);
}
}
public static class SessionReducer
extends Reducer<Text, Event, Text, Session> {
private static final long SESSION_TIMEOUT = 30 * 60 * 1000; // 30 min
@Override
public void reduce(Text key, Iterable<Event> values,
Context context)
throws IOException, InterruptedException {
// Sort events by timestamp
List<Event> events = new ArrayList<>();
for (Event e : values) {
events.add(new Event(e));
}
Collections.sort(events);
// Group into sessions
List<Session> sessions = new ArrayList<>();
Session currentSession = null;
for (Event event : events) {
if (currentSession == null ||
event.getTimestamp() - currentSession.getLastEventTime()
> SESSION_TIMEOUT) {
// Start new session
if (currentSession != null) {
sessions.add(currentSession);
}
currentSession = new Session(event.getTimestamp());
}
currentSession.addEvent(event);
}
if (currentSession != null) {
sessions.add(currentSession);
}
// Emit sessions
for (int i = 0; i < sessions.size(); i++) {
context.write(new Text(key + "_session_" + i),
sessions.get(i));
}
}
}
}
Performance Checklist
- Use combiners where applicable
- Enable compression (intermediate and output)
- Optimize mapper/reducer memory settings
- Use appropriate file formats (ORC, Parquet for structured data)
- Minimize shuffle data (filter early, use combiner)
- Partition data appropriately (avoid skew)
- Use distributed cache for small lookup tables
- Monitor and tune based on metrics (shuffle time, GC time)
- Consider Spark for iterative workloads
What’s Next?
Module 7: Production Deployment & Operations
Learn to deploy, secure, monitor, and maintain production Hadoop clusters