Skip to main content

Core Programming Model

Module Duration: 3-4 hours Focus: Fundamental Beam abstractions and transformations Prerequisites: Java or Python, basic data processing concepts

Overview

Apache Beam’s core programming model provides a unified abstraction for both batch and streaming data processing. This module covers the fundamental concepts that make Beam portable across different execution engines.

Key Concepts

  • PCollection: Immutable, distributed dataset
  • Pipeline: DAG of transformations
  • PTransform: Data transformation operation
  • ParDo: Parallel element-wise processing
  • DoFn: User-defined function for ParDo

Understanding PCollections

A PCollection (Parallel Collection) represents a distributed dataset that your Beam pipeline operates on. PCollections are immutable and can be bounded (batch) or unbounded (streaming).

PCollection Characteristics

Immutability: Once created, a PCollection cannot be modified. Transformations create new PCollections. Distributed: Elements are distributed across multiple workers for parallel processing. Timestamped: Each element has an associated timestamp (critical for streaming). Windowed: Elements are organized into windows for grouping operations.

Creating PCollections

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create pipeline
with beam.Pipeline(options=PipelineOptions()) as pipeline:

    # From in-memory data
    numbers = pipeline | "Create Numbers" >> beam.Create([1, 2, 3, 4, 5])

    # From text file
    lines = pipeline | "Read File" >> beam.io.ReadFromText('input.txt')

    # From multiple files with pattern
    logs = pipeline | "Read Logs" >> beam.io.ReadFromText('logs/*.log')

    # From Kafka (unbounded)
    events = pipeline | "Read Kafka" >> beam.io.ReadFromKafka(
        consumer_config={'bootstrap.servers': 'localhost:9092'},
        topics=['events']
    )

PCollection Types

Bounded PCollection: Finite dataset with a known end (batch processing)
  • Reading from files
  • Database queries
  • Fixed in-memory collections
Unbounded PCollection: Infinite dataset that continues indefinitely (streaming)
  • Kafka topics
  • Pub/Sub subscriptions
  • Real-time sensor data

Pipeline Construction

A Pipeline represents the entire data processing workflow as a Directed Acyclic Graph (DAG) of transformations.

Pipeline Creation and Configuration

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Basic pipeline
pipeline = beam.Pipeline()

# Pipeline with options
options = PipelineOptions([
    '--runner=DirectRunner',
    '--project=my-project',
    '--region=us-central1',
    '--temp_location=gs://my-bucket/temp',
    '--job_name=word-count-job'
])

pipeline = beam.Pipeline(options=options)

# Context manager (recommended)
with beam.Pipeline(options=options) as p:
    # Pipeline operations
    result = (p
        | "Read" >> beam.io.ReadFromText('input.txt')
        | "Process" >> beam.Map(lambda x: x.upper())
        | "Write" >> beam.io.WriteToText('output.txt')
    )
    # Pipeline automatically runs and waits at end of context

Pipeline Options

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input', required=True)
        parser.add_argument('--output', required=True)
        parser.add_argument('--max_workers', type=int, default=10)

options = MyOptions([
    '--input=gs://bucket/input',
    '--output=gs://bucket/output',
    '--max_workers=20',
    '--runner=DataflowRunner'
])

with beam.Pipeline(options=options) as pipeline:
    input_path = options.input
    output_path = options.output
    # Use options in pipeline

PTransforms: Data Transformations

PTransforms are operations that transform PCollections. They represent the nodes in your pipeline DAG.

Core Transform Types

Element-wise: Process each element independently (Map, FlatMap, Filter) Aggregating: Combine elements (GroupByKey, Combine, Count) Composite: Combine multiple transforms into reusable units

Map Transform

Apply a 1:1 function to each element.
import apache_beam as beam

def double(x):
    return x * 2

with beam.Pipeline() as pipeline:
    # Using Map with function
    doubled = (pipeline
        | beam.Create([1, 2, 3, 4, 5])
        | beam.Map(double)
    )

    # Using Map with lambda
    squared = (pipeline
        | beam.Create([1, 2, 3, 4, 5])
        | beam.Map(lambda x: x * x)
    )

    # Map with additional arguments
    multiplied = (pipeline
        | beam.Create([1, 2, 3, 4, 5])
        | beam.Map(lambda x, factor: x * factor, factor=10)
    )

FlatMap Transform

Apply a 1:N function (each input produces zero or more outputs).
import apache_beam as beam

def split_words(text):
    return text.split()

