> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Windows & Time Operations

> Time-based aggregations and windowing

# Windows & Time Operations

<Info>
  **Module Duration**: 4-5 hours
  **Focus**: Windowing in Apache Flink
  **Prerequisites**: DataStream API, Event Time concepts
</Info>

## Introduction

Windows are essential for bounded computations over unbounded streams. They divide the infinite stream into finite chunks based on time or count, enabling aggregations, joins, and pattern detection.

## Window Types

### Tumbling Windows

Non-overlapping, fixed-size windows.

```java theme={null}
// Event-time tumbling windows
DataStream<Event> input = /* ... */;

DataStream<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .reduce(new MyReduceFunction());

// Processing-time tumbling windows
DataStream<Result> processingResults = input
    .keyBy(e -> e.key)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    .reduce(new MyReduceFunction());

// Offset windows (e.g., start at :02 instead of :00)
DataStream<Result> offsetResults = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5), Time.minutes(2)))
    .reduce(new MyReduceFunction());
```

### Sliding Windows

Overlapping windows with fixed size and slide interval.

```java theme={null}
// 10-minute windows sliding every 5 minutes
DataStream<Result> slidingResults = input
    .keyBy(e -> e.key)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10),  // window size
        Time.minutes(5)    // slide interval
    ))
    .aggregate(new MyAggregateFunction());

// Processing time variant
DataStream<Result> processingSlidingResults = input
    .keyBy(e -> e.key)
    .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .aggregate(new MyAggregateFunction());
```

### Session Windows

Dynamic windows based on activity gaps.

```java theme={null}
// Session window with 10-minute gap
DataStream<Result> sessionResults = input
    .keyBy(e -> e.userId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .process(new SessionWindowFunction());

// Dynamic gap based on element
DataStream<Result> dynamicSessionResults = input
    .keyBy(e -> e.userId)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
        @Override
        public long extract(Event element) {
            // Users with premium accounts get longer session gaps
            return element.isPremium() ? 30 * 60 * 1000 : 10 * 60 * 1000;
        }
    }))
    .process(new SessionWindowFunction());
```

### Global Windows

All elements go to a single window (requires custom trigger).

```java theme={null}
DataStream<Result> globalResults = input
    .keyBy(e -> e.key)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(100))  // Fire every 100 elements
    .process(new MyWindowFunction());
```

### Count Windows

Windows based on element count instead of time.

```java theme={null}
// Tumbling count window: every 100 elements
DataStream<Result> countResults = input
    .keyBy(e -> e.key)
    .countWindow(100)
    .reduce(new MyReduceFunction());

// Sliding count window: 100 elements, slide every 10
DataStream<Result> slidingCountResults = input
    .keyBy(e -> e.key)
    .countWindow(100, 10)
    .reduce(new MyReduceFunction());
```

## Window Functions

### ReduceFunction

Incrementally combines elements.

```java theme={null}
public class SumReduceFunction implements ReduceFunction<SensorReading> {
    @Override
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return new SensorReading(
            r1.sensorId,
            r1.temperature + r2.temperature,
            Math.max(r1.timestamp, r2.timestamp)
        );
    }
}

// Usage
DataStream<SensorReading> sums = readings
    .keyBy(r -> r.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new SumReduceFunction());
```

### AggregateFunction

More efficient than reduce, with separate accumulator.

```java theme={null}
public class AverageAggregate implements
        AggregateFunction<SensorReading, Tuple2<Double, Long>, Double> {

    @Override
    public Tuple2<Double, Long> createAccumulator() {
        return new Tuple2<>(0.0, 0L);
    }

    @Override
    public Tuple2<Double, Long> add(SensorReading reading, Tuple2<Double, Long> acc) {
        return new Tuple2<>(acc.f0 + reading.temperature, acc.f1 + 1);
    }

    @Override
    public Double getResult(Tuple2<Double, Long> acc) {
        return acc.f0 / acc.f1;
    }

    @Override
    public Tuple2<Double, Long> merge(Tuple2<Double, Long> acc1, Tuple2<Double, Long> acc2) {
        return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
    }
}

// Usage
DataStream<Double> averages = readings
    .keyBy(r -> r.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AverageAggregate());
```

