Full Citation:
Paris Carbone, Gyula Fóra, Stephan Ewen, Seif Haridi, and Kostas Tsoumakos. 2017. “State Management in Apache Flink: Consistent Stateful Distributed Processing”. Proceedings of the VLDB Endowment, Vol. 10, No. 12.Published: VLDB 2017 (Very Large Data Bases) - Top-tier database conference
// WITHOUT TTL: State grows foreverMapState<String, UserProfile> userProfiles;// After 1 year: Billions of users, hundreds of GB of state// Even for users who haven't been active in months!
public class SessionWindowFunction extends KeyedProcessFunction<String, Event, SessionSummary> { private ValueState<List<Event>> sessionEvents; private ValueState<Long> sessionEnd; @Override public void open(Configuration config) { sessionEvents = getRuntimeContext().getState( new ValueStateDescriptor<>("session", new TypeHint<List<Event>>(){}.getTypeInfo()) ); sessionEnd = getRuntimeContext().getState( new ValueStateDescriptor<>("sessionEnd", Long.class) ); } @Override public void processElement(Event event, Context ctx, Collector<SessionSummary> out) throws Exception { List<Event> events = sessionEvents.value(); if (events == null) { events = new ArrayList<>(); } events.add(event); sessionEvents.update(events); // Session timeout: 30 minutes of inactivity long newSessionEnd = event.timestamp + 1800000; // 30 min // Delete old timer if exists Long oldSessionEnd = sessionEnd.value(); if (oldSessionEnd != null) { ctx.timerService().deleteEventTimeTimer(oldSessionEnd); } // Register new timer ctx.timerService().registerEventTimeTimer(newSessionEnd); sessionEnd.update(newSessionEnd); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<SessionSummary> out) throws Exception { // Timer fired: Session ended List<Event> events = sessionEvents.value(); SessionSummary summary = new SessionSummary( ctx.getCurrentKey(), events.size(), events.get(0).timestamp, events.get(events.size() - 1).timestamp ); out.collect(summary); // Clean up state sessionEvents.clear(); sessionEnd.clear(); }}
How It Works:
Copy
10:00:00 - User action → Set timer for 10:30:0010:15:00 - User action → Delete 10:30:00 timer, set timer for 10:45:0010:20:00 - User action → Delete 10:45:00 timer, set timer for 10:50:0010:50:00 - Timer fires → Emit session summary, clear state
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));// or S3:env.setStateBackend(new FsStateBackend("s3://my-bucket/flink/checkpoints"));
Characteristics:
State stored in Java heap (working state)
Checkpoints stored in distributed filesystem (HDFS, S3)
Max state size: Heap size (~10-50 GB per TaskManager)
Checkpoint size: Unlimited (stored in FS)
Use Case: Production with moderate state (< 10 GB per TaskManager).Pros:
RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://bucket/checkpoints", true);// Enable incremental checkpointing (CRITICAL for large state)rocksDB.enableIncrementalCheckpointing(true);// Tune RocksDB optionsrocksDB.setOptions(new OptionsFactory() { @Override public DBOptions createDBOptions(DBOptions currentOptions) { return currentOptions .setMaxBackgroundJobs(4) // Parallel compaction .setMaxOpenFiles(-1); // Unlimited open files } @Override public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions .setCompactionStyle(CompactionStyle.LEVEL) .setWriteBufferSize(64 * 1024 * 1024) // 64 MB write buffer .setMaxWriteBufferNumber(3); }});env.setStateBackend(rocksDB);
Production Recommendation:
Copy
// For state > 1 GB: Use RocksDB with incremental checkpointingRocksDBStateBackend backend = new RocksDBStateBackend("s3://bucket/checkpoints", true);backend.enableIncrementalCheckpointing(true);env.setStateBackend(backend);
From the paper “Lightweight Asynchronous Snapshots for Distributed Dataflows” (Carbone et al.):
“A checkpoint is a consistent snapshot of the distributed state of a Flink job, capturing the state of all operators and the position in the input streams.”
In Practice: Periodically save state to durable storage so on failure, you can resume from the last checkpoint.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing every 60 secondsenv.enableCheckpointing(60000);// Checkpoint configurationenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30s between checkpointsenv.getCheckpointConfig().setCheckpointTimeout(600000); // 10 min timeoutenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // One at a timeenv.getCheckpointConfig().enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION // Keep checkpoints on cancel);// Optional: Unaligned checkpoints (faster, more storage)env.getCheckpointConfig().enableUnalignedCheckpoints();
Guarantees: Each event affects state exactly once, even with failures.Mechanism: Aligns barriers across all input streams before checkpointing.Use Case: Most applications (correctness critical).
Guarantees: Events may be processed multiple times after failure.Mechanism: No barrier alignment, checkpoints faster.Use Case: High-throughput, idempotent operations.
RocksDBStateBackend backend = new RocksDBStateBackend("s3://bucket/checkpoints", true);backend.enableIncrementalCheckpointing(true); // Only checkpoint changesenv.setStateBackend(backend);
How It Works:
Copy
Checkpoint 1: Full state (10 GB)Checkpoint 2: Delta since checkpoint 1 (100 MB)Checkpoint 3: Delta since checkpoint 2 (150 MB)...
Benefit: For large state (> 1 GB), incremental checkpoints are 10-100x faster.Trade-off: Slightly slower recovery (must replay deltas).
public class Deduplicator extends RichFlatMapFunction<Event, Event> { private ValueState<Boolean> seen; @Override public void open(Configuration config) { ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class); // TTL: Keep seen IDs for 24 hours descriptor.enableTimeToLive( StateTtlConfig.newBuilder(Time.hours(24)) .cleanupIncrementally(1000, true) .build() ); seen = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Event event, Collector<Event> out) throws Exception { if (seen.value() == null) { seen.update(true); out.collect(event); // First time seeing this ID } // Else: Duplicate, drop it }}// Usagestream .keyBy(event -> event.id) // Partition by event ID .flatMap(new Deduplicator()) .print();
// EFFICIENT: Minimal state accesspublic void processElement(Event event) throws Exception { // Access all state at once Long count = counter.value(); Long sum = total.value(); // Update in-memory count += 1; sum += event.value; // Write back counter.update(count); total.update(sum);}
Q1: What’s the difference between keyed state and operator state?A:
Keyed state: Partitioned by key, each key has independent state (e.g., ValueState<T>). Used for most stateful operations.
Operator state: Shared across all events in an operator instance, not partitioned (e.g., Kafka offsets). Used for sources/sinks.
Q2: Explain state TTL. When would you use it?A:
State TTL automatically expires state after inactivity. Use it to:
Prevent state from growing unbounded
Remove stale user sessions
Comply with data retention policies (GDPR)
Q3: What’s the difference between MemoryStateBackend and RocksDBStateBackend?A:
MemoryStateBackend: Heap-based, fast, limited to ~GB. For development.
RocksDBStateBackend: Off-heap (RocksDB), slower, scales to TBs. For production with large state.
Q4: What is incremental checkpointing? Why is it important?A:
Incremental checkpointing (RocksDB only) saves only state changes since last checkpoint, not full state. Critical for large state (> 1 GB) to avoid long checkpoint times (which block processing).