Skip to main content

Production Flink: Deployment, Operations & Monitoring at Scale

Module Duration: 7-8 hours Focus: Kubernetes/YARN deployment, HA, state backends, monitoring, troubleshooting Prerequisites: Flink fundamentals, Kubernetes basics, distributed systems knowledge Hands-on Labs: Production cluster setup and monitoring

Introduction: From Development to Production

The Production Gap

Development (laptop):
# Easy mode
./bin/start-cluster.sh
./bin/flink run examples/streaming/WordCount.jar
Production (distributed cluster):
  • 100+ TaskManagers across multiple nodes
  • Terabytes of state in RocksDB
  • 24/7 uptime requirements (HA, failover)
  • Multi-tenant resource management
  • Metrics, logging, alerting for 1000s of jobs
  • Zero-downtime upgrades
This module teaches you how to bridge that gap.

Part 1: Deployment Architectures

┌─────────────────────────────────────────────────┐
│         Flink Cluster Architecture              │
├─────────────────────────────────────────────────┤
│                                                 │
│  ┌──────────────┐          ┌───────────────┐   │
│  │ JobManager   │          │ TaskManager 1 │   │
│  │ (Master)     │◄────────►│ (Worker)      │   │
│  │              │          │ - Task Slots  │   │
│  │ - Scheduling │          │ - State       │   │
│  │ - Checkpoint │          └───────────────┘   │
│  │ - Recovery   │                               │
│  └──────────────┘          ┌───────────────┐   │
│                            │ TaskManager 2 │   │
│                            │ (Worker)      │   │
│                            └───────────────┘   │
└─────────────────────────────────────────────────┘

Deployment Modes

1. Standalone Cluster (Development)

# Start cluster
./bin/start-cluster.sh

# Submit job
./bin/flink run -c com.example.Job job.jar

# Stop cluster
./bin/stop-cluster.sh
Pros: Simple setup Cons: No resource management, no HA

2. YARN Deployment (Hadoop Ecosystem)

Per-Job Mode (recommended):
# Submit job (YARN creates cluster just for this job)
./bin/flink run -m yarn-cluster \
  -yn 10 \                    # Number of TaskManagers
  -yjm 2048 \                 # JobManager memory (MB)
  -ytm 4096 \                 # TaskManager memory (MB)
  -ys 4 \                     # Slots per TaskManager
  -c com.example.Job job.jar
Session Mode (shared cluster):
# Start YARN session
./bin/yarn-session.sh -n 10 -jm 2048 -tm 4096

# Submit jobs to session
./bin/flink run -c com.example.Job1 job1.jar
./bin/flink run -c com.example.Job2 job2.jar

3. Kubernetes Deployment (Cloud Native)

Native Kubernetes Integration:
# flink-configuration.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: flink
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 4
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 2048m
    taskmanager.memory.process.size: 4096m
    parallelism.default: 4
    # State backend
    state.backend: rocksdb
    state.checkpoints.dir: s3://my-bucket/flink/checkpoints
    state.savepoints.dir: s3://my-bucket/flink/savepoints
    # HA configuration
    high-availability: kubernetes
    high-availability.storageDir: s3://my-bucket/flink/ha
---
# JobManager Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.18.0
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "2Gi"
            cpu: "2"
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
---
# JobManager Service
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---
# JobManager UI (LoadBalancer)
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-webui
  namespace: flink
spec:
  type: LoadBalancer
  ports:
  - name: webui
    port: 80
    targetPort: 8081
  selector:
    app: flink
    component: jobmanager
---
# TaskManager Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink
spec:
  replicas: 4  # Number of TaskManagers
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.18.0
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "4Gi"
            cpu: "4"
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
Deploy to Kubernetes:
# Create namespace
kubectl create namespace flink

# Apply configuration
kubectl apply -f flink-configuration.yaml

# Check status
kubectl get pods -n flink
kubectl get svc -n flink

