Skip to main content

Windowing & Time

Module Duration: 3-4 hours Focus: Time-based data partitioning and processing Prerequisites: Core Programming Model, basic streaming concepts

Overview

Windowing in Apache Beam allows you to partition unbounded (and bounded) data into logical windows for aggregation. This is essential for streaming analytics where you need to group events by time periods.

Key Concepts

  • Window: A logical grouping of data based on time or other criteria
  • Event Time: When the event actually occurred (embedded in data)
  • Processing Time: When the event is processed by the pipeline
  • Watermark: Estimate of event time progress
  • Window Assignment: Determining which window(s) each element belongs to

Understanding Time in Beam

Beam’s time model distinguishes between different notions of time for robust stream processing.

Event Time vs Processing Time

Event Time: The timestamp when the event actually occurred, embedded in the data itself.
  • More accurate for business logic
  • Accounts for late-arriving data
  • Requires watermark tracking
Processing Time: The timestamp when the pipeline processes the element.
  • Simpler to implement
  • No late data handling needed
  • Less accurate for business analysis
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.window import TimestampedValue
from datetime import datetime

# Setting event time on elements
with beam.Pipeline() as pipeline:
    # Elements with event time
    timestamped_data = (pipeline
        | "Create" >> beam.Create([
            ('event1', datetime(2024, 1, 1, 10, 0, 0)),
            ('event2', datetime(2024, 1, 1, 10, 0, 5)),
            ('event3', datetime(2024, 1, 1, 10, 1, 0)),
        ])
        | "Add Timestamps" >> beam.Map(
            lambda x: TimestampedValue(x[0], x[1].timestamp())
        )
    )

    # Using processing time (default)
    processing_time_data = (pipeline
        | beam.Create(['a', 'b', 'c'])
        # Automatically uses processing time
    )

Watermarks

Watermarks are Beam’s mechanism for tracking progress in event time. A watermark is a guess that no more data with timestamps less than the watermark will arrive. Properties:
  • Monotonically increasing
  • Heuristic-based (not perfect)
  • Allows pipeline to make progress while handling late data
Impact:
  • Early watermark: May drop late data
  • Late watermark: May delay results

Window Types

Beam provides several built-in windowing strategies.

Fixed Windows

Divides time into fixed-size, non-overlapping intervals. Use Cases:
  • Hourly/daily reports
  • Regular interval aggregations
  • Time-series bucketing
import apache_beam as beam
from apache_beam import window

def process_fixed_windows():
    with beam.Pipeline() as pipeline:
        data = (pipeline
            | "Read Stream" >> beam.io.ReadFromPubSub(topic='events')
            | "Parse JSON" >> beam.Map(lambda x: json.loads(x))
            | "Extract Timestamp" >> beam.Map(
                lambda x: window.TimestampedValue(
                    x,
                    x['timestamp']
                )
            )
        )

        # Fixed windows of 1 minute
        windowed_data = (data
            | "1-Minute Windows" >> beam.WindowInto(
                window.FixedWindows(60)  # 60 seconds
            )
        )

        # Aggregate per window
        counts = (windowed_data
            | "Extract Key" >> beam.Map(lambda x: (x['user_id'], 1))
            | "Count per Window" >> beam.CombinePerKey(sum)
        )

        counts | "Print" >> beam.Map(print)

# Example with multiple window sizes
def multi_granularity_windows():
    with beam.Pipeline() as pipeline:
        events = (pipeline
            | beam.io.ReadFromKafka(...)
            | beam.Map(parse_event)
        )

        # 1-minute windows for real-time monitoring
        minute_metrics = (events
            | "1-Min Window" >> beam.WindowInto(window.FixedWindows(60))
            | "Minute Aggregation" >> beam.CombinePerKey(sum)
        )

        # 1-hour windows for hourly reports
        hourly_metrics = (events
            | "1-Hour Window" >> beam.WindowInto(window.FixedWindows(3600))
            | "Hourly Aggregation" >> beam.CombinePerKey(sum)
        )

        # 1-day windows for daily reports
        daily_metrics = (events
            | "1-Day Window" >> beam.WindowInto(window.FixedWindows(86400))
            | "Daily Aggregation" >> beam.CombinePerKey(sum)
        )

