> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Windowing & Time

> Time-based processing strategies

# Windowing & Time

<Info>
  **Module Duration**: 3-4 hours
  **Focus**: Time-based data partitioning and processing
  **Prerequisites**: Core Programming Model, basic streaming concepts
</Info>

## Overview

Windowing in Apache Beam allows you to partition unbounded (and bounded) data into logical windows for aggregation. This is essential for streaming analytics where you need to group events by time periods.

### Key Concepts

* **Window**: A logical grouping of data based on time or other criteria
* **Event Time**: When the event actually occurred (embedded in data)
* **Processing Time**: When the event is processed by the pipeline
* **Watermark**: Estimate of event time progress
* **Window Assignment**: Determining which window(s) each element belongs to

***

## Understanding Time in Beam

Beam's time model distinguishes between different notions of time for robust stream processing.

### Event Time vs Processing Time

**Event Time**: The timestamp when the event actually occurred, embedded in the data itself.

* More accurate for business logic
* Accounts for late-arriving data
* Requires watermark tracking

**Processing Time**: The timestamp when the pipeline processes the element.

* Simpler to implement
* No late data handling needed
* Less accurate for business analysis

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window
  from apache_beam.transforms.window import TimestampedValue
  from datetime import datetime

  # Setting event time on elements
  with beam.Pipeline() as pipeline:
      # Elements with event time
      timestamped_data = (pipeline
          | "Create" >> beam.Create([
              ('event1', datetime(2024, 1, 1, 10, 0, 0)),
              ('event2', datetime(2024, 1, 1, 10, 0, 5)),
              ('event3', datetime(2024, 1, 1, 10, 1, 0)),
          ])
          | "Add Timestamps" >> beam.Map(
              lambda x: TimestampedValue(x[0], x[1].timestamp())
          )
      )

      # Using processing time (default)
      processing_time_data = (pipeline
          | beam.Create(['a', 'b', 'c'])
          # Automatically uses processing time
      )
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.DoFn;
  import org.apache.beam.sdk.values.KV;
  import org.joda.time.Instant;

  // Custom DoFn to set event time
  static class SetEventTimeFn extends DoFn<KV<String, Long>, KV<String, Long>> {
      @ProcessElement
      public void processElement(@Element KV<String, Long> element,
                                 OutputReceiver<KV<String, Long>> out) {
          // Extract timestamp from element
          Long eventTimestamp = element.getValue();
          Instant eventTime = new Instant(eventTimestamp);

          // Output with event time
          out.outputWithTimestamp(element, eventTime);
      }
  }

  PCollection<KV<String, Long>> timestamped = data.apply(
      "Set Event Time",
      ParDo.of(new SetEventTimeFn())
  );
  ```
</CodeGroup>

### Watermarks

Watermarks are Beam's mechanism for tracking progress in event time. A watermark is a guess that no more data with timestamps less than the watermark will arrive.

**Properties**:

* Monotonically increasing
* Heuristic-based (not perfect)
* Allows pipeline to make progress while handling late data

**Impact**:

* Early watermark: May drop late data
* Late watermark: May delay results

***

## Window Types

Beam provides several built-in windowing strategies.

### Fixed Windows

Divides time into fixed-size, non-overlapping intervals.

**Use Cases**:

* Hourly/daily reports
* Regular interval aggregations
* Time-series bucketing

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  def process_fixed_windows():
      with beam.Pipeline() as pipeline:
          data = (pipeline
              | "Read Stream" >> beam.io.ReadFromPubSub(topic='events')
              | "Parse JSON" >> beam.Map(lambda x: json.loads(x))
              | "Extract Timestamp" >> beam.Map(
                  lambda x: window.TimestampedValue(
                      x,
                      x['timestamp']
                  )
              )
          )

          # Fixed windows of 1 minute
          windowed_data = (data
              | "1-Minute Windows" >> beam.WindowInto(
                  window.FixedWindows(60)  # 60 seconds
              )
          )

          # Aggregate per window
          counts = (windowed_data
              | "Extract Key" >> beam.Map(lambda x: (x['user_id'], 1))
              | "Count per Window" >> beam.CombinePerKey(sum)
          )

          counts | "Print" >> beam.Map(print)

  # Example with multiple window sizes
  def multi_granularity_windows():
      with beam.Pipeline() as pipeline:
          events = (pipeline
              | beam.io.ReadFromKafka(...)
              | beam.Map(parse_event)
          )

          # 1-minute windows for real-time monitoring
          minute_metrics = (events
              | "1-Min Window" >> beam.WindowInto(window.FixedWindows(60))
              | "Minute Aggregation" >> beam.CombinePerKey(sum)
          )

          # 1-hour windows for hourly reports
          hourly_metrics = (events
              | "1-Hour Window" >> beam.WindowInto(window.FixedWindows(3600))
              | "Hourly Aggregation" >> beam.CombinePerKey(sum)
          )

          # 1-day windows for daily reports
          daily_metrics = (events
              | "1-Day Window" >> beam.WindowInto(window.FixedWindows(86400))
              | "Daily Aggregation" >> beam.CombinePerKey(sum)
          )
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.windowing.FixedWindows;
  import org.apache.beam.sdk.transforms.windowing.Window;
  import org.joda.time.Duration;

  // 1-minute fixed windows
  PCollection<KV<String, Integer>> windowedData = data.apply(
      "1-Minute Windows",
      Window.<KV<String, Integer>>into(
          FixedWindows.of(Duration.standardMinutes(1))
      )
  );

  // Aggregate per window
  PCollection<KV<String, Integer>> counts = windowedData.apply(
      "Count per Window",
      Sum.integersPerKey()
  );

  // Multiple granularities
  PCollection<Event> events = pipeline.apply(KafkaIO.read()...);

  // 1-minute windows
  PCollection<KV<String, Long>> minuteMetrics = events
      .apply("1-Min Window", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
      .apply("Minute Aggregation", Count.perKey());

  // 1-hour windows
  PCollection<KV<String, Long>> hourlyMetrics = events
      .apply("1-Hour Window", Window.into(FixedWindows.of(Duration.standardHours(1))))
      .apply("Hourly Aggregation", Count.perKey());

  // 1-day windows
  PCollection<KV<String, Long>> dailyMetrics = events
      .apply("1-Day Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
      .apply("Daily Aggregation", Count.perKey());
  ```