# Access Flink UI
kubectl port-forward -n flink svc/flink-jobmanager-webui 8081:80
# Visit http://localhost:8081

Part 2: High Availability (HA)

Why HA Matters

Without HA:
  • JobManager dies → All jobs fail
  • Manual recovery required
  • Downtime = hours
With HA:
  • JobManager dies → Standby takes over in seconds
  • Jobs recover from last checkpoint automatically
  • Downtime = seconds

HA Configuration (Kubernetes)

# flink-conf.yaml
high-availability: kubernetes
high-availability.storageDir: s3://my-bucket/flink/ha
high-availability.cluster-id: flink-cluster-prod

# Kubernetes-specific
kubernetes.cluster-id: flink-cluster-prod
kubernetes.namespace: flink

# JobManager HA
jobmanager.rpc.address: flink-jobmanager
high-availability.jobmanager.port: 6123
Deploy HA JobManager:
apiVersion: apps/v1
kind: StatefulSet  # Use StatefulSet for HA
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  serviceName: flink-jobmanager-headless
  replicas: 2  # Multiple JobManagers for HA
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.18.0
        args: ["jobmanager"]
        env:
        - name: HIGH_AVAILABILITY_ENABLED
          value: "true"
        # ... rest of configuration

HA with ZooKeeper (Alternative)

# flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: flink-cluster-prod

Testing HA

# Kill JobManager pod
kubectl delete pod flink-jobmanager-0 -n flink

# Watch recovery (should be < 30 seconds)
kubectl get pods -n flink -w

# Check job status (should continue from last checkpoint)
kubectl logs flink-jobmanager-1 -n flink

Part 3: State Backends & Checkpointing

State Backend Selection

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Option 1: Heap-based (< 10 GB state)
env.setStateBackend(new FsStateBackend("s3://bucket/checkpoints"));

// Option 2: RocksDB (> 10 GB state, production recommended)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/checkpoints", true));

// Enable incremental checkpoints (RocksDB only)
RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://bucket/checkpoints", true);
rocksDB.enableIncrementalCheckpointing(true);
env.setStateBackend(rocksDB);

Production Checkpointing Configuration

// Checkpointing
env.enableCheckpointing(60000);  // Checkpoint every 60 seconds

CheckpointConfig config = env.getCheckpointConfig();

// Exactly-once semantics
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Min pause between checkpoints (prevent checkpoint overload)
config.setMinPauseBetweenCheckpoints(30000);  // 30 seconds

// Checkpoint timeout
config.setCheckpointTimeout(600000);  // 10 minutes

// Max concurrent checkpoints
config.setMaxConcurrentCheckpoints(1);

// Externalize checkpoints (keep after job cancellation)
config.enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// Unaligned checkpoints (faster for high backpressure scenarios)
config.enableUnalignedCheckpoints();
config.setAlignedCheckpointTimeout(Duration.ofSeconds(30));

// Checkpoint storage
config.setCheckpointStorage("s3://my-bucket/flink/checkpoints");

RocksDB Tuning (Production)

RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://bucket/checkpoints", true);
rocksDB.enableIncrementalCheckpointing(true);

// Tune RocksDB options
rocksDB.setOptions(new OptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions) {
        return currentOptions
            .setMaxBackgroundJobs(4)        // Parallel compaction
            .setMaxOpenFiles(-1)            // Unlimited open files
            .setUseFsync(false);            // Async writes
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
        return currentOptions
            .setCompactionStyle(CompactionStyle.LEVEL)
            .setWriteBufferSize(64 * 1024 * 1024)  // 64 MB write buffer
            .setMaxWriteBufferNumber(3)
            .setMinWriteBufferNumberToMerge(1)
            .setTableFormatConfig(
                new BlockBasedTableConfig()
                    .setBlockSize(16 * 1024)  // 16 KB blocks
                    .setBlockCacheSize(256 * 1024 * 1024)  // 256 MB cache
            );
    }
});

