Advanced HDFS Features
Module Duration : 3-4 hours
Focus : Enterprise HDFS features for production scale
Prerequisites : HDFS Architecture basics from Module 2
HDFS Federation
The Scalability Problem
Traditional HDFS Limitation :
Single NameNode bottleneck:
- All metadata in one NameNode's RAM
- 1GB RAM ≈ 1 million blocks
- For 1 billion files → 1TB+ RAM needed
- Single point of failure
- All namespace operations go through one node
Federation Architecture
Multiple independent NameNodes sharing the same DataNode pool:
┌─────────────────────────────────────────────────────────┐
│ HDFS FEDERATION │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ NameNode 1 │ │ NameNode 2 │ │ NameNode 3 │ │
│ │ Namespace 1 │ │ Namespace 2 │ │ Namespace 3 │ │
│ │ /projects │ │ /users │ │ /data │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ┌────────────────────────┼──────────────────────┐ │
│ │ DataNode Pool (Shared) │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │ DN 1 │ │ DN 2 │ │ DN 3 │ │ DN 4 │ │ │
│ │ └──────┘ └──────┘ └──────┘ └──────┘ │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Key Concepts :
Block Pool : Each NameNode manages its own block pool
Blocks from different namespaces don’t mix
Each block has namespace ID prefix
Namespace Volume : Namespace + Block Pool = one unit
Independent namespaces
No coordination between NameNodes needed
ViewFS : Client-side mount table to access federated cluster
Configuration
hdfs-site.xml (NameNode 1):
< configuration >
<!-- Federation identifier -->
< property >
< name > dfs.nameservices </ name >
< value > ns1,ns2,ns3 </ value >
</ property >
<!-- NameNode 1 configuration -->
< property >
< name > dfs.namenode.rpc-address.ns1 </ name >
< value > namenode1:8020 </ value >
</ property >
< property >
< name > dfs.namenode.http-address.ns1 </ name >
< value > namenode1:50070 </ value >
</ property >
< property >
< name > dfs.namenode.name.dir.ns1 </ name >
< value > file:///data/hadoop/dfs/name-ns1 </ value >
</ property >
<!-- Similar config for ns2, ns3... -->
</ configuration >
ViewFS Mount Table (core-site.xml on clients):
< configuration >
< property >
< name > fs.defaultFS </ name >
< value > viewfs://mycluster </ value >
</ property >
<!-- Mount points -->
< property >
< name > fs.viewfs.mounttable.mycluster.link./projects </ name >
< value > hdfs://ns1/projects </ value >
</ property >
< property >
< name > fs.viewfs.mounttable.mycluster.link./users </ name >
< value > hdfs://ns2/users </ value >
</ property >
< property >
< name > fs.viewfs.mounttable.mycluster.link./data </ name >
< value > hdfs://ns3/data </ value >
</ property >
</ configuration >
Client Access :
Configuration conf = new Configuration ();
FileSystem fs = FileSystem . get ( URI . create ( "viewfs://mycluster" ), conf);
// Access /projects/file.txt → routed to ns1
fs . open ( new Path ( "/projects/file.txt" ));
// Access /users/alice/data → routed to ns2
fs . open ( new Path ( "/users/alice/data" ));
HDFS High Availability (HA)
Standby NameNode Architecture
┌─────────────────────────────────────────────────────────┐
│ HDFS HA CLUSTER │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Active NameNode │ │ Standby NameNode │ │
│ │ │ │ │ │
│ │ • Serves reads │ │ • Tails logs │ │
│ │ • Serves writes │ │ • Stays current │ │
│ │ • Writes edits │ │ • Ready takeover│ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Shared Edit Log (JournalNodes) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Journal N1│ │Journal N2│ │Journal N3│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ Quorum-based journal (2N+1 nodes) │ │
│ └──────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌────────┴────────┐ │
│ │ ZooKeeper │ │
│ │ (Coordination) │ │
│ │ • Leader elect │ │
│ │ • Fencing │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────┘
Quorum Journal Manager (QJM)
How It Works :
Active NameNode writes edit logs to JournalNodes
Writes to quorum (majority) of JournalNodes
For 3 JNs: needs 2 successful writes
For 5 JNs: needs 3 successful writes
Standby NameNode tails edit logs
Reads from JournalNodes continuously
Applies edits to its in-memory state
Always ready to take over
Failover Process :
Active NN crashes
↓
ZooKeeper detects failure (missed heartbeats)
↓
ZKFC (ZK Failover Controller) triggers failover
↓
Fences old Active NN (prevents split-brain)
↓
Standby promoted to Active
↓
New Active starts serving requests
HA Configuration
hdfs-site.xml :
< configuration >
<!-- Enable HA -->
< property >
< name > dfs.nameservices </ name >
< value > mycluster </ value >
</ property >
< property >
< name > dfs.ha.namenodes.mycluster </ name >
< value > nn1,nn2 </ value >
</ property >
<!-- NameNode addresses -->
< property >
< name > dfs.namenode.rpc-address.mycluster.nn1 </ name >
< value > namenode1.example.com:8020 </ value >
</ property >
< property >
< name > dfs.namenode.rpc-address.mycluster.nn2 </ name >
< value > namenode2.example.com:8020 </ value >
</ property >
< property >
< name > dfs.namenode.http-address.mycluster.nn1 </ name >
< value > namenode1.example.com:50070 </ value >
</ property >
< property >
< name > dfs.namenode.http-address.mycluster.nn2 </ name >
< value > namenode2.example.com:50070 </ value >
</ property >
<!-- Shared edits directory (JournalNodes) -->
< property >
< name > dfs.namenode.shared.edits.dir </ name >
< value > qjournal://jn1.example.com:8485;jn2.example.com:8485;jn3.example.com:8485/mycluster </ value >
</ property >
<!-- Client failover configuration -->
< property >
< name > dfs.client.failover.proxy.provider.mycluster </ name >
< value > org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider </ value >
</ property >
<!-- Automatic failover -->
< property >
< name > dfs.ha.automatic-failover.enabled </ name >
< value > true </ value >
</ property >
<!-- Fencing method -->
< property >
< name > dfs.ha.fencing.methods </ name >
< value > sshfence
shell(/bin/true) </ value >
</ property >
< property >
< name > dfs.ha.fencing.ssh.private-key-files </ name >
< value > /home/hadoop/.ssh/id_rsa </ value >
</ property >
</ configuration >
Manual Failover :
# Check which NN is active
hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2
# Perform manual failover
hdfs haadmin -failover nn1 nn2
# Force failover (fence old active)
hdfs haadmin -failover --forcefence nn1 nn2
Erasure Coding
The Space Efficiency Problem
Traditional Replication :
Original data: 100TB
Replication factor: 3
Total storage: 300TB
Overhead: 200% (3x)
Erasure Coding Alternative :
Original data: 100TB
Erasure coding: RS-6-3 (6 data + 3 parity blocks)
Total storage: 150TB
Overhead: 50% (1.5x)
Space saved: 150TB (50% reduction)
How Erasure Coding Works
Reed-Solomon (RS) Encoding :
Original File (54MB):
┌────┬────┬────┬────┬────┬────┐
│ D1 │ D2 │ D3 │ D4 │ D5 │ D6 │ (6 data blocks × 9MB each)
└────┴────┴────┴────┴────┴────┘
│
│ Encode
▼
┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ D1 │ D2 │ D3 │ D4 │ D5 │ D6 │ P1 │ P2 │ P3 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┘
(6 data blocks + 3 parity blocks)
Fault Tolerance: Can lose any 3 blocks and still recover
Example - Lost D2, D4, P1:
┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
│ D1 │ ✗ │ D3 │ ✗ │ D5 │ D6 │ ✗ │ P2 │ P3 │
└────┴────┴────┴────┴────┴────┴────┴────┴────┘
│
│ Decode (using remaining 6 blocks)
▼
┌────┬────┬────┬────┬────┬────┐
│ D1 │ D2'│ D3 │ D4'│ D5 │ D6 │ (D2 and D4 recovered)
└────┴────┴────┴────┴────┴────┘
Erasure Coding Policies
Built-in Policies :
Policy Data Blocks Parity Blocks Total Overhead Max Failures RS-3-2 3 2 5 67% 2 RS-6-3 6 3 9 50% 3 RS-10-4 10 4 14 40% 4 XOR-2-1 2 1 3 50% 1
Configuration and Usage
Enable Erasure Coding :
# List available policies
hdfs ec -listPolicies
# Enable a policy
hdfs ec -enablePolicy -policy RS-6-3-1024k
# Set policy on a directory
hdfs ec -setPolicy -path /archive -policy RS-6-3-1024k
# All files created in /archive will use erasure coding
# Check policy
hdfs ec -getPolicy -path /archive/file.txt
When to Use Erasure Coding :
✅ Good for :
Archive/cold data (rarely accessed)
Large files (>100MB)
Write-once, read-many workloads
Cost-sensitive storage
❌ Not ideal for :
Hot data (frequently accessed)
Small files (<10MB)
Data requiring low-latency reads
High write throughput workloads
Performance Considerations :
// Erasure coding read performance
// Traditional replication: Read from 1 DataNode
// Erasure coding: Read from 6+ DataNodes (for RS-6-3)
// Example: Reading 54MB file with RS-6-3
// - Need to read 6 data blocks (9MB each)
// - Contacts 6 different DataNodes
// - More network overhead
// - Slower than replication for small reads
HDFS Snapshots
Snapshot Basics
Snapshots provide point-in-time, read-only copies of directories:
Directory /data at different points in time:
t0 (Initial):
/data
├── file1.txt (v1)
├── file2.txt (v1)
└── file3.txt (v1)
Create snapshot: .snapshot/snap1
↓
t1 (Modify file1, delete file2):
/data
├── file1.txt (v2) ← modified
└── file3.txt (v1)
Create snapshot: .snapshot/snap2
↓
t2 (Add file4):
/data
├── file1.txt (v2)
├── file3.txt (v1)
└── file4.txt (v1) ← new file
Snapshots:
/data/.snapshot/snap1/ → Shows state at t0
├── file1.txt (v1)
├── file2.txt (v1)
└── file3.txt (v1)
/data/.snapshot/snap2/ → Shows state at t1
├── file1.txt (v2)
└── file3.txt (v1)
Copy-on-Write Mechanism
How Snapshots Save Space :
No duplication of unchanged data!
Snapshot references existing blocks:
- Unchanged files: Point to same blocks
- Modified files: Only new blocks created
- Deleted files: Blocks retained if in snapshot
Example:
Original: file.txt (100MB, 1 block)
Snapshot created
Modify file.txt → New version (100MB, 1 new block)
Storage:
- Original block: 100MB (referenced by snapshot)
- New block: 100MB (current version)
- Total: 200MB (not 300MB with replication!)
If file deleted from current directory:
- Snapshot still references original block
- Block not deleted until snapshot removed
Snapshot Operations
Enable Snapshots :
# Make directory snapshottable
hdfs dfsadmin -allowSnapshot /data
# Create snapshot
hdfs dfs -createSnapshot /data snap_ $( date +%Y%m%d )
# List snapshots
hdfs dfs -ls /data/.snapshot
# Access snapshot data
hdfs dfs -cat /data/.snapshot/snap_20240115/file.txt
# Compare snapshots
hdfs snapshotDiff /data snap_20240115 snap_20240116
# Delete snapshot
hdfs dfs -deleteSnapshot /data snap_20240115
# Disable snapshots (must delete all snapshots first)
hdfs dfsadmin -disallowSnapshot /data
Snapshot Diff :
# Show differences between snapshots
hdfs snapshotDiff /data .snapshot/snap1 .snapshot/snap2
# Output:
# Difference between snapshot snap1 and snapshot snap2:
# M ./file1.txt (Modified)
# - ./file2.txt (Deleted)
# + ./file4.txt (Added)
Use Cases
1. Backup and Recovery :
# Daily snapshots
hdfs dfs -createSnapshot /data daily_ $( date +%Y%m%d )
# User accidentally deletes file
hdfs dfs -rm /data/important.txt
# Restore from snapshot
hdfs dfs -cp /data/.snapshot/daily_20240115/important.txt /data/
2. Testing and Validation :
# Before risky operation
hdfs dfs -createSnapshot /data before_migration
# Run migration
./migrate_data.sh
# Verify or rollback
hdfs snapshotDiff /data before_migration .
# If failed, restore
hdfs dfs -deleteSnapshot /data after_migration
# Data automatically reverts to before_migration state
3. Compliance and Auditing :
# Monthly compliance snapshots (retained for 7 years)
hdfs dfs -createSnapshot /financial_data compliance_ $( date +%Y%m )
# Access historical data
hdfs dfs -cat /financial_data/.snapshot/compliance_201701/report.csv
HDFS Caching
Centralized Cache Management
Problem : Hot data read repeatedly from disk
Solution : Cache frequently accessed blocks in DataNode memory
┌──────────────────────────────────────────────────┐
│ NameNode │
│ • Tracks cache directives │
│ • Instructs DataNodes what to cache │
│ • Monitors cache usage │
└────────────────┬─────────────────────────────────┘
│
│ Cache directives
▼
┌──────────────────────────────────────────────────┐
│ DataNode │
│ ┌────────────────────────────────┐ │
│ │ Memory Cache │ │
│ │ (Hot blocks pinned in RAM) │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │Block1│ │Block2│ │Block3│ │ │
│ │ └──────┘ └──────┘ └──────┘ │ │
│ └────────────────────────────────┘ │
│ ┌────────────────────────────────┐ │
│ │ Disk Storage │ │
│ │ (All blocks) │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
Cache Pool and Directive Management
Create Cache Pool :
# Create pool with limits
hdfs cacheadmin -addPool analytics_pool \
-owner analytics_team \
-group analytics \
-mode 0755 \
-limit 10737418240 # 10GB limit
# List pools
hdfs cacheadmin -listPools
Add Cache Directive :
# Cache entire directory
hdfs cacheadmin -addDirective \
-path /hot_data/daily_reports \
-pool analytics_pool \
-replication 1
# Cache specific file
hdfs cacheadmin -addDirective \
-path /hot_data/popular_dataset.csv \
-pool analytics_pool
# List directives
hdfs cacheadmin -listDirectives -pool analytics_pool
# Remove directive
hdfs cacheadmin -removeDirective < directive_i d >
Programmatic Caching :
import org.apache.hadoop.hdfs.protocol. * ;
public class CacheManager {
public static void cacheFile ( String filePath , String poolName )
throws Exception {
DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem . get ( new Configuration ());
// Add cache directive
CacheDirectiveInfo . Builder builder =
new CacheDirectiveInfo. Builder ();
builder . setPath ( new Path (filePath))
. setPool (poolName)
. setReplication (( short ) 1 );
long directiveId = dfs . addCacheDirective ( builder . build ());
System . out . println ( "Cached " + filePath +
" with directive ID: " + directiveId);
}
public static void checkCacheStatus ( String filePath )
throws Exception {
DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem . get ( new Configuration ());
RemoteIterator < CacheDirectiveEntry > iter =
dfs . listCacheDirectives ( null );
while ( iter . hasNext ()) {
CacheDirectiveEntry entry = iter . next ();
CacheDirectiveInfo info = entry . getInfo ();
CacheDirectiveStats stats = entry . getStats ();
if ( info . getPath (). toString (). equals (filePath)) {
System . out . println ( "Bytes cached: " +
stats . getBytesCached () + " / " +
stats . getBytesNeeded ());
break ;
}
}
}
}
Short-Circuit Local Reads
Enable reading from local DataNode without network :
< property >
< name > dfs.client.read.shortcircuit </ name >
< value > true </ value >
</ property >
< property >
< name > dfs.domain.socket.path </ name >
< value > /var/run/hadoop-hdfs/dn_socket </ value >
</ property >
< property >
< name > dfs.client.read.shortcircuit.skip.checksum </ name >
< value > false </ value >
< description > Don't skip checksums for security </ description >
</ property >
Performance Impact : 30-50% faster for local reads
Hedged Reads
Read from multiple replicas simultaneously for tail latency :
< property >
< name > dfs.client.hedged.read.threadpool.size </ name >
< value > 5 </ value >
</ property >
< property >
< name > dfs.client.hedged.read.threshold.millis </ name >
< value > 10 </ value >
< description > If first read takes >10ms, start hedged read </ description >
</ property >
How It Works :
Client requests block from DataNode A
↓
Wait 10ms
↓
If not received, also request from DataNode B
↓
Use whichever responds first
↓
Cancel other request
DataNode Configuration
<!-- Increase handler threads -->
< property >
< name > dfs.datanode.handler.count </ name >
< value > 10 </ value >
< description > More threads for concurrent transfers </ description >
</ property >
<!-- Increase max transfer threads -->
< property >
< name > dfs.datanode.max.transfer.threads </ name >
< value > 8192 </ value >
</ property >
<!-- Enable async disk service -->
< property >
< name > dfs.datanode.fsdataset.factory </ name >
< value > org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskServiceFactory </ value >
</ property >
Monitoring and Metrics
JMX Metrics Exposure
NameNode Metrics :
# Query via HTTP
curl http://namenode:50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo
# Key metrics:
# - PercentUsed
# - PercentRemaining
# - TotalBlocks
# - MissingBlocks
# - UnderReplicatedBlocks
# - CorruptBlocks
# - LiveNodes
# - DeadNodes
Programmatic Monitoring :
import javax.management. * ;
import java.lang.management. * ;
public class HDFSMonitor {
public static void getNameNodeMetrics () throws Exception {
MBeanServerConnection mbs =
ManagementFactory . getPlatformMBeanServer ();
ObjectName nameNodeMBean = new ObjectName (
"Hadoop:service=NameNode,name=FSNamesystem" );
// Get capacity
long capacityTotal = (Long) mbs . getAttribute (
nameNodeMBean, "CapacityTotal" );
long capacityUsed = (Long) mbs . getAttribute (
nameNodeMBean, "CapacityUsed" );
long capacityRemaining = (Long) mbs . getAttribute (
nameNodeMBean, "CapacityRemaining" );
System . out . println ( "Capacity Total: " + capacityTotal);
System . out . println ( "Capacity Used: " + capacityUsed +
" (" + (capacityUsed * 100.0 / capacityTotal) + "%)" );
System . out . println ( "Capacity Remaining: " + capacityRemaining);
// Get block stats
long underReplicatedBlocks = (Long) mbs . getAttribute (
nameNodeMBean, "UnderReplicatedBlocks" );
long corruptBlocks = (Long) mbs . getAttribute (
nameNodeMBean, "CorruptBlocks" );
System . out . println ( "Under-replicated Blocks: " +
underReplicatedBlocks);
System . out . println ( "Corrupt Blocks: " + corruptBlocks);
}
}
Best Practices Summary
Use Federation for Scale When metadata exceeds 100M files or single NameNode RAM limits, deploy federation to horizontally scale.
HA is Mandatory Always use HA in production. Manual NameNode recovery takes hours and risks data inconsistency.
Erasure Code Cold Data Archive data older than 90 days with erasure coding to save 50% storage costs.
Snapshot for Safety Daily snapshots before risky operations. Retention: 7 daily, 4 weekly, 12 monthly.
Interview Focus
Common Questions :
“How does HDFS HA prevent split-brain?”
Fencing: Active NN cannot write to JournalNodes after losing quorum
ZooKeeper coordination ensures only one Active NN at a time
SSH fencing kills old NN process if needed
“When to use Federation vs HA?”
HA: High availability, failover (same namespace)
Federation: Horizontal scalability (multiple independent namespaces)
Can combine both: Federated cluster with HA for each namespace
“Why is erasure coding slower than replication?”
Must read from 6+ DataNodes vs 1 for replication
Decode overhead for reconstructing data
Trade-off: 50% storage savings for slightly slower reads
What’s Next?
Module 3: MapReduce Programming Model Now that you master storage, learn to process data with MapReduce