> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Event Time & Watermarks

> Handle out-of-order events with watermarks

# Event Time & Watermarks

<Info>
  **Module Duration**: 4-5 hours
  **Focus**: Event time processing in Apache Flink
  **Prerequisites**: DataStream API, streaming concepts
</Info>

## Introduction

Event time processing is fundamental to building correct streaming applications. Unlike processing time (when events are processed), event time represents when events actually occurred. This distinction is critical for handling out-of-order events, late data, and producing deterministic results.

## Time Semantics

### Three Notions of Time

```java theme={null}
/**
 * Processing Time: Time when the event is processed by the operator
 * - Non-deterministic (depends on system speed, load)
 * - Lowest latency
 * - No coordination required
 *
 * Event Time: Time when the event actually occurred (embedded in record)
 * - Deterministic results
 * - Handles out-of-order events
 * - Requires watermarks
 *
 * Ingestion Time: Time when event enters Flink
 * - Middle ground between processing and event time
 * - Rarely used in practice
 */

// Processing time (default)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// Event time (most common for correct results)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Note: Since Flink 1.12+, event time is the default
```

### Why Event Time Matters

```java theme={null}
// Example: Two events arrive out of order

// Event A: timestamp=10:00:00, arrives at 10:00:05
// Event B: timestamp=10:00:02, arrives at 10:00:03

// Processing Time Window [10:00:00 - 10:00:05):
// - Would only include Event B (arrived first)
// - Results depend on network latency, processing speed

// Event Time Window [10:00:00 - 10:00:05):
// - Includes both events (based on actual event timestamp)
// - Results are deterministic and correct
```

## Watermarks

Watermarks are timestamps that flow through the stream, indicating "all events with timestamp \< T have arrived."

### Watermark Semantics

```java theme={null}
/**
 * Watermark(T) means: "No more events with timestamp < T will arrive"
 *
 * Properties:
 * 1. Monotonically increasing (never decrease)
 * 2. Allow processing despite out-of-order events
 * 3. Trade-off between latency and completeness
 *
 * When watermark T arrives:
 * - All windows with end time <= T are triggered
 * - Events with timestamp < T arriving after are considered "late"
 */

public class Watermark implements Serializable {
    private final long timestamp;

    // Special watermark values
    public static final long MAX_WATERMARK = Long.MAX_VALUE;  // End of stream

    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }

    public long getTimestamp() {
        return timestamp;
    }
}
```

### Watermark Strategies

Flink 1.11+ introduced the WatermarkStrategy API:

```java theme={null}
// Built-in watermark strategies

// 1. For monotonically ascending timestamps (no out-of-order)
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

DataStream<Event> withTimestamps = stream.assignTimestampsAndWatermarks(strategy);

// 2. For bounded out-of-orderness (most common)
WatermarkStrategy<Event> boundedStrategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))  // Max 10s out-of-order
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

// 3. Custom watermark generator
WatermarkStrategy<Event> customStrategy = WatermarkStrategy
    .forGenerator((context) -> new CustomWatermarkGenerator())
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());
```

### Watermark Generation Patterns

#### Periodic Watermark Generator

Generated at fixed intervals (default: 200ms).

```java theme={null}
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds
    private long currentMaxTimestamp;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Track maximum timestamp seen
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Emit watermark: max timestamp - max out-of-orderness
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }
}

// Usage
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .forGenerator((context) -> new BoundedOutOfOrdernessGenerator())
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

DataStream<Event> stream = source
    .assignTimestampsAndWatermarks(strategy);
```

#### Punctuated Watermark Generator

Emits watermarks based on special marker events.

```java theme={null}
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Emit watermark on special marker events
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(eventTimestamp));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Don't emit periodic watermarks
    }
}
```

### Advanced Watermark Strategies

#### Per-Partition Watermarks (Kafka)