### ProcessWindowFunction

Access to all window elements and metadata.

```java theme={null}
public class TopNWindowFunction extends
        ProcessWindowFunction<Event, String, String, TimeWindow> {

    private final int topN;

    public TopNWindowFunction(int topN) {
        this.topN = topN;
    }

    @Override
    public void process(
            String key,
            Context context,
            Iterable<Event> events,
            Collector<String> out) {

        List<Event> eventList = StreamSupport
            .stream(events.spliterator(), false)
            .sorted(Comparator.comparingDouble(e -> -e.value))
            .limit(topN)
            .collect(Collectors.toList());

        out.collect(String.format(
            "Key: %s, Window: [%d - %d], Top %d: %s",
            key,
            context.window().getStart(),
            context.window().getEnd(),
            topN,
            eventList
        ));
    }
}

// Usage
DataStream<String> topN = events
    .keyBy(e -> e.category)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .process(new TopNWindowFunction(10));
```

### Combining Incremental and Full-Window Processing

```java theme={null}
// Efficient: Pre-aggregate with AggregateFunction, then process with window context
public class MyAggregateFunction implements
        AggregateFunction<Event, Long, Long> {
    // Incremental aggregation (efficient)
    // Implementation similar to above
}

public class MyProcessWindowFunction extends
        ProcessWindowFunction<Long, Result, String, TimeWindow> {
    // Has access to window metadata

    @Override
    public void process(
            String key,
            Context context,
            Iterable<Long> aggregates,
            Collector<Result> out) {

        Long aggregatedValue = aggregates.iterator().next();

        out.collect(new Result(
            key,
            aggregatedValue,
            context.window().getStart(),
            context.window().getEnd()
        ));
    }
}

// Combine both
DataStream<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());
```

## Triggers

Control when windows fire.

```java theme={null}
// Built-in triggers
// 1. EventTimeTrigger: Fires when watermark passes window end
// 2. ProcessingTimeTrigger: Fires when processing time passes window end
// 3. CountTrigger: Fires after N elements
// 4. PurgingTrigger: Fires and purges window state

// Custom trigger
public class EarlyAndLateTrigger extends Trigger<Event, TimeWindow> {

    @Override
    public TriggerResult onElement(
            Event element,
            long timestamp,
            TimeWindow window,
            TriggerContext ctx) throws Exception {

        // Register timers
        ctx.registerEventTimeTimer(window.maxTimestamp());

        // Early firing: every 1000 elements
        long count = ctx.getPartitionedState(
            new ValueStateDescriptor<>("count", Long.class)
        ).value();

        if (count != null && count >= 1000) {
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(
            long time,
            TimeWindow window,
            TriggerContext ctx) throws Exception {

        return time == window.maxTimestamp()
            ? TriggerResult.FIRE_AND_PURGE
            : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long time,
            TimeWindow window,
            TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
        ctx.getPartitionedState(
            new ValueStateDescriptor<>("count", Long.class)
        ).clear();
    }
}

// Usage
DataStream<Result> withCustomTrigger = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .trigger(new EarlyAndLateTrigger())
    .process(new MyWindowFunction());
```

## Evictors

Remove elements from window before/after function application.

