Production Flink: Deployment, Operations & Monitoring at Scale
Module Duration : 7-8 hours
Focus : Kubernetes/YARN deployment, HA, state backends, monitoring, troubleshooting
Prerequisites : Flink fundamentals, Kubernetes basics, distributed systems knowledge
Hands-on Labs : Production cluster setup and monitoring
Introduction: From Development to Production
The Production Gap
Development (laptop):
# Easy mode
./bin/start-cluster.sh
./bin/flink run examples/streaming/WordCount.jar
Production (distributed cluster):
100+ TaskManagers across multiple nodes
Terabytes of state in RocksDB
24/7 uptime requirements (HA, failover)
Multi-tenant resource management
Metrics, logging, alerting for 1000s of jobs
Zero-downtime upgrades
This module teaches you how to bridge that gap .
Part 1: Deployment Architectures
Flink Cluster Components
┌─────────────────────────────────────────────────┐
│ Flink Cluster Architecture │
├─────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌───────────────┐ │
│ │ JobManager │ │ TaskManager 1 │ │
│ │ (Master) │◄────────►│ (Worker) │ │
│ │ │ │ - Task Slots │ │
│ │ - Scheduling │ │ - State │ │
│ │ - Checkpoint │ └───────────────┘ │
│ │ - Recovery │ │
│ └──────────────┘ ┌───────────────┐ │
│ │ TaskManager 2 │ │
│ │ (Worker) │ │
│ └───────────────┘ │
└─────────────────────────────────────────────────┘
Deployment Modes
1. Standalone Cluster (Development)
# Start cluster
./bin/start-cluster.sh
# Submit job
./bin/flink run -c com.example.Job job.jar
# Stop cluster
./bin/stop-cluster.sh
Pros : Simple setup
Cons : No resource management, no HA
2. YARN Deployment (Hadoop Ecosystem)
Per-Job Mode (recommended):
# Submit job (YARN creates cluster just for this job)
./bin/flink run -m yarn-cluster \
-yn 10 \ # Number of TaskManagers
-yjm 2048 \ # JobManager memory (MB)
-ytm 4096 \ # TaskManager memory (MB)
-ys 4 \ # Slots per TaskManager
-c com.example.Job job.jar
Session Mode (shared cluster):
# Start YARN session
./bin/yarn-session.sh -n 10 -jm 2048 -tm 4096
# Submit jobs to session
./bin/flink run -c com.example.Job1 job1.jar
./bin/flink run -c com.example.Job2 job2.jar
3. Kubernetes Deployment (Cloud Native)
Native Kubernetes Integration :
# flink-configuration.yaml
apiVersion : v1
kind : ConfigMap
metadata :
name : flink-config
namespace : flink
data :
flink-conf.yaml : |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m
parallelism.default: 4
# State backend
state.backend: rocksdb
state.checkpoints.dir: s3://my-bucket/flink/checkpoints
state.savepoints.dir: s3://my-bucket/flink/savepoints
# HA configuration
high-availability: kubernetes
high-availability.storageDir: s3://my-bucket/flink/ha
---
# JobManager Deployment
apiVersion : apps/v1
kind : Deployment
metadata :
name : flink-jobmanager
namespace : flink
spec :
replicas : 1
selector :
matchLabels :
app : flink
component : jobmanager
template :
metadata :
labels :
app : flink
component : jobmanager
spec :
containers :
- name : jobmanager
image : flink:1.18.0
args : [ "jobmanager" ]
ports :
- containerPort : 6123
name : rpc
- containerPort : 6124
name : blob-server
- containerPort : 8081
name : webui
env :
- name : JOB_MANAGER_RPC_ADDRESS
value : flink-jobmanager
volumeMounts :
- name : flink-config-volume
mountPath : /opt/flink/conf
resources :
requests :
memory : "2Gi"
cpu : "1"
limits :
memory : "2Gi"
cpu : "2"
volumes :
- name : flink-config-volume
configMap :
name : flink-config
items :
- key : flink-conf.yaml
path : flink-conf.yaml
---
# JobManager Service
apiVersion : v1
kind : Service
metadata :
name : flink-jobmanager
namespace : flink
spec :
type : ClusterIP
ports :
- name : rpc
port : 6123
- name : blob-server
port : 6124
- name : webui
port : 8081
selector :
app : flink
component : jobmanager
---
# JobManager UI (LoadBalancer)
apiVersion : v1
kind : Service
metadata :
name : flink-jobmanager-webui
namespace : flink
spec :
type : LoadBalancer
ports :
- name : webui
port : 80
targetPort : 8081
selector :
app : flink
component : jobmanager
---
# TaskManager Deployment
apiVersion : apps/v1
kind : Deployment
metadata :
name : flink-taskmanager
namespace : flink
spec :
replicas : 4 # Number of TaskManagers
selector :
matchLabels :
app : flink
component : taskmanager
template :
metadata :
labels :
app : flink
component : taskmanager
spec :
containers :
- name : taskmanager
image : flink:1.18.0
args : [ "taskmanager" ]
ports :
- containerPort : 6122
name : rpc
- containerPort : 6125
name : query-state
env :
- name : JOB_MANAGER_RPC_ADDRESS
value : flink-jobmanager
volumeMounts :
- name : flink-config-volume
mountPath : /opt/flink/conf
resources :
requests :
memory : "4Gi"
cpu : "2"
limits :
memory : "4Gi"
cpu : "4"
volumes :
- name : flink-config-volume
configMap :
name : flink-config
items :
- key : flink-conf.yaml
path : flink-conf.yaml
Deploy to Kubernetes :
# Create namespace
kubectl create namespace flink
# Apply configuration
kubectl apply -f flink-configuration.yaml
# Check status
kubectl get pods -n flink
kubectl get svc -n flink
# Access Flink UI
kubectl port-forward -n flink svc/flink-jobmanager-webui 8081:80
# Visit http://localhost:8081
Part 2: High Availability (HA)
Why HA Matters
Without HA :
JobManager dies → All jobs fail
Manual recovery required
Downtime = hours
With HA :
JobManager dies → Standby takes over in seconds
Jobs recover from last checkpoint automatically
Downtime = seconds
HA Configuration (Kubernetes)
# flink-conf.yaml
high-availability : kubernetes
high-availability.storageDir : s3://my-bucket/flink/ha
high-availability.cluster-id : flink-cluster-prod
# Kubernetes-specific
kubernetes.cluster-id : flink-cluster-prod
kubernetes.namespace : flink
# JobManager HA
jobmanager.rpc.address : flink-jobmanager
high-availability.jobmanager.port : 6123
Deploy HA JobManager :
apiVersion : apps/v1
kind : StatefulSet # Use StatefulSet for HA
metadata :
name : flink-jobmanager
namespace : flink
spec :
serviceName : flink-jobmanager-headless
replicas : 2 # Multiple JobManagers for HA
selector :
matchLabels :
app : flink
component : jobmanager
template :
metadata :
labels :
app : flink
component : jobmanager
spec :
containers :
- name : jobmanager
image : flink:1.18.0
args : [ "jobmanager" ]
env :
- name : HIGH_AVAILABILITY_ENABLED
value : "true"
# ... rest of configuration
HA with ZooKeeper (Alternative)
# flink-conf.yaml
high-availability : zookeeper
high-availability.storageDir : hdfs:///flink/ha
high-availability.zookeeper.quorum : zk1:2181,zk2:2181,zk3:2181
high-availability.zookeeper.path.root : /flink
high-availability.cluster-id : flink-cluster-prod
Testing HA
# Kill JobManager pod
kubectl delete pod flink-jobmanager-0 -n flink
# Watch recovery (should be < 30 seconds)
kubectl get pods -n flink -w
# Check job status (should continue from last checkpoint)
kubectl logs flink-jobmanager-1 -n flink
Part 3: State Backends & Checkpointing
State Backend Selection
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
// Option 1: Heap-based (< 10 GB state)
env . setStateBackend ( new FsStateBackend ( "s3://bucket/checkpoints" ));
// Option 2: RocksDB (> 10 GB state, production recommended)
env . setStateBackend ( new RocksDBStateBackend ( "s3://bucket/checkpoints" , true ));
// Enable incremental checkpoints (RocksDB only)
RocksDBStateBackend rocksDB = new RocksDBStateBackend ( "s3://bucket/checkpoints" , true );
rocksDB . enableIncrementalCheckpointing ( true );
env . setStateBackend (rocksDB);
Production Checkpointing Configuration
// Checkpointing
env . enableCheckpointing ( 60000 ); // Checkpoint every 60 seconds
CheckpointConfig config = env . getCheckpointConfig ();
// Exactly-once semantics
config . setCheckpointingMode ( CheckpointingMode . EXACTLY_ONCE );
// Min pause between checkpoints (prevent checkpoint overload)
config . setMinPauseBetweenCheckpoints ( 30000 ); // 30 seconds
// Checkpoint timeout
config . setCheckpointTimeout ( 600000 ); // 10 minutes
// Max concurrent checkpoints
config . setMaxConcurrentCheckpoints ( 1 );
// Externalize checkpoints (keep after job cancellation)
config . enableExternalizedCheckpoints (
ExternalizedCheckpointCleanup . RETAIN_ON_CANCELLATION
);
// Unaligned checkpoints (faster for high backpressure scenarios)
config . enableUnalignedCheckpoints ();
config . setAlignedCheckpointTimeout ( Duration . ofSeconds ( 30 ));
// Checkpoint storage
config . setCheckpointStorage ( "s3://my-bucket/flink/checkpoints" );
RocksDB Tuning (Production)
RocksDBStateBackend rocksDB = new RocksDBStateBackend ( "s3://bucket/checkpoints" , true );
rocksDB . enableIncrementalCheckpointing ( true );
// Tune RocksDB options
rocksDB . setOptions ( new OptionsFactory () {
@ Override
public DBOptions createDBOptions ( DBOptions currentOptions ) {
return currentOptions
. setMaxBackgroundJobs ( 4 ) // Parallel compaction
. setMaxOpenFiles ( - 1 ) // Unlimited open files
. setUseFsync ( false ); // Async writes
}
@ Override
public ColumnFamilyOptions createColumnOptions ( ColumnFamilyOptions currentOptions ) {
return currentOptions
. setCompactionStyle ( CompactionStyle . LEVEL )
. setWriteBufferSize ( 64 * 1024 * 1024 ) // 64 MB write buffer
. setMaxWriteBufferNumber ( 3 )
. setMinWriteBufferNumberToMerge ( 1 )
. setTableFormatConfig (
new BlockBasedTableConfig ()
. setBlockSize ( 16 * 1024 ) // 16 KB blocks
. setBlockCacheSize ( 256 * 1024 * 1024 ) // 256 MB cache
);
}
});
env . setStateBackend (rocksDB);
Savepoints (Manual Checkpoints)
# Trigger savepoint
flink savepoint < job-i d > s3://bucket/savepoints
# Cancel job with savepoint
flink cancel -s s3://bucket/savepoints < job-i d >
# Restore from savepoint
flink run -s s3://bucket/savepoints/savepoint-xxx \
-c com.example.Job job.jar
Use Cases :
Job upgrades (code changes)
Cluster migrations
Rescaling parallelism
Bug fixes with state preservation
Part 4: Monitoring & Observability
Flink Metrics System
Key Metrics to Monitor :
┌─────────────────────────────────────────────────┐
│ Critical Flink Metrics │
├─────────────────────────────────────────────────┤
│ 1. Throughput │
│ - numRecordsInPerSecond │
│ - numRecordsOutPerSecond │
│ │
│ 2. Latency │
│ - latency.source_id.operator_id.latency │
│ - latency.source_id.operator_id.latency_p99 │
│ │
│ 3. Checkpointing │
│ - lastCheckpointDuration │
│ - lastCheckpointSize │
│ - numberOfFailedCheckpoints │
│ │
│ 4. Backpressure │
│ - backPressureTimeMsPerSecond │
│ - idleTimeMsPerSecond │
│ - busyTimeMsPerSecond │
│ │
│ 5. State │
│ - managedMemoryUsed │
│ - rocksdb.live-sst-files-size │
│ - rocksdb.compaction-pending │
└─────────────────────────────────────────────────┘
Prometheus Integration
Flink Configuration :
# flink-conf.yaml
metrics.reporter.prom.class : org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port : 9249-9250
metrics.reporters : prom
Prometheus Scrape Config :
# prometheus.yml
scrape_configs :
- job_name : 'flink-jobmanager'
static_configs :
- targets : [ 'flink-jobmanager:9249' ]
labels :
component : 'jobmanager'
- job_name : 'flink-taskmanager'
kubernetes_sd_configs :
- role : pod
namespaces :
names :
- flink
relabel_configs :
- source_labels : [ __meta_kubernetes_pod_label_component ]
action : keep
regex : taskmanager
- source_labels : [ __address__ ]
action : replace
target_label : __address__
regex : ([^:]+)(?::\d+)?
replacement : $1:9249
Grafana Dashboard
Essential Panels :
{
"dashboard" : {
"title" : "Flink Monitoring" ,
"panels" : [
{
"title" : "Records In/Out per Second" ,
"targets" : [
{
"expr" : "rate(flink_taskmanager_job_task_operator_numRecordsIn[1m])"
},
{
"expr" : "rate(flink_taskmanager_job_task_operator_numRecordsOut[1m])"
}
]
},
{
"title" : "Checkpoint Duration" ,
"targets" : [
{
"expr" : "flink_jobmanager_job_lastCheckpointDuration"
}
]
},
{
"title" : "Backpressure" ,
"targets" : [
{
"expr" : "flink_taskmanager_job_task_backPressureTimeMsPerSecond"
}
]
},
{
"title" : "State Size" ,
"targets" : [
{
"expr" : "flink_taskmanager_job_task_operator_rocksdb_live_sst_files_size"
}
]
}
]
}
}
Logging
Log4j Configuration (log4j.properties):
# Root logger
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.file.ref = FileAppender
# Console appender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# File appender
appender.file.name = FileAppender
appender.file.type = RollingFile
appender.file.fileName = ${sys:log.file}
appender.file.filePattern = ${sys:log.file}.%i
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.file.policies.type = Policies
appender.file.policies.size.type = SizeBasedTriggeringPolicy
appender.file.policies.size.size = 100MB
appender.file.strategy.type = DefaultRolloverStrategy
appender.file.strategy.max = 10
# Suppress verbose logs
logger.akka.name = akka
logger.akka.level = WARN
logger.kafka.name = org.apache.kafka
logger.kafka.level = WARN
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = WARN
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = WARN
Centralized Logging (ELK Stack) :
# Fluent Bit DaemonSet for log collection
apiVersion : apps/v1
kind : DaemonSet
metadata :
name : fluent-bit
namespace : logging
spec :
selector :
matchLabels :
app : fluent-bit
template :
metadata :
labels :
app : fluent-bit
spec :
containers :
- name : fluent-bit
image : fluent/fluent-bit:latest
volumeMounts :
- name : varlog
mountPath : /var/log
- name : varlibdockercontainers
mountPath : /var/lib/docker/containers
readOnly : true
- name : fluent-bit-config
mountPath : /fluent-bit/etc/
volumes :
- name : varlog
hostPath :
path : /var/log
- name : varlibdockercontainers
hostPath :
path : /var/lib/docker/containers
- name : fluent-bit-config
configMap :
name : fluent-bit-config
Part 5: Backpressure Detection & Resolution
Understanding Backpressure
Symptoms :
Increasing checkpoint duration
Growing lag in Kafka consumer
High backPressureTimeMsPerSecond metric
Detection :
# Via Flink UI
# Navigate to: Job → Running Tasks → Backpressure
# Via metrics
curl http://jobmanager:8081/jobs/ < job-i d > /vertices/ < vertex-i d > /backpressure
Common Causes & Solutions
Cause 1: Slow Sink
// PROBLEM: Slow database writes
stream . addSink ( new JdbcSink (...)); // Blocks on every record
// SOLUTION: Batch writes
stream . addSink (
JdbcSink . sink (
"INSERT INTO table VALUES (?, ?)" ,
(statement, event) -> {
statement . setString ( 1 , event . getId ());
statement . setString ( 2 , event . getValue ());
},
JdbcExecutionOptions . builder ()
. withBatchSize ( 1000 ) // Batch 1000 records
. withBatchIntervalMs ( 200 ) // Or flush every 200ms
. build (),
new JdbcConnectionOptions. JdbcConnectionOptionsBuilder ()
. withUrl ( "jdbc:mysql://mysql:3306/db" )
. build ()
)
);
Cause 2: Inefficient Operator
// PROBLEM: Expensive per-record processing
stream . map (event -> {
// Heavy computation for EVERY record
return expensiveTransformation (event);
});
// SOLUTION: Use async I/O
AsyncDataStream . unorderedWait (
stream,
new AsyncDatabaseLookup (),
1000 , // Timeout (ms)
TimeUnit . MILLISECONDS ,
100 // Max concurrent requests
);
Cause 3: Skewed Keys
// PROBLEM: All events go to one partition
stream . keyBy (event -> "constant_key" ); // BAD!
// SOLUTION: Better key distribution
stream . keyBy (event -> event . getUserId ()); // Distribute by user
// Or: Add salt for hot keys
stream . keyBy (event -> {
String key = event . getUserId ();
if ( isHotKey (key)) {
return key + "-" + ( Math . random () * 10 ); // Split hot key
}
return key;
});
Cause 4: Insufficient Resources
# Increase parallelism
parallelism.default : 16 # Was 4
# Increase TaskManager resources
taskmanager.memory.process.size : 8192m # Was 4096m
taskmanager.numberOfTaskSlots : 8 # Was 4
Part 6: Resource Management & Tuning
Memory Configuration
# flink-conf.yaml
# JobManager memory
jobmanager.memory.process.size : 2048m
jobmanager.memory.jvm-overhead.fraction : 0.1
# TaskManager memory (total = 8 GB)
taskmanager.memory.process.size : 8192m
# Breakdown:
# - Flink managed memory (RocksDB, sorting, caching)
taskmanager.memory.managed.fraction : 0.4 # 40% = 3.2 GB
# - JVM heap (operators, user code)
taskmanager.memory.framework.heap.size : 128m
taskmanager.memory.task.heap.size : 2048m # 2 GB
# - Network buffers
taskmanager.memory.network.fraction : 0.1 # 10% = 819 MB
taskmanager.memory.network.min : 64m
taskmanager.memory.network.max : 1024m
# - JVM overhead
taskmanager.memory.jvm-overhead.fraction : 0.1 # 10%
Network Tuning
# Network buffer configuration
taskmanager.network.memory.fraction : 0.1
taskmanager.network.memory.min : 64mb
taskmanager.network.memory.max : 1gb
# Number of network buffers
taskmanager.network.numberOfBuffers : 2048
# Enable network compression (reduces network traffic)
taskmanager.network.compression.codec : LZ4
Parallelism & Scaling
// Set default parallelism
env . setParallelism ( 16 );
// Set operator-specific parallelism
stream
. map (...). setParallelism ( 8 ) // CPU-intensive
. keyBy (...)
. window (...)
. aggregate (...). setParallelism ( 32 ) // I/O-intensive
. addSink (...). setParallelism ( 4 ); // Limited sink capacity
Auto-Scaling (Kubernetes HPA) :
apiVersion : autoscaling/v2
kind : HorizontalPodAutoscaler
metadata :
name : flink-taskmanager-hpa
namespace : flink
spec :
scaleTargetRef :
apiVersion : apps/v1
kind : Deployment
name : flink-taskmanager
minReplicas : 2
maxReplicas : 20
metrics :
- type : Resource
resource :
name : cpu
target :
type : Utilization
averageUtilization : 70
- type : Pods
pods :
metric :
name : flink_taskmanager_job_task_backPressureTimeMsPerSecond
target :
type : AverageValue
averageValue : "100"
Part 7: Troubleshooting Common Issues
Issue 1: Out of Memory (OOM)
Symptoms :
java.lang.OutOfMemoryError: Java heap space
Diagnosis :
# Check heap usage
kubectl exec -it flink-taskmanager-0 -n flink -- jmap -heap < pi d >
# Heap dump
kubectl exec -it flink-taskmanager-0 -n flink -- \
jmap -dump:format=b,file=/tmp/heap.bin < pi d >
Solutions :
Increase heap size: taskmanager.memory.task.heap.size: 4096m
Use RocksDB (off-heap state): state.backend: rocksdb
Enable state TTL to clean old state
Reduce parallelism (fewer tasks per TaskManager)
Issue 2: Checkpoint Timeouts
Symptoms :
Checkpoint expired before completing
Solutions :
// Increase timeout
config . setCheckpointTimeout ( 900000 ); // 15 minutes
// Enable incremental checkpoints (RocksDB)
rocksDB . enableIncrementalCheckpointing ( true );
// Enable unaligned checkpoints (high backpressure)
config . enableUnalignedCheckpoints ();
// Reduce checkpoint interval
env . enableCheckpointing ( 120000 ); // Every 2 minutes instead of 1
Issue 3: Kafka Consumer Lag
Diagnosis :
# Check consumer lag
kafka-consumer-groups --bootstrap-server kafka:9092 \
--group flink-consumer-group --describe
Solutions :
// Increase Kafka source parallelism
Properties props = new Properties ();
props . setProperty ( "bootstrap.servers" , "kafka:9092" );
props . setProperty ( "group.id" , "flink-consumer" );
FlinkKafkaConsumer < String > consumer = new FlinkKafkaConsumer <>(
"my-topic" ,
new SimpleStringSchema (),
props
);
// Set source parallelism to match Kafka partition count
env . addSource (consumer). setParallelism ( 16 ); // 16 Kafka partitions
Issue 4: Job Stuck in Restart Loop
Symptoms :
Job is restarting (4/4 restarts within 5 minutes)
Diagnosis :
# Check logs
kubectl logs flink-jobmanager-0 -n flink | grep -i exception
# Check task failures
curl http://jobmanager:8081/jobs/ < job-i d > /exceptions
Solutions :
# Increase restart attempts
restart-strategy : exponential-delay
restart-strategy.exponential-delay.initial-backoff : 10s
restart-strategy.exponential-delay.max-backoff : 5min
restart-strategy.exponential-delay.backoff-multiplier : 2.0
restart-strategy.exponential-delay.attempts-before-reset-backoff : 10
# Or: Fixed delay
restart-strategy : fixed-delay
restart-strategy.fixed-delay.attempts : 10
restart-strategy.fixed-delay.delay : 30s
Part 8: Production Best Practices
1. Checkpointing Strategy
// Production configuration
env . enableCheckpointing ( 300000 ); // 5 minutes (not too frequent)
CheckpointConfig config = env . getCheckpointConfig ();
config . setCheckpointingMode ( CheckpointingMode . EXACTLY_ONCE );
config . setMinPauseBetweenCheckpoints ( 60000 ); // 1 min pause
config . setCheckpointTimeout ( 600000 ); // 10 min timeout
config . setMaxConcurrentCheckpoints ( 1 ); // One at a time
config . enableExternalizedCheckpoints (RETAIN_ON_CANCELLATION);
config . setTolerableCheckpointFailureNumber ( 3 ); // Tolerate 3 failures
2. State Backend Selection
// For state < 10 GB: FsStateBackend
env . setStateBackend ( new FsStateBackend ( "s3://bucket/checkpoints" ));
// For state > 10 GB: RocksDB with incremental checkpoints
RocksDBStateBackend rocksDB = new RocksDBStateBackend ( "s3://bucket/checkpoints" , true );
rocksDB . enableIncrementalCheckpointing ( true );
env . setStateBackend (rocksDB);
3. Monitoring & Alerting
Critical Alerts :
# Prometheus alert rules
groups :
- name : flink
rules :
# Checkpoint failure
- alert : FlinkCheckpointFailing
expr : flink_jobmanager_job_numberOfFailedCheckpoints > 0
for : 5m
annotations :
summary : "Flink job {{ $labels.job_name }} checkpoint failing"
# High backpressure
- alert : FlinkBackpressureHigh
expr : flink_taskmanager_job_task_backPressureTimeMsPerSecond > 500
for : 10m
annotations :
summary : "High backpressure in job {{ $labels.job_name }}"
# Consumer lag
- alert : FlinkKafkaLag
expr : flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max > 100000
for : 15m
annotations :
summary : "Kafka lag > 100K for job {{ $labels.job_name }}"
# Job down
- alert : FlinkJobDown
expr : flink_jobmanager_job_uptime == 0
for : 5m
annotations :
summary : "Flink job {{ $labels.job_name }} is down"
4. Security
# flink-conf.yaml
# Enable SSL/TLS
security.ssl.enabled : true
security.ssl.keystore : /path/to/keystore.jks
security.ssl.keystore-password : changeit
security.ssl.truststore : /path/to/truststore.jks
security.ssl.truststore-password : changeit
# Enable Kerberos (YARN/HDFS)
security.kerberos.login.keytab : /path/to/flink.keytab
security.kerberos.login.principal : flink@REALM
# Network encryption
akka.ssl.enabled : true
5. Cost Optimization
Spot Instances (AWS) :
# Use spot instances for TaskManagers
apiVersion : v1
kind : Node
metadata :
labels :
workload-type : flink-taskmanager
spec :
taints :
- key : workload-type
value : spot
effect : NoSchedule
# TaskManager deployment with spot toleration
apiVersion : apps/v1
kind : Deployment
metadata :
name : flink-taskmanager
spec :
template :
spec :
tolerations :
- key : workload-type
operator : Equal
value : spot
effect : NoSchedule
affinity :
nodeAffinity :
preferredDuringSchedulingIgnoredDuringExecution :
- weight : 1
preference :
matchExpressions :
- key : workload-type
operator : In
values :
- flink-taskmanager
Part 9: Upgrade Strategies
Blue-Green Deployment
# 1. Deploy new version (green)
kubectl apply -f flink-v2.yaml
# 2. Take savepoint from old version (blue)
flink savepoint < old-job-i d > s3://bucket/savepoints
# 3. Cancel old job
flink cancel < old-job-i d >
# 4. Start new job from savepoint
flink run -s s3://bucket/savepoints/savepoint-xxx \
-c com.example.Job job-v2.jar
# 5. Verify new job (green)
# 6. Remove old deployment (blue)
kubectl delete deployment flink-taskmanager-v1
Canary Deployment
# 1. Deploy canary (10% traffic)
kubectl scale deployment flink-taskmanager --replicas=10
kubectl apply -f flink-canary.yaml # 1 TaskManager
# 2. Monitor canary metrics
# 3. Gradually increase canary traffic
kubectl scale deployment flink-canary --replicas=5 # 50%
# 4. Full rollout
kubectl scale deployment flink-canary --replicas=10
kubectl delete deployment flink-taskmanager
Summary
What You’ve Mastered
✅ Deployment architectures (Standalone, YARN, Kubernetes)
✅ High availability with JobManager failover
✅ State backends and checkpointing strategies
✅ Production monitoring with Prometheus/Grafana
✅ Backpressure detection and resolution
✅ Resource tuning and auto-scaling
✅ Troubleshooting common production issues
✅ Security, cost optimization, upgrade strategies
Key Takeaways
HA is Non-Negotiable : Always enable HA in production
Monitor Everything : Checkpoints, backpressure, lag, state size
RocksDB for Large State : Use incremental checkpoints
Tune Memory Carefully : Heap, managed memory, network buffers
Test Failover : Kill pods and verify recovery
Next Module
Module 9: Capstone Project - Fraud Detection System Build a complete production-grade fraud detection system with Kafka, CEP, ML, and monitoring
Resources
Documentation
Practice : Deploy a Flink cluster to Kubernetes with HA, RocksDB state backend, and Prometheus monitoring. Run a stateful job, kill the JobManager, and verify automatic recovery!