Event Time & Watermarks
Module Duration: 4-5 hours
Focus: Event time processing in Apache Flink
Prerequisites: DataStream API, streaming concepts
Introduction
Event time processing is fundamental to building correct streaming applications. Unlike processing time (when events are processed), event time represents when events actually occurred. This distinction is critical for handling out-of-order events, late data, and producing deterministic results.Time Semantics
Three Notions of Time
Copy
/**
* Processing Time: Time when the event is processed by the operator
* - Non-deterministic (depends on system speed, load)
* - Lowest latency
* - No coordination required
*
* Event Time: Time when the event actually occurred (embedded in record)
* - Deterministic results
* - Handles out-of-order events
* - Requires watermarks
*
* Ingestion Time: Time when event enters Flink
* - Middle ground between processing and event time
* - Rarely used in practice
*/
// Processing time (default)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// Event time (most common for correct results)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Note: Since Flink 1.12+, event time is the default
Why Event Time Matters
Copy
// Example: Two events arrive out of order
// Event A: timestamp=10:00:00, arrives at 10:00:05
// Event B: timestamp=10:00:02, arrives at 10:00:03
// Processing Time Window [10:00:00 - 10:00:05):
// - Would only include Event B (arrived first)
// - Results depend on network latency, processing speed
// Event Time Window [10:00:00 - 10:00:05):
// - Includes both events (based on actual event timestamp)
// - Results are deterministic and correct
Watermarks
Watermarks are timestamps that flow through the stream, indicating “all events with timestamp < T have arrived.”Watermark Semantics
Copy
/**
* Watermark(T) means: "No more events with timestamp < T will arrive"
*
* Properties:
* 1. Monotonically increasing (never decrease)
* 2. Allow processing despite out-of-order events
* 3. Trade-off between latency and completeness
*
* When watermark T arrives:
* - All windows with end time <= T are triggered
* - Events with timestamp < T arriving after are considered "late"
*/
public class Watermark implements Serializable {
private final long timestamp;
// Special watermark values
public static final long MAX_WATERMARK = Long.MAX_VALUE; // End of stream
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestamp() {
return timestamp;
}
}
Watermark Strategies
Flink 1.11+ introduced the WatermarkStrategy API:Copy
// Built-in watermark strategies
// 1. For monotonically ascending timestamps (no out-of-order)
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
DataStream<Event> withTimestamps = stream.assignTimestampsAndWatermarks(strategy);
// 2. For bounded out-of-orderness (most common)
WatermarkStrategy<Event> boundedStrategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // Max 10s out-of-order
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
// 3. Custom watermark generator
WatermarkStrategy<Event> customStrategy = WatermarkStrategy
.forGenerator((context) -> new CustomWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
Watermark Generation Patterns
Periodic Watermark Generator
Generated at fixed intervals (default: 200ms).Copy
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// Track maximum timestamp seen
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Emit watermark: max timestamp - max out-of-orderness
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
// Usage
WatermarkStrategy<Event> strategy = WatermarkStrategy
.forGenerator((context) -> new BoundedOutOfOrdernessGenerator())
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
DataStream<Event> stream = source
.assignTimestampsAndWatermarks(strategy);
Punctuated Watermark Generator
Emits watermarks based on special marker events.Copy
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// Emit watermark on special marker events
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(eventTimestamp));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Don't emit periodic watermarks
}
}
Advanced Watermark Strategies
Per-Partition Watermarks (Kafka)
Copy
// Watermark alignment for Kafka sources with multiple partitions
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1)); // Handle idle partitions
// The idleness timeout prevents one slow partition from holding back watermarks
Custom Watermark with Multiple Timestamps
Copy
public class MultiTimestampWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxEventTime = Long.MIN_VALUE;
private long maxIngestionTime = Long.MIN_VALUE;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxEventTime = Math.max(maxEventTime, event.getEventTime());
maxIngestionTime = Math.max(maxIngestionTime, event.getIngestionTime());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Use minimum of both timestamps for conservative watermarking
long watermarkTimestamp = Math.min(
maxEventTime - 5000, // 5 sec lag for event time
maxIngestionTime - 1000 // 1 sec lag for ingestion time
);
output.emitWatermark(new Watermark(watermarkTimestamp));
}
}
Timestamp Extractors
Built-in Timestamp Assigners
Copy
// 1. AscendingTimestampExtractor (deprecated, use WatermarkStrategy)
public class MyAscendingExtractor implements TimestampAssigner<Event> {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getEventTime();
}
}
// Modern approach with WatermarkStrategy
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new MyAscendingExtractor());
// 2. Bounded out-of-orderness with custom timestamp field
WatermarkStrategy<SensorReading> sensorStrategy = WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((reading, ts) -> reading.timestamp);
Timestamp Extraction from Different Sources
Copy
// From JSON
public class JsonTimestampAssigner implements TimestampAssigner<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public long extractTimestamp(String element, long recordTimestamp) {
try {
JsonNode node = mapper.readTree(element);
return node.get("timestamp").asLong();
} catch (Exception e) {
return System.currentTimeMillis(); // Fallback
}
}
}
// From Kafka record metadata
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
WatermarkStrategy<String> strategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((element, recordTimestamp) -> {
// recordTimestamp is the Kafka record timestamp
return recordTimestamp;
});
DataStream<String> stream = env.fromSource(
kafkaSource,
strategy,
"Kafka Source"
);
// From embedded timestamp in CSV
WatermarkStrategy<String> csvStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((csvLine, ts) -> {
String[] fields = csvLine.split(",");
return Long.parseLong(fields[0]); // Assuming first field is timestamp
});
Allowed Lateness
Handle events that arrive after the watermark has passed.Copy
// Configure allowed lateness for a window
DataStream<Event> input = /* ... */;
DataStream<Result> results = input
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // Allow events up to 1 minute late
.sideOutputLateData(lateDataTag) // Capture very late data
.process(new MyWindowFunction());
// Get the late data side output
DataStream<Event> lateData = results.getSideOutput(lateDataTag);
// Handle late data
lateData.addSink(new LateDataSink());
Complete Late Data Handling Example
Copy
public class LateDataHandlingExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Configure watermark interval
env.getConfig().setAutoWatermarkInterval(1000); // 1 second
// Late data output tag
final OutputTag<SensorReading> lateDataTag =
new OutputTag<SensorReading>("late-readings"){};
DataStream<SensorReading> readings = /* ... */;
// Assign timestamps and watermarks
DataStream<SensorReading> withTimestamps = readings
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((reading, ts) -> reading.timestamp)
);
// Windowed aggregation with allowed lateness
SingleOutputStreamOperator<TemperatureAlert> alerts = withTimestamps
.keyBy(reading -> reading.sensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(5)) // Accept events up to 5s late
.sideOutputLateData(lateDataTag) // Capture very late data
.process(new ProcessWindowFunction<SensorReading, TemperatureAlert, String, TimeWindow>() {
@Override
public void process(
String sensorId,
Context context,
Iterable<SensorReading> readings,
Collector<TemperatureAlert> out) {
double avgTemp = StreamSupport.stream(readings.spliterator(), false)
.mapToDouble(r -> r.temperature)
.average()
.orElse(0.0);
if (avgTemp > 75.0) {
out.collect(new TemperatureAlert(
sensorId,
avgTemp,
context.window().getEnd()
));
}
}
});
// Process main results
alerts.print();
// Process late data separately
alerts.getSideOutput(lateDataTag)
.map(reading -> "Late data: " + reading)
.print();
env.execute("Late Data Handling");
}
}
Watermark Propagation
Multi-Input Operators
Copy
// Watermarks propagate through the dataflow graph
// For operators with multiple inputs, the minimum watermark is used
DataStream<Event> stream1 = source1
.assignTimestampsAndWatermarks(strategy1);
DataStream<Event> stream2 = source2
.assignTimestampsAndWatermarks(strategy2);
// Union: watermark = min(stream1.watermark, stream2.watermark)
DataStream<Event> unioned = stream1.union(stream2);
// Connect: same watermark semantics
ConnectedStreams<Event, Event> connected = stream1.connect(stream2);
Watermark Alignment
Copy
// For Kafka sources with multiple partitions
KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setDeserializer(new EventDeserializer())
.build();
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.timestamp)
.withIdleness(Duration.ofMinutes(1)) // Mark idle partitions
.withWatermarkAlignment("alignment-group", Duration.ofSeconds(20)); // Align watermarks
DataStream<Event> stream = env.fromSource(kafkaSource, strategy, "Kafka");
Real-World Use Cases
Use Case 1: IoT Sensor Data with Clock Drift
Copy
public class IoTSensorProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Custom watermark generator accounting for clock drift
WatermarkStrategy<SensorReading> strategy = WatermarkStrategy
.forGenerator(context -> new ClockDriftWatermarkGenerator())
.withTimestampAssigner((reading, ts) -> reading.deviceTimestamp);
DataStream<SensorReading> sensorData = env
.addSource(new IoTSensorSource())
.assignTimestampsAndWatermarks(strategy);
// Aggregate sensor readings in 1-minute windows
DataStream<SensorStats> stats = sensorData
.keyBy(reading -> reading.sensorId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30)) // Account for clock drift
.aggregate(new SensorAggregateFunction());
stats.print();
env.execute("IoT Sensor Processing");
}
static class ClockDriftWatermarkGenerator implements WatermarkGenerator<SensorReading> {
private long maxTimestamp = Long.MIN_VALUE;
private long maxDrift = 0;
@Override
public void onEvent(SensorReading event, long eventTimestamp, WatermarkOutput output) {
long serverTime = System.currentTimeMillis();
long drift = Math.abs(serverTime - event.deviceTimestamp);
maxTimestamp = Math.max(maxTimestamp, event.deviceTimestamp);
maxDrift = Math.max(maxDrift, drift);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Account for observed clock drift
output.emitWatermark(new Watermark(maxTimestamp - maxDrift - 1000));
}
}
}
Use Case 2: Log Aggregation from Distributed Systems
Copy
public class DistributedLogAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Different log sources with different delays
DataStream<LogEntry> appLogs = env
.addSource(new KafkaSource<>("app-logs"))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((log, ts) -> log.timestamp)
);
DataStream<LogEntry> accessLogs = env
.addSource(new KafkaSource<>("access-logs"))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((log, ts) -> log.timestamp)
);
// Union and aggregate
DataStream<LogStats> aggregated = appLogs
.union(accessLogs)
.keyBy(log -> log.service)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.aggregate(new LogAggregator());
aggregated.print();
env.execute("Distributed Log Aggregation");
}
}
Use Case 3: Financial Transaction Processing
Copy
public class TransactionProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Strict watermarking for financial data
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofMillis(100))
.withTimestampAssigner((txn, ts) -> txn.transactionTime);
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.assignTimestampsAndWatermarks(strategy);
// Session windows for fraud detection
DataStream<FraudAlert> alerts = transactions
.keyBy(txn -> txn.accountId)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.allowedLateness(Time.seconds(10))
.process(new FraudDetectionFunction());
alerts.addSink(new AlertingSink());
env.execute("Transaction Processing");
}
static class FraudDetectionFunction extends
ProcessWindowFunction<Transaction, FraudAlert, String, TimeWindow> {
@Override
public void process(
String accountId,
Context context,
Iterable<Transaction> transactions,
Collector<FraudAlert> out) {
List<Transaction> txnList = StreamSupport
.stream(transactions.spliterator(), false)
.collect(Collectors.toList());
// Detect suspicious patterns
double totalAmount = txnList.stream()
.mapToDouble(t -> t.amount)
.sum();
if (totalAmount > 10000 && txnList.size() > 10) {
out.collect(new FraudAlert(
accountId,
"High frequency and volume",
totalAmount,
context.window().getEnd()
));
}
}
}
}
Debugging Watermarks
Watermark Monitoring
Copy
// Add watermark callbacks for debugging
public class WatermarkMonitor<T> implements WatermarkStrategy<T> {
private final WatermarkStrategy<T> baseStrategy;
public WatermarkMonitor(WatermarkStrategy<T> baseStrategy) {
this.baseStrategy = baseStrategy;
}
@Override
public WatermarkGenerator<T> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<T>() {
private final WatermarkGenerator<T> base =
baseStrategy.createWatermarkGenerator(context);
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
base.onEvent(event, eventTimestamp, new WatermarkOutput() {
@Override
public void emitWatermark(Watermark watermark) {
System.out.println("Emitting watermark: " + watermark.getTimestamp());
output.emitWatermark(watermark);
}
@Override
public void markIdle() {
System.out.println("Marking source as idle");
output.markIdle();
}
@Override
public void markActive() {
System.out.println("Marking source as active");
output.markActive();
}
});
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
base.onPeriodicEmit(output);
}
};
}
@Override
public TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
return baseStrategy.createTimestampAssigner(context);
}
}
// Usage
WatermarkStrategy<Event> debugStrategy = new WatermarkMonitor<>(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.timestamp)
);
Watermark Metrics
Copy
public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {
private long currentMaxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 5000;
private Gauge<Long> watermarkGauge;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
// Update metric
if (watermarkGauge != null) {
watermarkGauge.setValue(currentMaxTimestamp - maxOutOfOrderness);
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
public void setWatermarkGauge(Gauge<Long> gauge) {
this.watermarkGauge = gauge;
}
}
Best Practices
1. Choose Appropriate Watermark Strategy
Copy
// For ordered data (e.g., database CDC)
WatermarkStrategy.forMonotonousTimestamps()
// For mostly ordered data with bounded delays (most common)
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
// For highly out-of-order data
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withIdleness(Duration.ofMinutes(1))
// For data with unpredictable delays
// Use custom generator with adaptive logic
2. Handle Idle Sources
Copy
// Prevent slow/idle partitions from blocking watermarks
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.timestamp)
.withIdleness(Duration.ofMinutes(1)); // Mark idle after 1 minute
3. Set Appropriate Allowed Lateness
Copy
// Balance between completeness and latency
dataStream
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // Reasonable for most cases
.sideOutputLateData(lateDataTag); // Always capture very late data
4. Monitor Watermark Lag
Copy
// Watermark lag = current processing time - current watermark
// High lag indicates:
// - Slow source
// - Too conservative watermark strategy
// - Processing bottleneck
// Monitor via Flink metrics
getRuntimeContext()
.getMetricGroup()
.gauge("watermark_lag", () ->
System.currentTimeMillis() - currentWatermark
);
5. Test with Out-of-Order Data
Copy
// Test source with controlled out-of-order events
public class OutOfOrderTestSource implements SourceFunction<Event> {
private volatile boolean running = true;
private final Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
long timestamp = System.currentTimeMillis();
while (running) {
// Generate events with random delays
long eventTime = timestamp - random.nextInt(10000); // Up to 10s in the past
ctx.collect(new Event("key", eventTime, "value"));
timestamp += 1000;
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
}
Performance Optimization
1. Watermark Interval Tuning
Copy
// Default watermark interval: 200ms
// Increase for high-throughput, low-latency scenarios
env.getConfig().setAutoWatermarkInterval(1000); // 1 second
// Decrease for low-throughput, high-accuracy scenarios
env.getConfig().setAutoWatermarkInterval(100); // 100ms
2. Parallelism and Watermarks
Copy
// Watermarks are generated per parallel instance
// Higher parallelism = more watermark generators
// If watermark computation is expensive, consider:
// 1. Reduce parallelism at source
source.setParallelism(4);
// 2. Or use lighter watermark strategy
WatermarkStrategy.forMonotonousTimestamps() // Lighter than bounded out-of-orderness
3. Avoid Frequent Watermark Emissions
Copy
// Bad: Emitting watermark for every event
public class BadWatermarkGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(eventTimestamp)); // Too frequent!
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {}
}
// Good: Let periodic emission handle watermarks
public class GoodWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
// Don't emit here
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - 5000));
}
}
Exercises
Exercise 1: Implement Adaptive Watermark Generator
Create a watermark generator that adapts to observed out-of-orderness.Copy
// TODO: Implement AdaptiveWatermarkGenerator
// Track observed delays and adjust watermark lag accordingly
Exercise 2: Handle Multi-Source Watermarks
Implement proper watermark handling for multiple sources with different characteristics.Copy
// TODO: Create a job that:
// 1. Reads from two sources (fast and slow)
// 2. Applies different watermark strategies
// 3. Unions the streams
// 4. Processes with event time windows
Exercise 3: Debug Watermark Issues
Given a job with late data, identify and fix the watermark configuration.Copy
// TODO: Fix the watermark issues in this code
DataStream<SensorReading> readings = env
.addSource(new SensorSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<SensorReading>forMonotonousTimestamps() // Issue 1: Should allow out-of-order
.withTimestampAssigner((reading, ts) -> reading.timestamp)
);
DataStream<Alert> alerts = readings
.keyBy(r -> r.sensorId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Issue 2: No allowed lateness configured
.process(new AlertFunction());
Summary
In this module, you learned:- The three notions of time in stream processing
- Why event time is critical for correct results
- Watermark semantics and generation strategies
- Periodic vs punctuated watermark generators
- Timestamp extraction from various sources
- Handling late data with allowed lateness
- Watermark propagation in dataflow graphs
- Real-world use cases (IoT, logs, finance)
- Debugging and monitoring watermarks
- Performance optimization techniques
- Best practices for production systems
Next Steps
Module 4: Windows & Time Operations
Learn how to implement sophisticated windowing logic for time-based aggregations