</CodeGroup>

### Sliding Windows

Creates overlapping windows that slide by a specified period.

**Use Cases**:

* Moving averages
* Rolling statistics
* Overlapping time period analysis

**Parameters**:

* Window size: Length of each window
* Slide period: How often a new window starts

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  def sliding_window_example():
      with beam.Pipeline() as pipeline:
          data = (pipeline
              | "Create Data" >> beam.Create([
                  (1, 100), (2, 200), (3, 300), (4, 400), (5, 500)
              ])
              | "Add Timestamps" >> beam.Map(
                  lambda x: window.TimestampedValue(x, x[0])
              )
          )

          # Sliding window: 30-second window, 10-second slide
          # Creates windows: [0-30], [10-40], [20-50], etc.
          sliding = (data
              | "Sliding Windows" >> beam.WindowInto(
                  window.SlidingWindows(
                      size=30,    # 30-second windows
                      period=10   # New window every 10 seconds
                  )
              )
              | "Extract Value" >> beam.Map(lambda x: ('key', x[1]))
              | "Calculate Average" >> beam.CombinePerKey(
                  beam.combiners.MeanCombineFn()
              )
          )

          sliding | beam.Map(print)

  # Moving average example
  def calculate_moving_average():
      with beam.Pipeline() as pipeline:
          stock_prices = (pipeline
              | beam.io.ReadFromPubSub(topic='stock-prices')
              | beam.Map(parse_stock_data)
          )

          # 5-minute moving average, updated every minute
          moving_avg = (stock_prices
              | "5-Min Moving Window" >> beam.WindowInto(
                  window.SlidingWindows(
                      size=300,   # 5 minutes
                      period=60   # Slide every minute
                  )
              )
              | "Key by Stock" >> beam.Map(lambda x: (x['symbol'], x['price']))
              | "Average Price" >> beam.CombinePerKey(
                  beam.combiners.MeanCombineFn()
              )
          )

          moving_avg | "Write to BigQuery" >> beam.io.WriteToBigQuery(...)
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.windowing.SlidingWindows;

  // 30-second window, 10-second slide
  PCollection<KV<String, Integer>> sliding = data.apply(
      "Sliding Windows",
      Window.<KV<String, Integer>>into(
          SlidingWindows.of(Duration.standardSeconds(30))
              .every(Duration.standardSeconds(10))
      )
  );

  PCollection<KV<String, Double>> average = sliding.apply(
      "Calculate Average",
      Mean.perKey()
  );

  // 5-minute moving average, updated every minute
  PCollection<StockPrice> stockPrices = pipeline.apply(
      PubsubIO.readStrings()...
  ).apply(ParDo.of(new ParseStockDataFn()));

  PCollection<KV<String, Double>> movingAvg = stockPrices
      .apply("5-Min Moving Window",
          Window.into(SlidingWindows
              .of(Duration.standardMinutes(5))
              .every(Duration.standardMinutes(1))))
      .apply("Key by Stock", MapElements.via(...))
      .apply("Average Price", Mean.perKey());
  ```
</CodeGroup>

### Session Windows

Groups elements into sessions based on gaps in activity.

**Use Cases**:

* User session analysis
* Activity bursts detection
* Click stream analysis

**Parameters**:

* Gap duration: Minimum gap between sessions

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  def session_window_example():
      with beam.Pipeline() as pipeline:
          # User click events
          clicks = (pipeline
              | "Read Clicks" >> beam.io.ReadFromPubSub(topic='user-clicks')
              | "Parse" >> beam.Map(parse_click_event)
          )

          # Session window with 10-minute gap
          # If no activity for 10 minutes, start new session
          sessions = (clicks
              | "Session Windows" >> beam.WindowInto(
                  window.Sessions(10 * 60)  # 10-minute gap
              )
              | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
              | "Collect Session Events" >> beam.GroupByKey()
          )

          # Analyze each session
          session_stats = (sessions
              | "Analyze Session" >> beam.MapTuple(analyze_user_session)
          )

          session_stats | "Write Results" >> beam.io.WriteToText('sessions.txt')

  def analyze_user_session(user_id, events):
      """Analyze a single user session."""
      events_list = list(events)
      return {
          'user_id': user_id,
          'session_length': len(events_list),
          'duration': calculate_duration(events_list),
          'pages_visited': [e['page'] for e in events_list],
          'conversion': any(e['event_type'] == 'purchase' for e in events_list)
      }

  # E-commerce session analysis
  def ecommerce_sessions():
      with beam.Pipeline() as pipeline:
          events = (pipeline
              | beam.io.ReadFromKafka(...)
              | beam.Map(parse_event)
          )

          # 30-minute session gap
          user_sessions = (events
              | "30-Min Sessions" >> beam.WindowInto(window.Sessions(30 * 60))
              | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
              | "Group by Session" >> beam.GroupByKey()
          )

          # Calculate session metrics
          metrics = (user_sessions
              | "Session Metrics" >> beam.Map(lambda kv: {
                  'user_id': kv[0],
                  'events': list(kv[1]),
                  'revenue': sum(e.get('amount', 0) for e in kv[1]),
                  'product_views': sum(1 for e in kv[1] if e['type'] == 'view'),
                  'purchases': sum(1 for e in kv[1] if e['type'] == 'purchase')
              })
          )

          metrics | beam.io.WriteToBigQuery(...)
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.windowing.Sessions;

  // Session window with 10-minute gap
  PCollection<ClickEvent> clicks = pipeline.apply(
      PubsubIO.readStrings()...
  ).apply(ParDo.of(new ParseClickEventFn()));

  PCollection<KV<String, Iterable<ClickEvent>>> sessions = clicks
      .apply("Session Windows",
          Window.<ClickEvent>into(
              Sessions.withGapDuration(Duration.standardMinutes(10))
          ))
      .apply("Key by User",
          MapElements.via(click -> KV.of(click.getUserId(), click)))
      .apply("Collect Session Events",
          GroupByKey.create());

  // Analyze sessions
  PCollection<SessionStats> sessionStats = sessions.apply(
      "Analyze Session",
      ParDo.of(new AnalyzeSessionFn())
  );

  static class AnalyzeSessionFn extends DoFn<KV<String, Iterable<ClickEvent>>, SessionStats> {
      @ProcessElement
      public void processElement(@Element KV<String, Iterable<ClickEvent>> element,
                                 OutputReceiver<SessionStats> out) {
          String userId = element.getKey();
          List<ClickEvent> events = Lists.newArrayList(element.getValue());

          SessionStats stats = new SessionStats(
              userId,
              events.size(),
              calculateDuration(events),
              events.stream().map(ClickEvent::getPage).collect(Collectors.toList()),
              events.stream().anyMatch(e -> e.getType().equals("purchase"))
          );

          out.output(stats);
      }
  }
  ```