with beam.Pipeline() as pipeline:
    words = (pipeline
        | beam.Create(['Hello World', 'Apache Beam', 'Data Processing'])
        | beam.FlatMap(split_words)
    )
    # Output: ['Hello', 'World', 'Apache', 'Beam', 'Data', 'Processing']

    # FlatMap with lambda
    chars = (pipeline
        | beam.Create(['abc', 'def'])
        | beam.FlatMap(lambda x: list(x))
    )
    # Output: ['a', 'b', 'c', 'd', 'e', 'f']

    # FlatMap can filter by returning empty list
    evens = (pipeline
        | beam.Create([1, 2, 3, 4, 5])
        | beam.FlatMap(lambda x: [x] if x % 2 == 0 else [])
    )

Filter Transform

Keep only elements matching a predicate.
import apache_beam as beam

with beam.Pipeline() as pipeline:
    even_numbers = (pipeline
        | beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        | beam.Filter(lambda x: x % 2 == 0)
    )

    # Filter with named function
    def is_valid(record):
        return record.get('status') == 'active' and record.get('value') > 0

    valid_records = (pipeline
        | beam.Create([
            {'status': 'active', 'value': 100},
            {'status': 'inactive', 'value': 50},
            {'status': 'active', 'value': -10}
        ])
        | beam.Filter(is_valid)
    )

ParDo and DoFn

ParDo is the most fundamental and flexible transform in Beam. It allows you to implement custom processing logic through DoFn (Do Function).

Basic DoFn

import apache_beam as beam
from apache_beam.transforms import DoFn

class ExtractWordsFn(DoFn):
    def process(self, element):
        """Process a single element.

        Args:
            element: Input element

        Yields:
            Processed outputs
        """
        words = element.split()
        for word in words:
            yield word.lower()

# Using DoFn
with beam.Pipeline() as pipeline:
    words = (pipeline
        | beam.Create(['Hello World', 'Apache Beam'])
        | beam.ParDo(ExtractWordsFn())
    )

DoFn Lifecycle Methods

DoFn provides lifecycle methods for setup and teardown operations.
import apache_beam as beam
from apache_beam.transforms import DoFn
import logging

class EnrichDataFn(DoFn):
    def __init__(self, api_endpoint):
        self.api_endpoint = api_endpoint
        self.client = None

    def setup(self):
        """Called once per worker before processing."""
        logging.info("Setting up worker resources")
        import requests
        self.client = requests.Session()
        self.cache = {}

    def start_bundle(self):
        """Called at the start of each bundle."""
        logging.info("Starting new bundle")
        self.bundle_count = 0

    def process(self, element):
        """Process each element."""
        self.bundle_count += 1

        # Use cache to avoid repeated API calls
        key = element['id']
        if key not in self.cache:
            response = self.client.get(f"{self.api_endpoint}/{key}")
            self.cache[key] = response.json()

        enriched = {**element, 'metadata': self.cache[key]}
        yield enriched

    def finish_bundle(self):
        """Called at the end of each bundle."""
        logging.info(f"Finished bundle with {self.bundle_count} elements")

    def teardown(self):
        """Called once per worker after all processing."""
        logging.info("Tearing down worker resources")
        if self.client:
            self.client.close()

with beam.Pipeline() as pipeline:
    enriched = (pipeline
        | beam.Create([{'id': '1'}, {'id': '2'}])
        | beam.ParDo(EnrichDataFn('https://api.example.com'))
    )

Multiple Outputs (Side Outputs)

DoFn can emit elements to multiple output PCollections using tagged outputs.
import apache_beam as beam
from apache_beam.transforms import DoFn

class SplitDataFn(DoFn):
    # Define output tags
    OUTPUT_TAG_VALID = 'valid'
    OUTPUT_TAG_INVALID = 'invalid'
    OUTPUT_TAG_WARNING = 'warning'

    def process(self, element):
        value = element.get('value', 0)

        if value < 0:
            # Emit to invalid output
            yield beam.pvalue.TaggedOutput(self.OUTPUT_TAG_INVALID, element)
        elif value > 1000:
            # Emit to warning output
            yield beam.pvalue.TaggedOutput(self.OUTPUT_TAG_WARNING, element)
        else:
            # Main output (no tag)
            yield element