```java theme={null}
// Built-in evictors
// 1. CountEvictor: Keeps max N elements
// 2. TimeEvictor: Keeps elements from last N milliseconds
// 3. DeltaEvictor: Keeps elements within delta of last element

// Custom evictor
public class OutlierEvictor implements Evictor<Event, TimeWindow> {

    @Override
    public void evictBefore(
            Iterable<TimestampedValue<Event>> elements,
            int size,
            TimeWindow window,
            EvictorContext evictorContext) {

        // Calculate mean
        double mean = StreamSupport.stream(elements.spliterator(), false)
            .mapToDouble(e -> e.getValue().value)
            .average()
            .orElse(0.0);

        // Calculate standard deviation
        double variance = StreamSupport.stream(elements.spliterator(), false)
            .mapToDouble(e -> Math.pow(e.getValue().value - mean, 2))
            .average()
            .orElse(0.0);
        double stdDev = Math.sqrt(variance);

        // Remove outliers (> 3 standard deviations)
        Iterator<TimestampedValue<Event>> iterator = elements.iterator();
        while (iterator.hasNext()) {
            TimestampedValue<Event> element = iterator.next();
            if (Math.abs(element.getValue().value - mean) > 3 * stdDev) {
                iterator.remove();
            }
        }
    }

    @Override
    public void evictAfter(
            Iterable<TimestampedValue<Event>> elements,
            int size,
            TimeWindow window,
            EvictorContext evictorContext) {
        // No post-processing eviction
    }
}

// Usage
DataStream<Result> withEvictor = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .evictor(new OutlierEvictor())
    .process(new MyWindowFunction());
```

## Allowed Lateness

```java theme={null}
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-events"){};

SingleOutputStreamOperator<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(2))  // Keep window state for 2 extra minutes
    .sideOutputLateData(lateDataTag)
    .aggregate(new MyAggregateFunction());

// Process late data
DataStream<Event> lateData = results.getSideOutput(lateDataTag);
lateData.addSink(new LateDataSink());
```

## Real-World Examples

### Example 1: Real-Time Analytics Dashboard

```java theme={null}
public class RealTimeAnalyticsDashboard {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<PageView> pageViews = env
            .addSource(new PageViewSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<PageView>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((view, ts) -> view.timestamp)
            );

        // 1-minute tumbling windows for page view counts
        DataStream<PageViewStats> stats = pageViews
            .keyBy(view -> view.page)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new PageViewAggregator());

        // 5-minute sliding windows for trending pages
        DataStream<TrendingPage> trending = pageViews
            .keyBy(view -> view.page)
            .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
            .process(new TrendingPageFunction());

        stats.addSink(new DashboardSink("page_views"));
        trending.addSink(new DashboardSink("trending"));

        env.execute("Real-Time Analytics Dashboard");
    }

    static class PageViewAggregator implements
            AggregateFunction<PageView, Tuple3<String, Long, Set<String>>, PageViewStats> {

        @Override
        public Tuple3<String, Long, Set<String>> createAccumulator() {
            return new Tuple3<>("", 0L, new HashSet<>());
        }

        @Override
        public Tuple3<String, Long, Set<String>> add(
                PageView view,
                Tuple3<String, Long, Set<String>> acc) {

            acc.f0 = view.page;
            acc.f1 += 1;
            acc.f2.add(view.userId);
            return acc;
        }

        @Override
        public PageViewStats getResult(Tuple3<String, Long, Set<String>> acc) {
            return new PageViewStats(acc.f0, acc.f1, acc.f2.size());
        }

        @Override
        public Tuple3<String, Long, Set<String>> merge(
                Tuple3<String, Long, Set<String>> acc1,
                Tuple3<String, Long, Set<String>> acc2) {

            acc1.f1 += acc2.f1;
            acc1.f2.addAll(acc2.f2);
            return acc1;
        }
    }
}
```

### Example 2: User Session Analysis