Sliding Windows

Creates overlapping windows that slide by a specified period. Use Cases:
  • Moving averages
  • Rolling statistics
  • Overlapping time period analysis
Parameters:
  • Window size: Length of each window
  • Slide period: How often a new window starts
import apache_beam as beam
from apache_beam import window

def sliding_window_example():
    with beam.Pipeline() as pipeline:
        data = (pipeline
            | "Create Data" >> beam.Create([
                (1, 100), (2, 200), (3, 300), (4, 400), (5, 500)
            ])
            | "Add Timestamps" >> beam.Map(
                lambda x: window.TimestampedValue(x, x[0])
            )
        )

        # Sliding window: 30-second window, 10-second slide
        # Creates windows: [0-30], [10-40], [20-50], etc.
        sliding = (data
            | "Sliding Windows" >> beam.WindowInto(
                window.SlidingWindows(
                    size=30,    # 30-second windows
                    period=10   # New window every 10 seconds
                )
            )
            | "Extract Value" >> beam.Map(lambda x: ('key', x[1]))
            | "Calculate Average" >> beam.CombinePerKey(
                beam.combiners.MeanCombineFn()
            )
        )

        sliding | beam.Map(print)

# Moving average example
def calculate_moving_average():
    with beam.Pipeline() as pipeline:
        stock_prices = (pipeline
            | beam.io.ReadFromPubSub(topic='stock-prices')
            | beam.Map(parse_stock_data)
        )

        # 5-minute moving average, updated every minute
        moving_avg = (stock_prices
            | "5-Min Moving Window" >> beam.WindowInto(
                window.SlidingWindows(
                    size=300,   # 5 minutes
                    period=60   # Slide every minute
                )
            )
            | "Key by Stock" >> beam.Map(lambda x: (x['symbol'], x['price']))
            | "Average Price" >> beam.CombinePerKey(
                beam.combiners.MeanCombineFn()
            )
        )

        moving_avg | "Write to BigQuery" >> beam.io.WriteToBigQuery(...)

Session Windows

Groups elements into sessions based on gaps in activity. Use Cases:
  • User session analysis
  • Activity bursts detection
  • Click stream analysis
Parameters:
  • Gap duration: Minimum gap between sessions
import apache_beam as beam
from apache_beam import window

def session_window_example():
    with beam.Pipeline() as pipeline:
        # User click events
        clicks = (pipeline
            | "Read Clicks" >> beam.io.ReadFromPubSub(topic='user-clicks')
            | "Parse" >> beam.Map(parse_click_event)
        )

        # Session window with 10-minute gap
        # If no activity for 10 minutes, start new session
        sessions = (clicks
            | "Session Windows" >> beam.WindowInto(
                window.Sessions(10 * 60)  # 10-minute gap
            )
            | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
            | "Collect Session Events" >> beam.GroupByKey()
        )

        # Analyze each session
        session_stats = (sessions
            | "Analyze Session" >> beam.MapTuple(analyze_user_session)
        )

        session_stats | "Write Results" >> beam.io.WriteToText('sessions.txt')

def analyze_user_session(user_id, events):
    """Analyze a single user session."""
    events_list = list(events)
    return {
        'user_id': user_id,
        'session_length': len(events_list),
        'duration': calculate_duration(events_list),
        'pages_visited': [e['page'] for e in events_list],
        'conversion': any(e['event_type'] == 'purchase' for e in events_list)
    }

# E-commerce session analysis
def ecommerce_sessions():
    with beam.Pipeline() as pipeline:
        events = (pipeline
            | beam.io.ReadFromKafka(...)
            | beam.Map(parse_event)
        )

        # 30-minute session gap
        user_sessions = (events
            | "30-Min Sessions" >> beam.WindowInto(window.Sessions(30 * 60))
            | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
            | "Group by Session" >> beam.GroupByKey()
        )

        # Calculate session metrics
        metrics = (user_sessions
            | "Session Metrics" >> beam.Map(lambda kv: {
                'user_id': kv[0],
                'events': list(kv[1]),
                'revenue': sum(e.get('amount', 0) for e in kv[1]),
                'product_views': sum(1 for e in kv[1] if e['type'] == 'view'),
                'purchases': sum(1 for e in kv[1] if e['type'] == 'purchase')
            })
        )

        metrics | beam.io.WriteToBigQuery(...)