with beam.Pipeline() as pipeline:
    data = pipeline | beam.Create([
        {'id': 1, 'value': 100},
        {'id': 2, 'value': -50},
        {'id': 3, 'value': 2000},
        {'id': 4, 'value': 500}
    ])

    # Apply DoFn with side outputs
    results = data | beam.ParDo(SplitDataFn()).with_outputs(
        SplitDataFn.OUTPUT_TAG_INVALID,
        SplitDataFn.OUTPUT_TAG_WARNING,
        main='valid'
    )

    # Access outputs
    valid = results.valid
    invalid = results.invalid
    warning = results.warning

    # Process each output separately
    valid | "Write Valid" >> beam.io.WriteToText('valid.txt')
    invalid | "Write Invalid" >> beam.io.WriteToText('invalid.txt')
    warning | "Write Warning" >> beam.io.WriteToText('warning.txt')

Side Inputs

Side inputs allow you to use additional data alongside the main input in a ParDo.
import apache_beam as beam
from apache_beam.transforms import DoFn

class EnrichWithDictFn(DoFn):
    def process(self, element, lookup_dict):
        """Enrich element using side input dictionary.

        Args:
            element: Main input element
            lookup_dict: Side input (passed as iterator)
        """
        # Side input is passed as iterator, convert to dict
        lookup = {k: v for d in lookup_dict for k, v in d.items()}

        key = element['id']
        enriched_value = lookup.get(key, 'unknown')

        yield {
            **element,
            'enriched': enriched_value
        }

with beam.Pipeline() as pipeline:
    # Main input
    main_data = pipeline | "Main Data" >> beam.Create([
        {'id': 'A', 'value': 100},
        {'id': 'B', 'value': 200},
        {'id': 'C', 'value': 300}
    ])

    # Side input (reference data)
    lookup_data = pipeline | "Lookup Data" >> beam.Create([
        {'A': 'Alpha', 'B': 'Beta', 'C': 'Gamma'}
    ])

    # Use side input in ParDo
    enriched = (main_data
        | beam.ParDo(EnrichWithDictFn(), lookup_dict=beam.pvalue.AsIter(lookup_data))
    )

    enriched | beam.Map(print)

Composite Transforms

Composite transforms encapsulate multiple transforms into reusable components.

Creating Composite Transforms

import apache_beam as beam
from apache_beam.transforms import PTransform

class CountWords(PTransform):
    """Composite transform to count word frequencies."""

    def expand(self, pcoll):
        return (pcoll
            | "Split into words" >> beam.FlatMap(lambda x: x.split())
            | "Lowercase" >> beam.Map(str.lower)
            | "Filter empty" >> beam.Filter(lambda x: len(x) > 0)
            | "Pair with 1" >> beam.Map(lambda x: (x, 1))
            | "Group by key" >> beam.GroupByKey()
            | "Sum counts" >> beam.MapTuple(lambda word, ones: (word, sum(ones)))
        )

class FilterAndTransform(PTransform):
    """Composite transform with parameters."""

    def __init__(self, min_value, transform_fn):
        super().__init__()
        self.min_value = min_value
        self.transform_fn = transform_fn

    def expand(self, pcoll):
        return (pcoll
            | "Filter" >> beam.Filter(lambda x: x >= self.min_value)
            | "Transform" >> beam.Map(self.transform_fn)
        )

# Using composite transforms
with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create([
        'Hello World',
        'Apache Beam',
        'Data Processing'
    ])

    # Use CountWords composite transform
    word_counts = lines | CountWords()

    # Use parameterized composite transform
    numbers = pipeline | beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    result = numbers | FilterAndTransform(5, lambda x: x * x)

Aggregation Operations

Beam provides several built-in transforms for aggregating data.

GroupByKey

Groups elements by key. Requires input PCollection of key-value pairs.
import apache_beam as beam

with beam.Pipeline() as pipeline:
    # Create key-value pairs
    kv_pairs = pipeline | beam.Create([
        ('cat', 'meow'),
        ('dog', 'woof'),
        ('cat', 'purr'),
        ('dog', 'bark'),
        ('cat', 'hiss')
    ])

    # Group by key
    grouped = kv_pairs | beam.GroupByKey()
    # Result: [('cat', ['meow', 'purr', 'hiss']), ('dog', ['woof', 'bark'])]

    # Process grouped results
    counts = grouped | beam.MapTuple(lambda key, values: (key, len(list(values))))

Combine

Efficiently aggregates all values for each key using associative and commutative operations.
import apache_beam as beam
from apache_beam.transforms import CombineFn

# Using built-in combiners
with beam.Pipeline() as pipeline:
    numbers = pipeline | beam.Create([1, 2, 3, 4, 5])

    # Sum all elements
    total = numbers | beam.CombineGlobally(sum)

    # Mean
    average = numbers | beam.CombineGlobally(beam.combiners.MeanCombineFn())

    # Count
    count = numbers | beam.combiners.Count.Globally()

