Skip to main content

Event Time & Watermarks

Module Duration: 4-5 hours Focus: Event time processing in Apache Flink Prerequisites: DataStream API, streaming concepts

Introduction

Event time processing is fundamental to building correct streaming applications. Unlike processing time (when events are processed), event time represents when events actually occurred. This distinction is critical for handling out-of-order events, late data, and producing deterministic results.

Time Semantics

Three Notions of Time

/**
 * Processing Time: Time when the event is processed by the operator
 * - Non-deterministic (depends on system speed, load)
 * - Lowest latency
 * - No coordination required
 *
 * Event Time: Time when the event actually occurred (embedded in record)
 * - Deterministic results
 * - Handles out-of-order events
 * - Requires watermarks
 *
 * Ingestion Time: Time when event enters Flink
 * - Middle ground between processing and event time
 * - Rarely used in practice
 */

// Processing time (default)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// Event time (most common for correct results)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Note: Since Flink 1.12+, event time is the default

Why Event Time Matters

// Example: Two events arrive out of order

// Event A: timestamp=10:00:00, arrives at 10:00:05
// Event B: timestamp=10:00:02, arrives at 10:00:03

// Processing Time Window [10:00:00 - 10:00:05):
// - Would only include Event B (arrived first)
// - Results depend on network latency, processing speed

// Event Time Window [10:00:00 - 10:00:05):
// - Includes both events (based on actual event timestamp)
// - Results are deterministic and correct

Watermarks

Watermarks are timestamps that flow through the stream, indicating “all events with timestamp < T have arrived.”

Watermark Semantics

/**
 * Watermark(T) means: "No more events with timestamp < T will arrive"
 *
 * Properties:
 * 1. Monotonically increasing (never decrease)
 * 2. Allow processing despite out-of-order events
 * 3. Trade-off between latency and completeness
 *
 * When watermark T arrives:
 * - All windows with end time <= T are triggered
 * - Events with timestamp < T arriving after are considered "late"
 */

public class Watermark implements Serializable {
    private final long timestamp;

    // Special watermark values
    public static final long MAX_WATERMARK = Long.MAX_VALUE;  // End of stream

    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

Watermark Strategies

Flink 1.11+ introduced the WatermarkStrategy API:
// Built-in watermark strategies

// 1. For monotonically ascending timestamps (no out-of-order)
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

DataStream<Event> withTimestamps = stream.assignTimestampsAndWatermarks(strategy);

// 2. For bounded out-of-orderness (most common)
WatermarkStrategy<Event> boundedStrategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))  // Max 10s out-of-order
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

// 3. Custom watermark generator
WatermarkStrategy<Event> customStrategy = WatermarkStrategy
    .forGenerator((context) -> new CustomWatermarkGenerator())
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

Watermark Generation Patterns

Periodic Watermark Generator

Generated at fixed intervals (default: 200ms).
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds
    private long currentMaxTimestamp;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Track maximum timestamp seen
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Emit watermark: max timestamp - max out-of-orderness
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }
}

// Usage
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .forGenerator((context) -> new BoundedOutOfOrdernessGenerator())
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

DataStream<Event> stream = source
    .assignTimestampsAndWatermarks(strategy);

Punctuated Watermark Generator

Emits watermarks based on special marker events.
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Emit watermark on special marker events
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(eventTimestamp));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Don't emit periodic watermarks
    }
}

Advanced Watermark Strategies

Per-Partition Watermarks (Kafka)

// Watermark alignment for Kafka sources with multiple partitions
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime())
    .withIdleness(Duration.ofMinutes(1));  // Handle idle partitions

// The idleness timeout prevents one slow partition from holding back watermarks

Custom Watermark with Multiple Timestamps

public class MultiTimestampWatermarkGenerator implements WatermarkGenerator<Event> {

    private long maxEventTime = Long.MIN_VALUE;
    private long maxIngestionTime = Long.MIN_VALUE;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxEventTime = Math.max(maxEventTime, event.getEventTime());
        maxIngestionTime = Math.max(maxIngestionTime, event.getIngestionTime());
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Use minimum of both timestamps for conservative watermarking
        long watermarkTimestamp = Math.min(
            maxEventTime - 5000,      // 5 sec lag for event time
            maxIngestionTime - 1000   // 1 sec lag for ingestion time
        );
        output.emitWatermark(new Watermark(watermarkTimestamp));
    }
}

Timestamp Extractors

Built-in Timestamp Assigners

// 1. AscendingTimestampExtractor (deprecated, use WatermarkStrategy)
public class MyAscendingExtractor implements TimestampAssigner<Event> {
    @Override
    public long extractTimestamp(Event element, long recordTimestamp) {
        return element.getEventTime();
    }
}