```java theme={null}
public class UserSessionAnalysis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<UserAction> actions = env
            .addSource(new UserActionSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<UserAction>forBoundedOutOfOrderness(Duration.ofSeconds(30))
                    .withTimestampAssigner((action, ts) -> action.timestamp)
            );

        // Session windows with 30-minute inactivity gap
        DataStream<UserSession> sessions = actions
            .keyBy(action -> action.userId)
            .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
            .process(new ProcessWindowFunction<UserAction, UserSession, String, TimeWindow>() {

                @Override
                public void process(
                        String userId,
                        Context context,
                        Iterable<UserAction> actions,
                        Collector<UserSession> out) {

                    List<UserAction> actionList = StreamSupport
                        .stream(actions.spliterator(), false)
                        .sorted(Comparator.comparingLong(a -> a.timestamp))
                        .collect(Collectors.toList());

                    long sessionDuration = context.window().getEnd() - context.window().getStart();
                    int pageViews = (int) actionList.stream()
                        .filter(a -> a.type.equals("page_view"))
                        .count();
                    boolean hasConversion = actionList.stream()
                        .anyMatch(a -> a.type.equals("purchase"));

                    out.collect(new UserSession(
                        userId,
                        context.window().getStart(),
                        context.window().getEnd(),
                        sessionDuration,
                        actionList.size(),
                        pageViews,
                        hasConversion
                    ));
                }
            });

        sessions.addSink(new SessionAnalyticsSink());
        env.execute("User Session Analysis");
    }
}
```

### Example 3: Anomaly Detection with Windows

```java theme={null}
public class AnomalyDetection {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Metric> metrics = env
            .addSource(new MetricSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Metric>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((metric, ts) -> metric.timestamp)
            );

        // Calculate statistics over 5-minute windows
        DataStream<MetricStats> stats = metrics
            .keyBy(m -> m.metricName)
            .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
            .aggregate(
                new StatisticsAggregator(),
                new StatsWindowFunction()
            );

        // Detect anomalies
        DataStream<Anomaly> anomalies = stats
            .keyBy(s -> s.metricName)
            .process(new AnomalyDetectionFunction());

        anomalies.addSink(new AlertingSink());
        env.execute("Anomaly Detection");
    }

    static class StatisticsAggregator implements
            AggregateFunction<Metric, StatAccumulator, StatAccumulator> {

        @Override
        public StatAccumulator createAccumulator() {
            return new StatAccumulator();
        }

        @Override
        public StatAccumulator add(Metric metric, StatAccumulator acc) {
            acc.count++;
            acc.sum += metric.value;
            acc.sumSquares += metric.value * metric.value;
            acc.min = Math.min(acc.min, metric.value);
            acc.max = Math.max(acc.max, metric.value);
            return acc;
        }

        @Override
        public StatAccumulator getResult(StatAccumulator acc) {
            return acc;
        }

        @Override
        public StatAccumulator merge(StatAccumulator acc1, StatAccumulator acc2) {
            acc1.count += acc2.count;
            acc1.sum += acc2.sum;
            acc1.sumSquares += acc2.sumSquares;
            acc1.min = Math.min(acc1.min, acc2.min);
            acc1.max = Math.max(acc1.max, acc2.max);
            return acc1;
        }
    }

    static class AnomalyDetectionFunction extends
            KeyedProcessFunction<String, MetricStats, Anomaly> {

        private ValueState<MetricStats> previousStats;

        @Override
        public void open(Configuration parameters) {
            previousStats = getRuntimeContext().getState(
                new ValueStateDescriptor<>("previous", MetricStats.class)
            );
        }

        @Override
        public void processElement(
                MetricStats current,
                Context ctx,
                Collector<Anomaly> out) throws Exception {

            MetricStats previous = previousStats.value();

            if (previous != null) {
                // Z-score anomaly detection
                double zScore = Math.abs(current.mean - previous.mean) / previous.stdDev;

                if (zScore > 3.0) {  // 3 standard deviations
                    out.collect(new Anomaly(
                        current.metricName,
                        current.mean,
                        previous.mean,
                        zScore,
                        current.windowEnd
                    ));
                }
            }

            previousStats.update(current);
        }
    }
}
```

## Window Joins

### Tumbling Window Join