# Custom CombineFn
class AverageFn(CombineFn):
    def create_accumulator(self):
        return (0.0, 0)  # (sum, count)

    def add_input(self, accumulator, input):
        sum_val, count = accumulator
        return (sum_val + input, count + 1)

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return (sum(sums), sum(counts))

    def extract_output(self, accumulator):
        sum_val, count = accumulator
        return sum_val / count if count > 0 else 0

# Per-key combine
with beam.Pipeline() as pipeline:
    kv_pairs = pipeline | beam.Create([
        ('A', 1), ('B', 2), ('A', 3), ('B', 4), ('A', 5)
    ])

    # Sum per key
    sums = kv_pairs | beam.CombinePerKey(sum)
    # Result: [('A', 9), ('B', 6)]

    # Average per key
    averages = kv_pairs | beam.CombinePerKey(AverageFn())

CoGroupByKey

Joins multiple PCollections with the same key type.
import apache_beam as beam

with beam.Pipeline() as pipeline:
    # Email addresses
    emails = pipeline | "Create Emails" >> beam.Create([
        ('alice', '[email protected]'),
        ('bob', '[email protected]')
    ])

    # Phone numbers
    phones = pipeline | "Create Phones" >> beam.Create([
        ('alice', '555-1234'),
        ('bob', '555-5678'),
        ('bob', '555-9999')
    ])

    # CoGroup
    joined = ({'emails': emails, 'phones': phones}
        | beam.CoGroupByKey()
    )

    # Process joined data
    def format_contact(key_value):
        name, data = key_value
        emails = list(data['emails'])
        phones = list(data['phones'])
        return f"{name}: {emails}, {phones}"

    result = joined | beam.Map(format_contact)
    # Result: ['alice: [[email protected]], [555-1234]',
    #          'bob: [[email protected]], [555-5678, 555-9999]']

Real-World Use Cases

Log Processing Pipeline

import apache_beam as beam
from apache_beam.transforms import DoFn
import json
import re
from datetime import datetime

class ParseLogFn(DoFn):
    LOG_PATTERN = re.compile(
        r'(?P<ip>\S+) - - \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+)'
    )

    def process(self, element):
        match = self.LOG_PATTERN.match(element)
        if match:
            yield {
                'ip': match.group('ip'),
                'timestamp': match.group('timestamp'),
                'method': match.group('method'),
                'path': match.group('path'),
                'status': int(match.group('status')),
                'size': int(match.group('size'))
            }

class DetectAnomaliesFn(DoFn):
    def process(self, element):
        ip, requests = element
        request_list = list(requests)

        # Anomaly: More than 100 requests from single IP
        if len(request_list) > 100:
            yield {
                'type': 'high_volume',
                'ip': ip,
                'count': len(request_list)
            }

        # Anomaly: High error rate
        errors = sum(1 for r in request_list if r['status'] >= 400)
        if errors > len(request_list) * 0.5:
            yield {
                'type': 'high_error_rate',
                'ip': ip,
                'error_rate': errors / len(request_list)
            }

def run_log_pipeline(input_path, output_path):
    with beam.Pipeline() as pipeline:
        logs = pipeline | "Read Logs" >> beam.io.ReadFromText(input_path)

        parsed = logs | "Parse Logs" >> beam.ParDo(ParseLogFn())

        # Count requests per IP
        ip_counts = (parsed
            | "Extract IP" >> beam.Map(lambda x: (x['ip'], x))
            | "Group by IP" >> beam.GroupByKey()
            | "Detect Anomalies" >> beam.ParDo(DetectAnomaliesFn())
        )

        # Count status codes
        status_counts = (parsed
            | "Extract Status" >> beam.Map(lambda x: (x['status'], 1))
            | "Count per Status" >> beam.CombinePerKey(sum)
        )

        # Write results
        ip_counts | "Write Anomalies" >> beam.io.WriteToText(f"{output_path}/anomalies")
        status_counts | "Write Status" >> beam.io.WriteToText(f"{output_path}/status_counts")

ETL Pipeline

import apache_beam as beam
from apache_beam.transforms import DoFn
import json

class ValidateRecordFn(DoFn):
    def process(self, element):
        try:
            record = json.loads(element)

            # Validation rules
            required_fields = ['id', 'name', 'email', 'created_at']
            if all(field in record for field in required_fields):
                if '@' in record['email']:
                    yield record
                else:
                    yield beam.pvalue.TaggedOutput('invalid_email', record)
            else:
                yield beam.pvalue.TaggedOutput('missing_fields', record)
        except json.JSONDecodeError:
            yield beam.pvalue.TaggedOutput('parse_error', element)