```java theme={null}
// Watermark alignment for Kafka sources with multiple partitions
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime())
    .withIdleness(Duration.ofMinutes(1));  // Handle idle partitions

// The idleness timeout prevents one slow partition from holding back watermarks
```

#### Custom Watermark with Multiple Timestamps

```java theme={null}
public class MultiTimestampWatermarkGenerator implements WatermarkGenerator<Event> {

    private long maxEventTime = Long.MIN_VALUE;
    private long maxIngestionTime = Long.MIN_VALUE;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxEventTime = Math.max(maxEventTime, event.getEventTime());
        maxIngestionTime = Math.max(maxIngestionTime, event.getIngestionTime());
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Use minimum of both timestamps for conservative watermarking
        long watermarkTimestamp = Math.min(
            maxEventTime - 5000,      // 5 sec lag for event time
            maxIngestionTime - 1000   // 1 sec lag for ingestion time
        );
        output.emitWatermark(new Watermark(watermarkTimestamp));
    }
}
```

## Timestamp Extractors

### Built-in Timestamp Assigners

```java theme={null}
// 1. AscendingTimestampExtractor (deprecated, use WatermarkStrategy)
public class MyAscendingExtractor implements TimestampAssigner<Event> {
    @Override
    public long extractTimestamp(Event element, long recordTimestamp) {
        return element.getEventTime();
    }
}

// Modern approach with WatermarkStrategy
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner(new MyAscendingExtractor());

// 2. Bounded out-of-orderness with custom timestamp field
WatermarkStrategy<SensorReading> sensorStrategy = WatermarkStrategy
    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((reading, ts) -> reading.timestamp);
```

### Timestamp Extraction from Different Sources

```java theme={null}
// From JSON
public class JsonTimestampAssigner implements TimestampAssigner<String> {
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public long extractTimestamp(String element, long recordTimestamp) {
        try {
            JsonNode node = mapper.readTree(element);
            return node.get("timestamp").asLong();
        } catch (Exception e) {
            return System.currentTimeMillis();  // Fallback
        }
    }
}

// From Kafka record metadata
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

WatermarkStrategy<String> strategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((element, recordTimestamp) -> {
        // recordTimestamp is the Kafka record timestamp
        return recordTimestamp;
    });

DataStream<String> stream = env.fromSource(
    kafkaSource,
    strategy,
    "Kafka Source"
);

// From embedded timestamp in CSV
WatermarkStrategy<String> csvStrategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((csvLine, ts) -> {
        String[] fields = csvLine.split(",");
        return Long.parseLong(fields[0]);  // Assuming first field is timestamp
    });
```

## Allowed Lateness

Handle events that arrive after the watermark has passed.

```java theme={null}
// Configure allowed lateness for a window
DataStream<Event> input = /* ... */;

DataStream<Result> results = input
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Allow events up to 1 minute late
    .sideOutputLateData(lateDataTag)   // Capture very late data
    .process(new MyWindowFunction());

// Get the late data side output
DataStream<Event> lateData = results.getSideOutput(lateDataTag);

// Handle late data
lateData.addSink(new LateDataSink());
```

### Complete Late Data Handling Example

