Skip to main content

Introduction & Foundational Papers

Module Duration: 3-4 hours Learning Style: Theory + Historical Context + Architectural Foundations Outcome: Deep understanding of WHY Hadoop was built the way it is

Why Start with Research Papers?

Most Hadoop courses dive straight into hdfs dfs -ls commands and WordCount examples. We’re taking a different approach. Understanding the research papers that inspired Hadoop gives you:
  • Conceptual clarity: Know WHY design decisions were made
  • Troubleshooting intuition: Predict behavior based on first principles
  • Interview advantage: Explain trade-offs, not just features
  • Architectural thinking: Apply principles to other distributed systems
We’ll break down these papers in plain language - no PhD required. Focus on concepts, not complex mathematical proofs.

The Big Data Problem (Early 2000s)

The Challenge

Around 2003, companies like Google faced unprecedented data growth:
  • Web crawling: Billions of web pages to index
  • Search logs: Terabytes of user queries daily
  • Data analysis: Processing petabytes with commodity hardware
Traditional solutions failed:
  • Scale-up approach: Expensive supercomputers couldn’t keep pace
  • Traditional databases: Not designed for append-heavy workloads
  • Existing distributed systems: Too complex, required PhDs to operate

The Insight

Google researchers realized:
  1. Commodity hardware is cheap but unreliable → Design for failure
  2. Large sequential reads are fast → Optimize for streaming, not random access
  3. Moving data is expensive → Move computation to data instead
  4. Most developers don’t understand distributed systems → Hide complexity
This led to two foundational papers that changed big data forever.

Paper 1: The Google File System (GFS)

Full Citation: Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. “The Google File System.” In Proceedings of the 19th ACM Symposium on Operating Systems Principles (SOSP), 2003.

The Core Problem

How do you store petabytes of data reliably using thousands of cheap, failure-prone machines?

Key Assumptions (That Changed Everything)

The GFS paper made explicit assumptions that differed from traditional file systems:
Traditional Thinking: Build systems that rarely fail GFS Thinking: Failures happen constantly, detect and recover automaticallyWith 1000s of machines, something is always failing:
  • Hard drives crash
  • Network switches die
  • Memory corrupts data
  • Servers reboot unexpectedly
Design Implication: Built-in monitoring, automatic replication, fast recovery
Traditional Thinking: Optimize for many small files GFS Thinking: Multi-GB files are common, optimize accordinglyTypical workloads:
  • Web crawl results: 100GB+ files
  • Log files: Continuous appends to large files
  • Dataset processing: Read entire large files sequentially
Design Implication: Large block sizes (64MB vs traditional 4KB), metadata optimization
Traditional Thinking: Random writes and overwrites are common GFS Thinking: Most files are append-only or write-once-read-manyCommon patterns:
  • Logs: Continuously append new entries
  • Data pipelines: Write output once, read many times for analysis
  • Random overwrites: Rare
Design Implication: Optimized append operations, relaxed consistency for appends
Traditional Thinking: File system provides standard POSIX interface GFS Thinking: Relax some guarantees in exchange for performance/simplicityTrade-offs:
  • Atomic append operations instead of strict POSIX semantics
  • Eventual consistency for metadata operations
  • Applications designed to tolerate duplicate records
Design Implication: Simpler implementation, better performance for target workloads

GFS Architecture

Here’s how GFS is structured (and how HDFS directly mirrors it):
┌─────────────────────────────────────────────────────────────┐
│                        GFS Master                           │
│  (Stores all metadata: namespace, file-to-chunk mapping)   │
│            Single point of coordination                     │
└────────────────┬────────────────────────────────────────────┘

                 │ Metadata operations (create, delete, locate)

    ┌────────────┼────────────┬────────────┬────────────┐
    │            │            │            │            │
    ▼            ▼            ▼            ▼            ▼
┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│ Chunk   │  │ Chunk   │  │ Chunk   │  │ Chunk   │  │ Chunk   │
│ Server  │  │ Server  │  │ Server  │  │ Server  │  │ Server  │
│ (Store  │  │ (Store  │  │ (Store  │  │ (Store  │  │ (Store  │
│ actual  │  │ actual  │  │ actual  │  │ actual  │  │ actual  │
│ data)   │  │ data)   │  │ data)   │  │ data)   │  │ data)   │
└─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘
     │            │            │            │            │
     └────────────┴────────────┴────────────┴────────────┘
                Data flows directly between
                client and chunk servers
                (Master not in data path!)
Key Design Decisions:
  1. Single Master: Simplifies design, stores all metadata in memory
    • Trade-off: Potential bottleneck, single point of failure
    • Mitigation: Master not in data path, shadow masters for read-only access
  2. Large Chunk Size (64MB):
    • Pros: Fewer metadata entries, reduces client-master interaction, efficient for large reads
    • Cons: Can waste space for small files, potential hotspots
  3. Replication: Every chunk stored on 3+ chunk servers
    • Ensures: Availability despite failures
    • Placement: Spread across racks for fault tolerance

How a Read Works in GFS

Client wants to read /data/logs.txt at offset 10MB

Step 1: Client → Master
   "Where is chunk containing offset 10MB of /data/logs.txt?"

Step 2: Master → Client
   "Chunk 5, stored on ChunkServers: [A, B, C]"
   (Master caches this info for future reads)

Step 3: Client → ChunkServer A (closest one)
   "Give me chunk 5, bytes 0-1MB"

Step 4: ChunkServer A → Client
   [Data stream directly to client]

Note: Master only involved in Step 1-2. Data flows directly in Step 3-4!

How a Write Works in GFS

Writes are more complex due to replication:
Client wants to append 1MB to /data/logs.txt

Step 1: Client → Master
   "I want to append to /data/logs.txt"

Step 2: Master → Client
   "Current chunk is chunk 10, replicas at [A, B, C]"
   "A is the primary, lease expires at time T"

Step 3: Client → All replicas (A, B, C)
   [Push data to all replicas - they cache but don't write yet]

Step 4: Client → Primary (A)
   "Write the data I sent you at the next available offset"

Step 5: Primary (A) → Secondaries (B, C)
   "Write at offset X"
   (Primary chooses offset, ensuring serial order)

Step 6: Secondaries → Primary
   "Write complete" or "Write failed"

Step 7: Primary → Client
   "Success" or "Failure on replica B, try again"

If failure: Client retries Steps 3-7
Why This Complexity?
  • Consistency: Primary serializes all writes to a chunk
  • Performance: Data pushed once to all replicas (Step 3)
  • Fault tolerance: If primary fails, master grants lease to a secondary

Consistency Model

GFS provides a relaxed consistency model:
OperationGuarantees
Write (overwrite)Consistent (all replicas see same data) but possibly undefined (mix of fragments from concurrent writes)
Record AppendGuaranteed atomic “at least once” - may have duplicates or padding
Delete/RenameAtomic, serialized by master
What This Means for Applications:
  • Apps must tolerate duplicate records (use unique IDs)
  • Checkpoints and write-ahead logs handle undefined regions
  • Most apps do bulk reads, process duplicates in MapReduce
This relaxed model is why GFS (and HDFS) are NOT suitable for OLTP databases, but perfect for log processing and batch analytics.

Paper 2: MapReduce

Full Citation: Jeffrey Dean and Sanjay Ghemawat. “MapReduce: Simplified Data Processing on Large Clusters.” In Proceedings of the 6th Symposium on Operating Systems Design and Implementation (OSDI), 2004.

The Core Problem

How do you let average programmers process terabytes of data across thousands of machines, without needing a PhD in distributed systems?

The Brilliant Insight

Most data processing tasks follow a common pattern:
  1. Map: Apply a function to each record independently (embarrassingly parallel)
  2. Shuffle: Group results by key
  3. Reduce: Aggregate grouped values
By providing this abstraction, Google could:
  • Hide complexity: Parallelization, distribution, fault tolerance automatic
  • Enable developers: Write simple map() and reduce() functions
  • Ensure reliability: Framework handles failures transparently

The MapReduce Programming Model

Here’s the canonical WordCount example:
// Input: Large text files
// Output: Word counts

map(String key, String value):
  // key: document name
  // value: document contents
  for each word w in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values):
  // key: a word
  // values: list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));
