Skip to main content

Windows & Time Operations

Module Duration: 4-5 hours Focus: Windowing in Apache Flink Prerequisites: DataStream API, Event Time concepts

Introduction

Windows are essential for bounded computations over unbounded streams. They divide the infinite stream into finite chunks based on time or count, enabling aggregations, joins, and pattern detection.

Window Types

Tumbling Windows

Non-overlapping, fixed-size windows.
// Event-time tumbling windows
DataStream<Event> input = /* ... */;

DataStream<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .reduce(new MyReduceFunction());

// Processing-time tumbling windows
DataStream<Result> processingResults = input
    .keyBy(e -> e.key)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    .reduce(new MyReduceFunction());

// Offset windows (e.g., start at :02 instead of :00)
DataStream<Result> offsetResults = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5), Time.minutes(2)))
    .reduce(new MyReduceFunction());

Sliding Windows

Overlapping windows with fixed size and slide interval.
// 10-minute windows sliding every 5 minutes
DataStream<Result> slidingResults = input
    .keyBy(e -> e.key)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10),  // window size
        Time.minutes(5)    // slide interval
    ))
    .aggregate(new MyAggregateFunction());

// Processing time variant
DataStream<Result> processingSlidingResults = input
    .keyBy(e -> e.key)
    .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .aggregate(new MyAggregateFunction());

Session Windows

Dynamic windows based on activity gaps.
// Session window with 10-minute gap
DataStream<Result> sessionResults = input
    .keyBy(e -> e.userId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .process(new SessionWindowFunction());

// Dynamic gap based on element
DataStream<Result> dynamicSessionResults = input
    .keyBy(e -> e.userId)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
        @Override
        public long extract(Event element) {
            // Users with premium accounts get longer session gaps
            return element.isPremium() ? 30 * 60 * 1000 : 10 * 60 * 1000;
        }
    }))
    .process(new SessionWindowFunction());

Global Windows

All elements go to a single window (requires custom trigger).
DataStream<Result> globalResults = input
    .keyBy(e -> e.key)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(100))  // Fire every 100 elements
    .process(new MyWindowFunction());

Count Windows

Windows based on element count instead of time.
// Tumbling count window: every 100 elements
DataStream<Result> countResults = input
    .keyBy(e -> e.key)
    .countWindow(100)
    .reduce(new MyReduceFunction());

// Sliding count window: 100 elements, slide every 10
DataStream<Result> slidingCountResults = input
    .keyBy(e -> e.key)
    .countWindow(100, 10)
    .reduce(new MyReduceFunction());

Window Functions

ReduceFunction

Incrementally combines elements.
public class SumReduceFunction implements ReduceFunction<SensorReading> {
    @Override
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return new SensorReading(
            r1.sensorId,
            r1.temperature + r2.temperature,
            Math.max(r1.timestamp, r2.timestamp)
        );
    }
}

// Usage
DataStream<SensorReading> sums = readings
    .keyBy(r -> r.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new SumReduceFunction());

AggregateFunction

More efficient than reduce, with separate accumulator.
public class AverageAggregate implements
        AggregateFunction<SensorReading, Tuple2<Double, Long>, Double> {

    @Override
    public Tuple2<Double, Long> createAccumulator() {
        return new Tuple2<>(0.0, 0L);
    }

    @Override
    public Tuple2<Double, Long> add(SensorReading reading, Tuple2<Double, Long> acc) {
        return new Tuple2<>(acc.f0 + reading.temperature, acc.f1 + 1);
    }

    @Override
    public Double getResult(Tuple2<Double, Long> acc) {
        return acc.f0 / acc.f1;
    }

    @Override
    public Tuple2<Double, Long> merge(Tuple2<Double, Long> acc1, Tuple2<Double, Long> acc2) {
        return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
    }
}

// Usage
DataStream<Double> averages = readings
    .keyBy(r -> r.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AverageAggregate());

ProcessWindowFunction

