Skip to main content

Advanced HDFS Features

Module Duration: 3-4 hours Focus: Enterprise HDFS features for production scale Prerequisites: HDFS Architecture basics from Module 2

HDFS Federation

The Scalability Problem

Traditional HDFS Limitation:
Single NameNode bottleneck:
- All metadata in one NameNode's RAM
- 1GB RAM ≈ 1 million blocks
- For 1 billion files → 1TB+ RAM needed
- Single point of failure
- All namespace operations go through one node

Federation Architecture

Multiple independent NameNodes sharing the same DataNode pool:
┌─────────────────────────────────────────────────────────┐
│                  HDFS FEDERATION                        │
│                                                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐ │
│  │  NameNode 1  │  │  NameNode 2  │  │  NameNode 3  │ │
│  │ Namespace 1  │  │ Namespace 2  │  │ Namespace 3  │ │
│  │  /projects   │  │   /users     │  │    /data     │ │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘ │
│         │                 │                 │          │
│         └─────────────────┼─────────────────┘          │
│                           │                            │
│  ┌────────────────────────┼──────────────────────┐     │
│  │        DataNode Pool (Shared)                 │     │
│  │  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐     │     │
│  │  │ DN 1 │  │ DN 2 │  │ DN 3 │  │ DN 4 │     │     │
│  │  └──────┘  └──────┘  └──────┘  └──────┘     │     │
│  └───────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────────┘
Key Concepts:
  1. Block Pool: Each NameNode manages its own block pool
    • Blocks from different namespaces don’t mix
    • Each block has namespace ID prefix
  2. Namespace Volume: Namespace + Block Pool = one unit
    • Independent namespaces
    • No coordination between NameNodes needed
  3. ViewFS: Client-side mount table to access federated cluster

Configuration

hdfs-site.xml (NameNode 1):
<configuration>
  <!-- Federation identifier -->
  <property>
    <name>dfs.nameservices</name>
    <value>ns1,ns2,ns3</value>
  </property>

  <!-- NameNode 1 configuration -->
  <property>
    <name>dfs.namenode.rpc-address.ns1</name>
    <value>namenode1:8020</value>
  </property>

  <property>
    <name>dfs.namenode.http-address.ns1</name>
    <value>namenode1:50070</value>
  </property>

  <property>
    <name>dfs.namenode.name.dir.ns1</name>
    <value>file:///data/hadoop/dfs/name-ns1</value>
  </property>

  <!-- Similar config for ns2, ns3... -->
</configuration>
ViewFS Mount Table (core-site.xml on clients):
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>viewfs://mycluster</value>
  </property>

  <!-- Mount points -->
  <property>
    <name>fs.viewfs.mounttable.mycluster.link./projects</name>
    <value>hdfs://ns1/projects</value>
  </property>

  <property>
    <name>fs.viewfs.mounttable.mycluster.link./users</name>
    <value>hdfs://ns2/users</value>
  </property>

  <property>
    <name>fs.viewfs.mounttable.mycluster.link./data</name>
    <value>hdfs://ns3/data</value>
  </property>
</configuration>
Client Access:
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("viewfs://mycluster"), conf);

// Access /projects/file.txt → routed to ns1
fs.open(new Path("/projects/file.txt"));

// Access /users/alice/data → routed to ns2
fs.open(new Path("/users/alice/data"));

HDFS High Availability (HA)

Standby NameNode Architecture

┌─────────────────────────────────────────────────────────┐
│                 HDFS HA CLUSTER                         │
│                                                         │
│  ┌──────────────────┐         ┌──────────────────┐     │
│  │  Active NameNode │         │ Standby NameNode │     │
│  │                  │         │                  │     │
│  │  • Serves reads  │         │  • Tails logs    │     │
│  │  • Serves writes │         │  • Stays current │     │
│  │  • Writes edits  │         │  • Ready takeover│     │
│  └────────┬─────────┘         └────────┬─────────┘     │
│           │                            │               │
│           ▼                            ▼               │
│  ┌──────────────────────────────────────────────┐      │
│  │      Shared Edit Log (JournalNodes)          │      │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐     │      │
│  │  │Journal N1│ │Journal N2│ │Journal N3│     │      │
│  │  └──────────┘ └──────────┘ └──────────┘     │      │
│  │        Quorum-based journal (2N+1 nodes)     │      │
│  └──────────────────────────────────────────────┘      │
│                        ▲                                │
│                        │                                │
│               ┌────────┴────────┐                       │
│               │   ZooKeeper     │                       │
│               │  (Coordination) │                       │
│               │  • Leader elect │                       │
│               │  • Fencing      │                       │
│               └─────────────────┘                       │
└─────────────────────────────────────────────────────────┘