env.setStateBackend(rocksDB);

Savepoints (Manual Checkpoints)

# Trigger savepoint
flink savepoint <job-id> s3://bucket/savepoints

# Cancel job with savepoint
flink cancel -s s3://bucket/savepoints <job-id>

# Restore from savepoint
flink run -s s3://bucket/savepoints/savepoint-xxx \
  -c com.example.Job job.jar
Use Cases:
  • Job upgrades (code changes)
  • Cluster migrations
  • Rescaling parallelism
  • Bug fixes with state preservation

Part 4: Monitoring & Observability

Key Metrics to Monitor:
┌─────────────────────────────────────────────────┐
│         Critical Flink Metrics                  │
├─────────────────────────────────────────────────┤
│ 1. Throughput                                   │
│    - numRecordsInPerSecond                      │
│    - numRecordsOutPerSecond                     │
│                                                 │
│ 2. Latency                                      │
│    - latency.source_id.operator_id.latency      │
│    - latency.source_id.operator_id.latency_p99  │
│                                                 │
│ 3. Checkpointing                                │
│    - lastCheckpointDuration                     │
│    - lastCheckpointSize                         │
│    - numberOfFailedCheckpoints                  │
│                                                 │
│ 4. Backpressure                                 │
│    - backPressureTimeMsPerSecond                │
│    - idleTimeMsPerSecond                        │
│    - busyTimeMsPerSecond                        │
│                                                 │
│ 5. State                                        │
│    - managedMemoryUsed                          │
│    - rocksdb.live-sst-files-size                │
│    - rocksdb.compaction-pending                 │
└─────────────────────────────────────────────────┘

Prometheus Integration

Flink Configuration:
# flink-conf.yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249-9250
metrics.reporters: prom
Prometheus Scrape Config:
# prometheus.yml
scrape_configs:
  - job_name: 'flink-jobmanager'
    static_configs:
      - targets: ['flink-jobmanager:9249']
        labels:
          component: 'jobmanager'

  - job_name: 'flink-taskmanager'
    kubernetes_sd_configs:
      - role: pod
        namespaces:
          names:
            - flink
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_component]
        action: keep
        regex: taskmanager
      - source_labels: [__address__]
        action: replace
        target_label: __address__
        regex: ([^:]+)(?::\d+)?
        replacement: $1:9249

Grafana Dashboard

Essential Panels:
{
  "dashboard": {
    "title": "Flink Monitoring",
    "panels": [
      {
        "title": "Records In/Out per Second",
        "targets": [
          {
            "expr": "rate(flink_taskmanager_job_task_operator_numRecordsIn[1m])"
          },
          {
            "expr": "rate(flink_taskmanager_job_task_operator_numRecordsOut[1m])"
          }
        ]
      },
      {
        "title": "Checkpoint Duration",
        "targets": [
          {
            "expr": "flink_jobmanager_job_lastCheckpointDuration"
          }
        ]
      },
      {
        "title": "Backpressure",
        "targets": [
          {
            "expr": "flink_taskmanager_job_task_backPressureTimeMsPerSecond"
          }
        ]
      },
      {
        "title": "State Size",
        "targets": [
          {
            "expr": "flink_taskmanager_job_task_operator_rocksdb_live_sst_files_size"
          }
        ]
      }
    ]
  }
}

Logging

Log4j Configuration (log4j.properties):
# Root logger
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.file.ref = FileAppender

# Console appender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# File appender
appender.file.name = FileAppender
appender.file.type = RollingFile
appender.file.fileName = ${sys:log.file}
appender.file.filePattern = ${sys:log.file}.%i
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.file.policies.type = Policies
appender.file.policies.size.type = SizeBasedTriggeringPolicy
appender.file.policies.size.size = 100MB
appender.file.strategy.type = DefaultRolloverStrategy
appender.file.strategy.max = 10