</CodeGroup>

### Global Windows

Default window that spans all time. Useful for batch processing or when you don't need time-based grouping.

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  # Global window (default for batch)
  with beam.Pipeline() as pipeline:
      data = (pipeline
          | beam.Create([1, 2, 3, 4, 5])
          # Implicitly uses GlobalWindow
          | beam.CombineGlobally(sum)
      )

  # Explicit global window
  with beam.Pipeline() as pipeline:
      data = (pipeline
          | beam.Create([1, 2, 3, 4, 5])
          | beam.WindowInto(window.GlobalWindows())
          | beam.CombineGlobally(sum)
      )
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.windowing.GlobalWindows;

  // Global window (default for batch)
  PCollection<Integer> total = numbers.apply(Sum.integersGlobally());

  // Explicit global window
  PCollection<Integer> total = numbers
      .apply(Window.into(new GlobalWindows()))
      .apply(Sum.integersGlobally());
  ```
</CodeGroup>

***

## Custom Windowing

You can create custom window functions for specialized use cases.

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam.transforms.window import WindowFn, IntervalWindow
  from apache_beam.utils.windowed_value import WindowedValue

  class CustomDayOfWeekWindow(WindowFn):
      """Windows data by day of week."""

      def assign(self, context):
          from datetime import datetime
          timestamp = context.timestamp
          dt = datetime.fromtimestamp(timestamp)

          # Create window for the day of week
          day_of_week = dt.weekday()  # 0=Monday, 6=Sunday

          # Create window spanning that entire day
          start = timestamp - (timestamp % 86400)  # Start of day
          end = start + 86400  # End of day

          return [IntervalWindow(start, end)]

      def merge(self, merge_context):
          # No merging for day-of-week windows
          pass

      def get_window_coder(self):
          from apache_beam.coders import IntervalWindowCoder
          return IntervalWindowCoder()

  # Usage
  with beam.Pipeline() as pipeline:
      data = (pipeline
          | beam.Create(events)
          | beam.WindowInto(CustomDayOfWeekWindow())
          | beam.CombinePerKey(sum)
      )
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.windowing.WindowFn;
  import org.apache.beam.sdk.transforms.windowing.IntervalWindow;

  public class CustomDayOfWeekWindow extends WindowFn<Object, IntervalWindow> {

      @Override
      public Collection<IntervalWindow> assignWindows(AssignContext c) {
          Instant timestamp = c.timestamp();
          DateTime dt = new DateTime(timestamp);

          // Create window for the day of week
          int dayOfWeek = dt.getDayOfWeek();

          // Create window spanning that entire day
          Instant start = timestamp.minus(timestamp.getMillis() % 86400000);
          Instant end = start.plus(Duration.standardDays(1));

          return Collections.singletonList(new IntervalWindow(start, end));
      }

      @Override
      public void mergeWindows(MergeContext c) {
          // No merging for day-of-week windows
      }

      @Override
      public Coder<IntervalWindow> windowCoder() {
          return IntervalWindow.getCoder();
      }
  }

  // Usage
  PCollection<KV<String, Integer>> windowed = data.apply(
      "Custom Windows",
      Window.into(new CustomDayOfWeekWindow())
  );
  ```