Quorum Journal Manager (QJM)

How It Works:
  1. Active NameNode writes edit logs to JournalNodes
    • Writes to quorum (majority) of JournalNodes
    • For 3 JNs: needs 2 successful writes
    • For 5 JNs: needs 3 successful writes
  2. Standby NameNode tails edit logs
    • Reads from JournalNodes continuously
    • Applies edits to its in-memory state
    • Always ready to take over
  3. Failover Process:
    Active NN crashes
    
    ZooKeeper detects failure (missed heartbeats)
    
    ZKFC (ZK Failover Controller) triggers failover
    
    Fences old Active NN (prevents split-brain)
    
    Standby promoted to Active
    
    New Active starts serving requests
    

HA Configuration

hdfs-site.xml:
<configuration>
  <!-- Enable HA -->
  <property>
    <name>dfs.nameservices</name>
    <value>mycluster</value>
  </property>

  <property>
    <name>dfs.ha.namenodes.mycluster</name>
    <value>nn1,nn2</value>
  </property>

  <!-- NameNode addresses -->
  <property>
    <name>dfs.namenode.rpc-address.mycluster.nn1</name>
    <value>namenode1.example.com:8020</value>
  </property>

  <property>
    <name>dfs.namenode.rpc-address.mycluster.nn2</name>
    <value>namenode2.example.com:8020</value>
  </property>

  <property>
    <name>dfs.namenode.http-address.mycluster.nn1</name>
    <value>namenode1.example.com:50070</value>
  </property>

  <property>
    <name>dfs.namenode.http-address.mycluster.nn2</name>
    <value>namenode2.example.com:50070</value>
  </property>

  <!-- Shared edits directory (JournalNodes) -->
  <property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://jn1.example.com:8485;jn2.example.com:8485;jn3.example.com:8485/mycluster</value>
  </property>

  <!-- Client failover configuration -->
  <property>
    <name>dfs.client.failover.proxy.provider.mycluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  </property>

  <!-- Automatic failover -->
  <property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
  </property>

  <!-- Fencing method -->
  <property>
    <name>dfs.ha.fencing.methods</name>
    <value>sshfence
shell(/bin/true)</value>
  </property>

  <property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/home/hadoop/.ssh/id_rsa</value>
  </property>
</configuration>
Manual Failover:
# Check which NN is active
hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2

# Perform manual failover
hdfs haadmin -failover nn1 nn2

# Force failover (fence old active)
hdfs haadmin -failover --forcefence nn1 nn2

Erasure Coding

The Space Efficiency Problem

Traditional Replication:
Original data: 100TB
Replication factor: 3
Total storage: 300TB
Overhead: 200% (3x)
Erasure Coding Alternative:
Original data: 100TB
Erasure coding: RS-6-3 (6 data + 3 parity blocks)
Total storage: 150TB
Overhead: 50% (1.5x)

Space saved: 150TB (50% reduction)

How Erasure Coding Works

Reed-Solomon (RS) Encoding:
Original File (54MB):
┌────┬────┬────┬────┬────┬────┐
│ D1 │ D2 │ D3 │ D4 │ D5 │ D6 │  (6 data blocks × 9MB each)
└────┴────┴────┴────┴────┴────┘

         │ Encode

┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ D1 │ D2 │ D3 │ D4 │ D5 │ D6 │ P1 │ P2 │ P3 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┘
 (6 data blocks + 3 parity blocks)

Fault Tolerance: Can lose any 3 blocks and still recover

Example - Lost D2, D4, P1:
┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ D1 │ ✗  │ D3 │ ✗  │ D5 │ D6 │ ✗  │ P2 │ P3 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┘

         │ Decode (using remaining 6 blocks)

┌────┬────┬────┬────┬────┬────┐
│ D1 │ D2'│ D3 │ D4'│ D5 │ D6 │  (D2 and D4 recovered)
└────┴────┴────┴────┴────┴────┘