Global Windows

Default window that spans all time. Useful for batch processing or when you don’t need time-based grouping.
import apache_beam as beam
from apache_beam import window

# Global window (default for batch)
with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([1, 2, 3, 4, 5])
        # Implicitly uses GlobalWindow
        | beam.CombineGlobally(sum)
    )

# Explicit global window
with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([1, 2, 3, 4, 5])
        | beam.WindowInto(window.GlobalWindows())
        | beam.CombineGlobally(sum)
    )

Custom Windowing

You can create custom window functions for specialized use cases.
import apache_beam as beam
from apache_beam.transforms.window import WindowFn, IntervalWindow
from apache_beam.utils.windowed_value import WindowedValue

class CustomDayOfWeekWindow(WindowFn):
    """Windows data by day of week."""

    def assign(self, context):
        from datetime import datetime
        timestamp = context.timestamp
        dt = datetime.fromtimestamp(timestamp)

        # Create window for the day of week
        day_of_week = dt.weekday()  # 0=Monday, 6=Sunday

        # Create window spanning that entire day
        start = timestamp - (timestamp % 86400)  # Start of day
        end = start + 86400  # End of day

        return [IntervalWindow(start, end)]

    def merge(self, merge_context):
        # No merging for day-of-week windows
        pass

    def get_window_coder(self):
        from apache_beam.coders import IntervalWindowCoder
        return IntervalWindowCoder()

# Usage
with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create(events)
        | beam.WindowInto(CustomDayOfWeekWindow())
        | beam.CombinePerKey(sum)
    )

Window-Aware Transforms

Some transforms behave differently based on windowing.

GroupByKey with Windows

import apache_beam as beam
from apache_beam import window

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([
            ('A', 1, 0), ('A', 2, 5), ('A', 3, 65),
            ('B', 4, 10), ('B', 5, 70)
        ])
        | beam.Map(lambda x: window.TimestampedValue((x[0], x[1]), x[2]))
    )

    # Fixed windows of 60 seconds
    windowed = (data
        | "60-Second Windows" >> beam.WindowInto(window.FixedWindows(60))
        | "Group by Key" >> beam.GroupByKey()
    )

    # Results grouped per window:
    # Window [0, 60): ('A', [1, 2]), ('B', [4])
    # Window [60, 120): ('A', [3]), ('B', [5])

    windowed | beam.Map(print)

Combine with Windows

import apache_beam as beam
from apache_beam import window

with beam.Pipeline() as pipeline:
    sensor_data = (pipeline
        | beam.io.ReadFromPubSub(topic='sensors')
        | beam.Map(parse_sensor_data)
    )

    # Average sensor values per 5-minute window
    avg_per_window = (sensor_data
        | "5-Min Windows" >> beam.WindowInto(window.FixedWindows(300))
        | "Key by Sensor" >> beam.Map(lambda x: (x['sensor_id'], x['value']))
        | "Average per Window" >> beam.CombinePerKey(
            beam.combiners.MeanCombineFn()
        )
    )

    avg_per_window | beam.io.WriteToBigQuery(...)

Working with Window Metadata

Access window information in your transforms.
import apache_beam as beam
from apache_beam import window, DoFn

class AddWindowInfoFn(DoFn):
    def process(self, element, window=DoFn.WindowParam):
        """Add window information to each element."""
        yield {
            'element': element,
            'window_start': window.start.to_utc_datetime(),
            'window_end': window.end.to_utc_datetime(),
            'window_max_timestamp': window.max_timestamp()
        }

with beam.Pipeline() as pipeline:
    data = (pipeline
        | beam.Create([(i, i * 10) for i in range(100)])
        | beam.Map(lambda x: window.TimestampedValue(x, x[0]))
        | beam.WindowInto(window.FixedWindows(10))
        | beam.ParDo(AddWindowInfoFn())
    )

    data | beam.Map(print)