</CodeGroup>

***

## Window-Aware Transforms

Some transforms behave differently based on windowing.

### GroupByKey with Windows

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  with beam.Pipeline() as pipeline:
      data = (pipeline
          | beam.Create([
              ('A', 1, 0), ('A', 2, 5), ('A', 3, 65),
              ('B', 4, 10), ('B', 5, 70)
          ])
          | beam.Map(lambda x: window.TimestampedValue((x[0], x[1]), x[2]))
      )

      # Fixed windows of 60 seconds
      windowed = (data
          | "60-Second Windows" >> beam.WindowInto(window.FixedWindows(60))
          | "Group by Key" >> beam.GroupByKey()
      )

      # Results grouped per window:
      # Window [0, 60): ('A', [1, 2]), ('B', [4])
      # Window [60, 120): ('A', [3]), ('B', [5])

      windowed | beam.Map(print)
  ```

  ```java Java theme={null}
  PCollection<KV<String, Integer>> data = pipeline.apply(...);

  PCollection<KV<String, Iterable<Integer>>> windowed = data
      .apply("60-Second Windows",
          Window.into(FixedWindows.of(Duration.standardSeconds(60))))
      .apply("Group by Key",
          GroupByKey.create());

  // Results grouped per window:
  // Window [0, 60): ("A", [1, 2]), ("B", [4])
  // Window [60, 120): ("A", [3]), ("B", [5])
  ```
</CodeGroup>

### Combine with Windows

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  with beam.Pipeline() as pipeline:
      sensor_data = (pipeline
          | beam.io.ReadFromPubSub(topic='sensors')
          | beam.Map(parse_sensor_data)
      )

      # Average sensor values per 5-minute window
      avg_per_window = (sensor_data
          | "5-Min Windows" >> beam.WindowInto(window.FixedWindows(300))
          | "Key by Sensor" >> beam.Map(lambda x: (x['sensor_id'], x['value']))
          | "Average per Window" >> beam.CombinePerKey(
              beam.combiners.MeanCombineFn()
          )
      )

      avg_per_window | beam.io.WriteToBigQuery(...)
  ```

  ```java Java theme={null}
  PCollection<SensorData> sensorData = pipeline.apply(
      PubsubIO.readStrings()...
  ).apply(ParDo.of(new ParseSensorDataFn()));

  PCollection<KV<String, Double>> avgPerWindow = sensorData
      .apply("5-Min Windows",
          Window.into(FixedWindows.of(Duration.standardMinutes(5))))
      .apply("Key by Sensor",
          MapElements.via(s -> KV.of(s.getSensorId(), s.getValue())))
      .apply("Average per Window",
          Mean.perKey());
  ```