Erasure Coding Policies

Built-in Policies:
PolicyData BlocksParity BlocksTotalOverheadMax Failures
RS-3-232567%2
RS-6-363950%3
RS-10-41041440%4
XOR-2-121350%1

Configuration and Usage

Enable Erasure Coding:
# List available policies
hdfs ec -listPolicies

# Enable a policy
hdfs ec -enablePolicy -policy RS-6-3-1024k

# Set policy on a directory
hdfs ec -setPolicy -path /archive -policy RS-6-3-1024k

# All files created in /archive will use erasure coding

# Check policy
hdfs ec -getPolicy -path /archive/file.txt
When to Use Erasure Coding: Good for:
  • Archive/cold data (rarely accessed)
  • Large files (>100MB)
  • Write-once, read-many workloads
  • Cost-sensitive storage
Not ideal for:
  • Hot data (frequently accessed)
  • Small files (<10MB)
  • Data requiring low-latency reads
  • High write throughput workloads
Performance Considerations:
// Erasure coding read performance
// Traditional replication: Read from 1 DataNode
// Erasure coding: Read from 6+ DataNodes (for RS-6-3)

// Example: Reading 54MB file with RS-6-3
// - Need to read 6 data blocks (9MB each)
// - Contacts 6 different DataNodes
// - More network overhead
// - Slower than replication for small reads

HDFS Snapshots

Snapshot Basics

Snapshots provide point-in-time, read-only copies of directories:
Directory /data at different points in time:

t0 (Initial):
/data
  ├── file1.txt (v1)
  ├── file2.txt (v1)
  └── file3.txt (v1)

Create snapshot: .snapshot/snap1


t1 (Modify file1, delete file2):
/data
  ├── file1.txt (v2)  ← modified
  └── file3.txt (v1)

Create snapshot: .snapshot/snap2


t2 (Add file4):
/data
  ├── file1.txt (v2)
  ├── file3.txt (v1)
  └── file4.txt (v1)  ← new file

Snapshots:
/data/.snapshot/snap1/  → Shows state at t0
  ├── file1.txt (v1)
  ├── file2.txt (v1)
  └── file3.txt (v1)

/data/.snapshot/snap2/  → Shows state at t1
  ├── file1.txt (v2)
  └── file3.txt (v1)

Copy-on-Write Mechanism

How Snapshots Save Space:
No duplication of unchanged data!

Snapshot references existing blocks:
- Unchanged files: Point to same blocks
- Modified files: Only new blocks created
- Deleted files: Blocks retained if in snapshot

Example:
Original: file.txt (100MB, 1 block)
Snapshot created
Modify file.txt → New version (100MB, 1 new block)

Storage:
  - Original block: 100MB (referenced by snapshot)
  - New block: 100MB (current version)
  - Total: 200MB (not 300MB with replication!)

If file deleted from current directory:
  - Snapshot still references original block
  - Block not deleted until snapshot removed

Snapshot Operations

Enable Snapshots:
# Make directory snapshottable
hdfs dfsadmin -allowSnapshot /data

# Create snapshot
hdfs dfs -createSnapshot /data snap_$(date +%Y%m%d)

# List snapshots
hdfs dfs -ls /data/.snapshot

# Access snapshot data
hdfs dfs -cat /data/.snapshot/snap_20240115/file.txt

# Compare snapshots
hdfs snapshotDiff /data snap_20240115 snap_20240116

# Delete snapshot
hdfs dfs -deleteSnapshot /data snap_20240115

# Disable snapshots (must delete all snapshots first)
hdfs dfsadmin -disallowSnapshot /data
Snapshot Diff:
# Show differences between snapshots
hdfs snapshotDiff /data .snapshot/snap1 .snapshot/snap2

# Output:
# Difference between snapshot snap1 and snapshot snap2:
# M       ./file1.txt   (Modified)
# -       ./file2.txt   (Deleted)
# +       ./file4.txt   (Added)

Use Cases

1. Backup and Recovery:
# Daily snapshots
hdfs dfs -createSnapshot /data daily_$(date +%Y%m%d)

# User accidentally deletes file
hdfs dfs -rm /data/important.txt