What the framework does automatically:
  1. Splits input across machines
  2. Runs map() on each split in parallel
  3. Shuffles intermediate key-value pairs to reducers
  4. Groups by key
  5. Runs reduce() on each group
  6. Handles machine failures (re-executes tasks)
  7. Optimizes data locality (run map near data)

Execution Flow

┌─────────────────────────────────────────────────────────────┐
│                         Master                              │
│         (Assigns tasks, monitors progress)                  │
└──────┬──────────────────────────────────────────────┬───────┘
       │                                              │
       │ Assign Map Tasks                            │ Assign Reduce Tasks
       │                                              │
┌──────▼──────┐  ┌──────────┐  ┌──────────┐          │
│   Worker    │  │  Worker  │  │  Worker  │          │
│  (Map Task) │  │(Map Task)│  │(Map Task)│          │
└──────┬──────┘  └────┬─────┘  └────┬─────┘          │
       │              │              │                │
       │ Read Input   │              │                │
       ▼              ▼              ▼                │
  ┌────────┐     ┌────────┐     ┌────────┐           │
  │ Split 1│     │ Split 2│     │ Split 3│           │
  └────────┘     └────────┘     └────────┘           │
       │              │              │                │
       │ Emit (k,v)   │              │                │
       ▼              ▼              ▼                │
  ┌──────────────────────────────────────┐           │
  │    Intermediate Files (on local disk) │           │
  │  Partitioned by hash(key) % R        │           │
  └──────────────────┬──────────────┬────┘           │
                     │              │                │
       ┌─────────────┘              └─────────┐      │
       │                                      │      │
       │ Shuffle (network transfer)           │      │
       │                                      │      │
       ▼                                      ▼      ▼
  ┌──────────┐                          ┌──────────┐
  │  Worker  │                          │  Worker  │
  │ (Reduce) │                          │ (Reduce) │
  │  Task 1  │                          │  Task R  │
  └────┬─────┘                          └────┬─────┘
       │                                     │
       │ Read & Sort by key                  │
       │ Apply reduce() function             │
       ▼                                     ▼
  ┌──────────┐                          ┌──────────┐
  │ Output 1 │                          │ Output R │
  └──────────┘                          └──────────┘

Key Implementation Details

Input Splitting:
  • Input files divided into M splits (typically 16-64MB)
  • Each split processed by one map task
  • M typically >> number of machines for load balancing
Output Partitioning:
  • Intermediate keys partitioned into R buckets
  • Default: hash(key) % R
  • R typically set to number of machines
Why M and R?
  • Larger M: Better load balancing, faster recovery
  • Larger R: More output files to merge
  • Typical: M = 200,000, R = 5,000 for 2,000 worker machines
The Problem: Moving data is expensive (limited bandwidth)The Solution: Schedule map tasks on machines storing input data
GFS stores file X with replicas on machines [A, B, C]

Master tries to schedule map task for X on:
  1. Machine A, B, or C (same machine as data)
  2. Machine on same rack as A, B, or C (same-rack network)
  3. Any available machine (cross-rack network)
Impact: In typical runs, majority of map tasks run locally, saving huge network bandwidth
Worker Failure:
  • Master pings workers periodically
  • If worker fails:
    • Completed map tasks re-executed (output on local disk lost)
    • In-progress tasks re-executed on other machines
    • Completed reduce tasks NOT re-executed (output in GFS)
Master Failure:
  • Original paper: Abort and retry entire job (rare event)
  • Modern Hadoop: Uses ZooKeeper for master HA
Deterministic Functions Required:
  • Map and reduce must be deterministic
  • Ensures re-execution produces same output
  • Non-deterministic? Use unique task IDs and atomic commits
Problem: WordCount emits millions of (word, “1”) pairsSolution: Combiner function runs on mapper output BEFORE shuffle
// Without combiner:
// Mapper emits: ("the", 1), ("the", 1), ("the", 1) ... 1000 times
// Network transfer: 1000 pairs