```java theme={null}
public class LateDataHandlingExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Configure watermark interval
        env.getConfig().setAutoWatermarkInterval(1000);  // 1 second

        // Late data output tag
        final OutputTag<SensorReading> lateDataTag =
            new OutputTag<SensorReading>("late-readings"){};

        DataStream<SensorReading> readings = /* ... */;

        // Assign timestamps and watermarks
        DataStream<SensorReading> withTimestamps = readings
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((reading, ts) -> reading.timestamp)
            );

        // Windowed aggregation with allowed lateness
        SingleOutputStreamOperator<TemperatureAlert> alerts = withTimestamps
            .keyBy(reading -> reading.sensorId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .allowedLateness(Time.seconds(5))  // Accept events up to 5s late
            .sideOutputLateData(lateDataTag)   // Capture very late data
            .process(new ProcessWindowFunction<SensorReading, TemperatureAlert, String, TimeWindow>() {

                @Override
                public void process(
                        String sensorId,
                        Context context,
                        Iterable<SensorReading> readings,
                        Collector<TemperatureAlert> out) {

                    double avgTemp = StreamSupport.stream(readings.spliterator(), false)
                        .mapToDouble(r -> r.temperature)
                        .average()
                        .orElse(0.0);

                    if (avgTemp > 75.0) {
                        out.collect(new TemperatureAlert(
                            sensorId,
                            avgTemp,
                            context.window().getEnd()
                        ));
                    }
                }
            });

        // Process main results
        alerts.print();

        // Process late data separately
        alerts.getSideOutput(lateDataTag)
            .map(reading -> "Late data: " + reading)
            .print();

        env.execute("Late Data Handling");
    }
}
```

## Watermark Propagation

### Multi-Input Operators

```java theme={null}
// Watermarks propagate through the dataflow graph
// For operators with multiple inputs, the minimum watermark is used

DataStream<Event> stream1 = source1
    .assignTimestampsAndWatermarks(strategy1);

DataStream<Event> stream2 = source2
    .assignTimestampsAndWatermarks(strategy2);

// Union: watermark = min(stream1.watermark, stream2.watermark)
DataStream<Event> unioned = stream1.union(stream2);

// Connect: same watermark semantics
ConnectedStreams<Event, Event> connected = stream1.connect(stream2);
```

### Watermark Alignment

```java theme={null}
// For Kafka sources with multiple partitions
KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setDeserializer(new EventDeserializer())
    .build();

WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, ts) -> event.timestamp)
    .withIdleness(Duration.ofMinutes(1))  // Mark idle partitions
    .withWatermarkAlignment("alignment-group", Duration.ofSeconds(20));  // Align watermarks

DataStream<Event> stream = env.fromSource(kafkaSource, strategy, "Kafka");
```

## Real-World Use Cases

### Use Case 1: IoT Sensor Data with Clock Drift

```java theme={null}
public class IoTSensorProcessing {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Custom watermark generator accounting for clock drift
        WatermarkStrategy<SensorReading> strategy = WatermarkStrategy
            .forGenerator(context -> new ClockDriftWatermarkGenerator())
            .withTimestampAssigner((reading, ts) -> reading.deviceTimestamp);

        DataStream<SensorReading> sensorData = env
            .addSource(new IoTSensorSource())
            .assignTimestampsAndWatermarks(strategy);

        // Aggregate sensor readings in 1-minute windows
        DataStream<SensorStats> stats = sensorData
            .keyBy(reading -> reading.sensorId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(30))  // Account for clock drift
            .aggregate(new SensorAggregateFunction());

        stats.print();
        env.execute("IoT Sensor Processing");
    }

    static class ClockDriftWatermarkGenerator implements WatermarkGenerator<SensorReading> {

        private long maxTimestamp = Long.MIN_VALUE;
        private long maxDrift = 0;

        @Override
        public void onEvent(SensorReading event, long eventTimestamp, WatermarkOutput output) {
            long serverTime = System.currentTimeMillis();
            long drift = Math.abs(serverTime - event.deviceTimestamp);

            maxTimestamp = Math.max(maxTimestamp, event.deviceTimestamp);
            maxDrift = Math.max(maxDrift, drift);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // Account for observed clock drift
            output.emitWatermark(new Watermark(maxTimestamp - maxDrift - 1000));
        }
    }
}
```

### Use Case 2: Log Aggregation from Distributed Systems

```java theme={null}
public class DistributedLogAggregation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Different log sources with different delays
        DataStream<LogEntry> appLogs = env
            .addSource(new KafkaSource<>("app-logs"))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((log, ts) -> log.timestamp)
            );

        DataStream<LogEntry> accessLogs = env
            .addSource(new KafkaSource<>("access-logs"))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((log, ts) -> log.timestamp)
            );

        // Union and aggregate
        DataStream<LogStats> aggregated = appLogs
            .union(accessLogs)
            .keyBy(log -> log.service)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .allowedLateness(Time.minutes(1))
            .aggregate(new LogAggregator());

        aggregated.print();
        env.execute("Distributed Log Aggregation");
    }
}
```