```java theme={null}
DataStream<ClickEvent> clicks = /* ... */;
DataStream<ImpressionEvent> impressions = /* ... */;

DataStream<JoinedEvent> joined = clicks
    .join(impressions)
    .where(click -> click.adId)
    .equalTo(impression -> impression.adId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .apply(new JoinFunction<ClickEvent, ImpressionEvent, JoinedEvent>() {
        @Override
        public JoinedEvent join(ClickEvent click, ImpressionEvent impression) {
            return new JoinedEvent(click, impression);
        }
    });
```

### Interval Join

```java theme={null}
// Join events that occur within a time interval
DataStream<JoinedEvent> intervalJoined = clicks
    .keyBy(click -> click.adId)
    .intervalJoin(impressions.keyBy(imp -> imp.adId))
    .between(Time.seconds(-10), Time.seconds(10))  // Click within ±10s of impression
    .process(new ProcessJoinFunction<ClickEvent, ImpressionEvent, JoinedEvent>() {
        @Override
        public void processElement(
                ClickEvent click,
                ImpressionEvent impression,
                Context ctx,
                Collector<JoinedEvent> out) {

            out.collect(new JoinedEvent(click, impression));
        }
    });
```

### CoGroup

More flexible than join - access to all elements from both sides.

```java theme={null}
DataStream<Result> cogrouped = stream1
    .coGroup(stream2)
    .where(e1 -> e1.key)
    .equalTo(e2 -> e2.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .apply(new CoGroupFunction<Event1, Event2, Result>() {
        @Override
        public void coGroup(
                Iterable<Event1> first,
                Iterable<Event2> second,
                Collector<Result> out) {

            // Process all matching elements from both streams
            for (Event1 e1 : first) {
                for (Event2 e2 : second) {
                    out.collect(new Result(e1, e2));
                }
            }
        }
    });
```

## Performance Optimization

### 1. Choose Right Window Function

```java theme={null}
// Best: AggregateFunction (incremental, memory efficient)
.aggregate(new MyAggregateFunction())

// Good: ReduceFunction (incremental)
.reduce(new MyReduceFunction())

// Use sparingly: ProcessWindowFunction (keeps all elements)
.process(new MyProcessWindowFunction())

// Optimal: Combine both
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
```

### 2. Window Size Tuning

```java theme={null}
// Too small: High overhead, many windows
.window(TumblingEventTimeWindows.of(Time.seconds(1)))

// Too large: High latency, memory pressure
.window(TumblingEventTimeWindows.of(Time.hours(24)))

// Balanced: Based on requirements
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
```

### 3. State Management

```java theme={null}
// Use state TTL to prevent unbounded growth
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<MyState> descriptor =
    new ValueStateDescriptor<>("state", MyState.class);
descriptor.enableTimeToLive(ttlConfig);
```

## Best Practices

### 1. Always Handle Late Data

```java theme={null}
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Result> results = stream
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateTag)
    .aggregate(new MyAggregateFunction());

results.getSideOutput(lateTag).addSink(new LateDataSink());
```

### 2. Use Appropriate Watermark Strategy

```java theme={null}
// Align watermarks with window requirements
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
    .withTimestampAssigner((event, ts) -> event.timestamp);
```

### 3. Monitor Window Latency

```java theme={null}
public class LatencyMonitoringWindowFunction extends
        ProcessWindowFunction<Event, Result, String, TimeWindow> {

    private Counter windowsFired;
    private Histogram processingLatency;

    @Override
    public void open(Configuration parameters) {
        windowsFired = getRuntimeContext()
            .getMetricGroup()
            .counter("windows_fired");

        processingLatency = getRuntimeContext()
            .getMetricGroup()
            .histogram("processing_latency", new DescriptiveStatisticsHistogram(1000));
    }

    @Override
    public void process(
            String key,
            Context context,
            Iterable<Event> elements,
            Collector<Result> out) {

        long windowEnd = context.window().getEnd();
        long currentTime = System.currentTimeMillis();
        long latency = currentTime - windowEnd;

        windowsFired.inc();
        processingLatency.update(latency);

        // Process elements
        // ...
    }
}
```

