Skip to main content

Triggers & Watermarks

Module Duration: 3-4 hours Focus: Advanced timing control and late data handling Prerequisites: Core Programming Model, Windowing & Time

Overview

Triggers determine when results are materialized for a window. They provide fine-grained control over when computations complete and emit results, enabling you to balance latency, completeness, and cost.

Key Concepts

  • Trigger: Determines when to emit aggregated results
  • Watermark: System’s notion of event time progress
  • Early Firing: Emit speculative results before window closes
  • On-Time Firing: Emit results when watermark passes window end
  • Late Firing: Handle data arriving after watermark
  • Accumulation Mode: How to combine multiple firings

Understanding Watermarks

Watermarks are the foundation of Beam’s event-time processing. They represent a heuristic about event time completeness.

Watermark Semantics

Definition: A watermark with value T indicates that no more elements with timestamps less than T should arrive. Properties:
  • Monotonically increasing
  • Per-source tracking
  • Heuristic-based (not a guarantee)

Watermark Behavior

import apache_beam as beam
from apache_beam import window

# Default watermark behavior
with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        # Watermark automatically tracked
        | beam.WindowInto(window.FixedWindows(60))
        | beam.CombinePerKey(sum)
    )

# Watermark advances as:
# 1. Source reads events and assigns timestamps
# 2. Beam tracks the minimum timestamp of pending data
# 3. Watermark = min(pending timestamps) - some buffer
# 4. When watermark > window.end, window closes

Watermark Lag

Watermark lag represents how far behind the watermark is from real time.
# Monitoring watermark lag
import apache_beam as beam
from apache_beam import DoFn

class MonitorWatermarkLagFn(DoFn):
    def process(self, element, timestamp=DoFn.TimestampParam):
        import time
        current_time = time.time()
        event_time = timestamp.micros / 1_000_000
        lag = current_time - event_time

        yield {
            'element': element,
            'event_time': event_time,
            'processing_time': current_time,
            'watermark_lag_seconds': lag
        }

Default Trigger

The default trigger fires when the watermark passes the end of the window.
import apache_beam as beam
from apache_beam import window

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([
            (1, 'a'), (59, 'b'), (61, 'c'), (120, 'd')
        ])
        | beam.Map(lambda x: window.TimestampedValue(x[1], x[0]))
        | beam.WindowInto(window.FixedWindows(60))
        # Default trigger: fires when watermark passes window end
        | beam.CombineGlobally(lambda xs: list(xs)).without_defaults()
    )

    # Window [0, 60): fires when watermark >= 60
    #   Contains: 'a', 'b'
    # Window [60, 120): fires when watermark >= 120
    #   Contains: 'c'
    # Window [120, 180): fires when watermark >= 180
    #   Contains: 'd'

Trigger Types

Beam provides several trigger types that can be combined to create sophisticated timing strategies.

AfterWatermark

Fires when the watermark passes the end of the window.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(60),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | beam.CombinePerKey(sum)
    )

AfterProcessingTime

Fires after a certain amount of processing time has elapsed.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(300),  # 5-minute windows
            trigger=trigger.AfterProcessingTime(60),  # Fire every minute
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | beam.CombinePerKey(sum)
    )

    # Fires every 60 seconds of processing time
    # regardless of event time or watermark

AfterCount

Fires after a certain number of elements arrive in a pane.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(60),
            trigger=trigger.AfterCount(100),  # Fire after 100 elements
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | beam.CombinePerKey(sum)
    )

Repeatedly

Fires a trigger repeatedly.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(300),
            trigger=trigger.Repeatedly(
                trigger.AfterProcessingTime(30)  # Fire every 30 seconds
            ),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | beam.CombinePerKey(sum)
    )

Early and Late Firings

Combine early, on-time, and late firings for flexible result emission.

Early Firing Pattern

Emit speculative results before the watermark passes the window end.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

