> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Demystifying the Dataflow Model: Write Once, Run Anywhere with Apache Beam

> Master the unified batch/streaming model from Google's Dataflow paper - portable pipelines across Spark, Flink, and Dataflow

# Demystifying the Dataflow Model: Write Once, Run Anywhere with Apache Beam

<Info>
  **Module Duration**: 3-4 hours
  **Focus**: Dataflow Model foundations + Beam portability
  **Prerequisites**: Java or Python, basic data processing
  **Hands-on Labs**: 8+ portable pipelines
</Info>

## Introduction: The Portability Problem

### The Nightmare Before Beam (2010-2015)

Imagine you're a data engineer in 2015:

**Your Company**:

* Runs Spark for batch ETL
* Uses Storm for real-time alerting
* Evaluating Flink for complex event processing

**Your Reality**:

```java theme={null}
// Spark batch job
sparkContext.textFile("data.txt")
    .flatMap(line -> Arrays.asList(line.split(" ")))
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((a, b) -> a + b);

// Storm streaming job (COMPLETELY DIFFERENT API!)
builder.setSpout("words", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("words");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

// Flink job (ANOTHER DIFFERENT API!)
env.readTextFile("data.txt")
    .flatMap((String line, Collector<String> out) -> {
        for (String word : line.split(" ")) { out.collect(word); }
    })
    .groupBy(0)
    .sum(1);
```

**Problem**:

* 3 frameworks = 3 codebases
* Change execution engine? Rewrite everything!
* Want both batch and streaming? Learn TWO APIs per framework!
* Vendor lock-in at its worst

**What was missing?** A **unified, portable** abstraction.

***

## Part 1: The Research Foundation - Google's Dataflow Model

### The Paper That Solved Portability

**Full Citation**:
Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, Rafael J. Fernández-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Frances Perry, Eric Schmidt, and Sam Whittle. 2015. **"The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing"**. VLDB '15.

### The Authors: Google's Data Infrastructure Team