# Another example: Window-aware aggregation
class WindowAwareAggregationFn(DoFn):
    def process(self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):
        key, values = element
        values_list = list(values)

        yield {
            'key': key,
            'count': len(values_list),
            'window': {
                'start': window.start.to_utc_datetime().isoformat(),
                'end': window.end.to_utc_datetime().isoformat()
            },
            'processing_timestamp': timestamp.to_utc_datetime().isoformat(),
            'values': values_list
        }

Real-World Use Cases

Real-Time Analytics Dashboard

import apache_beam as beam
from apache_beam import window
from datetime import datetime

def realtime_analytics_pipeline():
    with beam.Pipeline() as pipeline:
        # Read events from Kafka
        events = (pipeline
            | "Read Kafka" >> beam.io.ReadFromKafka(
                consumer_config={'bootstrap.servers': 'localhost:9092'},
                topics=['user-events']
            )
            | "Parse Events" >> beam.Map(parse_event)
        )

        # 1-minute windows for real-time metrics
        minute_windows = events | "1-Min Windows" >> beam.WindowInto(
            window.FixedWindows(60)
        )

        # Page views per minute
        page_views = (minute_windows
            | "Extract Page Views" >> beam.Filter(lambda x: x['event_type'] == 'page_view')
            | "Count Views" >> beam.combiners.Count.Globally()
            | "Format Views" >> beam.Map(lambda count: {
                'metric': 'page_views_per_minute',
                'value': count,
                'timestamp': datetime.utcnow().isoformat()
            })
        )

        # Unique users per minute
        unique_users = (minute_windows
            | "Extract User IDs" >> beam.Map(lambda x: x['user_id'])
            | "Deduplicate Users" >> beam.Distinct()
            | "Count Unique" >> beam.combiners.Count.Globally()
            | "Format Users" >> beam.Map(lambda count: {
                'metric': 'unique_users_per_minute',
                'value': count,
                'timestamp': datetime.utcnow().isoformat()
            })
        )

        # Average session duration (5-minute sliding window)
        session_duration = (events
            | "5-Min Sliding" >> beam.WindowInto(
                window.SlidingWindows(size=300, period=60)
            )
            | "Calculate Duration" >> beam.Map(calculate_session_duration)
            | "Average Duration" >> beam.CombineGlobally(
                beam.combiners.MeanCombineFn()
            ).without_defaults()
        )

        # Write metrics to different sinks
        page_views | "Write Views to Pub/Sub" >> beam.io.WriteToPubSub(
            topic='metrics-pageviews'
        )
        unique_users | "Write Users to Pub/Sub" >> beam.io.WriteToPubSub(
            topic='metrics-users'
        )
        session_duration | "Write Duration to BigQuery" >> beam.io.WriteToBigQuery(
            table='analytics.session_duration',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )

def parse_event(kafka_record):
    import json
    return json.loads(kafka_record[1].decode('utf-8'))

def calculate_session_duration(event):
    # Extract session duration from event
    return event.get('session_duration', 0)

Click Stream Analysis

import apache_beam as beam
from apache_beam import window

def clickstream_analysis():
    with beam.Pipeline() as pipeline:
        clicks = (pipeline
            | "Read Clicks" >> beam.io.ReadFromPubSub(topic='clickstream')
            | "Parse Clicks" >> beam.Map(parse_click)
        )

        # User sessions (30-minute gap)
        sessions = (clicks
            | "Session Windows" >> beam.WindowInto(window.Sessions(30 * 60))
            | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
            | "Group Sessions" >> beam.GroupByKey()
        )

        # Session analysis
        session_metrics = (sessions
            | "Analyze Sessions" >> beam.Map(analyze_session)
        )

        # Conversion funnel per hour
        hourly_funnel = (clicks
            | "Hourly Windows" >> beam.WindowInto(window.FixedWindows(3600))
            | "Calculate Funnel" >> beam.CombineGlobally(FunnelCombineFn())
        )

        # Popular pages (5-minute sliding window)
        popular_pages = (clicks
            | "5-Min Sliding" >> beam.WindowInto(
                window.SlidingWindows(size=300, period=60)
            )
            | "Extract Page" >> beam.Map(lambda x: (x['page_url'], 1))
            | "Count Pages" >> beam.CombinePerKey(sum)
            | "Top 10 Pages" >> beam.combiners.Top.PerKey(10)
        )

        # Write results
        session_metrics | "Write Sessions" >> beam.io.WriteToBigQuery(
            table='analytics.user_sessions'
        )
        hourly_funnel | "Write Funnel" >> beam.io.WriteToBigQuery(
            table='analytics.conversion_funnel'
        )
        popular_pages | "Write Popular Pages" >> beam.io.WriteToPubSub(
            topic='popular-pages'
        )