Access to all window elements and metadata.
public class TopNWindowFunction extends
        ProcessWindowFunction<Event, String, String, TimeWindow> {

    private final int topN;

    public TopNWindowFunction(int topN) {
        this.topN = topN;
    }

    @Override
    public void process(
            String key,
            Context context,
            Iterable<Event> events,
            Collector<String> out) {

        List<Event> eventList = StreamSupport
            .stream(events.spliterator(), false)
            .sorted(Comparator.comparingDouble(e -> -e.value))
            .limit(topN)
            .collect(Collectors.toList());

        out.collect(String.format(
            "Key: %s, Window: [%d - %d], Top %d: %s",
            key,
            context.window().getStart(),
            context.window().getEnd(),
            topN,
            eventList
        ));
    }
}

// Usage
DataStream<String> topN = events
    .keyBy(e -> e.category)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .process(new TopNWindowFunction(10));

Combining Incremental and Full-Window Processing

// Efficient: Pre-aggregate with AggregateFunction, then process with window context
public class MyAggregateFunction implements
        AggregateFunction<Event, Long, Long> {
    // Incremental aggregation (efficient)
    // Implementation similar to above
}

public class MyProcessWindowFunction extends
        ProcessWindowFunction<Long, Result, String, TimeWindow> {
    // Has access to window metadata

    @Override
    public void process(
            String key,
            Context context,
            Iterable<Long> aggregates,
            Collector<Result> out) {

        Long aggregatedValue = aggregates.iterator().next();

        out.collect(new Result(
            key,
            aggregatedValue,
            context.window().getStart(),
            context.window().getEnd()
        ));
    }
}

// Combine both
DataStream<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());

Triggers

Control when windows fire.
// Built-in triggers
// 1. EventTimeTrigger: Fires when watermark passes window end
// 2. ProcessingTimeTrigger: Fires when processing time passes window end
// 3. CountTrigger: Fires after N elements
// 4. PurgingTrigger: Fires and purges window state

// Custom trigger
public class EarlyAndLateTrigger extends Trigger<Event, TimeWindow> {

    @Override
    public TriggerResult onElement(
            Event element,
            long timestamp,
            TimeWindow window,
            TriggerContext ctx) throws Exception {

        // Register timers
        ctx.registerEventTimeTimer(window.maxTimestamp());

        // Early firing: every 1000 elements
        long count = ctx.getPartitionedState(
            new ValueStateDescriptor<>("count", Long.class)
        ).value();

        if (count != null && count >= 1000) {
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(
            long time,
            TimeWindow window,
            TriggerContext ctx) throws Exception {

        return time == window.maxTimestamp()
            ? TriggerResult.FIRE_AND_PURGE
            : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long time,
            TimeWindow window,
            TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
        ctx.getPartitionedState(
            new ValueStateDescriptor<>("count", Long.class)
        ).clear();
    }
}

// Usage
DataStream<Result> withCustomTrigger = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .trigger(new EarlyAndLateTrigger())
    .process(new MyWindowFunction());

Evictors

Remove elements from window before/after function application.
// Built-in evictors
// 1. CountEvictor: Keeps max N elements
// 2. TimeEvictor: Keeps elements from last N milliseconds
// 3. DeltaEvictor: Keeps elements within delta of last element

// Custom evictor
public class OutlierEvictor implements Evictor<Event, TimeWindow> {

    @Override
    public void evictBefore(
            Iterable<TimestampedValue<Event>> elements,
            int size,
            TimeWindow window,
            EvictorContext evictorContext) {

        // Calculate mean
        double mean = StreamSupport.stream(elements.spliterator(), false)
            .mapToDouble(e -> e.getValue().value)
            .average()
            .orElse(0.0);

        // Calculate standard deviation
        double variance = StreamSupport.stream(elements.spliterator(), false)
            .mapToDouble(e -> Math.pow(e.getValue().value - mean, 2))
            .average()
            .orElse(0.0);
        double stdDev = Math.sqrt(variance);

        // Remove outliers (> 3 standard deviations)
        Iterator<TimestampedValue<Event>> iterator = elements.iterator();
        while (iterator.hasNext()) {
            TimestampedValue<Event> element = iterator.next();
            if (Math.abs(element.getValue().value - mean) > 3 * stdDev) {
                iterator.remove();
            }
        }
    }

    @Override
    public void evictAfter(
            Iterable<TimestampedValue<Event>> elements,
            int size,
            TimeWindow window,
            EvictorContext evictorContext) {
        // No post-processing eviction
    }
}

// Usage
DataStream<Result> withEvictor = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .evictor(new OutlierEvictor())
    .process(new MyWindowFunction());

Allowed Lateness

OutputTag<Event> lateDataTag = new OutputTag<Event>("late-events"){};

SingleOutputStreamOperator<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(2))  // Keep window state for 2 extra minutes
    .sideOutputLateData(lateDataTag)
    .aggregate(new MyAggregateFunction());

// Process late data
DataStream<Event> lateData = results.getSideOutput(lateDataTag);
lateData.addSink(new LateDataSink());

Real-World Examples

Example 1: Real-Time Analytics Dashboard

public class RealTimeAnalyticsDashboard {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<PageView> pageViews = env
            .addSource(new PageViewSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<PageView>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((view, ts) -> view.timestamp)
            );