def early_firing_pipeline():
    with beam.Pipeline() as pipeline:
        data = (pipeline
            | beam.io.ReadFromKafka(
                consumer_config={'bootstrap.servers': 'localhost:9092'},
                topics=['events']
            )
            | beam.Map(parse_event)
            | beam.WindowInto(
                window.FixedWindows(300),  # 5-minute windows
                trigger=trigger.AfterWatermark(
                    early=trigger.AfterProcessingTime(60)  # Early: every minute
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                allowed_lateness=0
            )
            | beam.CombinePerKey(sum)
        )

        data | beam.Map(print)

# Timeline for a 5-minute window [0:00, 5:00):
# 1:00 - Early firing (partial results)
# 2:00 - Early firing (updated partial results)
# 3:00 - Early firing (updated partial results)
# 4:00 - Early firing (updated partial results)
# 5:00 - On-time firing (watermark passed, final result)

Late Firing Pattern

Handle data arriving after the watermark has passed.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

def late_firing_pipeline():
    with beam.Pipeline() as pipeline:
        data = (pipeline
            | beam.io.ReadFromPubSub(topic='events')
            | beam.Map(parse_event)
            | beam.WindowInto(
                window.FixedWindows(300),
                trigger=trigger.AfterWatermark(
                    late=trigger.AfterProcessingTime(60)  # Late: every minute
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                allowed_lateness=600  # Allow 10 minutes of lateness
            )
            | beam.CombinePerKey(sum)
        )

        data | beam.Map(print)

# Timeline for a 5-minute window [0:00, 5:00):
# 5:00 - On-time firing (watermark passed)
# 6:00 - Late firing (late data arrived)
# 7:00 - Late firing (more late data)
# ...
# 15:00 - Window closes (allowed lateness expired)

Complete Early/Late Pattern

import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

def complete_early_late_pipeline():
    with beam.Pipeline() as pipeline:
        data = (pipeline
            | beam.io.ReadFromKafka(...)
            | beam.Map(parse_event)
            | beam.WindowInto(
                window.FixedWindows(300),  # 5-minute windows
                trigger=trigger.AfterWatermark(
                    early=trigger.AfterProcessingTime(60),  # Early: every minute
                    late=trigger.AfterCount(100)  # Late: every 100 elements
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                allowed_lateness=600  # 10 minutes
            )
            | beam.CombinePerKey(sum)
        )

        # Add metadata to track firing type
        tagged_data = data | beam.ParDo(AddFiringMetadataFn())

        tagged_data | beam.io.WriteToBigQuery(
            table='analytics.windowed_results'
        )

from apache_beam import DoFn, PaneInfo

class AddFiringMetadataFn(DoFn):
    def process(self, element, pane_info=DoFn.PaneInfoParam):
        key, value = element

        firing_type = 'unknown'
        if pane_info.is_first and not pane_info.is_last:
            firing_type = 'early'
        elif not pane_info.is_first and not pane_info.is_last:
            firing_type = 'late'
        elif pane_info.is_last:
            firing_type = 'final'
        elif pane_info.is_first and pane_info.is_last:
            firing_type = 'on_time'

        yield {
            'key': key,
            'value': value,
            'firing_type': firing_type,
            'is_first': pane_info.is_first,
            'is_last': pane_info.is_last,
            'index': pane_info.index
        }

Accumulation Modes

Accumulation modes determine how multiple firings of the same window are combined.

Discarding Mode

Each firing contains only new data since the last firing.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([
            (0, 1), (30, 2), (60, 3), (90, 4)
        ])
        | beam.Map(lambda x: window.TimestampedValue(x[1], x[0]))
        | beam.WindowInto(
            window.FixedWindows(120),
            trigger=trigger.Repeatedly(
                trigger.AfterProcessingTime(30)
            ),
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | beam.CombineGlobally(sum).without_defaults()
    )

    # Firings for window [0, 120):
    # 30s: sum([1]) = 1 (first element)
    # 60s: sum([2]) = 2 (only new element)
    # 90s: sum([3]) = 3 (only new element)
    # 120s: sum([4]) = 4 (only new element)

Accumulating Mode

Each firing contains all data seen so far for the window.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([
            (0, 1), (30, 2), (60, 3), (90, 4)
        ])
        | beam.Map(lambda x: window.TimestampedValue(x[1], x[0]))
        | beam.WindowInto(
            window.FixedWindows(120),
            trigger=trigger.Repeatedly(
                trigger.AfterProcessingTime(30)
            ),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | beam.CombineGlobally(sum).without_defaults()
    )

    # Firings for window [0, 120):
    # 30s: sum([1]) = 1
    # 60s: sum([1, 2]) = 3 (cumulative)
    # 90s: sum([1, 2, 3]) = 6 (cumulative)
    # 120s: sum([1, 2, 3, 4]) = 10 (cumulative)

Accumulating and Retracting

Similar to accumulating, but also emits retractions for previous firings.
# Note: Accumulating and retracting is primarily available in Java SDK
# Python SDK has limited support for retractions

Composite Triggers

Combine multiple triggers with logical operators.

AfterEach (Sequential)

Fire triggers in sequence.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(300),
            trigger=trigger.AfterEach(
                trigger.AfterCount(1000),  # First: fire after 1000 elements
                trigger.Repeatedly(
                    trigger.AfterProcessingTime(60)  # Then: every minute
                )
            ),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | beam.CombinePerKey(sum)
    )

AfterAll (Conjunction)

Fire when all triggers have fired.
# AfterAll is less commonly used and has limited Python support
# Typically use AfterWatermark with early/late firings instead

AfterAny (Disjunction)

Fire when any trigger fires.
# Python SDK: Use OrFinally for similar behavior
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromPubSub(topic='events')
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(300),
            trigger=trigger.OrFinally(
                trigger.AfterCount(1000),  # Fire after 1000 elements
                trigger.AfterProcessingTime(120)  # OR after 2 minutes
            ),
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | beam.CombinePerKey(sum)
    )

Allowed Lateness

Control how long to keep window state after the watermark passes.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.io.ReadFromKafka(...)
        | beam.Map(parse_event)
        | beam.WindowInto(
            window.FixedWindows(300),
            trigger=trigger.AfterWatermark(
                late=trigger.AfterCount(1)  # Fire on each late element
            ),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
            allowed_lateness=600  # Keep state for 10 minutes after watermark
        )
        | beam.CombinePerKey(sum)
    )