class TransformRecordFn(DoFn):
    def process(self, element):
        from datetime import datetime

        transformed = {
            'user_id': element['id'],
            'full_name': element['name'].upper(),
            'email': element['email'].lower(),
            'signup_date': datetime.fromisoformat(element['created_at']).date().isoformat(),
            'processed_at': datetime.utcnow().isoformat()
        }

        # Add computed fields
        transformed['email_domain'] = element['email'].split('@')[1]

        yield transformed

def run_etl_pipeline():
    with beam.Pipeline() as pipeline:
        raw_data = pipeline | "Read JSON" >> beam.io.ReadFromText('input.json')

        # Validate
        validation_results = (raw_data
            | "Validate" >> beam.ParDo(ValidateRecordFn()).with_outputs(
                'invalid_email',
                'missing_fields',
                'parse_error',
                main='valid'
            )
        )

        # Transform valid records
        transformed = (validation_results.valid
            | "Transform" >> beam.ParDo(TransformRecordFn())
        )

        # Aggregate by email domain
        domain_counts = (transformed
            | "Extract Domain" >> beam.Map(lambda x: (x['email_domain'], 1))
            | "Count per Domain" >> beam.CombinePerKey(sum)
        )

        # Write outputs
        transformed | "Write Valid" >> beam.io.WriteToText('output/valid')
        validation_results.invalid_email | "Write Invalid Email" >> beam.io.WriteToText('output/invalid_email')
        validation_results.missing_fields | "Write Missing" >> beam.io.WriteToText('output/missing_fields')
        domain_counts | "Write Domains" >> beam.io.WriteToText('output/domain_counts')

Best Practices

Performance Optimization

1. Minimize data serialization
# Bad: Multiple maps with serialization overhead
result = (data
    | beam.Map(lambda x: x.upper())
    | beam.Map(lambda x: x.strip())
    | beam.Map(lambda x: x.replace(' ', '_'))
)

# Good: Single map combining operations
result = data | beam.Map(lambda x: x.upper().strip().replace(' ', '_'))
2. Use Combine instead of GroupByKey when possible
# Less efficient: GroupByKey + Map
totals = (data
    | beam.Map(lambda x: (x['key'], x['value']))
    | beam.GroupByKey()
    | beam.MapTuple(lambda k, vs: (k, sum(vs)))
)

# More efficient: CombinePerKey
totals = (data
    | beam.Map(lambda x: (x['key'], x['value']))
    | beam.CombinePerKey(sum)
)
3. Reuse expensive resources with setup/teardown
class OptimizedDoFn(DoFn):
    def setup(self):
        # Initialize expensive resources once per worker
        self.model = load_ml_model()
        self.connection_pool = create_connection_pool()

    def process(self, element):
        # Reuse resources for each element
        prediction = self.model.predict(element)
        yield prediction

    def teardown(self):
        # Clean up resources
        self.connection_pool.close()

Testing

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

def test_word_count():
    with TestPipeline() as p:
        input_data = ['Hello World', 'Hello Beam']
        expected_output = [('hello', 2), ('world', 1), ('beam', 1)]

        result = (p
            | beam.Create(input_data)
            | beam.FlatMap(lambda x: x.lower().split())
            | beam.Map(lambda x: (x, 1))
            | beam.CombinePerKey(sum)
        )

        assert_that(result, equal_to(expected_output))

def test_custom_dofn():
    with TestPipeline() as p:
        result = (p
            | beam.Create([1, 2, 3, 4, 5])
            | beam.ParDo(DoubleFn())
        )

        assert_that(result, equal_to([2, 4, 6, 8, 10]))

Summary

In this module, you learned:
  • PCollections as the fundamental data abstraction in Beam
  • How to construct and configure pipelines
  • Core transforms: Map, FlatMap, Filter, GroupByKey, Combine
  • ParDo and DoFn for custom processing logic
  • DoFn lifecycle methods and advanced features
  • Side outputs and side inputs
  • Creating reusable composite transforms
  • Best practices for performance and testing

Key Takeaways

  1. PCollections are immutable and distributed
  2. Transforms create new PCollections rather than modifying existing ones
  3. ParDo is the most flexible transform for custom logic
  4. Use Combine instead of GroupByKey for aggregations
  5. Leverage DoFn lifecycle methods for resource management
  6. Write testable code using composite transforms

Next Steps

Continue to the next module on Windowing & Time to learn how to process data based on temporal characteristics and handle late-arriving data in streaming pipelines.

Additional Resources