// Modern approach with WatermarkStrategy
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner(new MyAscendingExtractor());

// 2. Bounded out-of-orderness with custom timestamp field
WatermarkStrategy<SensorReading> sensorStrategy = WatermarkStrategy
    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((reading, ts) -> reading.timestamp);

Timestamp Extraction from Different Sources

// From JSON
public class JsonTimestampAssigner implements TimestampAssigner<String> {
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public long extractTimestamp(String element, long recordTimestamp) {
        try {
            JsonNode node = mapper.readTree(element);
            return node.get("timestamp").asLong();
        } catch (Exception e) {
            return System.currentTimeMillis();  // Fallback
        }
    }
}

// From Kafka record metadata
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

WatermarkStrategy<String> strategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((element, recordTimestamp) -> {
        // recordTimestamp is the Kafka record timestamp
        return recordTimestamp;
    });

DataStream<String> stream = env.fromSource(
    kafkaSource,
    strategy,
    "Kafka Source"
);

// From embedded timestamp in CSV
WatermarkStrategy<String> csvStrategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((csvLine, ts) -> {
        String[] fields = csvLine.split(",");
        return Long.parseLong(fields[0]);  // Assuming first field is timestamp
    });

Allowed Lateness

Handle events that arrive after the watermark has passed.
// Configure allowed lateness for a window
DataStream<Event> input = /* ... */;

DataStream<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Allow events up to 1 minute late
    .sideOutputLateData(lateDataTag)   // Capture very late data
    .process(new MyWindowFunction());

// Get the late data side output
DataStream<Event> lateData = results.getSideOutput(lateDataTag);

// Handle late data
lateData.addSink(new LateDataSink());

Complete Late Data Handling Example

public class LateDataHandlingExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Configure watermark interval
        env.getConfig().setAutoWatermarkInterval(1000);  // 1 second

        // Late data output tag
        final OutputTag<SensorReading> lateDataTag =
            new OutputTag<SensorReading>("late-readings"){};

        DataStream<SensorReading> readings = /* ... */;

        // Assign timestamps and watermarks
        DataStream<SensorReading> withTimestamps = readings
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((reading, ts) -> reading.timestamp)
            );

        // Windowed aggregation with allowed lateness
        SingleOutputStreamOperator<TemperatureAlert> alerts = withTimestamps
            .keyBy(reading -> reading.sensorId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .allowedLateness(Time.seconds(5))  // Accept events up to 5s late
            .sideOutputLateData(lateDataTag)   // Capture very late data
            .process(new ProcessWindowFunction<SensorReading, TemperatureAlert, String, TimeWindow>() {

                @Override
                public void process(
                        String sensorId,
                        Context context,
                        Iterable<SensorReading> readings,
                        Collector<TemperatureAlert> out) {

                    double avgTemp = StreamSupport.stream(readings.spliterator(), false)
                        .mapToDouble(r -> r.temperature)
                        .average()
                        .orElse(0.0);

                    if (avgTemp > 75.0) {
                        out.collect(new TemperatureAlert(
                            sensorId,
                            avgTemp,
                            context.window().getEnd()
                        ));
                    }
                }
            });

        // Process main results
        alerts.print();

        // Process late data separately
        alerts.getSideOutput(lateDataTag)
            .map(reading -> "Late data: " + reading)
            .print();

        env.execute("Late Data Handling");
    }
}

Watermark Propagation

Multi-Input Operators

// Watermarks propagate through the dataflow graph
// For operators with multiple inputs, the minimum watermark is used

DataStream<Event> stream1 = source1
    .assignTimestampsAndWatermarks(strategy1);

DataStream<Event> stream2 = source2
    .assignTimestampsAndWatermarks(strategy2);

// Union: watermark = min(stream1.watermark, stream2.watermark)
DataStream<Event> unioned = stream1.union(stream2);

// Connect: same watermark semantics
ConnectedStreams<Event, Event> connected = stream1.connect(stream2);

Watermark Alignment

// For Kafka sources with multiple partitions
KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setDeserializer(new EventDeserializer())
    .build();

WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, ts) -> event.timestamp)
    .withIdleness(Duration.ofMinutes(1))  // Mark idle partitions
    .withWatermarkAlignment("alignment-group", Duration.ofSeconds(20));  // Align watermarks

DataStream<Event> stream = env.fromSource(kafkaSource, strategy, "Kafka");

Real-World Use Cases

Use Case 1: IoT Sensor Data with Clock Drift

public class IoTSensorProcessing {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Custom watermark generator accounting for clock drift
        WatermarkStrategy<SensorReading> strategy = WatermarkStrategy
            .forGenerator(context -> new ClockDriftWatermarkGenerator())
            .withTimestampAssigner((reading, ts) -> reading.deviceTimestamp);