        // 1-minute tumbling windows for page view counts
        DataStream<PageViewStats> stats = pageViews
            .keyBy(view -> view.page)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new PageViewAggregator());

        // 5-minute sliding windows for trending pages
        DataStream<TrendingPage> trending = pageViews
            .keyBy(view -> view.page)
            .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
            .process(new TrendingPageFunction());

        stats.addSink(new DashboardSink("page_views"));
        trending.addSink(new DashboardSink("trending"));

        env.execute("Real-Time Analytics Dashboard");
    }

    static class PageViewAggregator implements
            AggregateFunction<PageView, Tuple3<String, Long, Set<String>>, PageViewStats> {

        @Override
        public Tuple3<String, Long, Set<String>> createAccumulator() {
            return new Tuple3<>("", 0L, new HashSet<>());
        }

        @Override
        public Tuple3<String, Long, Set<String>> add(
                PageView view,
                Tuple3<String, Long, Set<String>> acc) {

            acc.f0 = view.page;
            acc.f1 += 1;
            acc.f2.add(view.userId);
            return acc;
        }

        @Override
        public PageViewStats getResult(Tuple3<String, Long, Set<String>> acc) {
            return new PageViewStats(acc.f0, acc.f1, acc.f2.size());
        }

        @Override
        public Tuple3<String, Long, Set<String>> merge(
                Tuple3<String, Long, Set<String>> acc1,
                Tuple3<String, Long, Set<String>> acc2) {

            acc1.f1 += acc2.f1;
            acc1.f2.addAll(acc2.f2);
            return acc1;
        }
    }
}

Example 2: User Session Analysis

public class UserSessionAnalysis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<UserAction> actions = env
            .addSource(new UserActionSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<UserAction>forBoundedOutOfOrderness(Duration.ofSeconds(30))
                    .withTimestampAssigner((action, ts) -> action.timestamp)
            );

        // Session windows with 30-minute inactivity gap
        DataStream<UserSession> sessions = actions
            .keyBy(action -> action.userId)
            .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
            .process(new ProcessWindowFunction<UserAction, UserSession, String, TimeWindow>() {

                @Override
                public void process(
                        String userId,
                        Context context,
                        Iterable<UserAction> actions,
                        Collector<UserSession> out) {

                    List<UserAction> actionList = StreamSupport
                        .stream(actions.spliterator(), false)
                        .sorted(Comparator.comparingLong(a -> a.timestamp))
                        .collect(Collectors.toList());

                    long sessionDuration = context.window().getEnd() - context.window().getStart();
                    int pageViews = (int) actionList.stream()
                        .filter(a -> a.type.equals("page_view"))
                        .count();
                    boolean hasConversion = actionList.stream()
                        .anyMatch(a -> a.type.equals("purchase"));

                    out.collect(new UserSession(
                        userId,
                        context.window().getStart(),
                        context.window().getEnd(),
                        sessionDuration,
                        actionList.size(),
                        pageViews,
                        hasConversion
                    ));
                }
            });

        sessions.addSink(new SessionAnalyticsSink());
        env.execute("User Session Analysis");
    }
}