# Window [0:00, 5:00):
# 5:00 - Watermark passes, on-time firing
# 5:01 - Late element arrives, late firing
# ...
# 14:59 - Late element arrives, late firing
# 15:00 - allowed_lateness expires, drop state
# 15:01 - Any late elements after this are dropped

Real-World Use Cases

Low-Latency Dashboard

Provide quick updates with early firings, refine with late data.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

def low_latency_dashboard():
    with beam.Pipeline() as pipeline:
        events = (pipeline
            | "Read Events" >> beam.io.ReadFromKafka(
                consumer_config={'bootstrap.servers': 'localhost:9092'},
                topics=['user-events']
            )
            | "Parse" >> beam.Map(parse_event)
        )

        # Real-time metrics with early/late firings
        metrics = (events
            | "1-Min Windows" >> beam.WindowInto(
                window.FixedWindows(60),
                trigger=trigger.AfterWatermark(
                    early=trigger.AfterProcessingTime(10),  # Update every 10s
                    late=trigger.AfterCount(50)  # Update every 50 late elements
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                allowed_lateness=300  # 5 minutes
            )
            | "Count Events" >> beam.combiners.Count.PerKey()
            | "Add Metadata" >> beam.ParDo(AddDashboardMetadataFn())
        )

        # Write to real-time dashboard
        metrics | "Write to Dashboard" >> beam.io.WriteToPubSub(
            topic='dashboard-metrics'
        )

class AddDashboardMetadataFn(beam.DoFn):
    def process(self, element, pane_info=beam.DoFn.PaneInfoParam,
                window=beam.DoFn.WindowParam):
        key, count = element

        # Determine update type
        update_type = 'final'
        if pane_info.is_first and not pane_info.is_last:
            update_type = 'preliminary'
        elif not pane_info.is_first and not pane_info.is_last:
            update_type = 'updated'

        yield {
            'metric_key': key,
            'count': count,
            'update_type': update_type,
            'window_start': window.start.to_utc_datetime().isoformat(),
            'window_end': window.end.to_utc_datetime().isoformat(),
            'firing_index': pane_info.index,
            'is_final': pane_info.is_last
        }

Session Analysis with Late Data

Handle user sessions with realistic late-arrival handling.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

def session_analysis_with_late_data():
    with beam.Pipeline() as pipeline:
        clicks = (pipeline
            | "Read Clicks" >> beam.io.ReadFromPubSub(topic='clickstream')
            | "Parse" >> beam.Map(parse_click)
        )

        # Session windows with late data handling
        sessions = (clicks
            | "Session Windows" >> beam.WindowInto(
                window.Sessions(gap_size=600),  # 10-minute gap
                trigger=trigger.AfterWatermark(
                    early=trigger.AfterProcessingTime(120),  # Updates every 2 min
                    late=trigger.AfterCount(1)  # Fire on each late click
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                allowed_lateness=1800  # 30 minutes
            )
            | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
            | "Group Sessions" >> beam.GroupByKey()
            | "Analyze" >> beam.ParDo(SessionAnalysisFn())
        )

        # Separate early/on-time/late results
        early_sessions = sessions | "Filter Early" >> beam.Filter(
            lambda x: x['firing_type'] == 'early'
        )
        final_sessions = sessions | "Filter Final" >> beam.Filter(
            lambda x: x['firing_type'] in ['on_time', 'late']
        )

        # Early results for real-time monitoring
        early_sessions | "Write Early" >> beam.io.WriteToPubSub(
            topic='session-updates'
        )

        # Final results for analytics
        final_sessions | "Write Final" >> beam.io.WriteToBigQuery(
            table='analytics.user_sessions'
        )

class SessionAnalysisFn(beam.DoFn):
    def process(self, element, pane_info=beam.DoFn.PaneInfoParam):
        user_id, clicks = element
        clicks_list = list(clicks)

        firing_type = 'on_time'
        if pane_info.is_first and not pane_info.is_last:
            firing_type = 'early'
        elif not pane_info.is_first:
            firing_type = 'late'

        yield {
            'user_id': user_id,
            'click_count': len(clicks_list),
            'unique_pages': len(set(c['page'] for c in clicks_list)),
            'session_duration': (
                max(c['timestamp'] for c in clicks_list) -
                min(c['timestamp'] for c in clicks_list)
            ),
            'firing_type': firing_type,
            'firing_index': pane_info.index
        }

Trading Analytics

High-frequency updates with precise control.
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms import trigger

def trading_analytics():
    with beam.Pipeline() as pipeline:
        trades = (pipeline
            | "Read Trades" >> beam.io.ReadFromKafka(
                consumer_config={'bootstrap.servers': 'localhost:9092'},
                topics=['market-trades']
            )
            | "Parse" >> beam.Map(parse_trade)
        )

        # 1-second windows with sub-second updates
        second_stats = (trades
            | "1-Sec Windows" >> beam.WindowInto(
                window.FixedWindows(1),  # 1-second windows
                trigger=trigger.AfterWatermark(
                    early=trigger.AfterProcessingTime(0.1)  # Every 100ms
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                allowed_lateness=5  # 5 seconds
            )
            | "Key by Symbol" >> beam.Map(lambda x: (x['symbol'], x['price']))
            | "Calculate VWAP" >> beam.CombinePerKey(VWAPCombineFn())
        )

        # 1-minute windows for aggregated stats
        minute_stats = (trades
            | "1-Min Windows" >> beam.WindowInto(
                window.FixedWindows(60),
                trigger=trigger.AfterWatermark(
                    early=trigger.AfterProcessingTime(5)  # Every 5 seconds
                ),
                accumulation_mode=trigger.AccumulationMode.ACCUMULATING
            )
            | "Key by Symbol (Min)" >> beam.Map(
                lambda x: (x['symbol'], x)
            )
            | "Aggregate Minute" >> beam.CombinePerKey(
                MinuteStatsCombineFn()
            )
        )

        # Real-time updates
        second_stats | "Publish Second Stats" >> beam.io.WriteToPubSub(
            topic='market-stats-1s'
        )

        # Persistent storage
        minute_stats | "Write Minute Stats" >> beam.io.WriteToBigQuery(
            table='market.minute_stats'
        )

from apache_beam.transforms import CombineFn

class VWAPCombineFn(CombineFn):
    """Volume-Weighted Average Price."""

    def create_accumulator(self):
        return {'total_value': 0.0, 'total_volume': 0}

    def add_input(self, accumulator, element):
        price = element
        # Assuming unit volume for simplicity
        accumulator['total_value'] += price
        accumulator['total_volume'] += 1
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = self.create_accumulator()
        for acc in accumulators:
            merged['total_value'] += acc['total_value']
            merged['total_volume'] += acc['total_volume']
        return merged

    def extract_output(self, accumulator):
        if accumulator['total_volume'] == 0:
            return 0.0
        return accumulator['total_value'] / accumulator['total_volume']

class MinuteStatsCombineFn(CombineFn):
    def create_accumulator(self):
        return {
            'trades': [],
            'volume': 0,
            'total_value': 0.0
        }

    def add_input(self, accumulator, trade):
        accumulator['trades'].append(trade['price'])
        accumulator['volume'] += trade.get('volume', 1)
        accumulator['total_value'] += trade['price'] * trade.get('volume', 1)
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = self.create_accumulator()
        for acc in accumulators:
            merged['trades'].extend(acc['trades'])
            merged['volume'] += acc['volume']
            merged['total_value'] += acc['total_value']
        return merged

    def extract_output(self, accumulator):
        if not accumulator['trades']:
            return None

        prices = sorted(accumulator['trades'])
        return {
            'open': accumulator['trades'][0],
            'high': max(prices),
            'low': min(prices),
            'close': accumulator['trades'][-1],
            'volume': accumulator['volume'],
            'vwap': accumulator['total_value'] / accumulator['volume']
        }

Best Practices

Choosing Trigger Strategies

Use Default Trigger When:
  • Latency is not critical
  • You can wait for watermark
  • Data completeness is more important than speed
Use Early Firings When:
  • Low latency is required
  • Users expect real-time updates
  • Approximate results are acceptable
Use Late Firings When:
  • Data frequently arrives late
  • You need to update results
  • Completeness is critical

Balancing Latency and Completeness

# Low latency (more frequent updates, less complete)
low_latency = beam.WindowInto(
    window.FixedWindows(60),
    trigger=trigger.AfterWatermark(
        early=trigger.AfterProcessingTime(5)  # Very frequent
    ),
    accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)

# Balanced (moderate latency, good completeness)
balanced = beam.WindowInto(
    window.FixedWindows(300),
    trigger=trigger.AfterWatermark(
        early=trigger.AfterProcessingTime(60),
        late=trigger.AfterCount(100)
    ),
    accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
    allowed_lateness=600
)

# High completeness (wait for watermark, handle late data)
high_completeness = beam.WindowInto(
    window.FixedWindows(3600),
    trigger=trigger.AfterWatermark(
        late=trigger.AfterCount(1)
    ),
    accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
    allowed_lateness=3600
)

Resource Management

# Careful with allowed lateness - impacts state retention
# Too long: High memory usage
too_long = beam.WindowInto(
    window.FixedWindows(60),
    allowed_lateness=86400  # 24 hours - expensive!
)

# Too short: May lose important late data
too_short = beam.WindowInto(
    window.FixedWindows(60),
    allowed_lateness=0  # No late data handling
)

# Reasonable: Balance completeness and resources
reasonable = beam.WindowInto(
    window.FixedWindows(60),
    allowed_lateness=300  # 5 minutes
)

Summary

In this module, you learned:
  • Watermark semantics and behavior
  • Trigger types: AfterWatermark, AfterProcessingTime, AfterCount
  • Early, on-time, and late firings
  • Accumulation modes: discarding vs accumulating
  • Composite triggers for complex timing logic
  • Allowed lateness for state management
  • Real-world trigger strategies

Key Takeaways

  1. Triggers control when to emit results for a window
  2. Watermarks track event time progress
  3. Early firings reduce latency, late firings improve completeness
  4. Accumulation mode determines how multiple firings combine
  5. Allowed lateness balances completeness and resource usage
  6. Choose trigger strategy based on latency/completeness requirements

Next Steps

Continue to the next module on State & Timers to learn how to maintain per-key state and implement complex stateful processing patterns.

Additional Resources