Skip to main content

Demystifying the Dataflow Model: True Stream Processing with Flink

Module Duration: 3-4 hours Focus: Research foundations + Flink’s streaming architecture Prerequisites: Basic distributed systems, Java or Scala Hands-on Labs: 8+ streaming examples

Introduction: The Streaming Revolution

The Problem That Needed Solving

In 2013, Google faced a critical challenge: existing batch processing frameworks (like MapReduce and Spark) couldn’t handle real-time streaming data properly. The industry had two bad options:
  1. Batch Processing (MapReduce, early Spark):
    • Wait hours for results
    • Can’t handle continuous data
    • Good for historical analysis, terrible for real-time
  2. Micro-Batching (Spark Streaming):
    • Chop stream into tiny batches
    • Latency measured in seconds (at best)
    • Pretends batches are streams
    • Event time processing is a hack
What was missing? A framework that treats streaming as the default, not an afterthought.
Critical Distinction:Micro-batching (Spark Streaming):
Stream → [Batch 1] → [Batch 2] → [Batch 3] → Results
         2 seconds   2 seconds   2 seconds
         (minimum latency: 2 seconds)
True Streaming (Flink):
Stream → Process → Process → Process → Results
         ↓          ↓          ↓         (millisecond latency)
       event     event     event

Part 1: The Research Foundation - Google’s Dataflow Model

The Paper That Changed Everything

Full Citation: Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, Rafael J. Fernández-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Frances Perry, Eric Schmidt, and Sam Whittle. 2015. “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing”. VLDB ‘15.

The Authors: Google’s Stream Processing Team

  • Tyler Akidau (Lead Author): Principal Engineer at Google, later joined Confluent. Creator of Apache Beam. Author of the book “Streaming Systems”.
  • Craig Chambers: Senior Staff Engineer at Google, previously led Flumejava project.
  • Robert Bradshaw: Tech Lead for Google Cloud Dataflow, Apache Beam PMC Chair.
  • Team Background: This wasn’t academic research - it was the formalization of Google’s internal streaming infrastructure (MillWheel + FlumeJava).

Publication Venue and Impact

Conference: VLDB 2015 (Very Large Data Bases) - the top database conference Impact Metrics:
  • 5,000+ citations (as of 2024)
  • Industry Adoption: Led to Apache Beam, influenced Flink design
  • Google’s Implementation: Powers Google Cloud Dataflow
  • Academic Recognition: Foundational paper for stream processing research

Historical Context: 2013-2015

Before the Dataflow Model:
  • MapReduce dominated batch processing
  • Storm provided low-latency streaming (but no correctness guarantees)
  • Spark Streaming used micro-batching (high latency)
  • No framework handled out-of-order, unbounded data with event time semantics
The Problem Statement from the Paper:
“Existing systems force an uncomfortable choice: either sacrifice latency (batch systems), or sacrifice correctness (streaming systems that ignore event time).”

Part 2: Core Concepts from the Dataflow Model

The Four Questions Framework

The Dataflow Model paper introduced a simple but profound framework for reasoning about stream processing. Every streaming pipeline must answer:

1. WHAT are you computing?

// Example: Counting events by type
DataStream<Event> events = ...;

// WHAT: Sum of counts per event type
DataStream<Tuple2<String, Integer>> counts = events
    .map(event -> Tuple2.of(event.type, 1))
    .keyBy(0)
    .sum(1);
This is the transformation logic - the “business logic” of your pipeline.

2. WHERE in event time are you computing it?

// WHERE: 5-minute tumbling windows
DataStream<Tuple2<String, Integer>> windowedCounts = events
    .map(event -> Tuple2.of(event.type, 1))
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum(1);
Windowing divides the infinite stream into finite chunks for aggregation.

3. WHEN in processing time do you materialize results?

// WHEN: Emit results when watermark passes end of window
// (This is the DEFAULT trigger in Flink)

// Or: Emit early results every 1 minute, then final result at watermark
DataStream<Tuple2<String, Integer>> earlyResults = events
    .map(event -> Tuple2.of(event.type, 1))
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .trigger(
        ContinuousEventTimeTrigger.of(Time.minutes(1))
    )
    .sum(1);
Triggers control when windows emit results (early, on-time, late).

4. HOW do refinements relate to each other?

// HOW: Accumulating mode (updates include previous values)
// vs. Discarding mode (each update is independent)