# Restore from snapshot
hdfs dfs -cp /data/.snapshot/daily_20240115/important.txt /data/
2. Testing and Validation:
# Before risky operation
hdfs dfs -createSnapshot /data before_migration

# Run migration
./migrate_data.sh

# Verify or rollback
hdfs snapshotDiff /data before_migration .

# If failed, restore
hdfs dfs -deleteSnapshot /data after_migration
# Data automatically reverts to before_migration state
3. Compliance and Auditing:
# Monthly compliance snapshots (retained for 7 years)
hdfs dfs -createSnapshot /financial_data compliance_$(date +%Y%m)

# Access historical data
hdfs dfs -cat /financial_data/.snapshot/compliance_201701/report.csv

HDFS Caching

Centralized Cache Management

Problem: Hot data read repeatedly from disk Solution: Cache frequently accessed blocks in DataNode memory
┌──────────────────────────────────────────────────┐
│              NameNode                            │
│  • Tracks cache directives                      │
│  • Instructs DataNodes what to cache             │
│  • Monitors cache usage                          │
└────────────────┬─────────────────────────────────┘

                 │ Cache directives

┌──────────────────────────────────────────────────┐
│           DataNode                               │
│  ┌────────────────────────────────┐              │
│  │  Memory Cache                  │              │
│  │  (Hot blocks pinned in RAM)    │              │
│  │  ┌──────┐ ┌──────┐ ┌──────┐   │              │
│  │  │Block1│ │Block2│ │Block3│   │              │
│  │  └──────┘ └──────┘ └──────┘   │              │
│  └────────────────────────────────┘              │
│  ┌────────────────────────────────┐              │
│  │  Disk Storage                  │              │
│  │  (All blocks)                  │              │
│  └────────────────────────────────┘              │
└──────────────────────────────────────────────────┘

Cache Pool and Directive Management

Create Cache Pool:
# Create pool with limits
hdfs cacheadmin -addPool analytics_pool \
  -owner analytics_team \
  -group analytics \
  -mode 0755 \
  -limit 10737418240  # 10GB limit

# List pools
hdfs cacheadmin -listPools
Add Cache Directive:
# Cache entire directory
hdfs cacheadmin -addDirective \
  -path /hot_data/daily_reports \
  -pool analytics_pool \
  -replication 1

# Cache specific file
hdfs cacheadmin -addDirective \
  -path /hot_data/popular_dataset.csv \
  -pool analytics_pool

# List directives
hdfs cacheadmin -listDirectives -pool analytics_pool

# Remove directive
hdfs cacheadmin -removeDirective <directive_id>
Programmatic Caching:
import org.apache.hadoop.hdfs.protocol.*;

public class CacheManager {

    public static void cacheFile(String filePath, String poolName)
            throws Exception {

        DistributedFileSystem dfs = (DistributedFileSystem)
            FileSystem.get(new Configuration());

        // Add cache directive
        CacheDirectiveInfo.Builder builder =
            new CacheDirectiveInfo.Builder();

        builder.setPath(new Path(filePath))
               .setPool(poolName)
               .setReplication((short) 1);

        long directiveId = dfs.addCacheDirective(builder.build());

        System.out.println("Cached " + filePath +
                          " with directive ID: " + directiveId);
    }

    public static void checkCacheStatus(String filePath)
            throws Exception {

        DistributedFileSystem dfs = (DistributedFileSystem)
            FileSystem.get(new Configuration());

        RemoteIterator<CacheDirectiveEntry> iter =
            dfs.listCacheDirectives(null);

        while (iter.hasNext()) {
            CacheDirectiveEntry entry = iter.next();
            CacheDirectiveInfo info = entry.getInfo();
            CacheDirectiveStats stats = entry.getStats();

            if (info.getPath().toString().equals(filePath)) {
                System.out.println("Bytes cached: " +
                    stats.getBytesCached() + " / " +
                    stats.getBytesNeeded());
                break;
            }
        }
    }
}

HDFS Performance Tuning

Short-Circuit Local Reads

Enable reading from local DataNode without network:
<property>
  <name>dfs.client.read.shortcircuit</name>
  <value>true</value>
</property>

<property>
  <name>dfs.domain.socket.path</name>
  <value>/var/run/hadoop-hdfs/dn_socket</value>
</property>