Example 3: Anomaly Detection with Windows

public class AnomalyDetection {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Metric> metrics = env
            .addSource(new MetricSource())
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Metric>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((metric, ts) -> metric.timestamp)
            );

        // Calculate statistics over 5-minute windows
        DataStream<MetricStats> stats = metrics
            .keyBy(m -> m.metricName)
            .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
            .aggregate(
                new StatisticsAggregator(),
                new StatsWindowFunction()
            );

        // Detect anomalies
        DataStream<Anomaly> anomalies = stats
            .keyBy(s -> s.metricName)
            .process(new AnomalyDetectionFunction());

        anomalies.addSink(new AlertingSink());
        env.execute("Anomaly Detection");
    }

    static class StatisticsAggregator implements
            AggregateFunction<Metric, StatAccumulator, StatAccumulator> {

        @Override
        public StatAccumulator createAccumulator() {
            return new StatAccumulator();
        }

        @Override
        public StatAccumulator add(Metric metric, StatAccumulator acc) {
            acc.count++;
            acc.sum += metric.value;
            acc.sumSquares += metric.value * metric.value;
            acc.min = Math.min(acc.min, metric.value);
            acc.max = Math.max(acc.max, metric.value);
            return acc;
        }

        @Override
        public StatAccumulator getResult(StatAccumulator acc) {
            return acc;
        }

        @Override
        public StatAccumulator merge(StatAccumulator acc1, StatAccumulator acc2) {
            acc1.count += acc2.count;
            acc1.sum += acc2.sum;
            acc1.sumSquares += acc2.sumSquares;
            acc1.min = Math.min(acc1.min, acc2.min);
            acc1.max = Math.max(acc1.max, acc2.max);
            return acc1;
        }
    }

    static class AnomalyDetectionFunction extends
            KeyedProcessFunction<String, MetricStats, Anomaly> {

        private ValueState<MetricStats> previousStats;

        @Override
        public void open(Configuration parameters) {
            previousStats = getRuntimeContext().getState(
                new ValueStateDescriptor<>("previous", MetricStats.class)
            );
        }

        @Override
        public void processElement(
                MetricStats current,
                Context ctx,
                Collector<Anomaly> out) throws Exception {

            MetricStats previous = previousStats.value();

            if (previous != null) {
                // Z-score anomaly detection
                double zScore = Math.abs(current.mean - previous.mean) / previous.stdDev;

                if (zScore > 3.0) {  // 3 standard deviations
                    out.collect(new Anomaly(
                        current.metricName,
                        current.mean,
                        previous.mean,
                        zScore,
                        current.windowEnd
                    ));
                }
            }

            previousStats.update(current);
        }
    }
}

Window Joins

Tumbling Window Join

DataStream<ClickEvent> clicks = /* ... */;
DataStream<ImpressionEvent> impressions = /* ... */;