// Accumulating: [5, 12, 18]  (cumulative totals)
// Discarding:   [5,  7,  6]  (deltas)
Accumulation mode determines whether results are cumulative or deltas.

Deep Dive: Event Time vs Processing Time

This is the most important concept in stream processing.

Event Time

Definition: When an event actually occurred (according to embedded timestamp).
class SensorReading {
    String sensorId;
    double temperature;
    long eventTime;  // When sensor took reading (milliseconds since epoch)
}

// Event created at: 2024-01-15 10:00:00 (event time)
// Arrived at Flink: 2024-01-15 10:05:23 (processing time)
// Delay: 5 minutes 23 seconds

Processing Time

Definition: When an event is processed by the streaming system.
// Processing time example (Flink assigns timestamp on arrival)
DataStream<SensorReading> stream = env
    .addSource(new KafkaSource<>(...))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.forMonotonousTimestamps()
    );

// Each event gets processing time = System.currentTimeMillis()

Why Event Time is Critical

Scenario: Mobile app usage analytics
User opens app at:     10:00:00 (EVENT TIME)
User goes into tunnel: 10:00:05 (loses connection)
User exits tunnel:     10:15:30 (connection restored)
Event arrives at DC:   10:15:35 (PROCESSING TIME)

Delay: 15 minutes 35 seconds!
With Processing Time (WRONG):
Event goes into 10:15 window → Metrics show spike at 10:15
Reality: User was active at 10:00, not 10:15!
With Event Time (CORRECT):
Event goes into 10:00 window → Metrics accurately reflect 10:00 usage
Even though we processed it at 10:15!
Rule of Thumb:Use Event Time when:
  • Events can arrive out-of-order
  • Delays are unpredictable (mobile, IoT, distributed systems)
  • Correctness matters more than simplicity
Use Processing Time when:
  • Events arrive in order
  • You control the entire pipeline
  • Latency is negligible
  • You’re aggregating server logs from a single machine

Part 3: Watermarks - The Core Innovation

What are Watermarks?

From the Dataflow Model paper:
“A watermark is a monotonically increasing timestamp indicating that no more events with timestamps less than the watermark will arrive.”
In Plain English: A watermark is the streaming system saying: “I’m confident that all events with timestamps before time T have arrived. I can now safely compute results for time T.”

Visual Example

Timeline (Event Time):
10:00  10:01  10:02  10:03  10:04  10:05
  ↓      ↓      ↓      ↓      ↓      ↓

Events Arriving (Processing Time):
10:00 → [event@10:00]  Watermark: 10:00
10:01 → [event@10:01]  Watermark: 10:01
10:02 → [event@10:02]  Watermark: 10:02
10:03 → [event@10:01]  Watermark: 10:02 (LATE EVENT! Watermark doesn't move backward)
10:04 → [event@10:03]  Watermark: 10:03
10:05 → [event@10:04]  Watermark: 10:04
What Happens:
  • At processing time 10:02, watermark reaches 10:02
  • This means: “All events with timestamps ≤ 10:02 have arrived”
  • Windows covering time ≤ 10:02 can now emit results
  • At processing time 10:03, event with timestamp 10:01 arrives (LATE!)
  • Watermark stays at 10:02 (watermarks are monotonic)

Watermark Strategies

Perfect Watermarks (Rare in Practice)

// Example: Reading from a Kafka topic with known partitions
// and timestamps are perfectly ordered within each partition

WatermarkStrategy<Event> perfectStrategy =
    WatermarkStrategy
        .forMonotonousTimestamps()  // Assumes perfect order
        .withTimestampAssigner((event, timestamp) -> event.eventTime);
When to Use: Controlled environments, pre-sorted data, append-only logs. Downside: One late event = watermark stuck forever!

Bounded Out-of-Orderness (Production Standard)

// Allow up to 10 seconds of out-of-orderness
WatermarkStrategy<Event> boundedStrategy =
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((event, timestamp) -> event.eventTime);

// Watermark = max_seen_timestamp - 10_seconds
How it Works:
Events arrive with timestamps: [10:00, 10:05, 10:03, 10:08]

After event@10:00: Watermark = 10:00 - 10s = 09:50
After event@10:05: Watermark = 10:05 - 10s = 09:55
After event@10:03: Watermark = 10:05 - 10s = 09:55 (no change)
After event@10:08: Watermark = 10:08 - 10s = 09:58
When to Use: Most production scenarios (IoT, mobile, distributed logs).

Custom Watermark Generators

public class SensorWatermarkGenerator implements WatermarkGenerator<SensorReading> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 5000; // 5 seconds

    @Override
    public void onEvent(SensorReading event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, event.getEventTime());
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Emit watermark every 200ms (default)
        output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
    }
}