def analyze_session(user_session):
    user_id, clicks = user_session
    clicks_list = list(clicks)

    return {
        'user_id': user_id,
        'click_count': len(clicks_list),
        'pages_visited': len(set(c['page_url'] for c in clicks_list)),
        'duration_seconds': max(c['timestamp'] for c in clicks_list) -
                          min(c['timestamp'] for c in clicks_list),
        'bounced': len(clicks_list) == 1,
        'converted': any(c.get('conversion') for c in clicks_list)
    }

from apache_beam.transforms import CombineFn

class FunnelCombineFn(CombineFn):
    """Calculate conversion funnel metrics."""

    def create_accumulator(self):
        return {
            'landing': 0,
            'product_view': 0,
            'add_to_cart': 0,
            'checkout': 0,
            'purchase': 0
        }

    def add_input(self, accumulator, click):
        event_type = click.get('event_type')
        if event_type in accumulator:
            accumulator[event_type] += 1
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = self.create_accumulator()
        for acc in accumulators:
            for key in merged:
                merged[key] += acc.get(key, 0)
        return merged

    def extract_output(self, accumulator):
        return {
            'landing_pages': accumulator['landing'],
            'product_views': accumulator['product_view'],
            'add_to_cart': accumulator['add_to_cart'],
            'checkouts': accumulator['checkout'],
            'purchases': accumulator['purchase'],
            'landing_to_purchase_rate': (
                accumulator['purchase'] / accumulator['landing']
                if accumulator['landing'] > 0 else 0
            )
        }

def parse_click(pubsub_message):
    import json
    return json.loads(pubsub_message.decode('utf-8'))

IoT Sensor Monitoring

import apache_beam as beam
from apache_beam import window, DoFn

def iot_monitoring_pipeline():
    with beam.Pipeline() as pipeline:
        sensor_data = (pipeline
            | "Read Sensors" >> beam.io.ReadFromPubSub(topic='iot-sensors')
            | "Parse Sensor Data" >> beam.Map(parse_sensor_reading)
        )

        # 1-minute windows for anomaly detection
        minute_windows = (sensor_data
            | "1-Min Windows" >> beam.WindowInto(window.FixedWindows(60))
        )

        # Detect anomalies
        anomalies = (minute_windows
            | "Key by Sensor" >> beam.Map(lambda x: (x['sensor_id'], x))
            | "Group by Sensor" >> beam.GroupByKey()
            | "Detect Anomalies" >> beam.ParDo(AnomalyDetectionFn())
        )

        # Calculate statistics (5-minute sliding window)
        stats = (sensor_data
            | "5-Min Sliding" >> beam.WindowInto(
                window.SlidingWindows(size=300, period=60)
            )
            | "Key by Sensor and Type" >> beam.Map(
                lambda x: ((x['sensor_id'], x['metric_type']), x['value'])
            )
            | "Calculate Stats" >> beam.CombinePerKey(StatisticsCombineFn())
        )

        # Alert on high values (30-second windows)
        alerts = (sensor_data
            | "30-Sec Windows" >> beam.WindowInto(window.FixedWindows(30))
            | "Filter High Values" >> beam.Filter(lambda x: x['value'] > x['threshold'])
            | "Format Alerts" >> beam.Map(format_alert)
        )

        # Write outputs
        anomalies | "Write Anomalies" >> beam.io.WriteToPubSub(
            topic='sensor-anomalies'
        )
        stats | "Write Stats" >> beam.io.WriteToBigQuery(
            table='iot.sensor_statistics'
        )
        alerts | "Write Alerts" >> beam.io.WriteToPubSub(
            topic='sensor-alerts'
        )

