Windows & Time Operations
Module Duration: 4-5 hours
Focus: Windowing in Apache Flink
Prerequisites: DataStream API, Event Time concepts
Introduction
Windows are essential for bounded computations over unbounded streams. They divide the infinite stream into finite chunks based on time or count, enabling aggregations, joins, and pattern detection.Window Types
Tumbling Windows
Non-overlapping, fixed-size windows.Copy
// Event-time tumbling windows
DataStream<Event> input = /* ... */;
DataStream<Result> results = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce(new MyReduceFunction());
// Processing-time tumbling windows
DataStream<Result> processingResults = input
.keyBy(e -> e.key)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(new MyReduceFunction());
// Offset windows (e.g., start at :02 instead of :00)
DataStream<Result> offsetResults = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5), Time.minutes(2)))
.reduce(new MyReduceFunction());
Sliding Windows
Overlapping windows with fixed size and slide interval.Copy
// 10-minute windows sliding every 5 minutes
DataStream<Result> slidingResults = input
.keyBy(e -> e.key)
.window(SlidingEventTimeWindows.of(
Time.minutes(10), // window size
Time.minutes(5) // slide interval
))
.aggregate(new MyAggregateFunction());
// Processing time variant
DataStream<Result> processingSlidingResults = input
.keyBy(e -> e.key)
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.aggregate(new MyAggregateFunction());
Session Windows
Dynamic windows based on activity gaps.Copy
// Session window with 10-minute gap
DataStream<Result> sessionResults = input
.keyBy(e -> e.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.process(new SessionWindowFunction());
// Dynamic gap based on element
DataStream<Result> dynamicSessionResults = input
.keyBy(e -> e.userId)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
@Override
public long extract(Event element) {
// Users with premium accounts get longer session gaps
return element.isPremium() ? 30 * 60 * 1000 : 10 * 60 * 1000;
}
}))
.process(new SessionWindowFunction());
Global Windows
All elements go to a single window (requires custom trigger).Copy
DataStream<Result> globalResults = input
.keyBy(e -> e.key)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100)) // Fire every 100 elements
.process(new MyWindowFunction());
Count Windows
Windows based on element count instead of time.Copy
// Tumbling count window: every 100 elements
DataStream<Result> countResults = input
.keyBy(e -> e.key)
.countWindow(100)
.reduce(new MyReduceFunction());
// Sliding count window: 100 elements, slide every 10
DataStream<Result> slidingCountResults = input
.keyBy(e -> e.key)
.countWindow(100, 10)
.reduce(new MyReduceFunction());
Window Functions
ReduceFunction
Incrementally combines elements.Copy
public class SumReduceFunction implements ReduceFunction<SensorReading> {
@Override
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return new SensorReading(
r1.sensorId,
r1.temperature + r2.temperature,
Math.max(r1.timestamp, r2.timestamp)
);
}
}
// Usage
DataStream<SensorReading> sums = readings
.keyBy(r -> r.sensorId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new SumReduceFunction());
AggregateFunction
More efficient than reduce, with separate accumulator.Copy
public class AverageAggregate implements
AggregateFunction<SensorReading, Tuple2<Double, Long>, Double> {
@Override
public Tuple2<Double, Long> createAccumulator() {
return new Tuple2<>(0.0, 0L);
}
@Override
public Tuple2<Double, Long> add(SensorReading reading, Tuple2<Double, Long> acc) {
return new Tuple2<>(acc.f0 + reading.temperature, acc.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Long> acc) {
return acc.f0 / acc.f1;
}
@Override
public Tuple2<Double, Long> merge(Tuple2<Double, Long> acc1, Tuple2<Double, Long> acc2) {
return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
}
}
// Usage
DataStream<Double> averages = readings
.keyBy(r -> r.sensorId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AverageAggregate());
ProcessWindowFunction
Access to all window elements and metadata.Copy
public class TopNWindowFunction extends
ProcessWindowFunction<Event, String, String, TimeWindow> {
private final int topN;
public TopNWindowFunction(int topN) {
this.topN = topN;
}
@Override
public void process(
String key,
Context context,
Iterable<Event> events,
Collector<String> out) {
List<Event> eventList = StreamSupport
.stream(events.spliterator(), false)
.sorted(Comparator.comparingDouble(e -> -e.value))
.limit(topN)
.collect(Collectors.toList());
out.collect(String.format(
"Key: %s, Window: [%d - %d], Top %d: %s",
key,
context.window().getStart(),
context.window().getEnd(),
topN,
eventList
));
}
}
// Usage
DataStream<String> topN = events
.keyBy(e -> e.category)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new TopNWindowFunction(10));
Combining Incremental and Full-Window Processing
Copy
// Efficient: Pre-aggregate with AggregateFunction, then process with window context
public class MyAggregateFunction implements
AggregateFunction<Event, Long, Long> {
// Incremental aggregation (efficient)
// Implementation similar to above
}
public class MyProcessWindowFunction extends
ProcessWindowFunction<Long, Result, String, TimeWindow> {
// Has access to window metadata
@Override
public void process(
String key,
Context context,
Iterable<Long> aggregates,
Collector<Result> out) {
Long aggregatedValue = aggregates.iterator().next();
out.collect(new Result(
key,
aggregatedValue,
context.window().getStart(),
context.window().getEnd()
));
}
}
// Combine both
DataStream<Result> results = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());
Triggers
Control when windows fire.Copy
// Built-in triggers
// 1. EventTimeTrigger: Fires when watermark passes window end
// 2. ProcessingTimeTrigger: Fires when processing time passes window end
// 3. CountTrigger: Fires after N elements
// 4. PurgingTrigger: Fires and purges window state
// Custom trigger
public class EarlyAndLateTrigger extends Trigger<Event, TimeWindow> {
@Override
public TriggerResult onElement(
Event element,
long timestamp,
TimeWindow window,
TriggerContext ctx) throws Exception {
// Register timers
ctx.registerEventTimeTimer(window.maxTimestamp());
// Early firing: every 1000 elements
long count = ctx.getPartitionedState(
new ValueStateDescriptor<>("count", Long.class)
).value();
if (count != null && count >= 1000) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(
long time,
TimeWindow window,
TriggerContext ctx) throws Exception {
return time == window.maxTimestamp()
? TriggerResult.FIRE_AND_PURGE
: TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long time,
TimeWindow window,
TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
ctx.getPartitionedState(
new ValueStateDescriptor<>("count", Long.class)
).clear();
}
}
// Usage
DataStream<Result> withCustomTrigger = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.trigger(new EarlyAndLateTrigger())
.process(new MyWindowFunction());
Evictors
Remove elements from window before/after function application.Copy
// Built-in evictors
// 1. CountEvictor: Keeps max N elements
// 2. TimeEvictor: Keeps elements from last N milliseconds
// 3. DeltaEvictor: Keeps elements within delta of last element
// Custom evictor
public class OutlierEvictor implements Evictor<Event, TimeWindow> {
@Override
public void evictBefore(
Iterable<TimestampedValue<Event>> elements,
int size,
TimeWindow window,
EvictorContext evictorContext) {
// Calculate mean
double mean = StreamSupport.stream(elements.spliterator(), false)
.mapToDouble(e -> e.getValue().value)
.average()
.orElse(0.0);
// Calculate standard deviation
double variance = StreamSupport.stream(elements.spliterator(), false)
.mapToDouble(e -> Math.pow(e.getValue().value - mean, 2))
.average()
.orElse(0.0);
double stdDev = Math.sqrt(variance);
// Remove outliers (> 3 standard deviations)
Iterator<TimestampedValue<Event>> iterator = elements.iterator();
while (iterator.hasNext()) {
TimestampedValue<Event> element = iterator.next();
if (Math.abs(element.getValue().value - mean) > 3 * stdDev) {
iterator.remove();
}
}
}
@Override
public void evictAfter(
Iterable<TimestampedValue<Event>> elements,
int size,
TimeWindow window,
EvictorContext evictorContext) {
// No post-processing eviction
}
}
// Usage
DataStream<Result> withEvictor = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.evictor(new OutlierEvictor())
.process(new MyWindowFunction());
Allowed Lateness
Copy
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-events"){};
SingleOutputStreamOperator<Result> results = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(2)) // Keep window state for 2 extra minutes
.sideOutputLateData(lateDataTag)
.aggregate(new MyAggregateFunction());
// Process late data
DataStream<Event> lateData = results.getSideOutput(lateDataTag);
lateData.addSink(new LateDataSink());
Real-World Examples
Example 1: Real-Time Analytics Dashboard
Copy
public class RealTimeAnalyticsDashboard {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<PageView> pageViews = env
.addSource(new PageViewSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<PageView>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((view, ts) -> view.timestamp)
);
// 1-minute tumbling windows for page view counts
DataStream<PageViewStats> stats = pageViews
.keyBy(view -> view.page)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new PageViewAggregator());
// 5-minute sliding windows for trending pages
DataStream<TrendingPage> trending = pageViews
.keyBy(view -> view.page)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new TrendingPageFunction());
stats.addSink(new DashboardSink("page_views"));
trending.addSink(new DashboardSink("trending"));
env.execute("Real-Time Analytics Dashboard");
}
static class PageViewAggregator implements
AggregateFunction<PageView, Tuple3<String, Long, Set<String>>, PageViewStats> {
@Override
public Tuple3<String, Long, Set<String>> createAccumulator() {
return new Tuple3<>("", 0L, new HashSet<>());
}
@Override
public Tuple3<String, Long, Set<String>> add(
PageView view,
Tuple3<String, Long, Set<String>> acc) {
acc.f0 = view.page;
acc.f1 += 1;
acc.f2.add(view.userId);
return acc;
}
@Override
public PageViewStats getResult(Tuple3<String, Long, Set<String>> acc) {
return new PageViewStats(acc.f0, acc.f1, acc.f2.size());
}
@Override
public Tuple3<String, Long, Set<String>> merge(
Tuple3<String, Long, Set<String>> acc1,
Tuple3<String, Long, Set<String>> acc2) {
acc1.f1 += acc2.f1;
acc1.f2.addAll(acc2.f2);
return acc1;
}
}
}
Example 2: User Session Analysis
Copy
public class UserSessionAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserAction> actions = env
.addSource(new UserActionSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<UserAction>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((action, ts) -> action.timestamp)
);
// Session windows with 30-minute inactivity gap
DataStream<UserSession> sessions = actions
.keyBy(action -> action.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new ProcessWindowFunction<UserAction, UserSession, String, TimeWindow>() {
@Override
public void process(
String userId,
Context context,
Iterable<UserAction> actions,
Collector<UserSession> out) {
List<UserAction> actionList = StreamSupport
.stream(actions.spliterator(), false)
.sorted(Comparator.comparingLong(a -> a.timestamp))
.collect(Collectors.toList());
long sessionDuration = context.window().getEnd() - context.window().getStart();
int pageViews = (int) actionList.stream()
.filter(a -> a.type.equals("page_view"))
.count();
boolean hasConversion = actionList.stream()
.anyMatch(a -> a.type.equals("purchase"));
out.collect(new UserSession(
userId,
context.window().getStart(),
context.window().getEnd(),
sessionDuration,
actionList.size(),
pageViews,
hasConversion
));
}
});
sessions.addSink(new SessionAnalyticsSink());
env.execute("User Session Analysis");
}
}
Example 3: Anomaly Detection with Windows
Copy
public class AnomalyDetection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Metric> metrics = env
.addSource(new MetricSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Metric>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((metric, ts) -> metric.timestamp)
);
// Calculate statistics over 5-minute windows
DataStream<MetricStats> stats = metrics
.keyBy(m -> m.metricName)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(
new StatisticsAggregator(),
new StatsWindowFunction()
);
// Detect anomalies
DataStream<Anomaly> anomalies = stats
.keyBy(s -> s.metricName)
.process(new AnomalyDetectionFunction());
anomalies.addSink(new AlertingSink());
env.execute("Anomaly Detection");
}
static class StatisticsAggregator implements
AggregateFunction<Metric, StatAccumulator, StatAccumulator> {
@Override
public StatAccumulator createAccumulator() {
return new StatAccumulator();
}
@Override
public StatAccumulator add(Metric metric, StatAccumulator acc) {
acc.count++;
acc.sum += metric.value;
acc.sumSquares += metric.value * metric.value;
acc.min = Math.min(acc.min, metric.value);
acc.max = Math.max(acc.max, metric.value);
return acc;
}
@Override
public StatAccumulator getResult(StatAccumulator acc) {
return acc;
}
@Override
public StatAccumulator merge(StatAccumulator acc1, StatAccumulator acc2) {
acc1.count += acc2.count;
acc1.sum += acc2.sum;
acc1.sumSquares += acc2.sumSquares;
acc1.min = Math.min(acc1.min, acc2.min);
acc1.max = Math.max(acc1.max, acc2.max);
return acc1;
}
}
static class AnomalyDetectionFunction extends
KeyedProcessFunction<String, MetricStats, Anomaly> {
private ValueState<MetricStats> previousStats;
@Override
public void open(Configuration parameters) {
previousStats = getRuntimeContext().getState(
new ValueStateDescriptor<>("previous", MetricStats.class)
);
}
@Override
public void processElement(
MetricStats current,
Context ctx,
Collector<Anomaly> out) throws Exception {
MetricStats previous = previousStats.value();
if (previous != null) {
// Z-score anomaly detection
double zScore = Math.abs(current.mean - previous.mean) / previous.stdDev;
if (zScore > 3.0) { // 3 standard deviations
out.collect(new Anomaly(
current.metricName,
current.mean,
previous.mean,
zScore,
current.windowEnd
));
}
}
previousStats.update(current);
}
}
}
Window Joins
Tumbling Window Join
Copy
DataStream<ClickEvent> clicks = /* ... */;
DataStream<ImpressionEvent> impressions = /* ... */;
DataStream<JoinedEvent> joined = clicks
.join(impressions)
.where(click -> click.adId)
.equalTo(impression -> impression.adId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new JoinFunction<ClickEvent, ImpressionEvent, JoinedEvent>() {
@Override
public JoinedEvent join(ClickEvent click, ImpressionEvent impression) {
return new JoinedEvent(click, impression);
}
});
Interval Join
Copy
// Join events that occur within a time interval
DataStream<JoinedEvent> intervalJoined = clicks
.keyBy(click -> click.adId)
.intervalJoin(impressions.keyBy(imp -> imp.adId))
.between(Time.seconds(-10), Time.seconds(10)) // Click within ±10s of impression
.process(new ProcessJoinFunction<ClickEvent, ImpressionEvent, JoinedEvent>() {
@Override
public void processElement(
ClickEvent click,
ImpressionEvent impression,
Context ctx,
Collector<JoinedEvent> out) {
out.collect(new JoinedEvent(click, impression));
}
});
CoGroup
More flexible than join - access to all elements from both sides.Copy
DataStream<Result> cogrouped = stream1
.coGroup(stream2)
.where(e1 -> e1.key)
.equalTo(e2 -> e2.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new CoGroupFunction<Event1, Event2, Result>() {
@Override
public void coGroup(
Iterable<Event1> first,
Iterable<Event2> second,
Collector<Result> out) {
// Process all matching elements from both streams
for (Event1 e1 : first) {
for (Event2 e2 : second) {
out.collect(new Result(e1, e2));
}
}
}
});
Performance Optimization
1. Choose Right Window Function
Copy
// Best: AggregateFunction (incremental, memory efficient)
.aggregate(new MyAggregateFunction())
// Good: ReduceFunction (incremental)
.reduce(new MyReduceFunction())
// Use sparingly: ProcessWindowFunction (keeps all elements)
.process(new MyProcessWindowFunction())
// Optimal: Combine both
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
2. Window Size Tuning
Copy
// Too small: High overhead, many windows
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
// Too large: High latency, memory pressure
.window(TumblingEventTimeWindows.of(Time.hours(24)))
// Balanced: Based on requirements
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
3. State Management
Copy
// Use state TTL to prevent unbounded growth
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<MyState> descriptor =
new ValueStateDescriptor<>("state", MyState.class);
descriptor.enableTimeToLive(ttlConfig);
Best Practices
1. Always Handle Late Data
Copy
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<Result> results = stream
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.aggregate(new MyAggregateFunction());
results.getSideOutput(lateTag).addSink(new LateDataSink());
2. Use Appropriate Watermark Strategy
Copy
// Align watermarks with window requirements
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, ts) -> event.timestamp);
3. Monitor Window Latency
Copy
public class LatencyMonitoringWindowFunction extends
ProcessWindowFunction<Event, Result, String, TimeWindow> {
private Counter windowsFired;
private Histogram processingLatency;
@Override
public void open(Configuration parameters) {
windowsFired = getRuntimeContext()
.getMetricGroup()
.counter("windows_fired");
processingLatency = getRuntimeContext()
.getMetricGroup()
.histogram("processing_latency", new DescriptiveStatisticsHistogram(1000));
}
@Override
public void process(
String key,
Context context,
Iterable<Event> elements,
Collector<Result> out) {
long windowEnd = context.window().getEnd();
long currentTime = System.currentTimeMillis();
long latency = currentTime - windowEnd;
windowsFired.inc();
processingLatency.update(latency);
// Process elements
// ...
}
}
Exercises
Exercise 1: Implement Session Timeout Detection
Detect when users have been inactive for too long.Copy
// TODO: Use session windows to detect user timeouts
// Alert when session duration exceeds threshold
Exercise 2: Top-N per Window
Find top N items in each window.Copy
// TODO: Implement top-N products by sales per hour
Summary
In this module, you learned:- Window types: tumbling, sliding, session, global, count
- Window functions: reduce, aggregate, process
- Triggers for custom window firing logic
- Evictors for element removal
- Handling late data with allowed lateness
- Real-world examples: analytics, sessions, anomaly detection
- Window joins and CoGroup
- Performance optimization techniques
- Production best practices
Next Steps
Module 6: Table API & Flink SQL
Learn declarative stream processing with SQL