Cluster Deployment and Operations
Module Duration: 4-5 hours
Focus: Production deployment, monitoring, and maintenance
Prerequisites: Understanding of Spark architecture and cluster computing
Overview
This module covers deploying Spark applications in production environments, including cluster management, resource allocation, monitoring, troubleshooting, and best practices for reliable operation at scale.Deployment Modes
Local Mode: Single JVM for development and testing. Standalone: Spark’s built-in cluster manager. YARN: Hadoop’s resource manager for multi-tenant clusters. Kubernetes: Container orchestration for cloud-native deployments. Mesos: General-purpose cluster manager (deprecated).Spark Architecture Review
Cluster Components
Copy
Driver Program
├── SparkContext
├── DAG Scheduler
├── Task Scheduler
└── Block Manager
Cluster Manager (YARN/K8s/Standalone)
├── Resource Allocation
├── Executor Lifecycle
└── Application Monitoring
Executors (Worker Nodes)
├── Tasks
├── Block Manager
├── Cache
└── Shuffle Data
Execution Flow
- Application Submission: spark-submit sends application to cluster manager
- Resource Allocation: Cluster manager allocates executors
- Driver Initialization: SparkContext created, DAG scheduler initialized
- Job Submission: Actions trigger job submission to DAG scheduler
- Task Distribution: Tasks sent to executors for execution
- Result Collection: Results aggregated and returned to driver
Standalone Cluster
Setting Up Standalone Cluster
Copy
# Start master
$SPARK_HOME/sbin/start-master.sh
# Master URL will be displayed: spark://master-host:7077
# Start workers
$SPARK_HOME/sbin/start-worker.sh spark://master-host:7077
# Start all (master + workers)
$SPARK_HOME/sbin/start-all.sh
# Stop all
$SPARK_HOME/sbin/stop-all.sh
Master Configuration
Copy
# conf/spark-env.sh
export SPARK_MASTER_HOST=master-node
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=8"
Worker Configuration
Copy
# conf/spark-env.sh on workers
export SPARK_WORKER_CORES=16
export SPARK_WORKER_MEMORY=64g
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_DIR=/data/spark/work
Submitting to Standalone
Copy
spark-submit \
--master spark://master-host:7077 \
--deploy-mode cluster \
--executor-memory 8g \
--executor-cores 4 \
--total-executor-cores 32 \
--driver-memory 4g \
--conf spark.executor.memoryOverhead=2g \
--class com.example.MyApp \
application.jar \
arg1 arg2
High Availability
Copy
# Use ZooKeeper for master HA
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER \
-Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181,zk3:2181 \
-Dspark.deploy.zookeeper.dir=/spark"
# Start multiple masters
./sbin/start-master.sh # On master1
./sbin/start-master.sh # On master2
./sbin/start-master.sh # On master3
YARN Deployment
YARN Architecture
Copy
ResourceManager (RM)
├── Scheduler
├── ApplicationsManager
└── Resource Tracker
NodeManager (NM) - on each worker
├── Container Manager
├── Node Health Checker
└── Log Aggregation
ApplicationMaster (AM) - Spark Driver
├── Resource Negotiation
├── Task Scheduling
└── Progress Monitoring
YARN Configuration
Copy
<!-- yarn-site.xml -->
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>rm-host</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>65536</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>32768</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>
</configuration>
YARN Client Mode
Copy
# Driver runs on client machine
spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--driver-memory 4g \
--conf spark.yarn.submit.waitAppCompletion=true \
--conf spark.yarn.maxAppAttempts=3 \
application.py
YARN Cluster Mode
Copy
# Driver runs on YARN cluster
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--driver-memory 4g \
--driver-cores 2 \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.yarn.maxAppAttempts=3 \
--conf spark.yarn.am.memory=2g \
--conf spark.yarn.am.cores=1 \
application.py
YARN Resource Queues
Copy
# Submit to specific queue
spark-submit \
--master yarn \
--queue production \
--executor-memory 8g \
application.py
# Configure queue capacity (capacity-scheduler.xml)
Copy
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,production,development</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.capacity</name>
<value>70</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.development.capacity</name>
<value>30</value>
</property>
Kubernetes Deployment
Prerequisites
Copy
# Install kubectl
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
# Configure kubeconfig
kubectl config use-context my-cluster
# Create namespace
kubectl create namespace spark-apps
# Create service account
kubectl create serviceaccount spark -n spark-apps
# Grant permissions
kubectl create clusterrolebinding spark-role \
--clusterrole=edit \
--serviceaccount=spark-apps:spark \
--namespace=spark-apps
Building Docker Image
Copy
# Dockerfile
FROM apache/spark:3.5.0
# Add dependencies
COPY requirements.txt /opt/spark/work-dir/
RUN pip install -r /opt/spark/work-dir/requirements.txt
# Add application
COPY application.py /opt/spark/work-dir/
COPY config.yaml /opt/spark/work-dir/
WORKDIR /opt/spark/work-dir
ENTRYPOINT ["/opt/spark/bin/spark-submit"]
Copy
# Build and push image
docker build -t myregistry/spark-app:v1.0 .
docker push myregistry/spark-app:v1.0
Submitting to Kubernetes
Copy
spark-submit \
--master k8s://https://k8s-master:6443 \
--deploy-mode cluster \
--name spark-app \
--conf spark.kubernetes.container.image=myregistry/spark-app:v1.0 \
--conf spark.kubernetes.namespace=spark-apps \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=5 \
--conf spark.executor.memory=8g \
--conf spark.executor.cores=4 \
--conf spark.driver.memory=4g \
--conf spark.driver.cores=2 \
--conf spark.kubernetes.driver.request.cores=2 \
--conf spark.kubernetes.executor.request.cores=4 \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.kubernetes.executor.limit.cores=4 \
local:///opt/spark/work-dir/application.py
Kubernetes ConfigMap
Copy
# spark-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: spark-config
namespace: spark-apps
data:
spark-defaults.conf: |
spark.executor.memory=8g
spark.executor.cores=4
spark.sql.shuffle.partitions=200
spark.serializer=org.apache.spark.serializer.KryoSerializer
Kubernetes Persistent Volumes
Copy
# persistent-volume.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: spark-data-pvc
namespace: spark-apps
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
storageClassName: nfs-storage
Copy
# Mount in spark-submit
spark-submit \
--master k8s://https://k8s-master:6443 \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-data.mount.path=/data \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-data.options.claimName=spark-data-pvc \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-data.mount.path=/data \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-data.options.claimName=spark-data-pvc \
local:///opt/spark/work-dir/application.py
Spark Operator
Copy
# Install Spark Operator
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install spark-operator spark-operator/spark-operator --namespace spark-operator --create-namespace
# Create SparkApplication
Copy
# spark-application.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-apps
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "apache/spark:3.5.0"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.5.0"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 2
memory: "4g"
serviceAccount: spark
labels:
version: 3.5.0
executor:
cores: 4
instances: 5
memory: "8g"
labels:
version: 3.5.0
Copy
kubectl apply -f spark-application.yaml
kubectl get sparkapplication -n spark-apps
kubectl logs spark-pi-driver -n spark-apps
Resource Management
Calculating Resources
Copy
# Formula for executor configuration
"""
Given cluster resources:
- N nodes
- C cores per node
- M memory per node
Recommended:
- executor_cores = 4-5 (optimal for HDFS throughput)
- Leave 1 core and ~10% memory for OS/services
- Memory overhead = 10% of executor memory (min 384MB)
Example: 10 nodes, 32 cores, 128GB RAM each
- Usable: 31 cores, 115GB per node
- Executors per node: 31 / 5 = 6
- Total executors: 10 * 6 = 60
- Memory per executor: 115GB / 6 = 19GB
- Split: 15GB executor.memory + 4GB overhead
"""
def calculate_executor_config(nodes, cores_per_node, memory_per_node_gb):
"""Calculate optimal executor configuration"""
# Reserve resources for OS
usable_cores = cores_per_node - 1
usable_memory = memory_per_node_gb * 0.9
# Optimal cores per executor
executor_cores = 5
# Calculate executors per node
executors_per_node = usable_cores // executor_cores
total_executors = nodes * executors_per_node
# Calculate memory per executor
memory_per_executor = usable_memory / executors_per_node
# Split memory and overhead (10%)
executor_memory = int(memory_per_executor * 0.9)
memory_overhead = int(memory_per_executor * 0.1)
return {
"num_executors": total_executors,
"executor_cores": executor_cores,
"executor_memory": f"{executor_memory}g",
"memory_overhead": f"{memory_overhead}g",
"total_cores": total_executors * executor_cores
}
# Example
config = calculate_executor_config(
nodes=10,
cores_per_node=32,
memory_per_node_gb=128
)
print(config)
Dynamic Resource Allocation
Copy
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DynamicAllocation") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "100") \
.config("spark.dynamicAllocation.initialExecutors", "10") \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s") \
.config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \
.config("spark.shuffle.service.enabled", "true") \
.getOrCreate()
External Shuffle Service
Copy
# Start external shuffle service (on each node)
$SPARK_HOME/sbin/start-shuffle-service.sh
# Configure in spark-defaults.conf
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
Monitoring and Observability
Spark UI
Copy
# Access Spark UI
# During execution: http://driver-host:4040
# History server: http://history-server:18080
# Key tabs:
# - Jobs: Job execution timeline
# - Stages: Stage details and task metrics
# - Storage: Cached RDDs and memory usage
# - Environment: Spark configuration
# - Executors: Executor metrics and logs
# - SQL: Query execution plans
History Server
Copy
# Configure history server (spark-defaults.conf)
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///spark-logs
spark.history.fs.logDirectory=hdfs:///spark-logs
# Start history server
$SPARK_HOME/sbin/start-history-server.sh
# Access at http://history-server:18080
Metrics System
Copy
# metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=graphite-server
*.sink.graphite.port=2003
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
*.sink.graphite.prefix=spark
# Prometheus metrics
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics
*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Prometheus Integration
Copy
# Enable Prometheus metrics
spark = SparkSession.builder \
.config("spark.ui.prometheus.enabled", "true") \
.config("spark.metrics.conf", "/path/to/metrics.properties") \
.getOrCreate()
Copy
# prometheus.yml
scrape_configs:
- job_name: 'spark'
static_configs:
- targets: ['spark-driver:4040']
metrics_path: '/metrics/prometheus'
Grafana Dashboards
Copy
{
"dashboard": {
"title": "Spark Application Metrics",
"panels": [
{
"title": "Executor Memory Usage",
"targets": [
{
"expr": "spark_executor_memory_usedMemory"
}
]
},
{
"title": "Task Completion Rate",
"targets": [
{
"expr": "rate(spark_executor_completedTasks[5m])"
}
]
},
{
"title": "GC Time",
"targets": [
{
"expr": "spark_executor_jvmGCTime"
}
]
}
]
}
}
Structured Streaming Monitoring
Copy
from pyspark.sql.streaming import StreamingQuery
query = df.writeStream \
.format("console") \
.start()
# Monitor query progress
progress = query.lastProgress
if progress:
print(f"Input rate: {progress['inputRowsPerSecond']}")
print(f"Process rate: {progress['processedRowsPerSecond']}")
print(f"Batch duration: {progress['durationMs']['triggerExecution']}ms")
print(f"State rows: {progress.get('stateOperators', [{}])[0].get('numRowsTotal', 0)}")
# Status information
status = query.status
print(f"Is active: {status['isDataAvailable']}")
print(f"Message: {status['message']}")
Logging
Log Configuration
Copy
# log4j.properties
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Application logging
log4j.logger.com.mycompany=INFO
log4j.logger.org.apache.spark=WARN
log4j.logger.org.apache.hadoop=WARN
# Separate file appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/spark/application.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
Python Logging
Copy
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Use in application
logger.info("Starting Spark application")
logger.warning("Low memory warning")
logger.error("Task failed", exc_info=True)
# Set Spark log level
spark.sparkContext.setLogLevel("WARN")
Centralized Logging
Copy
# Configure log aggregation (log4j.properties)
log4j.appender.fluentd=org.fluentd.log4j.appender.FluentdAppender
log4j.appender.fluentd.host=fluentd-server
log4j.appender.fluentd.port=24224
log4j.appender.fluentd.tag=spark.application
Troubleshooting
Common Issues
Issue 1: OutOfMemoryError
Copy
# Symptoms: Tasks fail with OOM, executor lost
# Solutions:
# 1. Increase executor memory
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")
# 2. Increase partitions
df = df.repartition(1000)
# 3. Reduce batch size for streaming
spark.conf.set("spark.sql.shuffle.partitions", "500")
# 4. Disable broadcast for large tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
# 5. Use external shuffle service
spark.conf.set("spark.shuffle.service.enabled", "true")
Issue 2: Slow Performance
Copy
# Diagnose:
# - Check Spark UI for stage bottlenecks
# - Look for data skew in task duration
# - Monitor shuffle read/write sizes
# Solutions:
# 1. Optimize partition count
optimal_partitions = data_size_gb / 0.5 # 500MB per partition
# 2. Cache frequently used data
df.cache()
# 3. Use broadcast joins
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# 4. Enable adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
# 5. Optimize file format
df.write.parquet("/output") # Use columnar format
Issue 3: Executor Lost
Copy
# Symptoms: Executors crash or become unresponsive
# Solutions:
# 1. Increase executor memory overhead
spark.conf.set("spark.executor.memoryOverhead", "4g")
# 2. Enable speculation
spark.conf.set("spark.speculation", "true")
# 3. Configure heartbeat timeout
spark.conf.set("spark.network.timeout", "600s")
spark.conf.set("spark.executor.heartbeatInterval", "60s")
# 4. Check for GC issues
spark.conf.set("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -verbose:gc")
Issue 4: Data Skew
Copy
# Diagnose: Some tasks take much longer than others
# Solution 1: Salt keys
from pyspark.sql.functions import rand, concat, lit
df_salted = df.withColumn(
"salted_key",
concat(col("key"), lit("_"), (rand() * 10).cast("int"))
)
# Solution 2: Adaptive skew join
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# Solution 3: Separate processing for hot keys
hot_keys = df.groupBy("key").count().filter(col("count") > 1000000)
hot_data = df.join(broadcast(hot_keys), "key")
normal_data = df.join(broadcast(hot_keys), "key", "left_anti")
result = process_hot(hot_data).union(process_normal(normal_data))
Debugging Tools
Copy
# 1. Explain query plan
df.explain(True)
# 2. Show partition distribution
df.withColumn("partition_id", spark_partition_id()) \
.groupBy("partition_id").count() \
.show()
# 3. Sample for testing
sample = df.sample(0.01, seed=42)
# 4. Collect metrics
start_time = time.time()
result = df.count()
duration = time.time() - start_time
print(f"Execution time: {duration}s")
# 5. Profile Python UDFs
spark.conf.set("spark.python.profile", "true")
Security
Authentication
Copy
# Enable Spark authentication
spark-submit \
--conf spark.authenticate=true \
--conf spark.authenticate.secret=my-secret-key \
--conf spark.network.crypto.enabled=true \
--conf spark.network.crypto.keyLength=128 \
application.py
Kerberos Integration
Copy
# Kinit before submitting
kinit -kt /path/to/keytab user@REALM
# Submit with Kerberos
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal user@REALM \
--keytab /path/to/keytab \
--conf spark.yarn.principal=user@REALM \
--conf spark.yarn.keytab=/path/to/keytab \
application.py
SSL/TLS
Copy
# spark-defaults.conf
spark.ssl.enabled=true
spark.ssl.protocol=TLSv1.2
spark.ssl.keyStore=/path/to/keystore.jks
spark.ssl.keyStorePassword=password
spark.ssl.trustStore=/path/to/truststore.jks
spark.ssl.trustStorePassword=password
Access Control
Copy
# Enable SQL authorization
spark.conf.set("spark.sql.authorization.enabled", "true")
spark.conf.set("spark.sql.warehouse.dir", "hdfs:///warehouse")
# Configure Hive metastore security
spark.conf.set("hive.metastore.uris", "thrift://metastore:9083")
spark.conf.set("hive.metastore.sasl.enabled", "true")
Best Practices
Configuration Management
Copy
# Use configuration files
# spark-defaults.conf
"""
spark.executor.memory=8g
spark.executor.cores=4
spark.sql.shuffle.partitions=200
spark.serializer=org.apache.spark.serializer.KryoSerializer
"""
# Override in code when needed
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.shuffle.partitions", "500") \
.getOrCreate()
Deployment Checklist
Copy
"""
Pre-deployment checklist:
1. Resource Configuration
- Executor memory sized correctly
- Number of executors calculated
- Dynamic allocation configured
2. Performance
- Caching strategy defined
- Shuffle partitions tuned
- Broadcast joins identified
3. Reliability
- Checkpointing enabled (streaming)
- Speculation configured
- Max attempts set
4. Monitoring
- Metrics collection enabled
- Logging configured
- Alerting set up
5. Security
- Authentication enabled
- Encryption configured
- Access controls verified
6. Testing
- Tested with production data volume
- Load tested
- Failure scenarios tested
"""
Production Patterns
Copy
# Pattern 1: Graceful shutdown
import signal
import sys
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
spark.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Pattern 2: Retry logic
from time import sleep
def read_with_retry(path, max_retries=3):
for attempt in range(max_retries):
try:
return spark.read.parquet(path)
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Retry {attempt + 1}/{max_retries}")
sleep(2 ** attempt)
# Pattern 3: Health checks
def health_check():
try:
spark.sparkContext.getConf().get("spark.app.id")
return True
except:
return False
Hands-On Exercises
Exercise 1: Deploy to YARN
Copy
# TODO: Deploy application to YARN cluster
# 1. Calculate optimal executor configuration
# 2. Submit application in cluster mode
# 3. Monitor via YARN UI
# 4. Collect application logs
# Your solution here
Exercise 2: Set Up Monitoring
Copy
# TODO: Configure comprehensive monitoring
# 1. Enable Prometheus metrics
# 2. Create Grafana dashboard
# 3. Set up alerting
# 4. Test with sample workload
# Your code here
Exercise 3: Kubernetes Deployment
Copy
# TODO: Deploy to Kubernetes
# 1. Build Docker image
# 2. Create Kubernetes manifests
# 3. Deploy with Spark Operator
# 4. Monitor and troubleshoot
# Your solution here
Summary
Successful Spark operations require:- Cluster Management: Choose appropriate deployment mode
- Resource Allocation: Size executors correctly
- Monitoring: Comprehensive observability
- Troubleshooting: Quick issue resolution
- Security: Authentication and encryption
- Reliability: Fault tolerance and recovery
Key Takeaways
- Choose deployment mode based on infrastructure
- Calculate resources carefully for efficiency
- Monitor continuously with proper tooling
- Log comprehensively for troubleshooting
- Secure all communication channels
- Test thoroughly before production
- Have runbooks for common issues
Continue to the next module for advanced Spark techniques and optimizations.