        DataStream<SensorReading> sensorData = env
            .addSource(new IoTSensorSource())
            .assignTimestampsAndWatermarks(strategy);

        // Aggregate sensor readings in 1-minute windows
        DataStream<SensorStats> stats = sensorData
            .keyBy(reading -> reading.sensorId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(30))  // Account for clock drift
            .aggregate(new SensorAggregateFunction());

        stats.print();
        env.execute("IoT Sensor Processing");
    }

    static class ClockDriftWatermarkGenerator implements WatermarkGenerator<SensorReading> {

        private long maxTimestamp = Long.MIN_VALUE;
        private long maxDrift = 0;

        @Override
        public void onEvent(SensorReading event, long eventTimestamp, WatermarkOutput output) {
            long serverTime = System.currentTimeMillis();
            long drift = Math.abs(serverTime - event.deviceTimestamp);

            maxTimestamp = Math.max(maxTimestamp, event.deviceTimestamp);
            maxDrift = Math.max(maxDrift, drift);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // Account for observed clock drift
            output.emitWatermark(new Watermark(maxTimestamp - maxDrift - 1000));
        }
    }
}

Use Case 2: Log Aggregation from Distributed Systems

public class DistributedLogAggregation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Different log sources with different delays
        DataStream<LogEntry> appLogs = env
            .addSource(new KafkaSource<>("app-logs"))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((log, ts) -> log.timestamp)
            );

        DataStream<LogEntry> accessLogs = env
            .addSource(new KafkaSource<>("access-logs"))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((log, ts) -> log.timestamp)
            );

        // Union and aggregate
        DataStream<LogStats> aggregated = appLogs
            .union(accessLogs)
            .keyBy(log -> log.service)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .allowedLateness(Time.minutes(1))
            .aggregate(new LogAggregator());

        aggregated.print();
        env.execute("Distributed Log Aggregation");
    }
}

Use Case 3: Financial Transaction Processing

public class TransactionProcessing {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Strict watermarking for financial data
        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
            .<Transaction>forBoundedOutOfOrderness(Duration.ofMillis(100))
            .withTimestampAssigner((txn, ts) -> txn.transactionTime);

        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .assignTimestampsAndWatermarks(strategy);

        // Session windows for fraud detection
        DataStream<FraudAlert> alerts = transactions
            .keyBy(txn -> txn.accountId)
            .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
            .allowedLateness(Time.seconds(10))
            .process(new FraudDetectionFunction());

        alerts.addSink(new AlertingSink());
        env.execute("Transaction Processing");
    }

    static class FraudDetectionFunction extends
            ProcessWindowFunction<Transaction, FraudAlert, String, TimeWindow> {

        @Override
        public void process(
                String accountId,
                Context context,
                Iterable<Transaction> transactions,
                Collector<FraudAlert> out) {

            List<Transaction> txnList = StreamSupport
                .stream(transactions.spliterator(), false)
                .collect(Collectors.toList());

            // Detect suspicious patterns
            double totalAmount = txnList.stream()
                .mapToDouble(t -> t.amount)
                .sum();

            if (totalAmount > 10000 && txnList.size() > 10) {
                out.collect(new FraudAlert(
                    accountId,
                    "High frequency and volume",
                    totalAmount,
                    context.window().getEnd()
                ));
            }
        }
    }
}

Debugging Watermarks

Watermark Monitoring

// Add watermark callbacks for debugging
public class WatermarkMonitor<T> implements WatermarkStrategy<T> {

    private final WatermarkStrategy<T> baseStrategy;

    public WatermarkMonitor(WatermarkStrategy<T> baseStrategy) {
        this.baseStrategy = baseStrategy;
    }

    @Override
    public WatermarkGenerator<T> createWatermarkGenerator(
            WatermarkGeneratorSupplier.Context context) {

        return new WatermarkGenerator<T>() {
            private final WatermarkGenerator<T> base =
                baseStrategy.createWatermarkGenerator(context);

            @Override
            public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
                base.onEvent(event, eventTimestamp, new WatermarkOutput() {
                    @Override
                    public void emitWatermark(Watermark watermark) {
                        System.out.println("Emitting watermark: " + watermark.getTimestamp());
                        output.emitWatermark(watermark);
                    }

                    @Override
                    public void markIdle() {
                        System.out.println("Marking source as idle");
                        output.markIdle();
                    }

                    @Override
                    public void markActive() {
                        System.out.println("Marking source as active");
                        output.markActive();
                    }
                });
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                base.onPeriodicEmit(output);
            }
        };
    }

    @Override
    public TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        return baseStrategy.createTimestampAssigner(context);
    }
}

// Usage
WatermarkStrategy<Event> debugStrategy = new WatermarkMonitor<>(
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.timestamp)
);

Watermark Metrics