### Use Case 3: Financial Transaction Processing

```java theme={null}
public class TransactionProcessing {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Strict watermarking for financial data
        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
            .<Transaction>forBoundedOutOfOrderness(Duration.ofMillis(100))
            .withTimestampAssigner((txn, ts) -> txn.transactionTime);

        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .assignTimestampsAndWatermarks(strategy);

        // Session windows for fraud detection
        DataStream<FraudAlert> alerts = transactions
            .keyBy(txn -> txn.accountId)
            .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
            .allowedLateness(Time.seconds(10))
            .process(new FraudDetectionFunction());

        alerts.addSink(new AlertingSink());
        env.execute("Transaction Processing");
    }

    static class FraudDetectionFunction extends
            ProcessWindowFunction<Transaction, FraudAlert, String, TimeWindow> {

        @Override
        public void process(
                String accountId,
                Context context,
                Iterable<Transaction> transactions,
                Collector<FraudAlert> out) {

            List<Transaction> txnList = StreamSupport
                .stream(transactions.spliterator(), false)
                .collect(Collectors.toList());

            // Detect suspicious patterns
            double totalAmount = txnList.stream()
                .mapToDouble(t -> t.amount)
                .sum();

            if (totalAmount > 10000 && txnList.size() > 10) {
                out.collect(new FraudAlert(
                    accountId,
                    "High frequency and volume",
                    totalAmount,
                    context.window().getEnd()
                ));
            }
        }
    }
}
```

## Debugging Watermarks

### Watermark Monitoring

```java theme={null}
// Add watermark callbacks for debugging
public class WatermarkMonitor<T> implements WatermarkStrategy<T> {

    private final WatermarkStrategy<T> baseStrategy;

    public WatermarkMonitor(WatermarkStrategy<T> baseStrategy) {
        this.baseStrategy = baseStrategy;
    }

    @Override
    public WatermarkGenerator<T> createWatermarkGenerator(
            WatermarkGeneratorSupplier.Context context) {

        return new WatermarkGenerator<T>() {
            private final WatermarkGenerator<T> base =
                baseStrategy.createWatermarkGenerator(context);

            @Override
            public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
                base.onEvent(event, eventTimestamp, new WatermarkOutput() {
                    @Override
                    public void emitWatermark(Watermark watermark) {
                        System.out.println("Emitting watermark: " + watermark.getTimestamp());
                        output.emitWatermark(watermark);
                    }

                    @Override
                    public void markIdle() {
                        System.out.println("Marking source as idle");
                        output.markIdle();
                    }

                    @Override
                    public void markActive() {
                        System.out.println("Marking source as active");
                        output.markActive();
                    }
                });
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                base.onPeriodicEmit(output);
            }
        };
    }

    @Override
    public TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        return baseStrategy.createTimestampAssigner(context);
    }
}

// Usage
WatermarkStrategy<Event> debugStrategy = new WatermarkMonitor<>(
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.timestamp)
);
```

### Watermark Metrics

```java theme={null}
public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {

    private long currentMaxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 5000;
    private Gauge<Long> watermarkGauge;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);

        // Update metric
        if (watermarkGauge != null) {
            watermarkGauge.setValue(currentMaxTimestamp - maxOutOfOrderness);
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

    public void setWatermarkGauge(Gauge<Long> gauge) {
        this.watermarkGauge = gauge;
    }
}
```

## Best Practices

### 1. Choose Appropriate Watermark Strategy