## Exercises

### Exercise 1: Implement Session Timeout Detection

Detect when users have been inactive for too long.

```java theme={null}
// TODO: Use session windows to detect user timeouts
// Alert when session duration exceeds threshold
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class SessionTimeoutDetection {

      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env =
              StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream<UserActivity> activities = /* ... */;

          DataStream<TimeoutAlert> timeouts = activities
              .keyBy(a -> a.userId)
              .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
              .process(new ProcessWindowFunction<UserActivity, TimeoutAlert, String, TimeWindow>() {

                  @Override
                  public void process(
                          String userId,
                          Context context,
                          Iterable<UserActivity> activities,
                          Collector<TimeoutAlert> out) {

                      long sessionDuration = context.window().getEnd() -
                                           context.window().getStart();

                      // Alert if session longer than 4 hours
                      if (sessionDuration > 4 * 60 * 60 * 1000) {
                          out.collect(new TimeoutAlert(
                              userId,
                              context.window().getStart(),
                              sessionDuration
                          ));
                      }
                  }
              });

          timeouts.print();
          env.execute("Session Timeout Detection");
      }
  }
  ```
</details>

### Exercise 2: Top-N per Window

Find top N items in each window.

```java theme={null}
// TODO: Implement top-N products by sales per hour
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class TopNPerWindow {

      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env =
              StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream<Sale> sales = /* ... */;

          DataStream<TopProducts> topN = sales
              .keyBy(sale -> "global")  // Global window
              .window(TumblingEventTimeWindows.of(Time.hours(1)))
              .aggregate(
                  new SalesAggregator(),
                  new TopNWindowFunction(10)
              );

          topN.print();
          env.execute("Top-N per Window");
      }

      static class SalesAggregator implements
              AggregateFunction<Sale, Map<String, Long>, Map<String, Long>> {

          @Override
          public Map<String, Long> createAccumulator() {
              return new HashMap<>();
          }

          @Override
          public Map<String, Long> add(Sale sale, Map<String, Long> acc) {
              acc.merge(sale.productId, sale.quantity, Long::sum);
              return acc;
          }

          @Override
          public Map<String, Long> getResult(Map<String, Long> acc) {
              return acc;
          }

          @Override
          public Map<String, Long> merge(Map<String, Long> acc1, Map<String, Long> acc2) {
              acc2.forEach((k, v) -> acc1.merge(k, v, Long::sum));
              return acc1;
          }
      }

      static class TopNWindowFunction extends
              ProcessWindowFunction<Map<String, Long>, TopProducts, String, TimeWindow> {

          private final int topN;

          public TopNWindowFunction(int topN) {
              this.topN = topN;
          }

          @Override
          public void process(
                  String key,
                  Context context,
                  Iterable<Map<String, Long>> sales,
                  Collector<TopProducts> out) {

              Map<String, Long> salesMap = sales.iterator().next();

              List<ProductSales> topProducts = salesMap.entrySet().stream()
                  .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
                  .limit(topN)
                  .map(e -> new ProductSales(e.getKey(), e.getValue()))
                  .collect(Collectors.toList());

              out.collect(new TopProducts(
                  context.window().getEnd(),
                  topProducts
              ));
          }
      }
  }
  ```
</details>

## Summary

In this module, you learned:

* Window types: tumbling, sliding, session, global, count
* Window functions: reduce, aggregate, process
* Triggers for custom window firing logic
* Evictors for element removal
* Handling late data with allowed lateness
* Real-world examples: analytics, sessions, anomaly detection
* Window joins and CoGroup
* Performance optimization techniques
* Production best practices

## Next Steps

<Card title="Module 6: Table API & Flink SQL" icon="table" href="/distributed-systems-tools/flink-sql">
  Learn declarative stream processing with SQL
</Card>

## Additional Resources

* [Flink Windows Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
* [Window Joins](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/joining/)