public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {

    private long currentMaxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 5000;
    private Gauge<Long> watermarkGauge;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);

        // Update metric
        if (watermarkGauge != null) {
            watermarkGauge.setValue(currentMaxTimestamp - maxOutOfOrderness);
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

    public void setWatermarkGauge(Gauge<Long> gauge) {
        this.watermarkGauge = gauge;
    }
}

Best Practices

1. Choose Appropriate Watermark Strategy

// For ordered data (e.g., database CDC)
WatermarkStrategy.forMonotonousTimestamps()

// For mostly ordered data with bounded delays (most common)
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))

// For highly out-of-order data
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(5))
    .withIdleness(Duration.ofMinutes(1))

// For data with unpredictable delays
// Use custom generator with adaptive logic

2. Handle Idle Sources

// Prevent slow/idle partitions from blocking watermarks
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.timestamp)
    .withIdleness(Duration.ofMinutes(1));  // Mark idle after 1 minute

3. Set Appropriate Allowed Lateness

// Balance between completeness and latency
dataStream
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Reasonable for most cases
    .sideOutputLateData(lateDataTag);  // Always capture very late data

4. Monitor Watermark Lag

// Watermark lag = current processing time - current watermark
// High lag indicates:
// - Slow source
// - Too conservative watermark strategy
// - Processing bottleneck

// Monitor via Flink metrics
getRuntimeContext()
    .getMetricGroup()
    .gauge("watermark_lag", () ->
        System.currentTimeMillis() - currentWatermark
    );

5. Test with Out-of-Order Data

// Test source with controlled out-of-order events
public class OutOfOrderTestSource implements SourceFunction<Event> {

    private volatile boolean running = true;
    private final Random random = new Random();

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        long timestamp = System.currentTimeMillis();

        while (running) {
            // Generate events with random delays
            long eventTime = timestamp - random.nextInt(10000);  // Up to 10s in the past

            ctx.collect(new Event("key", eventTime, "value"));

            timestamp += 1000;
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Performance Optimization

1. Watermark Interval Tuning

// Default watermark interval: 200ms
// Increase for high-throughput, low-latency scenarios
env.getConfig().setAutoWatermarkInterval(1000);  // 1 second

// Decrease for low-throughput, high-accuracy scenarios
env.getConfig().setAutoWatermarkInterval(100);  // 100ms

2. Parallelism and Watermarks

// Watermarks are generated per parallel instance
// Higher parallelism = more watermark generators

// If watermark computation is expensive, consider:
// 1. Reduce parallelism at source
source.setParallelism(4);

// 2. Or use lighter watermark strategy
WatermarkStrategy.forMonotonousTimestamps()  // Lighter than bounded out-of-orderness

3. Avoid Frequent Watermark Emissions

// Bad: Emitting watermark for every event
public class BadWatermarkGenerator implements WatermarkGenerator<Event> {
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        output.emitWatermark(new Watermark(eventTimestamp));  // Too frequent!
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {}
}

// Good: Let periodic emission handle watermarks
public class GoodWatermarkGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        // Don't emit here
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - 5000));
    }
}

Exercises

Exercise 1: Implement Adaptive Watermark Generator

Create a watermark generator that adapts to observed out-of-orderness.
// TODO: Implement AdaptiveWatermarkGenerator
// Track observed delays and adjust watermark lag accordingly

Exercise 2: Handle Multi-Source Watermarks

Implement proper watermark handling for multiple sources with different characteristics.
// TODO: Create a job that:
// 1. Reads from two sources (fast and slow)
// 2. Applies different watermark strategies
// 3. Unions the streams
// 4. Processes with event time windows

Exercise 3: Debug Watermark Issues

Given a job with late data, identify and fix the watermark configuration.
// TODO: Fix the watermark issues in this code

DataStream<SensorReading> readings = env
    .addSource(new SensorSource())
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<SensorReading>forMonotonousTimestamps()  // Issue 1: Should allow out-of-order
            .withTimestampAssigner((reading, ts) -> reading.timestamp)
    );

DataStream<Alert> alerts = readings
    .keyBy(r -> r.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // Issue 2: No allowed lateness configured
    .process(new AlertFunction());

Summary

In this module, you learned:
  • The three notions of time in stream processing
  • Why event time is critical for correct results
  • Watermark semantics and generation strategies
  • Periodic vs punctuated watermark generators
  • Timestamp extraction from various sources
  • Handling late data with allowed lateness
  • Watermark propagation in dataflow graphs
  • Real-world use cases (IoT, logs, finance)
  • Debugging and monitoring watermarks
  • Performance optimization techniques
  • Best practices for production systems

Next Steps

Module 4: Windows & Time Operations

Learn how to implement sophisticated windowing logic for time-based aggregations

Additional Resources