# Suppress verbose logs
logger.akka.name = akka
logger.akka.level = WARN
logger.kafka.name = org.apache.kafka
logger.kafka.level = WARN
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = WARN
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = WARN
Centralized Logging (ELK Stack):
# Fluent Bit DaemonSet for log collection
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: logging
spec:
  selector:
    matchLabels:
      app: fluent-bit
  template:
    metadata:
      labels:
        app: fluent-bit
    spec:
      containers:
      - name: fluent-bit
        image: fluent/fluent-bit:latest
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc/
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config

Part 5: Backpressure Detection & Resolution

Understanding Backpressure

Symptoms:
  • Increasing checkpoint duration
  • Growing lag in Kafka consumer
  • High backPressureTimeMsPerSecond metric
Detection:
# Via Flink UI
# Navigate to: Job → Running Tasks → Backpressure

# Via metrics
curl http://jobmanager:8081/jobs/<job-id>/vertices/<vertex-id>/backpressure

Common Causes & Solutions

Cause 1: Slow Sink

// PROBLEM: Slow database writes
stream.addSink(new JdbcSink(...));  // Blocks on every record

// SOLUTION: Batch writes
stream.addSink(
    JdbcSink.sink(
        "INSERT INTO table VALUES (?, ?)",
        (statement, event) -> {
            statement.setString(1, event.getId());
            statement.setString(2, event.getValue());
        },
        JdbcExecutionOptions.builder()
            .withBatchSize(1000)  // Batch 1000 records
            .withBatchIntervalMs(200)  // Or flush every 200ms
            .build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withUrl("jdbc:mysql://mysql:3306/db")
            .build()
    )
);

Cause 2: Inefficient Operator

// PROBLEM: Expensive per-record processing
stream.map(event -> {
    // Heavy computation for EVERY record
    return expensiveTransformation(event);
});

// SOLUTION: Use async I/O
AsyncDataStream.unorderedWait(
    stream,
    new AsyncDatabaseLookup(),
    1000,  // Timeout (ms)
    TimeUnit.MILLISECONDS,
    100    // Max concurrent requests
);

Cause 3: Skewed Keys

// PROBLEM: All events go to one partition
stream.keyBy(event -> "constant_key");  // BAD!

// SOLUTION: Better key distribution
stream.keyBy(event -> event.getUserId());  // Distribute by user

// Or: Add salt for hot keys
stream.keyBy(event -> {
    String key = event.getUserId();
    if (isHotKey(key)) {
        return key + "-" + (Math.random() * 10);  // Split hot key
    }
    return key;
});

Cause 4: Insufficient Resources

# Increase parallelism
parallelism.default: 16  # Was 4

# Increase TaskManager resources
taskmanager.memory.process.size: 8192m  # Was 4096m
taskmanager.numberOfTaskSlots: 8  # Was 4

Part 6: Resource Management & Tuning

Memory Configuration

# flink-conf.yaml

# JobManager memory
jobmanager.memory.process.size: 2048m
jobmanager.memory.jvm-overhead.fraction: 0.1

# TaskManager memory (total = 8 GB)
taskmanager.memory.process.size: 8192m

# Breakdown:
# - Flink managed memory (RocksDB, sorting, caching)
taskmanager.memory.managed.fraction: 0.4  # 40% = 3.2 GB

# - JVM heap (operators, user code)
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.task.heap.size: 2048m  # 2 GB

# - Network buffers
taskmanager.memory.network.fraction: 0.1  # 10% = 819 MB
taskmanager.memory.network.min: 64m
taskmanager.memory.network.max: 1024m

# - JVM overhead
taskmanager.memory.jvm-overhead.fraction: 0.1  # 10%

Network Tuning

# Network buffer configuration
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb

# Number of network buffers
taskmanager.network.numberOfBuffers: 2048

# Enable network compression (reduces network traffic)
taskmanager.network.compression.codec: LZ4

Parallelism & Scaling

// Set default parallelism
env.setParallelism(16);