</CodeGroup>

***

## Working with Window Metadata

Access window information in your transforms.

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window, DoFn

  class AddWindowInfoFn(DoFn):
      def process(self, element, window=DoFn.WindowParam):
          """Add window information to each element."""
          yield {
              'element': element,
              'window_start': window.start.to_utc_datetime(),
              'window_end': window.end.to_utc_datetime(),
              'window_max_timestamp': window.max_timestamp()
          }

  with beam.Pipeline() as pipeline:
      data = (pipeline
          | beam.Create([(i, i * 10) for i in range(100)])
          | beam.Map(lambda x: window.TimestampedValue(x, x[0]))
          | beam.WindowInto(window.FixedWindows(10))
          | beam.ParDo(AddWindowInfoFn())
      )

      data | beam.Map(print)

  # Another example: Window-aware aggregation
  class WindowAwareAggregationFn(DoFn):
      def process(self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):
          key, values = element
          values_list = list(values)

          yield {
              'key': key,
              'count': len(values_list),
              'window': {
                  'start': window.start.to_utc_datetime().isoformat(),
                  'end': window.end.to_utc_datetime().isoformat()
              },
              'processing_timestamp': timestamp.to_utc_datetime().isoformat(),
              'values': values_list
          }
  ```

  ```java Java theme={null}
  import org.apache.beam.sdk.transforms.DoFn;
  import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
  import org.apache.beam.sdk.transforms.windowing.IntervalWindow;

  static class AddWindowInfoFn extends DoFn<KV<String, Integer>, WindowedValue> {
      @ProcessElement
      public void processElement(@Element KV<String, Integer> element,
                                 BoundedWindow window,
                                 OutputReceiver<WindowedValue> out) {
          IntervalWindow iw = (IntervalWindow) window;

          WindowedValue result = new WindowedValue(
              element,
              iw.start(),
              iw.end(),
              iw.maxTimestamp()
          );

          out.output(result);
      }
  }

  static class WindowAwareAggregationFn
      extends DoFn<KV<String, Iterable<Integer>>, AggregationResult> {

      @ProcessElement
      public void processElement(@Element KV<String, Iterable<Integer>> element,
                                 BoundedWindow window,
                                 @Timestamp Instant timestamp,
                                 OutputReceiver<AggregationResult> out) {
          String key = element.getKey();
          List<Integer> values = Lists.newArrayList(element.getValue());
          IntervalWindow iw = (IntervalWindow) window;

          AggregationResult result = new AggregationResult(
              key,
              values.size(),
              iw.start(),
              iw.end(),
              timestamp,
              values
          );

          out.output(result);
      }
  }
  ```
</CodeGroup>

***

## Real-World Use Cases

### Real-Time Analytics Dashboard

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window
  from datetime import datetime

  def realtime_analytics_pipeline():
      with beam.Pipeline() as pipeline:
          # Read events from Kafka
          events = (pipeline
              | "Read Kafka" >> beam.io.ReadFromKafka(
                  consumer_config={'bootstrap.servers': 'localhost:9092'},
                  topics=['user-events']
              )
              | "Parse Events" >> beam.Map(parse_event)
          )

          # 1-minute windows for real-time metrics
          minute_windows = events | "1-Min Windows" >> beam.WindowInto(
              window.FixedWindows(60)
          )

          # Page views per minute
          page_views = (minute_windows
              | "Extract Page Views" >> beam.Filter(lambda x: x['event_type'] == 'page_view')
              | "Count Views" >> beam.combiners.Count.Globally()
              | "Format Views" >> beam.Map(lambda count: {
                  'metric': 'page_views_per_minute',
                  'value': count,
                  'timestamp': datetime.utcnow().isoformat()
              })
          )

          # Unique users per minute
          unique_users = (minute_windows
              | "Extract User IDs" >> beam.Map(lambda x: x['user_id'])
              | "Deduplicate Users" >> beam.Distinct()
              | "Count Unique" >> beam.combiners.Count.Globally()
              | "Format Users" >> beam.Map(lambda count: {
                  'metric': 'unique_users_per_minute',
                  'value': count,
                  'timestamp': datetime.utcnow().isoformat()
              })
          )

          # Average session duration (5-minute sliding window)
          session_duration = (events
              | "5-Min Sliding" >> beam.WindowInto(
                  window.SlidingWindows(size=300, period=60)
              )
              | "Calculate Duration" >> beam.Map(calculate_session_duration)
              | "Average Duration" >> beam.CombineGlobally(
                  beam.combiners.MeanCombineFn()
              ).without_defaults()
          )

          # Write metrics to different sinks
          page_views | "Write Views to Pub/Sub" >> beam.io.WriteToPubSub(
              topic='metrics-pageviews'
          )
          unique_users | "Write Users to Pub/Sub" >> beam.io.WriteToPubSub(
              topic='metrics-users'
          )
          session_duration | "Write Duration to BigQuery" >> beam.io.WriteToBigQuery(
              table='analytics.session_duration',
              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
          )

  def parse_event(kafka_record):
      import json
      return json.loads(kafka_record[1].decode('utf-8'))

  def calculate_session_duration(event):
      # Extract session duration from event
      return event.get('session_duration', 0)
  ```