// Usage
WatermarkStrategy<SensorReading> customStrategy =
    WatermarkStrategy
        .forGenerator(ctx -> new SensorWatermarkGenerator())
        .withTimestampAssigner((event, ts) -> event.getEventTime());

Late Events and Allowed Lateness

DataStream<Tuple2<String, Integer>> counts = events
    .keyBy(event -> event.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Accept events up to 1 min late
    .sideOutputLateData(lateDataTag)   // Capture very late events
    .sum("value");

// Get the very late events that were rejected
DataStream<Event> veryLateEvents = counts.getSideOutput(lateDataTag);
Behavior:
  1. Watermark passes end of window → Window closes and emits result
  2. Late events (within allowed lateness) → Window reopens, updates result
  3. Very late events (beyond allowed lateness) → Sent to side output

The Streaming Landscape (2015-2024)

FrameworkProcessing ModelLatencyEvent TimeExactly-OnceBest For
Apache FlinkTrue streamingMillisecondsNativeYesReal-time analytics, CEP
Spark StreamingMicro-batchingSecondsAdd-onYesBatch + streaming hybrid
Apache StormTrue streamingMillisecondsLimitedAt-least-onceLow-latency, simple apps
Kafka StreamsTrue streamingMillisecondsNativeYesKafka-centric apps
Google DataflowTrue streamingMillisecondsNativeYesGCP-only

Flink’s Unique Advantages

1. True Streaming (Not Micro-Batching)

// Spark Streaming (Micro-batching)
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Minimum latency: 2 seconds (batch interval)

// Flink (True Streaming)
DataStream<String> stream = env.socketTextStream("localhost", 9999);
stream.map(new MapFunction<String, String>() {
    public String map(String value) {
        return value.toUpperCase();
    }
});
// Latency: Milliseconds (per-record processing)

2. Stateful Stream Processing

// Flink's Stateful Processing
public class CountWithState extends RichFlatMapFunction<Event, Tuple2<String, Long>> {
    private transient ValueState<Long> count;

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Long> descriptor =
            new ValueStateDescriptor<>("count", Long.class, 0L);
        count = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Event event, Collector<Tuple2<String, Long>> out) throws Exception {
        long currentCount = count.value();
        currentCount += 1;
        count.update(currentCount);
        out.collect(Tuple2.of(event.key, currentCount));
    }
}

// State is:
// - Fault-tolerant (checkpointed)
// - Scalable (partitioned by key)
// - Queryable (can be accessed externally)
Storm doesn’t have built-in managed state. Spark has state but with higher latency.

3. Exactly-Once Semantics with Chandy-Lamport

From the research paper “Lightweight Asynchronous Snapshots for Distributed Dataflows” (Paris Carbone et al., 2015): Flink implements the Chandy-Lamport distributed snapshot algorithm for exactly-once processing. How it Works (Simplified):
1. JobManager injects "barrier" into source
2. Barrier flows through the DAG with data
3. When operator receives barrier from all inputs:
   - Snapshots its state to durable storage
   - Forwards barrier downstream
4. When all operators snapshot: Checkpoint complete
5. On failure: Restore from last checkpoint
Result: Even if a node crashes mid-processing, each event is processed exactly once.
// Enable checkpointing
env.enableCheckpointing(60000); // Every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Flink guarantees:
// - No duplicates in output
// - No lost events
// - Consistent state

Components

┌─────────────────────────────────────────┐
│          Client Application             │
│    (Submits JobGraph to Flink)          │
└──────────────┬──────────────────────────┘


┌──────────────────────────────────────────┐
│         JobManager (Master)              │
│  - Coordinates execution                 │
│  - Tracks checkpoints                    │
│  - Assigns tasks to TaskManagers         │
└──────────┬───────────────────────────────┘


    ┌──────────────┐
    │  ZooKeeper   │  (HA Coordination)
    └──────────────┘