// Set operator-specific parallelism
stream
    .map(...).setParallelism(8)   // CPU-intensive
    .keyBy(...)
    .window(...)
    .aggregate(...).setParallelism(32)  // I/O-intensive
    .addSink(...).setParallelism(4);     // Limited sink capacity
Auto-Scaling (Kubernetes HPA):
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: flink-taskmanager-hpa
  namespace: flink
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: flink-taskmanager
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: flink_taskmanager_job_task_backPressureTimeMsPerSecond
      target:
        type: AverageValue
        averageValue: "100"

Part 7: Troubleshooting Common Issues

Issue 1: Out of Memory (OOM)

Symptoms:
java.lang.OutOfMemoryError: Java heap space
Diagnosis:
# Check heap usage
kubectl exec -it flink-taskmanager-0 -n flink -- jmap -heap <pid>

# Heap dump
kubectl exec -it flink-taskmanager-0 -n flink -- \
  jmap -dump:format=b,file=/tmp/heap.bin <pid>
Solutions:
  1. Increase heap size: taskmanager.memory.task.heap.size: 4096m
  2. Use RocksDB (off-heap state): state.backend: rocksdb
  3. Enable state TTL to clean old state
  4. Reduce parallelism (fewer tasks per TaskManager)

Issue 2: Checkpoint Timeouts

Symptoms:
Checkpoint expired before completing
Solutions:
// Increase timeout
config.setCheckpointTimeout(900000);  // 15 minutes

// Enable incremental checkpoints (RocksDB)
rocksDB.enableIncrementalCheckpointing(true);

// Enable unaligned checkpoints (high backpressure)
config.enableUnalignedCheckpoints();

// Reduce checkpoint interval
env.enableCheckpointing(120000);  // Every 2 minutes instead of 1

Issue 3: Kafka Consumer Lag

Diagnosis:
# Check consumer lag
kafka-consumer-groups --bootstrap-server kafka:9092 \
  --group flink-consumer-group --describe
Solutions:
// Increase Kafka source parallelism
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka:9092");
props.setProperty("group.id", "flink-consumer");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
    "my-topic",
    new SimpleStringSchema(),
    props
);

// Set source parallelism to match Kafka partition count
env.addSource(consumer).setParallelism(16);  // 16 Kafka partitions

Issue 4: Job Stuck in Restart Loop

Symptoms:
Job is restarting (4/4 restarts within 5 minutes)
Diagnosis:
# Check logs
kubectl logs flink-jobmanager-0 -n flink | grep -i exception

# Check task failures
curl http://jobmanager:8081/jobs/<job-id>/exceptions
Solutions:
# Increase restart attempts
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10s
restart-strategy.exponential-delay.max-backoff: 5min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.attempts-before-reset-backoff: 10

# Or: Fixed delay
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 30s

Part 8: Production Best Practices

1. Checkpointing Strategy

// Production configuration
env.enableCheckpointing(300000);  // 5 minutes (not too frequent)

CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(60000);  // 1 min pause
config.setCheckpointTimeout(600000);  // 10 min timeout
config.setMaxConcurrentCheckpoints(1);  // One at a time
config.enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
config.setTolerableCheckpointFailureNumber(3);  // Tolerate 3 failures

2. State Backend Selection

// For state < 10 GB: FsStateBackend
env.setStateBackend(new FsStateBackend("s3://bucket/checkpoints"));

// For state > 10 GB: RocksDB with incremental checkpoints
RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://bucket/checkpoints", true);
rocksDB.enableIncrementalCheckpointing(true);
env.setStateBackend(rocksDB);

3. Monitoring & Alerting