</CodeGroup>

### Click Stream Analysis

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window

  def clickstream_analysis():
      with beam.Pipeline() as pipeline:
          clicks = (pipeline
              | "Read Clicks" >> beam.io.ReadFromPubSub(topic='clickstream')
              | "Parse Clicks" >> beam.Map(parse_click)
          )

          # User sessions (30-minute gap)
          sessions = (clicks
              | "Session Windows" >> beam.WindowInto(window.Sessions(30 * 60))
              | "Key by User" >> beam.Map(lambda x: (x['user_id'], x))
              | "Group Sessions" >> beam.GroupByKey()
          )

          # Session analysis
          session_metrics = (sessions
              | "Analyze Sessions" >> beam.Map(analyze_session)
          )

          # Conversion funnel per hour
          hourly_funnel = (clicks
              | "Hourly Windows" >> beam.WindowInto(window.FixedWindows(3600))
              | "Calculate Funnel" >> beam.CombineGlobally(FunnelCombineFn())
          )

          # Popular pages (5-minute sliding window)
          popular_pages = (clicks
              | "5-Min Sliding" >> beam.WindowInto(
                  window.SlidingWindows(size=300, period=60)
              )
              | "Extract Page" >> beam.Map(lambda x: (x['page_url'], 1))
              | "Count Pages" >> beam.CombinePerKey(sum)
              | "Top 10 Pages" >> beam.combiners.Top.PerKey(10)
          )

          # Write results
          session_metrics | "Write Sessions" >> beam.io.WriteToBigQuery(
              table='analytics.user_sessions'
          )
          hourly_funnel | "Write Funnel" >> beam.io.WriteToBigQuery(
              table='analytics.conversion_funnel'
          )
          popular_pages | "Write Popular Pages" >> beam.io.WriteToPubSub(
              topic='popular-pages'
          )

  def analyze_session(user_session):
      user_id, clicks = user_session
      clicks_list = list(clicks)

      return {
          'user_id': user_id,
          'click_count': len(clicks_list),
          'pages_visited': len(set(c['page_url'] for c in clicks_list)),
          'duration_seconds': max(c['timestamp'] for c in clicks_list) -
                            min(c['timestamp'] for c in clicks_list),
          'bounced': len(clicks_list) == 1,
          'converted': any(c.get('conversion') for c in clicks_list)
      }

  from apache_beam.transforms import CombineFn

  class FunnelCombineFn(CombineFn):
      """Calculate conversion funnel metrics."""

      def create_accumulator(self):
          return {
              'landing': 0,
              'product_view': 0,
              'add_to_cart': 0,
              'checkout': 0,
              'purchase': 0
          }

      def add_input(self, accumulator, click):
          event_type = click.get('event_type')
          if event_type in accumulator:
              accumulator[event_type] += 1
          return accumulator

      def merge_accumulators(self, accumulators):
          merged = self.create_accumulator()
          for acc in accumulators:
              for key in merged:
                  merged[key] += acc.get(key, 0)
          return merged

      def extract_output(self, accumulator):
          return {
              'landing_pages': accumulator['landing'],
              'product_views': accumulator['product_view'],
              'add_to_cart': accumulator['add_to_cart'],
              'checkouts': accumulator['checkout'],
              'purchases': accumulator['purchase'],
              'landing_to_purchase_rate': (
                  accumulator['purchase'] / accumulator['landing']
                  if accumulator['landing'] > 0 else 0
              )
          }

  def parse_click(pubsub_message):
      import json
      return json.loads(pubsub_message.decode('utf-8'))
  ```
</CodeGroup>

### IoT Sensor Monitoring

<CodeGroup>
  ```python Python theme={null}
  import apache_beam as beam
  from apache_beam import window, DoFn

  def iot_monitoring_pipeline():
      with beam.Pipeline() as pipeline:
          sensor_data = (pipeline
              | "Read Sensors" >> beam.io.ReadFromPubSub(topic='iot-sensors')
              | "Parse Sensor Data" >> beam.Map(parse_sensor_reading)
          )

          # 1-minute windows for anomaly detection
          minute_windows = (sensor_data
              | "1-Min Windows" >> beam.WindowInto(window.FixedWindows(60))
          )

          # Detect anomalies
          anomalies = (minute_windows
              | "Key by Sensor" >> beam.Map(lambda x: (x['sensor_id'], x))
              | "Group by Sensor" >> beam.GroupByKey()
              | "Detect Anomalies" >> beam.ParDo(AnomalyDetectionFn())
          )

          # Calculate statistics (5-minute sliding window)
          stats = (sensor_data
              | "5-Min Sliding" >> beam.WindowInto(
                  window.SlidingWindows(size=300, period=60)
              )
              | "Key by Sensor and Type" >> beam.Map(
                  lambda x: ((x['sensor_id'], x['metric_type']), x['value'])
              )
              | "Calculate Stats" >> beam.CombinePerKey(StatisticsCombineFn())
          )

          # Alert on high values (30-second windows)
          alerts = (sensor_data
              | "30-Sec Windows" >> beam.WindowInto(window.FixedWindows(30))
              | "Filter High Values" >> beam.Filter(lambda x: x['value'] > x['threshold'])
              | "Format Alerts" >> beam.Map(format_alert)
          )

          # Write outputs
          anomalies | "Write Anomalies" >> beam.io.WriteToPubSub(
              topic='sensor-anomalies'
          )
          stats | "Write Stats" >> beam.io.WriteToBigQuery(
              table='iot.sensor_statistics'
          )
          alerts | "Write Alerts" >> beam.io.WriteToPubSub(
              topic='sensor-alerts'
          )

  class AnomalyDetectionFn(DoFn):
      def process(self, element):
          sensor_id, readings = element
          readings_list = list(readings)

          if len(readings_list) < 10:
              return  # Not enough data

          values = [r['value'] for r in readings_list]
          mean = sum(values) / len(values)
          variance = sum((x - mean) ** 2 for x in values) / len(values)
          std_dev = variance ** 0.5

          # Detect values outside 3 standard deviations
          for reading in readings_list:
              if abs(reading['value'] - mean) > 3 * std_dev:
                  yield {
                      'sensor_id': sensor_id,
                      'timestamp': reading['timestamp'],
                      'value': reading['value'],
                      'mean': mean,
                      'std_dev': std_dev,
                      'anomaly_type': 'outlier'
                  }

  class StatisticsCombineFn(CombineFn):
      def create_accumulator(self):
          return []

      def add_input(self, accumulator, input_value):
          accumulator.append(input_value)
          return accumulator

      def merge_accumulators(self, accumulators):
          merged = []
          for acc in accumulators:
              merged.extend(acc)
          return merged

      def extract_output(self, accumulator):
          if not accumulator:
              return None

          sorted_values = sorted(accumulator)
          n = len(sorted_values)

          return {
              'count': n,
              'min': sorted_values[0],
              'max': sorted_values[-1],
              'mean': sum(sorted_values) / n,
              'median': sorted_values[n // 2],
              'p95': sorted_values[int(n * 0.95)],
              'p99': sorted_values[int(n * 0.99)]
          }

  def parse_sensor_reading(pubsub_message):
      import json
      data = json.loads(pubsub_message.decode('utf-8'))
      return data

  def format_alert(reading):
      return {
          'alert_type': 'high_value',
          'sensor_id': reading['sensor_id'],
          'value': reading['value'],
          'threshold': reading['threshold'],
          'timestamp': reading['timestamp']
      }
  ```
</CodeGroup>

***

## Best Practices

### Choosing the Right Window

**Fixed Windows**:

* Regular reporting intervals
* Time-series data bucketing
* When you need non-overlapping periods

**Sliding Windows**:

* Moving averages
* Trend analysis
* When you need overlapping perspectives

**Session Windows**:

* User behavior analysis
* Activity burst detection
* Variable-length time periods

**Global Windows**:

* Batch processing
* When time doesn't matter
* Simple aggregations

### Window Size Considerations

<CodeGroup>
  ```python Python theme={null}
  # Too small: High overhead, many small windows
  small_windows = data | beam.WindowInto(window.FixedWindows(1))  # 1 second

  # Too large: High latency, delayed results
  large_windows = data | beam.WindowInto(window.FixedWindows(86400))  # 1 day

  # Good balance for real-time: 1-5 minutes
  balanced_windows = data | beam.WindowInto(window.FixedWindows(60))  # 1 minute
  ```
</CodeGroup>

### Memory Management

<CodeGroup>
  ```python Python theme={null}
  # Be careful with GroupByKey in large windows
  # This can cause memory issues if windows are too large
  problematic = (data
      | beam.WindowInto(window.FixedWindows(86400))  # 1 day
      | beam.GroupByKey()  # May accumulate too much data
  )

  # Better: Use CombinePerKey for aggregations
  better = (data
      | beam.WindowInto(window.FixedWindows(86400))
      | beam.CombinePerKey(sum)  # Incrementally combines
  )
  ```
</CodeGroup>

***

## Summary

In this module, you learned:

* Event time vs processing time concepts
* Watermarks and their role in streaming
* Fixed, sliding, session, and global windows
* Custom windowing functions
* Window-aware transforms
* Real-world windowing use cases
* Best practices for window selection

### Key Takeaways

1. Choose window type based on your use case
2. Event time provides more accurate business logic
3. Watermarks enable progress while handling late data
4. Use CombinePerKey instead of GroupByKey for large windows
5. Balance window size between latency and overhead
6. Session windows are ideal for user behavior analysis

### Next Steps

Continue to the next module on **Triggers & Watermarks** to learn how to control when results are materialized and how to handle late-arriving data effectively.

***

## Additional Resources

* [Beam Windowing Documentation](https://beam.apache.org/documentation/programming-guide/#windowing)
* [The Dataflow Model Paper](https://research.google/pubs/pub43864/)
* [Streaming 101/102 Blog Series](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/)
* [Window Assignment Examples](https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function)