┌──────────┴───────────────────────────────┐
│                                          │
│  ┌────────────────┐  ┌────────────────┐ │
│  │ TaskManager 1  │  │ TaskManager 2  │ │
│  │                │  │                │ │
│  │ [Task] [Task]  │  │ [Task] [Task]  │ │
│  │ [Task] [Task]  │  │ [Task] [Task]  │ │
│  └────────────────┘  └────────────────┘ │
│        Workers (Execute Tasks)          │
└─────────────────────────────────────────┘

JobManager (Master)

Responsibilities:
  1. Job Scheduling: Converts logical plan to physical execution plan
  2. Checkpoint Coordination: Triggers and tracks checkpoints
  3. Resource Management: Allocates slots to tasks
  4. Failure Recovery: Restarts failed tasks from checkpoints
High Availability:
# Flink HA configuration
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha/
Multiple standby JobManagers, one active. On failure, standby takes over.

TaskManager (Worker)

Responsibilities:
  1. Execute Tasks: Run operator instances (map, filter, window, etc.)
  2. Buffer Data: Manage network buffers for data exchange
  3. Maintain State: Store keyed state (backed by RocksDB or heap)
  4. Checkpoint State: Snapshot state to durable storage
Configuration:
# TaskManager resources
taskmanager.numberOfTaskSlots: 4  # 4 parallel tasks per TM
taskmanager.memory.process.size: 4g
taskmanager.memory.managed.size: 1g  # For RocksDB state backend

Data Exchange: Task Slots and Parallelism

// Set parallelism
env.setParallelism(4);

DataStream<String> stream = env.socketTextStream("localhost", 9999);

stream
    .flatMap(new Tokenizer())      // 4 parallel instances
    .keyBy(value -> value.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .sum(1)                         // 4 parallel instances
    .print();                       // 4 parallel instances
Task Slot Allocation:
TaskManager 1 (4 slots):
  Slot 0: [Source → FlatMap → KeyBy → Window → Sum → Sink]  (Pipeline 1)
  Slot 1: [Source → FlatMap → KeyBy → Window → Sum → Sink]  (Pipeline 2)
  Slot 2: [Source → FlatMap → KeyBy → Window → Sum → Sink]  (Pipeline 3)
  Slot 3: [Source → FlatMap → KeyBy → Window → Sum → Sink]  (Pipeline 4)
Flink pipelines operators into single tasks when possible (called “operator chaining”).

Part 6: Hands-On Examples

Example 1: Word Count with Event Time

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class EventTimeWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set event time as time characteristic
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> text = env.socketTextStream("localhost", 9999);

        // Assign timestamps and watermarks
        DataStream<Tuple2<String, Long>> withTimestamps = text
            .map(line -> Tuple2.of(line, System.currentTimeMillis()))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((event, timestamp) -> event.f1)
            );

        // Word count with 10-second tumbling windows
        DataStream<Tuple2<String, Integer>> wordCounts = withTimestamps
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .sum(1);

        wordCounts.print();
        env.execute("Event Time Word Count");
    }

    public static class Tokenizer implements FlatMapFunction<Tuple2<String, Long>, Tuple2<String, Integer>> {
        @Override
        public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, Integer>> out) {
            String[] words = value.f0.toLowerCase().split("\\W+");
            for (String word : words) {
                if (word.length() > 0) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }
    }
}
Output:
(apache, 5)   [Window: 10:00:00 - 10:00:10]
(flink, 8)    [Window: 10:00:00 - 10:00:10]
(streaming, 3) [Window: 10:00:00 - 10:00:10]
...
(apache, 7)   [Window: 10:00:10 - 10:00:20]
(flink, 12)   [Window: 10:00:10 - 10:00:20]

Example 2: Sensor Data with Late Events

public class SensorMonitoring {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Sample sensor data (sensorId, temperature, timestamp)
        DataStream<SensorReading> sensorData = env
            .addSource(new SensorSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((reading, ts) -> reading.timestamp)
            );

        // Define output tag for late data
        final OutputTag<SensorReading> lateDataTag = new OutputTag<SensorReading>("late-data"){};

        // Compute average temperature per sensor per minute
        SingleOutputStreamOperator<Tuple3<String, Long, Double>> avgTemp = sensorData
            .keyBy(r -> r.sensorId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(30))  // Allow 30 seconds of lateness
            .sideOutputLateData(lateDataTag)
            .aggregate(new AvgTempAggregator());

        avgTemp.print("Average Temperature");

        // Handle late data separately
        DataStream<SensorReading> lateData = avgTemp.getSideOutput(lateDataTag);
        lateData.print("Late Data");

