YARN (Yet Another Resource Negotiator) represents a fundamental redesign of Hadoop introduced in Hadoop 2.0. It decouples resource management from data processing, transforming Hadoop from a MapReduce-only system into a general-purpose cluster operating system capable of running diverse workloads.
Chapter Goals:
Understand why YARN was created and what problems it solves
HADOOP 1.x:──────────JobTracker = Resource Manager + MapReduce Scheduler + Job MonitorTaskTracker = Resource Provider + Map/Reduce ExecutorHADOOP 2.x (YARN):─────────────────ResourceManager = Cluster Resource Manager (ONLY)NodeManager = Resource Provider (ONLY)ApplicationMaster = Per-Application Scheduler and MonitorKEY INSIGHT: Resource Management is Generic──────────────────────────────────────────ResourceManager doesn't know about MapReduce!It only knows about:• Resources: CPU, memory, disk, network• Containers: Resource abstractions• Applications: Things that request containersProcessing logic (MapReduce, Spark, etc.) lives inApplicationMaster, which is application-specific.BENEFITS:────────✓ Scalability: ResourceManager does less work✓ Flexibility: Run any framework, not just MapReduce✓ Isolation: Each app has its own ApplicationMaster✓ Multi-tenancy: Different apps can coexist✓ Innovation: New frameworks without changing core
FRAMEWORKS ON YARN:──────────────────┌────────────────────────────────────────┐│ YARN Cluster │├────────────────────────────────────────┤│ ResourceManager + NodeManagers │└────────────────────────────────────────┘ ↑ ┌───────────┼───────────┐ │ │ │┌─────────┐ ┌────────┐ ┌─────────┐│MapReduce│ │ Spark │ │ Flink ││ v2 │ │ │ │ │└─────────┘ └────────┘ └─────────┘ │ │ │┌─────────┐ ┌────────┐ ┌─────────┐│ Hive │ │ Tez │ │ Storm │└─────────┘ └────────┘ └─────────┘SUPPORTED FRAMEWORKS:────────────────────Batch Processing:• MapReduce v2 (compatibility)• Apache Spark (most popular)• Apache Tez (Hive backend)Stream Processing:• Apache Flink• Apache Storm• Spark StreamingInteractive:• Apache Impala (via YARN)• Presto (can use YARN)Graph Processing:• Apache GiraphMachine Learning:• Spark MLlib• TensorFlow on YARNWHY THIS MATTERS:────────────────Single cluster can run:• Batch jobs (Spark)• Streaming pipelines (Flink)• SQL queries (Hive/Tez)• ML training (TensorFlow)All sharing the same resources efficiently!
Eliminating Single Points of Failure
Copy
RESOURCEMANAGER HA:──────────────────┌──────────────────────────────────┐│ Active ResourceManager ││ (handles all requests) │└──────────────────────────────────┘ ↓ ┌──────────────┐ │ ZooKeeper │ (leader election) └──────────────┘ ↑┌──────────────────────────────────┐│ Standby ResourceManager ││ (ready to take over) │└──────────────────────────────────┘FAILOVER PROCESS:────────────────1. Active RM fails2. ZooKeeper detects failure (no heartbeat)3. Standby RM becomes Active4. Running applications reconnect5. New applications submitted to new ActiveDowntime: Seconds (not hours!)APPLICATION STATE RECOVERY:──────────────────────────ApplicationMaster checkpoints state:• Completed tasks• Running tasks• Pending workOn RM failover:• ApplicationMaster reconnects• Resumes from checkpoint• Only re-run in-flight workNODEMANAGER RESILIENCE:──────────────────────If NodeManager fails:• ResourceManager detects via heartbeat• Containers on that node are lost• ApplicationMaster reschedules work• Other NodeManagers unaffectedNo global impact from single node failure!
RESOURCEMANAGER INTERNALS:─────────────────────────┌────────────────────────────────────────────┐│ ResourceManager │├────────────────────────────────────────────┤│ ││ ┌──────────────────────────────────────┐ ││ │ ApplicationsManager (ASM) │ ││ ├──────────────────────────────────────┤ ││ │ • Accept application submissions │ ││ │ • Negotiate first container for AM │ ││ │ • Restart AM on failure │ ││ └──────────────────────────────────────┘ ││ ││ ┌──────────────────────────────────────┐ ││ │ ApplicationMasterService (AMS) │ ││ ├──────────────────────────────────────┤ ││ │ • AM registration │ ││ │ • Heartbeat from AMs │ ││ │ • Resource requests │ ││ │ • Container allocation responses │ ││ └──────────────────────────────────────┘ ││ ││ ┌──────────────────────────────────────┐ ││ │ Scheduler │ ││ ├──────────────────────────────────────┤ ││ │ • Capacity Scheduler (default) │ ││ │ • Fair Scheduler │ ││ │ • FIFO Scheduler │ ││ │ • Allocates containers to apps │ ││ └──────────────────────────────────────┘ ││ ││ ┌──────────────────────────────────────┐ ││ │ Resource Tracker Service │ ││ ├──────────────────────────────────────┤ ││ │ • NodeManager registration │ ││ │ • Heartbeat from NMs │ ││ │ • Track node health and resources │ ││ └──────────────────────────────────────┘ ││ ││ ┌──────────────────────────────────────┐ ││ │ Client Service │ ││ ├──────────────────────────────────────┤ ││ │ • Application submission │ ││ │ • Application kill/status │ ││ │ • Cluster metrics queries │ ││ └──────────────────────────────────────┘ ││ │└────────────────────────────────────────────┘KEY RESPONSIBILITIES:────────────────────1. Accept Applications: - Client submits ApplicationSubmissionContext - Validate submission (resources, queue, ACLs) - Assign application ID2. Allocate AM Container: - Find suitable NodeManager - Allocate first container for ApplicationMaster - Track AM container location3. Resource Scheduling: - Receive resource requests from AMs - Use scheduler to allocate containers - Respond with container grants4. Node Management: - Track available resources per node - Mark nodes as healthy/unhealthy - Decommission/recommission nodes5. Application Lifecycle: - Monitor AM health via heartbeat - Restart AM on failure (configurable attempts) - Clean up on application completion
Scheduling Algorithms
FIFO, Capacity, and Fair Schedulers
Copy
SCHEDULER COMPARISON:────────────────────┌──────────┬─────────────────────────────────────┐│ Scheduler│ Characteristics │├──────────┼─────────────────────────────────────┤│ FIFO │ Simple queue, first-come first-serve││ │ Good for: Single user, batch jobs ││ │ Bad for: Multi-tenant, mixed SLAs │├──────────┼─────────────────────────────────────┤│ Capacity │ Multiple queues, guaranteed capacity││ │ Good for: Orgs with resource quotas ││ │ Bad for: Dynamic, changing workloads│├──────────┼─────────────────────────────────────┤│ Fair │ Equal share for all apps/users ││ │ Good for: Interactive, multi-tenant ││ │ Bad for: Strict capacity guarantees │└──────────┴─────────────────────────────────────┘CAPACITY SCHEDULER (Default):────────────────────────────Queue Hierarchy:┌─────────────────────────────────────────┐│ root (100%) │├─────────────────────────────────────────┤│ ├─ engineering (60%) ││ │ ├─ dev (30%) ││ │ └─ prod (30%) ││ ├─ marketing (20%) ││ └─ finance (20%) │└─────────────────────────────────────────┘Properties:• Each queue guaranteed minimum capacity• Can use more if available (elasticity)• Can set max capacity (prevent starvation)• Hierarchical queues• ACLs per queueConfiguration:──────────────yarn.scheduler.capacity.root.queues=engineering,marketing,financeyarn.scheduler.capacity.root.engineering.capacity=60yarn.scheduler.capacity.root.marketing.capacity=20yarn.scheduler.capacity.root.finance.capacity=20yarn.scheduler.capacity.root.engineering.queues=dev,prodyarn.scheduler.capacity.root.engineering.dev.capacity=50yarn.scheduler.capacity.root.engineering.prod.capacity=50FAIR SCHEDULER:──────────────Concept: All applications get equal share over timeWithout Fair Scheduler:──────────────────────App A submitted at t=0: Gets all resourcesApp B submitted at t=10: Waits for App A to finishWith Fair Scheduler:───────────────────App A at t=0: Gets 100% (no other apps)App B at t=10: Each app gets 50%App C at t=20: Each app gets 33%App A finishes at t=30: App B and C get 50% eachFair Share Calculation:──────────────────────Total resources: 100 GB memory, 50 coresApps:• App A (weight=1): 33.3 GB, 16 cores• App B (weight=1): 33.3 GB, 17 cores• App C (weight=1): 33.3 GB, 17 coresIf App A doesn't use full share:• App B: 50 GB, 25 cores• App C: 50 GB, 25 coresWeighted Fair Share:───────────────────Total: 100 GB, 50 coresApp A (weight=1): 25 GB, 12.5 coresApp B (weight=2): 50 GB, 25 coresApp C (weight=1): 25 GB, 12.5 coresConfiguration:─────────────yarn.scheduler.fair.preemption=true(kill containers from over-allocated apps)yarn.scheduler.fair.sizebasedweight=true(larger jobs get more weight)FIFO SCHEDULER:──────────────Simplest: First job gets all resources until done.Timeline:─────────t=0: Submit Job 1 (big) → runst=10: Submit Job 2 (small) → waitst=20: Submit Job 3 (big) → waitst=100: Job 1 finishest=100: Job 2 startst=110: Job 2 finishest=110: Job 3 startsProblem: Short jobs wait behind long jobs (poor turnaround)Use only for:• Single-user clusters• Batch processing with known schedule
In large production clusters, YARN is shared by multiple departments, teams, and project types. To manage this complexity, YARN uses Hierarchical Queues.
The FairScheduler ensures that all applications get an equal share of resources over time.
Fair Share: If two apps are running, they each get 50%.
Dominant Resource Fairness (DRF): Handles multi-dimensional resources. If App A is CPU-heavy and App B is Memory-heavy, DRF calculates fairness based on the “dominant” resource each app consumes.
What happens if Queue A is using its guaranteed 50%, and Queue B (also guaranteed 50%) is currently empty? Queue A will scale up to 100%.
If a job is then submitted to Queue B, YARN must reclaim resources from Queue A.
Graceful Termination: YARN sends a signal to the AM of the over-allocated containers, asking them to finish up.
Hard Kill: If the AM doesn’t release containers within a timeout (e.g., 15 seconds), the RM will forcefully kill those containers to satisfy the guarantee of Queue B.
Strategy
When to use
FIFO
Small, private development clusters.
Capacity
Large corporate environments with fixed budgets.
Fair
Clusters with many interactive users (e.g., Spark Shells).
Resource Requests
How Applications Request Containers
Copy
RESOURCE REQUEST STRUCTURE:──────────────────────────ResourceRequest:{ priority: 1, // Higher = more important resourceName: "node1", // Specific node, rack, or "*" capability: { memory: 2048, // MB vCores: 2 }, numContainers: 5, // How many containers relaxLocality: true // Can use other nodes?}LOCALITY LEVELS:───────────────1. Node-local: Specific node resourceName = "datanode1.example.com" Best: Data locality (HDFS block on this node)2. Rack-local: Any node in rack resourceName = "/rack1" Good: Data on same rack (faster network)3. Off-rack: Any node in cluster resourceName = "*" OK: No data locality, but get resourcesRELAXED LOCALITY:────────────────relaxLocality = true (default):• Try node-local first• If not available, try rack-local• If not available, use any noderelaxLocality = false:• MUST get requested node/rack• Wait indefinitely if not available• Use for data-local workloads onlyEXAMPLE: MapReduce Request─────────────────────────For Map Task (data on node3, rack /rack1):Request 1 (Preferred):{ resourceName: "node3", capability: {memory: 1024, vCores: 1}, numContainers: 1, relaxLocality: true}Request 2 (Fallback - rack):{ resourceName: "/rack1", capability: {memory: 1024, vCores: 1}, numContainers: 1, relaxLocality: true}Request 3 (Fallback - any):{ resourceName: "*", capability: {memory: 1024, vCores: 1}, numContainers: 1, relaxLocality: true}Scheduler will try to satisfy in order of preference.PRIORITY:────────ApplicationMaster can assign priorities:• Priority 1: Map tasks (need locality)• Priority 2: Reduce tasks (locality less critical)Higher priority requests served first.RESOURCE REQUEST FLOW:─────────────────────1. AM sends ResourceRequest to RM (via heartbeat, every 1 second)2. RM's Scheduler evaluates: - Check queue limits - Check node availability - Match resource requirements - Consider locality preferences3. RM responds with Container allocations: { containerId: "container_123", nodeId: "node3:8041", resource: {memory: 1024, vCores: 1}, token: "..." }4. AM launches container on NodeManager5. AM sends container completion back to RM
High Availability
Fault Tolerance and Failover
Copy
RESOURCEMANAGER HA ARCHITECTURE:───────────────────────────────┌──────────────────────────────────────┐│ ResourceManager 1 (ACTIVE) ││ • Handles all client requests ││ • Schedules containers ││ • Monitors applications │└──────────────┬───────────────────────┘ │ ↓ ┌───────────────┐ │ ZooKeeper │ │ Ensemble │ (leader election + coordination) └───────────────┘ ↑ │┌──────────────┴───────────────────────┐│ ResourceManager 2 (STANDBY) ││ • Waits for active to fail ││ • Syncs state from ZooKeeper ││ • Ready to become active │└──────────────────────────────────────┘STATE STORAGE:─────────────Stores in ZooKeeper or HDFS:• Running applications• Application attempts• Completed applications• Delegation tokensOn failover, new active RM:• Reads state from ZooKeeper• Recovers application state• AMs re-register• NMs re-registerFAILOVER PROCESS:────────────────Normal Operation:1. Active RM processes requests2. Active RM writes state to ZK3. Standby RM monitors ZKActive RM Fails:1. ZooKeeper detects no heartbeat2. Standby RM wins leader election3. Standby becomes Active4. New Active loads state from ZK5. ApplicationMasters reconnect6. NodeManagers reconnectDowntime: Typically < 30 secondsCLIENT FAILOVER:───────────────Clients configured with both RMs:yarn.resourcemanager.ha.rm-ids=rm1,rm2yarn.resourcemanager.hostname.rm1=host1yarn.resourcemanager.hostname.rm2=host2Client behavior:• Try rm1• If connection fails, try rm2• Automatic retry on failoverAPPLICATIONMASTER RECOVERY:──────────────────────────When RM fails and restarts:Option 1: Work-Preserving Recovery• AM reconnects to new RM• Reports running containers• Continues from current state• Only in-flight work re-executedOption 2: Non-Work-Preserving• AM killed and restarted• Application restarts from beginning• Simpler, but more expensiveCONFIGURATION:─────────────Enable RM HA:yarn.resourcemanager.ha.enabled=trueyarn.resourcemanager.ha.rm-ids=rm1,rm2Enable work-preserving recovery:yarn.resourcemanager.recovery.enabled=trueyarn.resourcemanager.store.class= org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStoreZooKeeper connection:yarn.resourcemanager.zk-address= zk1:2181,zk2:2181,zk3:2181
Deep Dive: YARN State Machines and Lifecycle Internals
The robustness of YARN comes from its strict state-machine-driven design. Both the ResourceManager and ApplicationMaster operate as complex event-driven state machines.
TASK FAILURE:────────────Hadoop 1.x:• JobTracker detects failure• Reschedules task on different TaskTracker• Same as YARNHadoop 2.x:• ApplicationMaster detects failure• Requests new container from RM• Launches task in new container• Same outcome, but AM handles itMASTER FAILURE:──────────────Hadoop 1.x:• JobTracker dies → all jobs LOST• Must manually restart JobTracker• All jobs must be resubmitted• Expensive restartHadoop 2.x:• Active RM dies → Standby RM takes over• ZooKeeper coordinates failover• ApplicationMasters reconnect• Jobs continue from checkpoint• Downtime: secondsYARN is production-grade HA!WORKER FAILURE:──────────────Hadoop 1.x:• TaskTracker dies• All tasks on node fail• JobTracker reschedules• Completed map outputs LOST (re-run maps)Hadoop 2.x:• NodeManager dies• All containers on node fail• RM detects via heartbeat• ApplicationMasters reschedule tasks• Same behaviorBoth handle worker failures well.
YARN: ResourceManager focuses only on resources, scales to 10K+ nodes
3. Poor Resource Utilization:
Fixed map/reduce slots led to idle resources (map slots unused while reduce slots busy)
YARN: Flexible containers can be allocated for any purpose
Utilization improved from ~60% to ~90%
4. No High Availability:
JobTracker failure meant all jobs lost
YARN: Built-in RM HA with ZooKeeper, work-preserving recovery
Core Innovation: Separation of concerns—resource management (RM) separate from application logic (ApplicationMaster).
Intermediate: Explain the role of ApplicationMaster in YARN
Expected Answer:ApplicationMaster (AM) is a per-application coordinator that manages the lifecycle of a single application.Responsibilities:
Resource Negotiation:
Calculates resource needs (memory, CPU per task)
Sends ResourceRequests to ResourceManager
Receives Container allocations
Task Scheduling:
Decides which tasks run in which containers
Handles data locality (for HDFS-aware apps like MapReduce)
Manages task dependencies (map before reduce)
Task Monitoring:
Tracks progress of all tasks
Detects task failures
Requests replacement containers
Failure Handling:
Re-launches failed tasks
Implements speculative execution
Handles node failures
Lifecycle Management:
Starts when application begins
Unregisters from RM when application completes
Exits and releases all resources
Key Insight: AM is application-specific. MapReduce has MRAppMaster, Spark has SparkContext. This allows YARN to support any framework without changing core YARN code.AM Failure: If AM crashes, ResourceManager can restart it (configurable max attempts). On restart, AM can recover state and continue or restart application.
Advanced: How does YARN achieve data locality for HDFS-aware applications?
Expected Answer:Data locality is critical for performance—moving computation to data is far cheaper than moving data to computation.YARN’s Data Locality Mechanism:1. ResourceRequest with Locality Preferences:
Copy
ApplicationMaster sends:{ resourceName: "node3.example.com", // Preferred node capability: {memory: 2GB, vCores: 1}, numContainers: 1, relaxLocality: true // Can use other nodes if needed}
2. Locality Levels:
Node-local (best): Container on same node as HDFS block
Rack-local (good): Container on same rack (faster network)
Off-rack (acceptable): Any available node
3. Scheduler Considers Locality:
Capacity/Fair Schedulers try to satisfy locality first
Wait a configured delay for node-local (default: ~3 seconds)
If not available, relax to rack-local
Last resort: any node
4. Example (MapReduce):
Copy
Input Split: 128MB block on [DN1, DN3, DN5]MRAppMaster sends 3 requests:1. resourceName="DN1" (preferred)2. resourceName="/rack1" (DN1's rack)3. resourceName="*" (any node)Scheduler tries DN1 first. If busy, tries rack1 nodes.If none available, allocates on any node.
5. Impact on Performance:
Node-local: ~10 Gbps (local disk)
Rack-local: ~1 Gbps (rack switch)
Off-rack: ~500 Mbps (core switch)
Reading 128MB:
Node-local: ~1 second
Rack-local: ~10 seconds
Off-rack: ~20 seconds
Configuration:
Copy
yarn.scheduler.capacity.node-locality-delay=40(number of missed opportunities before relaxing)
Key Takeaway: YARN doesn’t guarantee locality, but makes best effort. Framework (AM) provides preferences, scheduler tries to satisfy them.
System Design: Design a YARN scheduler for machine learning workloads
Expected Answer:ML workloads have unique requirements that differ from batch MapReduce:Requirements Analysis:
Long-Running Jobs: ML training jobs run for hours/days (vs minutes for MR)
GPU Resources: Need GPU allocation, not just CPU/memory
Gang Scheduling: Distributed training needs all workers to start together
Preemption Concerns: Can’t kill ML job midway (lose expensive training progress)
Priority: Critical experiments should preempt less important ones
Reservation: Reserve resources for scheduled training runs
MLScheduler:├─ Supports GPU as first-class resource├─ Gang scheduling for distributed training├─ Checkpoint-aware preemption└─ Reservation system for large jobs
3. Gang Scheduling:Problem: Distributed TensorFlow needs 10 workers. If only 7 available, job can’t start.Solution:
Copy
AM requests 10 containers atomically:{ numContainers: 10, gangScheduling: true, allOrNothing: true}Scheduler behavior:• Collect 10 available containers• Allocate ALL at once• If can't get 10, don't allocate any (prevent deadlock)• Timeout: If can't satisfy in N minutes, fail job
1. Scheduler decides to preempt container2. Send PREEMPTION_WARNING to AM3. AM checkpoints model state to HDFS (5 minutes)4. AM signals CHECKPOINT_COMPLETE5. Scheduler kills container6. When resources available, AM resumes from checkpoint
5. Queue Structure:
Copy
root├─ production (40%, preemption disabled)│ └─ inference (40%)├─ research (40%, preemption enabled)│ ├─ critical (20%, min=10%, max=30%)│ ├─ normal (15%, min=5%, max=25%)│ └─ experimental (5%, min=0%, max=15%)└─ adhoc (20%, preemptable)
6. Reservation System:For large, planned training jobs:
Copy
yarn reservation-create \ -queue research/critical \ -start-time "2024-01-25T00:00:00" \ -duration 36000 \ // 10 hours -resource-spec memory=160GB,vCores=32,gpus=8Scheduler reserves these resources.Other jobs can't use during reservation window.
Trade-offs:
Feature
Benefit
Cost
Gang scheduling
Prevents partial allocation
Lower utilization (waiting for all resources)
Checkpoint-aware preemption
Don’t lose training progress
Slower preemption (wait for checkpoint)
Reservation
Guaranteed resources
Idle resources if job doesn’t start
GPU as resource
Proper GPU allocation
Requires custom NodeManager plugins
Alternative: Use Kubernetes with Kubeflow. K8s has better GPU support, custom resource definitions, and ML-specific operators. Many companies moving ML workloads from YARN to K8s.
Deep Dive: How would you debug a YARN application that's stuck in ACCEPTED state?
Expected Answer:Problem: Application submitted, ResourceManager accepted it, but ApplicationMaster never starts.Debugging Approach:1. Check ResourceManager Logs:
Copy
# Look for application IDyarn logs -applicationId app_1234_0001 -log_files resourcemanager.log# Common issues:# - "No nodes available" → All nodes full or unhealthy# - "Queue at max capacity" → Queue limits exceeded# - "User exceeded memory limit" → User quota exhausted
2. Check Queue Status:
Copy
yarn queue -status research# Check:# - Queue capacity used (at 100%?)# - Number of pending applications# - Max AM resource percentage
3. Check Cluster Capacity:
Copy
yarn node -list -all# Look for:# - Number of healthy nodes (enough resources?)# - Available memory/cores# - Unhealthy nodes (disk issues, heartbeat failures)
4. Check AM Resource Requirements:
Copy
yarn application -status app_1234_0001# Check AM container requirements:# - AM memory (default: 1.5GB)# - AM vCores (default: 1)# - Can cluster satisfy these requirements?
Common Root Causes:A. All Nodes Full:
Copy
Cluster: 10 nodes x 8GB = 80GB totalRunning apps using: 78GBNew app needs: 4GB for AMIssue: Not enough free memory on any single nodeSolution:• Wait for running jobs to finish• Kill lower-priority jobs• Add more nodes• Reduce AM memory requirement
B. Queue at Capacity:
Copy
Queue "research" capacity: 40%Cluster total: 100GBQueue limit: 40GBQueue currently using: 40GBNew app: Waiting...Solution:• Wait for jobs in queue to finish• Increase queue capacity• Use different queue• Enable queue elasticity (use spare capacity from other queues)
C. Max AM Percentage Exceeded:
Copy
Configuration:yarn.scheduler.capacity.maximum-am-resource-percent=0.1(Only 10% of cluster can be used for AMs)Cluster: 100GBMax for AMs: 10GBCurrent AM usage: 9.5GBNew AM needs: 1GBIssue: Would exceed 10% limitSolution:• Increase max-am-resource-percent to 0.2• Wait for some applications to complete• Reduce AM memory for new application
AM needs: 4GB memoryNodes:• node1: 3GB available• node2: 2GB available• node3: 3GB availableIssue: No single node has 4GB availableSolution:• Reduce AM memory requirement• Wait for containers to finish on one node• Kill containers on a node to free space
We’ve mastered YARN, Hadoop’s resource management layer. Next, we’ll see how the rich ecosystem of tools builds on HDFS and YARN to provide higher-level abstractions for data processing.