// With combiner:
// Combiner aggregates locally: ("the", 1000)
// Network transfer: 1 pair!

reduce(String key, Iterator values):
  // Combiner has same logic as reducer
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));
When to use: When reduce function is commutative and associative

Real-World Examples from the Paper

The MapReduce paper included production use cases at Google:
  1. Distributed Grep:
    • Map: Emit line if matches pattern
    • Reduce: Identity (just copy)
    • ~150 seconds for 1TB on 1800 machines
  2. URL Access Frequency:
    • Map: Process web logs, emit (URL, 1)
    • Reduce: Sum counts per URL
    • Output: (URL, total_count) pairs
  3. Reverse Web-Link Graph:
    • Map: Parse page, emit (target_url, source_url) for each link
    • Reduce: Concatenate all source URLs for each target
    • Output: (target, list of sources)
  4. Inverted Index (for search engines):
    • Map: Parse document, emit (word, document_id)
    • Reduce: Sort document IDs, emit (word, list of doc_ids)
    • Output: Index for search queries

From Papers to Hadoop

The Translation

GFS ConceptHadoop Equivalent
GFS MasterHDFS NameNode
Chunk ServerHDFS DataNode
64MB Chunks128MB Blocks (configurable)
GFS ClientHDFS Client
Lease mechanismBlock leases
MapReduce ConceptHadoop Equivalent
MasterJobTracker (Hadoop 1.x) / ResourceManager (YARN)
WorkerTaskTracker (Hadoop 1.x) / NodeManager (YARN)
Map/Reduce TasksTasks running in JVMs
Intermediate filesLocal disk shuffle files

Key Differences

HDFS vs GFS:
  • HDFS is open source, GFS is proprietary
  • HDFS uses TCP/IP, GFS has custom RPC
  • HDFS block size larger (128MB default vs 64MB)
  • HDFS has evolved: Federation, HA, Erasure Coding
Hadoop MapReduce vs Google MapReduce:
  • Hadoop added combiners, compression
  • Hadoop 2.x introduced YARN (next-generation resource management)
  • Hadoop ecosystem expanded beyond MapReduce (Hive, Pig, Spark on YARN)

The Evolution: Why YARN?

Hadoop 1.x Limitations

The original Hadoop tightly coupled MapReduce with resource management:
┌─────────────────────────────────────┐
│          JobTracker                 │
│  (Resource Manager +                │
│   MapReduce Framework)              │
└─────────────────────────────────────┘

    ┌───────┴───────┐
    ▼               ▼
┌───────────┐  ┌───────────┐
│TaskTracker│  │TaskTracker│
│  (Run map │  │  (Run map │
│   reduce) │  │   reduce) │
└───────────┘  └───────────┘
Problems:
  • Scalability: JobTracker overwhelmed beyond 4000 nodes
  • Resource utilization: Fixed map/reduce slots, underutilized
  • Multi-tenancy: Hard to run non-MapReduce apps (Spark, etc.)

YARN: Separation of Concerns

Hadoop 2.x introduced YARN (Yet Another Resource Negotiator):
┌─────────────────────────────────────┐
│       ResourceManager               │
│    (Only resource management)       │
└─────────────────────────────────────┘

    ┌───────┴───────┐
    ▼               ▼
┌───────────┐  ┌───────────┐
│NodeManager│  │NodeManager│
│ (Generic  │  │ (Generic  │
│containers)│  │containers)│
└───────────┘  └───────────┘
     │              │
     ▼              ▼
┌─────────┐    ┌─────────┐
│  App    │    │  App    │
│ Master  │    │ Master  │
│(MR/Spark│    │(MR/Spark│
│  etc.)  │    │  etc.)  │
└─────────┘    └─────────┘
Benefits:
  • Scalability to 10,000+ nodes
  • Better resource utilization (dynamic allocation)
  • Multi-framework support (MapReduce, Spark, Flink, etc.)
We’ll dive deep into YARN architecture in Module 4.

The CAP Theorem and Hadoop