        env.execute("Sensor Monitoring");
    }

    public static class SensorReading {
        public String sensorId;
        public double temperature;
        public long timestamp;

        public SensorReading(String sensorId, double temperature, long timestamp) {
            this.sensorId = sensorId;
            this.temperature = temperature;
            this.timestamp = timestamp;
        }
    }

    public static class AvgTempAggregator
        implements AggregateFunction<SensorReading, Tuple2<Double, Long>, Double> {

        @Override
        public Tuple2<Double, Long> createAccumulator() {
            return Tuple2.of(0.0, 0L);
        }

        @Override
        public Tuple2<Double, Long> add(SensorReading reading, Tuple2<Double, Long> acc) {
            return Tuple2.of(acc.f0 + reading.temperature, acc.f1 + 1);
        }

        @Override
        public Double getResult(Tuple2<Double, Long> acc) {
            return acc.f0 / acc.f1;
        }

        @Override
        public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
            return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
        }
    }
}

Example 3: Scala API (for Scala developers)

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

object FlinkScalaExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text: DataStream[String] = env.socketTextStream("localhost", 9999)

    val counts: DataStream[(String, Int)] = text
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .sum(1)

    counts.print()
    env.execute("Scala Word Count")
  }
}

Part 7: The Academic Reception and Industry Impact

Initial Reception (2015-2016)

VLDB 2015 Reviews:
  • “Significant contribution to stream processing theory”
  • “Elegant unification of batch and streaming”
  • “Practical impact is enormous”
Academic Citations (by research area):
  • Stream Processing Systems: 2,800+ citations
  • Event Time Processing: 1,200+ citations
  • Distributed Snapshots: 600+ citations
  • Windowing Semantics: 400+ citations

Industry Adoption Timeline

2015: Google publishes Dataflow Model paper
  • Google Cloud Dataflow launched (proprietary)
  • Apache Flink adopts Dataflow Model concepts
2016: Apache Beam created
  • Unified programming model (Google-donated)
  • Flink becomes a Beam runner
2017-2018: Explosion of Flink adoption
  • Alibaba (largest Flink deployment: 10,000+ nodes)
  • Uber (real-time analytics platform)
  • Netflix (keystone real-time data platform)
2019-2020: Flink becomes industry standard
  • AWS Kinesis Data Analytics (managed Flink)
  • Ververica (Flink creators) acquired by Alibaba
  • Confluent integrates Flink with Kafka
2021-2024: Maturity and dominance
  • 25,000+ production deployments
  • De facto standard for stateful stream processing
  • Chosen for: fraud detection, real-time ML, CEP, analytics
vs Storm (2011-2015):
  • Storm: No exactly-once semantics (at-least-once only)
  • Storm: No managed state (developers roll their own)
  • Storm: No event time support
  • Flink: All of the above, built-in
vs Spark Streaming (2013-present):
  • Spark: Micro-batching (seconds latency)
  • Spark: Event time added later (second-class citizen)
  • Flink: True streaming from day one (millisecond latency)
vs Samza (LinkedIn, 2013-present):
  • Samza: Tightly coupled to Kafka
  • Samza: Limited adoption outside LinkedIn
  • Flink: Source-agnostic, wider ecosystem

Part 8: Common Misconceptions

Reality: Flink treats batch as a special case of streaming (bounded streams).
// Batch processing in Flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.readTextFile("hdfs://data/input");
DataSet<Tuple2<String, Integer>> counts = text
    .flatMap(new Tokenizer())
    .groupBy(0)
    .sum(1);

counts.writeAsCsv("hdfs://data/output");
env.execute("Batch Word Count");
Flink’s DataSet API (batch) and DataStream API (streaming) share the same execution engine.

Misconception 2: “Watermarks solve all late data problems”

Reality: Watermarks are heuristic. They can be:
  • Too aggressive (drop valid late data)
  • Too conservative (delay results unnecessarily)
You must tune watermarks based on your data characteristics.

Misconception 3: “Exactly-once means no duplicates in external systems”

Reality: Exactly-once is within Flink’s state. External sinks (databases, files) may still see duplicates on retries. Solution: Use idempotent sinks or two-phase commit sinks (Flink’s KafkaSink, JDBCSink with XA).

Part 9: Interview Preparation

Conceptual Questions