```java theme={null}
// For ordered data (e.g., database CDC)
WatermarkStrategy.forMonotonousTimestamps()

// For mostly ordered data with bounded delays (most common)
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))

// For highly out-of-order data
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(5))
    .withIdleness(Duration.ofMinutes(1))

// For data with unpredictable delays
// Use custom generator with adaptive logic
```

### 2. Handle Idle Sources

```java theme={null}
// Prevent slow/idle partitions from blocking watermarks
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.timestamp)
    .withIdleness(Duration.ofMinutes(1));  // Mark idle after 1 minute
```

### 3. Set Appropriate Allowed Lateness

```java theme={null}
// Balance between completeness and latency
dataStream
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Reasonable for most cases
    .sideOutputLateData(lateDataTag);  // Always capture very late data
```

### 4. Monitor Watermark Lag

```java theme={null}
// Watermark lag = current processing time - current watermark
// High lag indicates:
// - Slow source
// - Too conservative watermark strategy
// - Processing bottleneck

// Monitor via Flink metrics
getRuntimeContext()
    .getMetricGroup()
    .gauge("watermark_lag", () ->
        System.currentTimeMillis() - currentWatermark
    );
```

### 5. Test with Out-of-Order Data

```java theme={null}
// Test source with controlled out-of-order events
public class OutOfOrderTestSource implements SourceFunction<Event> {

    private volatile boolean running = true;
    private final Random random = new Random();

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        long timestamp = System.currentTimeMillis();

        while (running) {
            // Generate events with random delays
            long eventTime = timestamp - random.nextInt(10000);  // Up to 10s in the past

            ctx.collect(new Event("key", eventTime, "value"));

            timestamp += 1000;
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
```

## Performance Optimization

### 1. Watermark Interval Tuning

```java theme={null}
// Default watermark interval: 200ms
// Increase for high-throughput, low-latency scenarios
env.getConfig().setAutoWatermarkInterval(1000);  // 1 second

// Decrease for low-throughput, high-accuracy scenarios
env.getConfig().setAutoWatermarkInterval(100);  // 100ms
```

### 2. Parallelism and Watermarks

```java theme={null}
// Watermarks are generated per parallel instance
// Higher parallelism = more watermark generators

// If watermark computation is expensive, consider:
// 1. Reduce parallelism at source
source.setParallelism(4);

// 2. Or use lighter watermark strategy
WatermarkStrategy.forMonotonousTimestamps()  // Lighter than bounded out-of-orderness
```

### 3. Avoid Frequent Watermark Emissions

```java theme={null}
// Bad: Emitting watermark for every event
public class BadWatermarkGenerator implements WatermarkGenerator<Event> {
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        output.emitWatermark(new Watermark(eventTimestamp));  // Too frequent!
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {}
}

// Good: Let periodic emission handle watermarks
public class GoodWatermarkGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        // Don't emit here
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - 5000));
    }
}
```

## Exercises

### Exercise 1: Implement Adaptive Watermark Generator

Create a watermark generator that adapts to observed out-of-orderness.

```java theme={null}
// TODO: Implement AdaptiveWatermarkGenerator
// Track observed delays and adjust watermark lag accordingly
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class AdaptiveWatermarkGenerator implements WatermarkGenerator<Event> {

      private long maxTimestamp = Long.MIN_VALUE;
      private long maxObservedDelay = 0;
      private static final long INITIAL_DELAY = 5000;  // 5 seconds
      private static final double SAFETY_MARGIN = 1.5;

      @Override
      public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
          long arrivalTime = System.currentTimeMillis();
          long delay = Math.abs(arrivalTime - eventTimestamp);

          // Update max observed delay
          maxObservedDelay = Math.max(maxObservedDelay, delay);

          // Update max timestamp
          maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
      }

      @Override
      public void onPeriodicEmit(WatermarkOutput output) {
          // Use observed delay with safety margin
          long adaptiveDelay = maxObservedDelay > 0
              ? (long) (maxObservedDelay * SAFETY_MARGIN)
              : INITIAL_DELAY;

          output.emitWatermark(new Watermark(maxTimestamp - adaptiveDelay));
      }
  }
  ```