Understanding the CAP theorem helps explain Hadoop’s design choices.

CAP Theorem Recap

You can only have 2 of 3:
  • Consistency: All nodes see the same data
  • Availability: System responds to requests
  • Partition Tolerance: System works despite network splits

Hadoop’s Choice: AP (Availability + Partition Tolerance)

HDFS during Network Partition:
  • If DataNodes can’t reach NameNode → They keep serving reads (Availability)
  • Writes may fail or use stale metadata (trades Consistency)
  • Eventually consistent after partition heals
Why this choice?
  • Big data analytics tolerate eventual consistency
  • Availability more critical (batch jobs can retry)
  • Strong consistency would require coordination (slow for petabyte systems)
This is why HDFS is NOT suitable for transactional databases requiring strict ACID guarantees. Use HBase (built on HDFS) for stronger consistency when needed.

Key Takeaways

Before diving into implementation, internalize these principles:

Design for Failure

Failures are normal, not exceptional. Automatic detection and recovery must be built-in from day one.

Move Computation, Not Data

Network bandwidth is precious. Schedule tasks where data lives. This single principle drives countless Hadoop optimizations.

Simplicity Through Abstraction

MapReduce hides distributed systems complexity. Developers write simple functions; framework handles parallelization.

Relaxed Consistency for Performance

Strict consistency is expensive. For batch analytics, eventual consistency and duplicate tolerance enable massive scale.

Interview Focus

Understanding these papers gives you a huge advantage in interviews:
Architecture Questions:
  • “Why does HDFS use large block sizes?”
    • Answer: Reduces metadata overhead, minimizes client-NameNode interaction, optimizes for large sequential reads (GFS assumption 2)
  • “Explain the HDFS write pipeline”
    • Answer: Walk through primary-secondary replication, mention consistency trade-offs
  • “How does MapReduce handle worker failures?”
    • Answer: Re-execute failed tasks, explain why map tasks always re-run but reduces sometimes don’t
Trade-off Questions:
  • “Why isn’t HDFS good for random writes?”
    • Answer: Optimized for append-only, large sequential reads (design assumption from GFS)
  • “What are the downsides of a single NameNode?”
    • Answer: Bottleneck for metadata ops, single point of failure (mention Federation and HA as solutions)
Design Questions:
  • “Design a system to process web logs at scale”
    • Answer: Apply MapReduce pattern, discuss partitioning, combiner optimization

Primary Sources

  1. The Google File System (GFS Paper)
    • Full PDF
    • Read: Sections 1-3 (Introduction, Design, Architecture)
    • Skim: Section 4 (Implementation details)
  2. MapReduce Paper
    • Full PDF
    • Read: Sections 1-3 (Introduction, Programming Model, Implementation)
    • Examples: Section 5 (Applications)

Supplementary

  • “The Hadoop Distributed File System” (Shvachko et al., 2010) - HDFS architectural overview
  • “Bigtable: A Distributed Storage System for Structured Data” - Complements GFS understanding
  • Blog: “The Chubby lock service for loosely-coupled distributed systems” - Used by GFS for coordination

Practical Exercise

Before moving to implementation, solidify your understanding:
1

Read & Annotate

Read the GFS paper introduction and Section 2 (Design Overview). As you read, note:
  • Which assumptions surprise you?
  • How do design decisions follow from assumptions?
2

Think Through Scenarios

Consider these scenarios:
  • A DataNode fails during a read operation - what happens?
  • NameNode metadata fills up memory - what are solutions?
  • A map task takes 10x longer than others - how does framework handle it?
3

Whiteboard Exercise

Draw the architecture of:
  1. HDFS read operation (client → NameNode → DataNode flow)
  2. MapReduce WordCount execution (map → shuffle → reduce)
Explain each step to yourself or a peer.

What’s Next?

Now that you understand the why behind Hadoop’s design, let’s dive into the how.

Module 2: HDFS Architecture & Internals

Deep dive into HDFS implementation, configuration, and hands-on labs
The concepts from this module will be referenced throughout the course. Bookmark this page for quick reference when architectural questions arise!