DataStream<JoinedEvent> joined = clicks
    .join(impressions)
    .where(click -> click.adId)
    .equalTo(impression -> impression.adId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .apply(new JoinFunction<ClickEvent, ImpressionEvent, JoinedEvent>() {
        @Override
        public JoinedEvent join(ClickEvent click, ImpressionEvent impression) {
            return new JoinedEvent(click, impression);
        }
    });

Interval Join

// Join events that occur within a time interval
DataStream<JoinedEvent> intervalJoined = clicks
    .keyBy(click -> click.adId)
    .intervalJoin(impressions.keyBy(imp -> imp.adId))
    .between(Time.seconds(-10), Time.seconds(10))  // Click within ±10s of impression
    .process(new ProcessJoinFunction<ClickEvent, ImpressionEvent, JoinedEvent>() {
        @Override
        public void processElement(
                ClickEvent click,
                ImpressionEvent impression,
                Context ctx,
                Collector<JoinedEvent> out) {

            out.collect(new JoinedEvent(click, impression));
        }
    });

CoGroup

More flexible than join - access to all elements from both sides.
DataStream<Result> cogrouped = stream1
    .coGroup(stream2)
    .where(e1 -> e1.key)
    .equalTo(e2 -> e2.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .apply(new CoGroupFunction<Event1, Event2, Result>() {
        @Override
        public void coGroup(
                Iterable<Event1> first,
                Iterable<Event2> second,
                Collector<Result> out) {

            // Process all matching elements from both streams
            for (Event1 e1 : first) {
                for (Event2 e2 : second) {
                    out.collect(new Result(e1, e2));
                }
            }
        }
    });

Performance Optimization

1. Choose Right Window Function

// Best: AggregateFunction (incremental, memory efficient)
.aggregate(new MyAggregateFunction())

// Good: ReduceFunction (incremental)
.reduce(new MyReduceFunction())

// Use sparingly: ProcessWindowFunction (keeps all elements)
.process(new MyProcessWindowFunction())

// Optimal: Combine both
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

2. Window Size Tuning

// Too small: High overhead, many windows
.window(TumblingEventTimeWindows.of(Time.seconds(1)))

// Too large: High latency, memory pressure
.window(TumblingEventTimeWindows.of(Time.hours(24)))

// Balanced: Based on requirements
.window(TumblingEventTimeWindows.of(Time.minutes(5)))

3. State Management

// Use state TTL to prevent unbounded growth
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<MyState> descriptor =
    new ValueStateDescriptor<>("state", MyState.class);
descriptor.enableTimeToLive(ttlConfig);

Best Practices

1. Always Handle Late Data

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Result> results = stream
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateTag)
    .aggregate(new MyAggregateFunction());

results.getSideOutput(lateTag).addSink(new LateDataSink());

2. Use Appropriate Watermark Strategy

// Align watermarks with window requirements
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
    .withTimestampAssigner((event, ts) -> event.timestamp);

3. Monitor Window Latency

public class LatencyMonitoringWindowFunction extends
        ProcessWindowFunction<Event, Result, String, TimeWindow> {

    private Counter windowsFired;
    private Histogram processingLatency;

    @Override
    public void open(Configuration parameters) {
        windowsFired = getRuntimeContext()
            .getMetricGroup()
            .counter("windows_fired");

        processingLatency = getRuntimeContext()
            .getMetricGroup()
            .histogram("processing_latency", new DescriptiveStatisticsHistogram(1000));
    }

    @Override
    public void process(
            String key,
            Context context,
            Iterable<Event> elements,
            Collector<Result> out) {

        long windowEnd = context.window().getEnd();
        long currentTime = System.currentTimeMillis();
        long latency = currentTime - windowEnd;

        windowsFired.inc();
        processingLatency.update(latency);

        // Process elements
        // ...
    }
}

Exercises

Exercise 1: Implement Session Timeout Detection

Detect when users have been inactive for too long.
// TODO: Use session windows to detect user timeouts
// Alert when session duration exceeds threshold

Exercise 2: Top-N per Window

Find top N items in each window.
// TODO: Implement top-N products by sales per hour

Summary

In this module, you learned:
  • Window types: tumbling, sliding, session, global, count
  • Window functions: reduce, aggregate, process
  • Triggers for custom window firing logic
  • Evictors for element removal
  • Handling late data with allowed lateness
  • Real-world examples: analytics, sessions, anomaly detection
  • Window joins and CoGroup
  • Performance optimization techniques
  • Production best practices

Next Steps

Module 6: Table API & Flink SQL

Learn declarative stream processing with SQL

Additional Resources