Critical Alerts:
# Prometheus alert rules
groups:
- name: flink
  rules:
  # Checkpoint failure
  - alert: FlinkCheckpointFailing
    expr: flink_jobmanager_job_numberOfFailedCheckpoints > 0
    for: 5m
    annotations:
      summary: "Flink job {{ $labels.job_name }} checkpoint failing"

  # High backpressure
  - alert: FlinkBackpressureHigh
    expr: flink_taskmanager_job_task_backPressureTimeMsPerSecond > 500
    for: 10m
    annotations:
      summary: "High backpressure in job {{ $labels.job_name }}"

  # Consumer lag
  - alert: FlinkKafkaLag
    expr: flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max > 100000
    for: 15m
    annotations:
      summary: "Kafka lag > 100K for job {{ $labels.job_name }}"

  # Job down
  - alert: FlinkJobDown
    expr: flink_jobmanager_job_uptime == 0
    for: 5m
    annotations:
      summary: "Flink job {{ $labels.job_name }} is down"

4. Security

# flink-conf.yaml

# Enable SSL/TLS
security.ssl.enabled: true
security.ssl.keystore: /path/to/keystore.jks
security.ssl.keystore-password: changeit
security.ssl.truststore: /path/to/truststore.jks
security.ssl.truststore-password: changeit

# Enable Kerberos (YARN/HDFS)
security.kerberos.login.keytab: /path/to/flink.keytab
security.kerberos.login.principal: flink@REALM

# Network encryption
akka.ssl.enabled: true

5. Cost Optimization

Spot Instances (AWS):
# Use spot instances for TaskManagers
apiVersion: v1
kind: Node
metadata:
  labels:
    workload-type: flink-taskmanager
spec:
  taints:
  - key: workload-type
    value: spot
    effect: NoSchedule

# TaskManager deployment with spot toleration
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  template:
    spec:
      tolerations:
      - key: workload-type
        operator: Equal
        value: spot
        effect: NoSchedule
      affinity:
        nodeAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 1
            preference:
              matchExpressions:
              - key: workload-type
                operator: In
                values:
                - flink-taskmanager

Part 9: Upgrade Strategies

Blue-Green Deployment

# 1. Deploy new version (green)
kubectl apply -f flink-v2.yaml

# 2. Take savepoint from old version (blue)
flink savepoint <old-job-id> s3://bucket/savepoints

# 3. Cancel old job
flink cancel <old-job-id>

# 4. Start new job from savepoint
flink run -s s3://bucket/savepoints/savepoint-xxx \
  -c com.example.Job job-v2.jar

# 5. Verify new job (green)
# 6. Remove old deployment (blue)
kubectl delete deployment flink-taskmanager-v1

Canary Deployment

# 1. Deploy canary (10% traffic)
kubectl scale deployment flink-taskmanager --replicas=10
kubectl apply -f flink-canary.yaml  # 1 TaskManager

# 2. Monitor canary metrics
# 3. Gradually increase canary traffic
kubectl scale deployment flink-canary --replicas=5  # 50%

# 4. Full rollout
kubectl scale deployment flink-canary --replicas=10
kubectl delete deployment flink-taskmanager

Summary

What You’ve Mastered

✅ Deployment architectures (Standalone, YARN, Kubernetes) ✅ High availability with JobManager failover ✅ State backends and checkpointing strategies ✅ Production monitoring with Prometheus/Grafana ✅ Backpressure detection and resolution ✅ Resource tuning and auto-scaling ✅ Troubleshooting common production issues ✅ Security, cost optimization, upgrade strategies

Key Takeaways

  1. HA is Non-Negotiable: Always enable HA in production
  2. Monitor Everything: Checkpoints, backpressure, lag, state size
  3. RocksDB for Large State: Use incremental checkpoints
  4. Tune Memory Carefully: Heap, managed memory, network buffers
  5. Test Failover: Kill pods and verify recovery

Next Module

Module 9: Capstone Project - Fraud Detection System

Build a complete production-grade fraud detection system with Kafka, CEP, ML, and monitoring

Resources

Documentation

Tools

Practice: Deploy a Flink cluster to Kubernetes with HA, RocksDB state backend, and Prometheus monitoring. Run a stateful job, kill the JobManager, and verify automatic recovery!