Q1: Explain event time vs processing time. When would you use each? Answer:
  • Event Time: Timestamp embedded in the event (when it happened). Use when events can arrive out-of-order or with delays (mobile, IoT, distributed logs).
  • Processing Time: Timestamp when Flink processes the event. Use when events arrive in order and low latency matters more than correctness.
Q2: What is a watermark? Why is it necessary? Answer: A watermark is a monotonically increasing timestamp indicating “all events before time T have (probably) arrived.” Necessary because:
  • Infinite streams have no natural “end”
  • Need to know when to close windows and emit results
  • Allows handling late data gracefully
Q3: How does Flink achieve exactly-once semantics? Answer: Flink uses the Chandy-Lamport distributed snapshot algorithm:
  1. Periodically injects barriers into the stream
  2. Operators snapshot state when barrier arrives
  3. On failure, restores from last successful checkpoint
  4. Replays events from checkpoint point
Q4: Why is Flink better than Spark Streaming for low-latency use cases? Answer:
  • Flink: True per-record processing (millisecond latency)
  • Spark: Micro-batching (minimum 0.5-2 second latency)
  • Flink’s event time support is first-class, Spark’s is retrofitted

Coding Questions

Q: Implement a Flink job that counts events per 5-second window with 2-second allowed lateness.
DataStream<Event> events = ...;

events
    .keyBy(event -> event.type)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .allowedLateness(Time.seconds(2))
    .aggregate(new CountAggregator())
    .print();
Q: How would you handle a data stream where 10% of events arrive > 1 hour late? Answer:
OutputTag<Event> veryLateTag = new OutputTag<Event>("very-late"){};

SingleOutputStreamOperator<Result> mainStream = events
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.minutes(10))  // Reasonable lateness
    .sideOutputLateData(veryLateTag)    // Capture very late events
    .aggregate(...);

// Process very late data separately (e.g., batch reprocessing)
DataStream<Event> veryLateData = mainStream.getSideOutput(veryLateTag);
veryLateData.addSink(new LateDataSink());

Summary and Key Takeaways

What You’ve Learned

Dataflow Model Foundations: The four questions (What, Where, When, How) ✅ Event Time Processing: Why it matters, how it works ✅ Watermarks: The core innovation for handling infinite streams ✅ Flink Architecture: JobManager, TaskManager, task slots, parallelism ✅ Exactly-Once Semantics: Chandy-Lamport snapshots ✅ Hands-On Examples: Word count, sensor monitoring, late data handling

Core Principles to Remember

  1. Streaming First: Flink treats batch as bounded streaming
  2. Event Time Default: Always prefer event time unless you have a good reason not to
  3. Watermarks are Heuristics: Tune them for your data characteristics
  4. State is First-Class: Flink’s managed state is a superpower
  5. Exactly-Once is Hard: Flink makes it easy (within the system)

The Dataflow Model’s Legacy

The 2015 Dataflow Model paper didn’t just create a framework - it created a new way of thinking about stream processing:
  • Before: “How do I adapt my batch code to streaming?”
  • After: “How do I express my logic independently of execution?”
This mental model shift is why Flink (and Beam) succeeded where earlier frameworks failed.

Next Steps

Next Module Preview

In Module 2: DataStream API & Transformations, you’ll learn:
  • Complete DataStream API reference
  • Stateful transformations (mapWithState, process functions)
  • Stream joins and patterns
  • Async I/O for enrichment
  • Production ETL pipelines

Module 2: DataStream API & Transformations

Master the low-level DataStream API

Additional Resources

Research Papers

  1. The Dataflow Model (Akidau et al., VLDB 2015)
    • PDF
    • The foundational paper this module is based on
  2. Lightweight Asynchronous Snapshots (Carbone et al., 2015)
    • Flink’s exactly-once semantics mechanism
    • PDF
  3. State Management in Apache Flink (Carbone et al., VLDB 2017)
    • Deep dive into Flink’s state backends
    • PDF

Books

  • “Streaming Systems” by Tyler Akidau et al. (O’Reilly, 2018)
    • Written by the Dataflow Model authors
    • Definitive guide to stream processing concepts
  • “Stream Processing with Apache Flink” by Fabian Hueske and Vasiliki Kalavri (O’Reilly, 2019)
    • Practical Flink programming guide
    • Written by Flink committers

Online Resources

Practice Time: Spend 4-6 hours implementing the examples in this module with your own data sources (Kafka, files, sockets) to truly internalize these concepts.