<property>
  <name>dfs.client.read.shortcircuit.skip.checksum</name>
  <value>false</value>
  <description>Don't skip checksums for security</description>
</property>
Performance Impact: 30-50% faster for local reads

Hedged Reads

Read from multiple replicas simultaneously for tail latency:
<property>
  <name>dfs.client.hedged.read.threadpool.size</name>
  <value>5</value>
</property>

<property>
  <name>dfs.client.hedged.read.threshold.millis</name>
  <value>10</value>
  <description>If first read takes >10ms, start hedged read</description>
</property>
How It Works:
Client requests block from DataNode A

Wait 10ms

If not received, also request from DataNode B

Use whichever responds first

Cancel other request

DataNode Configuration

<!-- Increase handler threads -->
<property>
  <name>dfs.datanode.handler.count</name>
  <value>10</value>
  <description>More threads for concurrent transfers</description>
</property>

<!-- Increase max transfer threads -->
<property>
  <name>dfs.datanode.max.transfer.threads</name>
  <value>8192</value>
</property>

<!-- Enable async disk service -->
<property>
  <name>dfs.datanode.fsdataset.factory</name>
  <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskServiceFactory</value>
</property>

Monitoring and Metrics

JMX Metrics Exposure

NameNode Metrics:
# Query via HTTP
curl http://namenode:50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo

# Key metrics:
# - PercentUsed
# - PercentRemaining
# - TotalBlocks
# - MissingBlocks
# - UnderReplicatedBlocks
# - CorruptBlocks
# - LiveNodes
# - DeadNodes
Programmatic Monitoring:
import javax.management.*;
import java.lang.management.*;

public class HDFSMonitor {

    public static void getNameNodeMetrics() throws Exception {
        MBeanServerConnection mbs =
            ManagementFactory.getPlatformMBeanServer();

        ObjectName nameNodeMBean = new ObjectName(
            "Hadoop:service=NameNode,name=FSNamesystem");

        // Get capacity
        long capacityTotal = (Long) mbs.getAttribute(
            nameNodeMBean, "CapacityTotal");
        long capacityUsed = (Long) mbs.getAttribute(
            nameNodeMBean, "CapacityUsed");
        long capacityRemaining = (Long) mbs.getAttribute(
            nameNodeMBean, "CapacityRemaining");

        System.out.println("Capacity Total: " + capacityTotal);
        System.out.println("Capacity Used: " + capacityUsed +
            " (" + (capacityUsed * 100.0 / capacityTotal) + "%)");
        System.out.println("Capacity Remaining: " + capacityRemaining);

        // Get block stats
        long underReplicatedBlocks = (Long) mbs.getAttribute(
            nameNodeMBean, "UnderReplicatedBlocks");
        long corruptBlocks = (Long) mbs.getAttribute(
            nameNodeMBean, "CorruptBlocks");

        System.out.println("Under-replicated Blocks: " +
            underReplicatedBlocks);
        System.out.println("Corrupt Blocks: " + corruptBlocks);
    }
}

Best Practices Summary

Use Federation for Scale

When metadata exceeds 100M files or single NameNode RAM limits, deploy federation to horizontally scale.

HA is Mandatory

Always use HA in production. Manual NameNode recovery takes hours and risks data inconsistency.

Erasure Code Cold Data

Archive data older than 90 days with erasure coding to save 50% storage costs.

Snapshot for Safety

Daily snapshots before risky operations. Retention: 7 daily, 4 weekly, 12 monthly.

Interview Focus

Common Questions:
  1. “How does HDFS HA prevent split-brain?”
    • Fencing: Active NN cannot write to JournalNodes after losing quorum
    • ZooKeeper coordination ensures only one Active NN at a time
    • SSH fencing kills old NN process if needed
  2. “When to use Federation vs HA?”
    • HA: High availability, failover (same namespace)
    • Federation: Horizontal scalability (multiple independent namespaces)
    • Can combine both: Federated cluster with HA for each namespace
  3. “Why is erasure coding slower than replication?”
    • Must read from 6+ DataNodes vs 1 for replication
    • Decode overhead for reconstructing data
    • Trade-off: 50% storage savings for slightly slower reads

What’s Next?

Module 3: MapReduce Programming Model

Now that you master storage, learn to process data with MapReduce