class AnomalyDetectionFn(DoFn):
    def process(self, element):
        sensor_id, readings = element
        readings_list = list(readings)

        if len(readings_list) < 10:
            return  # Not enough data

        values = [r['value'] for r in readings_list]
        mean = sum(values) / len(values)
        variance = sum((x - mean) ** 2 for x in values) / len(values)
        std_dev = variance ** 0.5

        # Detect values outside 3 standard deviations
        for reading in readings_list:
            if abs(reading['value'] - mean) > 3 * std_dev:
                yield {
                    'sensor_id': sensor_id,
                    'timestamp': reading['timestamp'],
                    'value': reading['value'],
                    'mean': mean,
                    'std_dev': std_dev,
                    'anomaly_type': 'outlier'
                }

class StatisticsCombineFn(CombineFn):
    def create_accumulator(self):
        return []

    def add_input(self, accumulator, input_value):
        accumulator.append(input_value)
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = []
        for acc in accumulators:
            merged.extend(acc)
        return merged

    def extract_output(self, accumulator):
        if not accumulator:
            return None

        sorted_values = sorted(accumulator)
        n = len(sorted_values)

        return {
            'count': n,
            'min': sorted_values[0],
            'max': sorted_values[-1],
            'mean': sum(sorted_values) / n,
            'median': sorted_values[n // 2],
            'p95': sorted_values[int(n * 0.95)],
            'p99': sorted_values[int(n * 0.99)]
        }

def parse_sensor_reading(pubsub_message):
    import json
    data = json.loads(pubsub_message.decode('utf-8'))
    return data

def format_alert(reading):
    return {
        'alert_type': 'high_value',
        'sensor_id': reading['sensor_id'],
        'value': reading['value'],
        'threshold': reading['threshold'],
        'timestamp': reading['timestamp']
    }

Best Practices

Choosing the Right Window

Fixed Windows:
  • Regular reporting intervals
  • Time-series data bucketing
  • When you need non-overlapping periods
Sliding Windows:
  • Moving averages
  • Trend analysis
  • When you need overlapping perspectives
Session Windows:
  • User behavior analysis
  • Activity burst detection
  • Variable-length time periods
Global Windows:
  • Batch processing
  • When time doesn’t matter
  • Simple aggregations

Window Size Considerations

# Too small: High overhead, many small windows
small_windows = data | beam.WindowInto(window.FixedWindows(1))  # 1 second

# Too large: High latency, delayed results
large_windows = data | beam.WindowInto(window.FixedWindows(86400))  # 1 day

# Good balance for real-time: 1-5 minutes
balanced_windows = data | beam.WindowInto(window.FixedWindows(60))  # 1 minute

Memory Management

# Be careful with GroupByKey in large windows
# This can cause memory issues if windows are too large
problematic = (data
    | beam.WindowInto(window.FixedWindows(86400))  # 1 day
    | beam.GroupByKey()  # May accumulate too much data
)

# Better: Use CombinePerKey for aggregations
better = (data
    | beam.WindowInto(window.FixedWindows(86400))
    | beam.CombinePerKey(sum)  # Incrementally combines
)

Summary

In this module, you learned:
  • Event time vs processing time concepts
  • Watermarks and their role in streaming
  • Fixed, sliding, session, and global windows
  • Custom windowing functions
  • Window-aware transforms
  • Real-world windowing use cases
  • Best practices for window selection

Key Takeaways

  1. Choose window type based on your use case
  2. Event time provides more accurate business logic
  3. Watermarks enable progress while handling late data
  4. Use CombinePerKey instead of GroupByKey for large windows
  5. Balance window size between latency and overhead
  6. Session windows are ideal for user behavior analysis

Next Steps

Continue to the next module on Triggers & Watermarks to learn how to control when results are materialized and how to handle late-arriving data effectively.

Additional Resources