* **Tyler Akidau** (Lead): Principal Engineer at Google, later co-founded Apache Beam. Author of "Streaming Systems" book.
* **Craig Chambers**: Senior Staff Engineer, previously led FlumeJava (Google's internal batch system).
* **Robert Bradshaw**: Tech Lead for Google Cloud Dataflow, Apache Beam PMC Chair.
* **Background**: Formalizes Google's internal systems (MillWheel for streaming, FlumeJava for batch).

### Historical Context

**Before Dataflow Model (2010-2015)**:

* MapReduce → Spark (batch only)
* Storm, Samza (streaming only, at-least-once)
* Spark Streaming (micro-batching, high latency)
* **No framework unified batch and streaming with event time correctness**

**The Problem Statement**:

> "Existing systems force teams to choose between two unsatisfying options: batch systems with high latency, or streaming systems that can't correctly handle out-of-order data."

### Publication and Impact

**Conference**: VLDB 2015 (Very Large Data Bases) - Top database conference

**Impact Metrics**:

* **5,000+ citations** (as of 2024)
* **Led to**: Apache Beam (2016), Google Cloud Dataflow, integration with Flink and Spark
* **Adopted by**: Google, LinkedIn, Lyft, PayPal, Spotify, Twitter

### The Core Innovation

The Dataflow Model introduced **four fundamental questions** that every data processing pipeline must answer:

1. **WHAT** are you computing? (transformations)
2. **WHERE** in event time? (windowing)
3. **WHEN** do you emit results? (triggers)
4. **HOW** do refinements relate? (accumulation modes)

**This framework is execution-engine agnostic** - it works on Spark, Flink, Dataflow, or any runner!

***

## Part 2: Apache Beam - The Dataflow Model Implemented

### What is Apache Beam?

**Apache Beam** = **B**atch + str**EAM**

An open-source, unified programming model for batch and streaming data processing that:

1. **Separates logic from execution**: Write once, run on any supported runner
2. **Implements the Dataflow Model**: Native support for event time, watermarks, triggers
3. **Multi-language**: Java, Python, Go, with cross-language transforms
4. **Extensible**: Write custom I/O connectors, transforms, runners

### Beam Architecture

```
┌──────────────────────────────────────────────────┐
│           Apache Beam Pipeline                   │
│  (Your application code in Java/Python/Go)       │
└──────────────┬───────────────────────────────────┘
               │
               ↓
       ┌──────────────┐
       │  Beam Model  │  (PCollections, PTransforms, Windowing)
       └──────┬───────┘
               │
       Portable Runner API
               │
    ┌──────────┼──────────┬──────────┬────────────┐
    │          │          │          │            │
    ↓          ↓          ↓          ↓            ↓
┌────────┐ ┌──────┐ ┌────────┐ ┌──────────┐ ┌─────────┐
│ Direct │ │ Spark│ │ Flink  │ │ Dataflow │ │ Samza   │
│ Runner │ │Runner│ │ Runner │ │  Runner  │ │ Runner  │
└────────┘ └──────┘ └────────┘ └──────────┘ └─────────┘
(Testing)  (Batch)  (Streaming)  (GCP)      (Kafka)
```

**Key Insight**: Your pipeline code is **runner-independent**. Change execution engine by swapping runners, not rewriting code!

***

## Part 3: The Four Questions Framework (Deep Dive)

### Question 1: WHAT are you computing?

**Definition**: The transformation logic - what operations to apply to your data.

```java theme={null}
// Java SDK: Word count "WHAT"
PCollection<String> lines = pipeline.apply(TextIO.read().from("input.txt"));

PCollection<KV<String, Long>> wordCounts = lines
    .apply(FlatMapElements.into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split(" "))))  // Split into words
    .apply(Filter.by((String word) -> !word.isEmpty()))         // Remove empty
    .apply(Count.perElement());                                  // Count per word
```

```python theme={null}
# Python SDK: Same logic, different syntax
lines = (pipeline
    | beam.io.ReadFromText('input.txt')
    | beam.FlatMap(lambda line: line.split(' '))
    | beam.Filter(lambda word: word != '')
    | beam.combiners.Count.PerElement())
```

**Key**: The WHAT is the same regardless of runner (Spark, Flink, Dataflow).

### Question 2: WHERE in event time?

**Definition**: Windowing - how to group events by time.

```java theme={null}
// WHERE: 5-minute fixed windows
PCollection<KV<String, Long>> windowedCounts = wordCounts
    .apply(Window.<KV<String, Long>>into(
        FixedWindows.of(Duration.standardMinutes(5))
    ))
    .apply(Sum.longsPerKey());
```

**Window Types**:

#### Fixed Windows (Tumbling)

```java theme={null}
// Non-overlapping windows
FixedWindows.of(Duration.standardMinutes(5))

Timeline:
[00:00-00:05) [00:05-00:10) [00:10-00:15) ...
```

#### Sliding Windows

```java theme={null}
// Overlapping windows
SlidingWindows.of(Duration.standardMinutes(10))  // Window size
    .every(Duration.standardMinutes(5))           // Slide period

Timeline:
[00:00-00:10)
     [00:05-00:15)
          [00:10-00:20)
               [00:15-00:25) ...
```

#### Session Windows

```java theme={null}
// Windows based on activity gaps
Sessions.withGapDuration(Duration.standardMinutes(30))

User Activity:
10:00 [click]
10:05 [click] ───┐
10:10 [click]    │ Session 1 (30 min gap)
10:45 [click] ───┘
11:20 [click] ─── Session 2 (new session after 35 min gap)
```

### Question 3: WHEN do you emit results?

**Definition**: Triggers - when to materialize (emit) window results.

```java theme={null}
// WHEN: Emit results at watermark (default)
PCollection<KV<String, Long>> triggered = windowedCounts
    .apply(Window.<KV<String, Long>>configure()
        .triggering(AfterWatermark.pastEndOfWindow())  // Default trigger
        .withAllowedLateness(Duration.standardMinutes(1))
        .discardingFiredPanes());
```

**Advanced Triggers**:

```java theme={null}
// Early + On-time + Late firings
Trigger trigger = AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
        .plusDelayOf(Duration.standardMinutes(1)))  // Speculative results every minute
    .withLateFirings(AfterPane.elementCountAtLeast(1));  // Update for late data

PCollection<KV<String, Long>> results = windowedCounts
    .apply(Window.<KV<String, Long>>configure()
        .triggering(trigger)
        .withAllowedLateness(Duration.standardMinutes(5))
        .accumulatingFiredPanes());  // Accumulate updates
```

**Behavior**:

```
Window [10:00-10:05):

10:01 - Early firing (1 min elapsed)    → Emit count: 42 (speculative)
10:02 - Early firing                    → Emit count: 89 (speculative)
10:05 - Watermark passes window end     → Emit count: 156 (on-time)
10:06 - Late event arrives              → Emit count: 157 (late update)
```

### Question 4: HOW do refinements relate?

**Definition**: Accumulation mode - how to handle multiple emissions from the same window.

```java theme={null}
// Accumulating: Each emission includes all data
.accumulatingFiredPanes()

// Discarding: Each emission only has new data since last firing
.discardingFiredPanes()

// Accumulating and Retracting: Emit retractions of previous values
.accumulatingAndRetractingFiredPanes()
```

**Example**:

```
Window [10:00-10:05):
Events: [5, 3, 7, 2, 4]

ACCUMULATING:
Early (after 5, 3):      Emit: 8
Early (after 7):         Emit: 15
On-time (after 2, 4):    Emit: 21   ← Includes everything

DISCARDING:
Early (after 5, 3):      Emit: 8
Early (after 7):         Emit: 7    ← Only new data
On-time (after 2, 4):    Emit: 6    ← Only new data since last firing

RETRACTING:
Early (after 5, 3):      Emit: 8
Early (after 7):         Emit: -8, 15   ← Retract previous, emit new total
On-time (after 2, 4):    Emit: -15, 21  ← Retract previous, emit final
```

***

## Part 4: Core Beam Abstractions

### PCollection - Immutable Distributed Dataset

```java theme={null}
// PCollection<T> represents a dataset
PCollection<String> lines;           // Collection of strings
PCollection<KV<String, Integer>> kvs; // Collection of key-value pairs
PCollection<Event> events;            // Collection of custom objects
```

**Characteristics**:

* **Immutable**: Transformations create new PCollections
* **Distributed**: Data spread across workers
* **Bounded or Unbounded**: Batch files (bounded) or streams (unbounded)
* **Timestamped**: Each element has an event time timestamp

```java theme={null}
// Creating PCollections
Pipeline p = Pipeline.create(options);

// From memory (bounded)
PCollection<String> words = p.apply(Create.of("hello", "world", "beam"));

// From file (bounded)
PCollection<String> lines = p.apply(TextIO.read().from("gs://bucket/file.txt"));

// From Kafka (unbounded)
PCollection<KV<String, String>> messages = p.apply(
    KafkaIO.<String, String>read()
        .withBootstrapServers("localhost:9092")
        .withTopic("events")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
);
```

### PTransform - Data Transformation

```java theme={null}
// PTransform<InputT, OutputT> transforms one PCollection to another
public class ExtractWords extends PTransform<PCollection<String>, PCollection<String>> {
    @Override
    public PCollection<String> expand(PCollection<String> input) {
        return input
            .apply(FlatMapElements.into(TypeDescriptors.strings())
                .via((String line) -> Arrays.asList(line.split(" "))));
    }
}

// Usage
PCollection<String> words = lines.apply(new ExtractWords());
```

**Built-in Transforms**:

```java theme={null}
// ParDo: Element-wise transformation (like map)
PCollection<Integer> lengths = words.apply(
    ParDo.of(new DoFn<String, Integer>() {
        @ProcessElement
        public void processElement(@Element String word, OutputReceiver<Integer> out) {
            out.output(word.length());
        }
    })
);

// Combine: Aggregation
PCollection<Integer> totalLength = lengths.apply(Sum.integersGlobally());

// GroupByKey: Group by key (shuffle)
PCollection<KV<String, Iterable<Integer>>> grouped = kvPairs.apply(GroupByKey.create());

// CoGroupByKey: Join multiple PCollections
PCollection<KV<String, CoGbkResult>> joined = KeyedPCollectionTuple
    .of(leftTag, leftCollection)
    .and(rightTag, rightCollection)
    .apply(CoGroupByKey.create());
```

***

## Part 5: Hands-On Examples

### Example 1: Batch Word Count (Java)

```java theme={null}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;

public class WordCount {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        p.apply("ReadLines", TextIO.read().from("input.txt"))
         .apply("ExtractWords", FlatMapElements
             .into(TypeDescriptors.strings())
             .via((String line) -> Arrays.asList(line.split(" "))))
         .apply("FilterEmpty", Filter.by((String word) -> !word.isEmpty()))
         .apply("CountPerWord", Count.perElement())
         .apply("FormatResults", MapElements
             .into(TypeDescriptors.strings())
             .via((KV<String, Long> kv) -> kv.getKey() + ": " + kv.getValue()))
         .apply("WriteResults", TextIO.write().to("output"));

        p.run().waitUntilFinish();
    }
}
```

**Run on Different Runners**:

```bash theme={null}
# Direct Runner (local testing)
mvn compile exec:java -Dexec.mainClass=WordCount \
    -Pdirect-runner

# Spark Runner
mvn compile exec:java -Dexec.mainClass=WordCount \
    -Pspark-runner

# Flink Runner
mvn compile exec:java -Dexec.mainClass=WordCount \
    -Pflink-runner

# Dataflow Runner (Google Cloud)
mvn compile exec:java -Dexec.mainClass=WordCount \
    -Pdataflow-runner \
    -Dexec.args="--runner=DataflowRunner \
                 --project=my-gcp-project \
                 --region=us-central1 \
                 --tempLocation=gs://my-bucket/temp"
```

**Same code, different execution engines!**

### Example 2: Streaming Window Aggregation (Python)

```python theme={null}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterCount

def run():
    options = PipelineOptions()

    with beam.Pipeline(options=options) as p:
        (p
         | 'ReadFromKafka' >> beam.io.ReadFromKafka(
             consumer_config={'bootstrap.servers': 'localhost:9092'},
             topics=['events'])

         | 'ParseJSON' >> beam.Map(lambda msg: json.loads(msg[1]))

         | 'AddTimestamps' >> beam.Map(
             lambda event: beam.window.TimestampedValue(event, event['timestamp']))

         | 'WindowInto5Minutes' >> beam.WindowInto(
             FixedWindows(5 * 60),  # 5 minute windows
             trigger=AfterWatermark(early=AfterCount(100)),  # Early firing every 100 elements
             accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING)

         | 'ExtractUserId' >> beam.Map(lambda event: (event['user_id'], 1))

         | 'CountPerUser' >> beam.CombinePerKey(sum)

         | 'FormatOutput' >> beam.Map(
             lambda kv: f"User {kv[0]}: {kv[1]} events")

         | 'WriteToSink' >> beam.io.WriteToText('output'))

if __name__ == '__main__':
    run()
```

**Run on Different Runners**:

```bash theme={null}
# Direct Runner
python word_count.py --runner DirectRunner

# Flink Runner
python word_count.py --runner FlinkRunner \
    --flink_master=localhost:8081

# Dataflow Runner
python word_count.py --runner DataflowRunner \
    --project my-gcp-project \
    --region us-central1 \
    --temp_location gs://my-bucket/temp
```

### Example 3: Complex Windowing with Late Data

```java theme={null}
public class SessionAnalysis {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        // Read unbounded stream
        PCollection<Event> events = p.apply(
            KafkaIO.<String, Event>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("user-events")
                // ... deserializer config
        );

        // Session windows (30 min gap)
        PCollection<KV<String, Long>> sessionsPerUser = events
            .apply(Window.<Event>into(
                Sessions.withGapDuration(Duration.standardMinutes(30)))
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)))
                        .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .withAllowedLateness(Duration.standardMinutes(10))
                .accumulatingFiredPanes())

            .apply(MapElements
                .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
                .via((Event e) -> KV.of(e.getUserId(), 1L)))

            .apply(Sum.longsPerKey());

        sessionsPerUser.apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {
            @ProcessElement
            public void processElement(ProcessContext c, BoundedWindow window) {
                String userId = c.element().getKey();
                Long count = c.element().getValue();
                System.out.println(String.format(
                    "User %s: %d events in session %s",
                    userId, count, window));
            }
        }));

        p.run().waitUntilFinish();
    }
}
```

***

## Part 6: Beam vs Framework-Specific APIs

### The Portability Advantage

**Scenario**: You start with Spark, later need to migrate to Flink.

**Without Beam**:

```java theme={null}
// Spark code
JavaRDD<String> lines = sc.textFile("input.txt");
JavaPairRDD<String, Integer> counts = lines
    .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((a, b) -> a + b);

// MIGRATION REQUIRED: Rewrite in Flink
DataSet<String> lines = env.readTextFile("input.txt");
DataSet<Tuple2<String, Integer>> counts = lines
    .flatMap((String line, Collector<String> out) -> {
        for (String word : line.split(" ")) out.collect(word);
    })
    .map(word -> new Tuple2<>(word, 1))
    .groupBy(0)
    .sum(1);
```

**Effort**: Days to weeks rewriting and testing.

**With Beam**:

```java theme={null}
// Beam code (runs on Spark OR Flink)
Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("input.txt"))
 .apply(FlatMapElements
     .into(TypeDescriptors.strings())
     .via((String line) -> Arrays.asList(line.split(" "))))
 .apply(Count.perElement())
 .apply(TextIO.write().to("output"));

p.run();
```

**Migration**: Change one line in your runner config:

```java theme={null}
// From Spark:
PipelineOptionsFactory.fromArgs(args).withRunner(SparkRunner.class).create();

// To Flink:
PipelineOptionsFactory.fromArgs(args).withRunner(FlinkRunner.class).create();
```

**Effort**: Minutes.

### Feature Comparison

| Feature               | Beam                                   | Spark                         | Flink                            |
| --------------------- | -------------------------------------- | ----------------------------- | -------------------------------- |
| **Portability**       | Multi-runner                           | Spark-only                    | Flink-only                       |
| **Batch + Streaming** | Unified model                          | Separate APIs                 | Unified (batch = bounded stream) |
| **Event Time**        | First-class                            | Add-on (Structured Streaming) | First-class                      |
| **Windowing**         | Rich (fixed, sliding, session, custom) | Limited                       | Rich                             |
| **Language**          | Java, Python, Go, cross-language       | Scala, Python, Java           | Java, Scala                      |
| **Runners**           | 7+ runners                             | 1 (Spark)                     | 1 (Flink)                        |

***

## Part 7: The Academic and Industry Reception

### Initial Reception (2015-2016)

**VLDB 2015 Reviews**:

* "Groundbreaking unification of batch and streaming"
* "Solves long-standing problems in out-of-order data handling"
* "Practical impact will be enormous"

**Citations by Research Area** (5,000+ total):

* Stream Processing Systems: 2,800+
* Event Time Processing: 1,100+
* Windowing Semantics: 600+
* Trigger Mechanisms: 400+

### Industry Adoption Timeline

**2015**: Google publishes Dataflow Model paper

* Internal use at Google (MillWheel, FlumeJava)
* Launches Google Cloud Dataflow (managed service)

**2016**: Apache Beam created

* Google donates code to Apache Software Foundation
* Initial runners: Direct, Dataflow, Spark, Flink

**2017-2018**: Rapid adoption

* LinkedIn integrates Beam with Samza runner
* PayPal builds streaming fraud detection on Beam
* Lyft uses Beam for real-time metrics

**2019-2020**: Maturity and expansion

* Python SDK reaches parity with Java
* Go SDK introduced
* Cross-language transforms (use Python transform in Java pipeline!)

**2021-2024**: Industry standard

* Used in production by: Google, LinkedIn, Lyft, PayPal, Spotify, Twitter
* 100+ companies with Beam deployments
* De facto standard for portable data processing

### Why Beam Succeeded

**vs Spark** (2014-present):

* Spark: Locked to Spark runtime, separate batch/streaming APIs
* **Beam**: Portable, truly unified model

**vs Storm** (2011-2018):

* Storm: No event time support, limited windowing
* **Beam**: Full Dataflow Model implementation

**vs Flink** (2014-present):

* Flink: Powerful but Flink-specific APIs
* **Beam**: Can run ON TOP of Flink (or Spark, or Dataflow\...)

***

## Part 8: Common Misconceptions

### Misconception 1: "Beam is just an abstraction layer, it's slower"

**Reality**: Beam pipelines compile to native runner code. On Flink, Beam uses Flink's execution engine directly.

**Benchmark** (from Apache Beam documentation):

* Beam on FlinkRunner: 98% of native Flink performance
* Beam on SparkRunner: 95% of native Spark performance
* **Trade-off**: 2-5% overhead for 100% portability

### Misconception 2: "I should always use Beam instead of Spark/Flink"

**Reality**: Use Beam when:

* ✅ You value portability (might change runners)
* ✅ Need unified batch/streaming code
* ✅ Complex windowing/triggering logic
* ✅ Multi-cloud deployments

**Use native APIs when**:

* ❌ Locked into one execution engine
* ❌ Need framework-specific optimizations
* ❌ Team expertise in one framework

### Misconception 3: "Beam is only for Google Cloud"

**Reality**: Beam is **open source** and runner-agnostic. Google Cloud Dataflow is just ONE of 7+ runners.

**Available Runners**:

1. DirectRunner (local testing)
2. ApexRunner (Apache Apex)
3. FlinkRunner (Apache Flink)
4. SparkRunner (Apache Spark)
5. DataflowRunner (Google Cloud)
6. SamzaRunner (Apache Samza)
7. TwisterRunner (Indiana University)

***

## Part 9: Interview Preparation

### Conceptual Questions

**Q1: What is Apache Beam? How does it relate to the Dataflow Model?**

**A**:
Apache Beam is an open-source implementation of Google's Dataflow Model. It provides a unified, portable programming model for batch and streaming data processing. The Dataflow Model (from the 2015 VLDB paper) introduced the "What/Where/When/How" framework, which Beam implements across multiple execution engines (Spark, Flink, Dataflow, etc.).

**Q2: Explain the four questions of the Dataflow Model.**

**A**:

1. **WHAT**: The transformation logic (map, filter, aggregate)
2. **WHERE**: Windowing (fixed, sliding, session windows)
3. **WHEN**: Triggers (when to emit results - early, on-time, late)
4. **HOW**: Accumulation mode (accumulating, discarding, retracting)

**Q3: What's the difference between a PCollection and an RDD (Spark) or DataSet (Flink)?**

**A**:

* **PCollection**: Runner-agnostic, can represent batch OR streaming data, has built-in windowing and triggering
* **RDD**: Spark-specific, primarily batch (streaming via DStream is separate)
* **DataSet**: Flink batch API (separate from DataStream for streaming)

**Q4: Why use Beam instead of writing directly in Spark or Flink?**

**A**:
**Portability**: One codebase runs on multiple runners. Switch from Spark to Flink without rewriting.
**Unified Model**: Same API for batch and streaming.
**Future-proof**: Not locked into one execution engine.
**Trade-off**: \~2-5% overhead vs native APIs.

### Coding Questions

**Q: Write a Beam pipeline that counts words in 5-minute windows with early firings.**

```java theme={null}
Pipeline p = Pipeline.create();

p.apply(KafkaIO.<String, String>read()
    .withBootstrapServers("localhost:9092")
    .withTopic("text-stream"))

 .apply(Window.<KV<String, String>>into(
     FixedWindows.of(Duration.standardMinutes(5)))
     .triggering(
         AfterWatermark.pastEndOfWindow()
             .withEarlyFirings(AfterProcessingTime
                 .pastFirstElementInPane()
                 .plusDelayOf(Duration.standardMinutes(1))))
     .accumulatingFiredPanes())

 .apply(Values.create())
 .apply(FlatMapElements
     .into(TypeDescriptors.strings())
     .via(line -> Arrays.asList(line.split(" "))))
 .apply(Count.perElement())
 .apply(ParDo.of(new PrintResultsFn()));

p.run();
```

***

## Summary and Key Takeaways

### What You've Mastered

✅ **Dataflow Model foundations** (the four questions)
✅ **Beam core abstractions** (PCollection, PTransform)
✅ **Windowing** (fixed, sliding, session)
✅ **Triggers and accumulation**
✅ **Portability** (write once, run anywhere)
✅ **Multi-language support** (Java, Python)
✅ **Real-world examples** (batch and streaming)

### Core Principles

1. **Portability First**: Code is separate from execution
2. **Unified Model**: Batch is just bounded streaming
3. **Event Time Native**: First-class event time support
4. **Dataflow Model**: What/Where/When/How framework
5. **Runner Agnostic**: Same pipeline, different engines

### The Dataflow Model's Legacy

The 2015 Dataflow Model paper didn't just create Beam - it created a **new way of thinking** about data processing:

* **Before**: "Which framework should I use? Batch or streaming?"
* **After**: "What do I want to compute? Beam will run it anywhere."

This abstraction is why Beam is the future of portable data processing.

***

## Next Module

<Card title="Module 2: Core Beam Programming Model" icon="code" href="/distributed-systems-tools/beam-core">
  Master PCollections, ParDo, and composite transforms
</Card>

***

## Resources

### Papers

* ["The Dataflow Model"](https://research.google/pubs/pub43864/) (Akidau et al., VLDB 2015)
* ["MillWheel: Fault-Tolerant Stream Processing at Internet Scale"](https://research.google/pubs/pub41378/) (Akidau et al., VLDB 2013)

### Books

* **"Streaming Systems"** by Tyler Akidau et al. (O'Reilly, 2018)
  * Written by Beam creators, the definitive guide

### Documentation

* [Apache Beam Documentation](https://beam.apache.org/documentation/)
* [Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/)

<Info>
  **Practice**: Implement the word count example in BOTH Java and Python, then run it on DirectRunner, Spark, and Flink to experience true portability!
</Info>