</details>

### Exercise 2: Handle Multi-Source Watermarks

Implement proper watermark handling for multiple sources with different characteristics.

```java theme={null}
// TODO: Create a job that:
// 1. Reads from two sources (fast and slow)
// 2. Applies different watermark strategies
// 3. Unions the streams
// 4. Processes with event time windows
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class MultiSourceWatermarks {

      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env =
              StreamExecutionEnvironment.getExecutionEnvironment();

          // Fast source: low latency, minimal out-of-order
          DataStream<Event> fastSource = env
              .addSource(new FastSource())
              .assignTimestampsAndWatermarks(
                  WatermarkStrategy
                      .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                      .withTimestampAssigner((event, ts) -> event.timestamp)
              );

          // Slow source: high latency, significant out-of-order
          DataStream<Event> slowSource = env
              .addSource(new SlowSource())
              .assignTimestampsAndWatermarks(
                  WatermarkStrategy
                      .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
                      .withTimestampAssigner((event, ts) -> event.timestamp)
                      .withIdleness(Duration.ofMinutes(1))  // Handle idle periods
              );

          // Union: watermark will be minimum of both sources
          DataStream<Event> combined = fastSource.union(slowSource);

          // Process with event time windows
          DataStream<String> results = combined
              .keyBy(e -> e.key)
              .window(TumblingEventTimeWindows.of(Time.minutes(5)))
              .allowedLateness(Time.seconds(30))  // Account for slow source
              .aggregate(new EventAggregator());

          results.print();
          env.execute("Multi-Source Watermarks");
      }
  }
  ```
</details>

### Exercise 3: Debug Watermark Issues

Given a job with late data, identify and fix the watermark configuration.

```java theme={null}
// TODO: Fix the watermark issues in this code

DataStream<SensorReading> readings = env
    .addSource(new SensorSource())
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<SensorReading>forMonotonousTimestamps()  // Issue 1: Should allow out-of-order
            .withTimestampAssigner((reading, ts) -> reading.timestamp)
    );

DataStream<Alert> alerts = readings
    .keyBy(r -> r.sensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // Issue 2: No allowed lateness configured
    .process(new AlertFunction());
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  // Fixed version
  DataStream<SensorReading> readings = env
      .addSource(new SensorSource())
      .assignTimestampsAndWatermarks(
          WatermarkStrategy
              .<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(10))
              .withTimestampAssigner((reading, ts) -> reading.timestamp)
              .withIdleness(Duration.ofMinutes(1))
      );

  OutputTag<SensorReading> lateDataTag =
      new OutputTag<SensorReading>("late-data"){};

  SingleOutputStreamOperator<Alert> alerts = readings
      .keyBy(r -> r.sensorId)
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .sideOutputLateData(lateDataTag)
      .process(new AlertFunction());

  // Monitor late data
  alerts.getSideOutput(lateDataTag)
      .map(r -> "Late data: " + r)
      .print();
  ```
</details>

## Summary

In this module, you learned:

* The three notions of time in stream processing
* Why event time is critical for correct results
* Watermark semantics and generation strategies
* Periodic vs punctuated watermark generators
* Timestamp extraction from various sources
* Handling late data with allowed lateness
* Watermark propagation in dataflow graphs
* Real-world use cases (IoT, logs, finance)
* Debugging and monitoring watermarks
* Performance optimization techniques
* Best practices for production systems

## Next Steps

<Card title="Module 4: Windows & Time Operations" icon="window-maximize" href="/distributed-systems-tools/flink-windows">
  Learn how to implement sophisticated windowing logic for time-based aggregations
</Card>

## Additional Resources

* [Flink Event Time Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/concepts/time/)
* [Watermark Strategies](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/)
* [Google Dataflow Model Paper](https://research.google/pubs/pub43864/)
