> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# DataStream API

> Master the low-level streaming API

# DataStream API

<Info>
  **Module Duration**: 4-5 hours
  **Focus**: Stream processing with Apache Flink
  **Prerequisites**: Streaming concepts, Java/Scala basics
</Info>

## Introduction

The DataStream API is Flink's foundational API for stream processing, providing fine-grained control over data transformations, state management, and event-time processing. Unlike high-level APIs like SQL, the DataStream API gives you complete flexibility to implement complex streaming logic.

## Core Concepts

### DataStream Abstraction

A `DataStream` represents an unbounded sequence of records. Operations on DataStreams are lazy - they build a dataflow graph that gets executed when you call `execute()`.

```java theme={null}
// Java: Basic DataStream creation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// From collection
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

// From socket
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// From Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
    "input-topic",
    new SimpleStringSchema(),
    properties
);
DataStream<String> kafkaStream = env.addSource(consumer);
```

```scala theme={null}
// Scala: Basic DataStream creation
val env = StreamExecutionEnvironment.getExecutionEnvironment

// From collection
val numbers = env.fromElements(1, 2, 3, 4, 5)

// From socket
val socketStream = env.socketTextStream("localhost", 9999)

// From Kafka
val consumer = new FlinkKafkaConsumer[String](
  "input-topic",
  new SimpleStringSchema(),
  properties
)
val kafkaStream = env.addSource(consumer)
```

```python theme={null}
# Python: Basic DataStream creation
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

# From collection
numbers = env.from_collection([1, 2, 3, 4, 5])

# From Kafka
kafka_source = FlinkKafkaConsumer(
    topics='input-topic',
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': 'localhost:9092'}
)
kafka_stream = env.add_source(kafka_source)
```

## Sources and Sinks

### Built-in Sources

```java theme={null}
// File sources
DataStream<String> textFiles = env.readTextFile("hdfs://path/to/files");

// Collection source
List<Event> events = Arrays.asList(/* events */);
DataStream<Event> fromCollection = env.fromCollection(events);

// Custom source
DataStream<CustomData> customStream = env.addSource(new CustomSourceFunction());

// Kafka source (Flink 1.14+)
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.noWatermarks(),
    "Kafka Source"
);
```

### Built-in Sinks

```java theme={null}
// Print sink (for debugging)
stream.print();

// File sink
final FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
    .build();

stream.sinkTo(sink);

// Kafka sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .build();

stream.sinkTo(kafkaSink);

// JDBC sink
stream.addSink(JdbcSink.sink(
    "INSERT INTO table (id, name, value) VALUES (?, ?, ?)",
    (statement, record) -> {
        statement.setLong(1, record.id);
        statement.setString(2, record.name);
        statement.setDouble(3, record.value);
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:postgresql://localhost:5432/mydb")
        .withDriverName("org.postgresql.Driver")
        .withUsername("user")
        .withPassword("password")
        .build()
));
```

## Transformations

### Basic Transformations

```java theme={null}
// Map: One-to-one transformation
DataStream<Integer> doubled = numbers.map(x -> x * 2);

// FlatMap: One-to-many transformation
DataStream<String> words = lines.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.split(" ")) {
            out.collect(word);
        }
    }
).returns(Types.STRING);

// Filter: Select elements
DataStream<Integer> evens = numbers.filter(x -> x % 2 == 0);

// KeyBy: Partition by key
KeyedStream<Transaction, String> byUser = transactions.keyBy(t -> t.userId);
```

```scala theme={null}
// Scala transformations with type safety
val doubled = numbers.map(_ * 2)

val words = lines.flatMap(_.split(" "))

val evens = numbers.filter(_ % 2 == 0)

val byUser = transactions.keyBy(_.userId)
```

### Rich Functions

Rich functions provide access to runtime context, state, and lifecycle methods.

```java theme={null}
public class EnrichmentMapper extends RichMapFunction<Transaction, EnrichedTransaction> {
    private transient ValueState<UserProfile> profileState;
    private transient Counter transactionCounter;

    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialize state
        ValueStateDescriptor<UserProfile> descriptor =
            new ValueStateDescriptor<>("user-profile", UserProfile.class);
        profileState = getRuntimeContext().getState(descriptor);

        // Initialize metrics
        transactionCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("transactions");
    }

    @Override
    public EnrichedTransaction map(Transaction transaction) throws Exception {
        // Access state
        UserProfile profile = profileState.value();

        if (profile == null) {
            // Fetch from external system (ideally cached)
            profile = fetchUserProfile(transaction.userId);
            profileState.update(profile);
        }

        transactionCounter.inc();

        return new EnrichedTransaction(transaction, profile);
    }

    private UserProfile fetchUserProfile(String userId) {
        // Implementation
        return new UserProfile(userId);
    }
}

// Usage
DataStream<EnrichedTransaction> enriched = transactions
    .keyBy(t -> t.userId)
    .map(new EnrichmentMapper());
```

### Process Functions

ProcessFunction provides the most flexibility with access to timers, state, and side outputs.

```java theme={null}
public class AlertProcessFunction extends KeyedProcessFunction<String, SensorReading, Alert> {

    // State to track last reading
    private ValueState<Double> lastTempState;
    // State to track timer
    private ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        lastTempState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("lastTemp", Double.class));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class));
    }

    @Override
    public void processElement(
            SensorReading reading,
            Context ctx,
            Collector<Alert> out) throws Exception {

        Double lastTemp = lastTempState.value();
        Long timer = timerState.value();

        // Update last temperature
        lastTempState.update(reading.temperature);

        // If temperature increased and no timer set
        if (lastTemp != null && reading.temperature > lastTemp && timer == null) {
            // Set timer for 1 minute from now
            long timerTs = ctx.timerService().currentProcessingTime() + 60000;
            ctx.timerService().registerProcessingTimeTimer(timerTs);
            timerState.update(timerTs);
        }
        // If temperature decreased, cancel timer
        else if (lastTemp != null && reading.temperature <= lastTemp && timer != null) {
            ctx.timerService().deleteProcessingTimeTimer(timer);
            timerState.clear();
        }
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Alert> out) throws Exception {

        out.collect(new Alert(
            "Temperature consistently rising for sensor " + ctx.getCurrentKey()));
        timerState.clear();
    }
}

// Usage
DataStream<Alert> alerts = sensorData
    .keyBy(r -> r.sensorId)
    .process(new AlertProcessFunction());
```

### Side Outputs

Side outputs allow splitting a stream into multiple outputs.

```java theme={null}
// Define output tags
final OutputTag<Transaction> highValueTag =
    new OutputTag<Transaction>("high-value"){};
final OutputTag<Transaction> suspiciousTag =
    new OutputTag<Transaction>("suspicious"){};

// Process function with side outputs
public class TransactionClassifier extends ProcessFunction<Transaction, Transaction> {

    @Override
    public void processElement(
            Transaction txn,
            Context ctx,
            Collector<Transaction> out) throws Exception {

        // Main output: normal transactions
        if (txn.amount < 1000 && !txn.isSuspicious) {
            out.collect(txn);
        }
        // High value side output
        else if (txn.amount >= 1000) {
            ctx.output(highValueTag, txn);
        }
        // Suspicious side output
        else if (txn.isSuspicious) {
            ctx.output(suspiciousTag, txn);
        }
    }
}

// Apply and get side outputs
SingleOutputStreamOperator<Transaction> mainStream = transactions
    .process(new TransactionClassifier());

DataStream<Transaction> highValue = mainStream.getSideOutput(highValueTag);
DataStream<Transaction> suspicious = mainStream.getSideOutput(suspiciousTag);
```

## Parallel Execution

### Parallelism Configuration

```java theme={null}
// Set default parallelism for the job
env.setParallelism(4);

// Set parallelism for specific operator
DataStream<String> processed = stream
    .map(new MyMapper())
    .setParallelism(8);  // This operator runs with parallelism 8

// Disable parallel execution (parallelism = 1)
stream.filter(x -> x > 0).setParallelism(1);
```

### Partitioning Strategies

```java theme={null}
// Key-based partitioning (hash partitioning)
KeyedStream<Event, String> keyed = events.keyBy(e -> e.key);

// Custom partitioning
stream.partitionCustom(
    (key, numPartitions) -> key.hashCode() % numPartitions,
    e -> e.customKey
);

// Round-robin partitioning (rebalance)
stream.rebalance();

// Rescale (local round-robin)
stream.rescale();

// Broadcast (send to all downstream tasks)
stream.broadcast();

// Global (send all to first partition)
stream.global();

// Forward (one-to-one, no network transfer)
stream.forward();

// Shuffle (random distribution)
stream.shuffle();
```

### Slot Sharing and Chaining

```java theme={null}
// Disable operator chaining
stream
    .map(new MyMapper())
    .disableChaining()  // This operator won't chain with others
    .filter(x -> x != null);

// Start new chain
stream
    .map(new MyMapper())
    .startNewChain()  // New chain starts here
    .filter(x -> x != null);

// Set slot sharing group (operators with same group share slots)
stream
    .map(new MyMapper())
    .slotSharingGroup("group1")
    .filter(x -> x != null)
    .slotSharingGroup("group2");  // Different group, different slot
```

## Connecting Streams

### Union

Combine multiple streams of the same type.

```java theme={null}
DataStream<Event> stream1 = /* ... */;
DataStream<Event> stream2 = /* ... */;
DataStream<Event> stream3 = /* ... */;

// Union multiple streams
DataStream<Event> combined = stream1
    .union(stream2, stream3);
```

### Connect (CoStream)

Combine two streams of different types with shared state.

```java theme={null}
// Connect two streams
ConnectedStreams<Transaction, Rule> connected =
    transactions.connect(rules);

// CoMap: Transform both streams
DataStream<Alert> alerts = connected.map(
    new CoMapFunction<Transaction, Rule, Alert>() {
        @Override
        public Alert map1(Transaction txn) {
            // Process transaction
            return checkTransaction(txn);
        }

        @Override
        public Alert map2(Rule rule) {
            // Process rule update
            return updateRules(rule);
        }
    }
);

// CoProcessFunction: Advanced processing with state
public class RuleBasedProcessor extends CoProcessFunction<Transaction, Rule, Alert> {

    // Broadcast state to store rules
    private MapState<String, Rule> rulesState;

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Rule> descriptor =
            new MapStateDescriptor<>("rules", String.class, Rule.class);
        rulesState = getRuntimeContext().getMapState(descriptor);
    }

    @Override
    public void processElement1(
            Transaction txn,
            Context ctx,
            Collector<Alert> out) throws Exception {

        // Check transaction against all rules
        for (Map.Entry<String, Rule> entry : rulesState.entries()) {
            if (entry.getValue().matches(txn)) {
                out.collect(new Alert(txn, entry.getValue()));
            }
        }
    }

    @Override
    public void processElement2(
            Rule rule,
            Context ctx,
            Collector<Alert> out) throws Exception {

        // Update rules
        rulesState.put(rule.id, rule);
    }
}

DataStream<Alert> alerts = transactions
    .keyBy(t -> t.userId)
    .connect(rules.broadcast())
    .process(new RuleBasedProcessor());
```

### Broadcast State Pattern

```java theme={null}
// Define broadcast state descriptor
MapStateDescriptor<String, Rule> ruleStateDescriptor =
    new MapStateDescriptor<>("RulesBroadcastState", String.class, Rule.class);

// Broadcast the rules stream
BroadcastStream<Rule> ruleBroadcastStream =
    rules.broadcast(ruleStateDescriptor);

// Connect transaction stream with broadcast stream
DataStream<Alert> alerts = transactions
    .connect(ruleBroadcastStream)
    .process(new KeyedBroadcastProcessFunction<String, Transaction, Rule, Alert>() {

        @Override
        public void processElement(
                Transaction txn,
                ReadOnlyContext ctx,
                Collector<Alert> out) throws Exception {

            // Access broadcast state (read-only)
            ReadOnlyBroadcastState<String, Rule> broadcastState =
                ctx.getBroadcastState(ruleStateDescriptor);

            for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {
                if (entry.getValue().matches(txn)) {
                    out.collect(new Alert(txn, entry.getValue()));
                }
            }
        }

        @Override
        public void processBroadcastElement(
                Rule rule,
                Context ctx,
                Collector<Alert> out) throws Exception {

            // Update broadcast state
            BroadcastState<String, Rule> broadcastState =
                ctx.getBroadcastState(ruleStateDescriptor);
            broadcastState.put(rule.id, rule);
        }
    });
```

## Async I/O

For efficient external data enrichment with non-blocking I/O.

```java theme={null}
// Async function for database lookup
public class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, DatabaseRecord>> {

    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient();
    }

    @Override
    public void asyncInvoke(
            String key,
            ResultFuture<Tuple2<String, DatabaseRecord>> resultFuture) throws Exception {

        // Async database query
        CompletableFuture<DatabaseRecord> future = client.asyncQuery(key);

        future.whenComplete((record, error) -> {
            if (error != null) {
                resultFuture.completeExceptionally(error);
            } else {
                resultFuture.complete(
                    Collections.singleton(new Tuple2<>(key, record)));
            }
        });
    }

    @Override
    public void timeout(
            String key,
            ResultFuture<Tuple2<String, DatabaseRecord>> resultFuture) throws Exception {

        // Handle timeout
        resultFuture.complete(Collections.emptyList());
    }

    @Override
    public void close() throws Exception {
        client.close();
    }
}

// Apply async I/O
DataStream<Tuple2<String, DatabaseRecord>> enriched =
    AsyncDataStream.unorderedWait(
        input,
        new AsyncDatabaseRequest(),
        1000,  // timeout in milliseconds
        TimeUnit.MILLISECONDS,
        100    // max concurrent requests
    );

// Ordered async I/O (preserves order)
DataStream<Tuple2<String, DatabaseRecord>> enrichedOrdered =
    AsyncDataStream.orderedWait(
        input,
        new AsyncDatabaseRequest(),
        1000,
        TimeUnit.MILLISECONDS,
        100
    );
```

## Iterations

Flink supports iterative stream processing for machine learning and graph algorithms.

```java theme={null}
// Define iteration
IterativeStream<Long> iteration = input.iterate();

// Define iteration body
DataStream<Long> iterationBody = iteration
    .map(new StepFunction())
    .filter(new FilterFunction<Long>() {
        @Override
        public boolean filter(Long value) throws Exception {
            return value > 0;
        }
    });

// Close iteration (feedback loop)
DataStream<Long> output = iteration.closeWith(iterationBody);

// Example: Fibonacci sequence
DataStream<Tuple2<Long, Long>> fibonacci = env
    .fromElements(new Tuple2<>(0L, 1L))
    .iterate(5000);  // Max 5000 iterations

DataStream<Tuple2<Long, Long>> step = fibonacci.map(
    value -> new Tuple2<>(value.f1, value.f0 + value.f1)
);

DataStream<Tuple2<Long, Long>> result = fibonacci.closeWith(step);
```

## Real-World Example: Real-Time ETL Pipeline

```java theme={null}
public class RealTimeETLPipeline {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(4);
        env.enableCheckpointing(60000);  // Checkpoint every minute

        // Source: Read from Kafka
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("raw-events")
            .setGroupId("etl-pipeline")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        DataStream<String> rawEvents = env.fromSource(
            kafkaSource,
            WatermarkStrategy.noWatermarks(),
            "Kafka Source"
        );

        // Transform: Parse JSON
        DataStream<Event> parsedEvents = rawEvents
            .map(new JsonParser())
            .name("JSON Parser");

        // Filter: Remove invalid events
        DataStream<Event> validEvents = parsedEvents
            .filter(e -> e.isValid())
            .name("Validation Filter");

        // Enrich: Add user data
        DataStream<EnrichedEvent> enrichedEvents = AsyncDataStream.unorderedWait(
            validEvents,
            new UserDataEnricher(),
            1000,
            TimeUnit.MILLISECONDS,
            100
        ).name("User Enrichment");

        // Split: Separate high-value events
        OutputTag<EnrichedEvent> highValueTag =
            new OutputTag<EnrichedEvent>("high-value"){};

        SingleOutputStreamOperator<EnrichedEvent> processedEvents =
            enrichedEvents.process(new ProcessFunction<EnrichedEvent, EnrichedEvent>() {
                @Override
                public void processElement(
                        EnrichedEvent event,
                        Context ctx,
                        Collector<EnrichedEvent> out) {

                    if (event.value >= 1000) {
                        ctx.output(highValueTag, event);
                    } else {
                        out.collect(event);
                    }
                }
            }).name("Event Classifier");

        // Sink: Write to different destinations

        // Regular events to S3
        FileSink<EnrichedEvent> s3Sink = FileSink
            .forRowFormat(
                new Path("s3://bucket/events/"),
                new EnrichedEventEncoder()
            )
            .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
            .withRollingPolicy(
                DefaultRollingPolicy.builder()
                    .withRolloverInterval(Duration.ofMinutes(15))
                    .build()
            )
            .build();

        processedEvents.sinkTo(s3Sink).name("S3 Sink");

        // High-value events to database
        processedEvents.getSideOutput(highValueTag)
            .addSink(JdbcSink.sink(
                "INSERT INTO high_value_events (id, value, timestamp) VALUES (?, ?, ?)",
                (statement, event) -> {
                    statement.setString(1, event.id);
                    statement.setDouble(2, event.value);
                    statement.setTimestamp(3, new Timestamp(event.timestamp));
                },
                JdbcExecutionOptions.builder()
                    .withBatchSize(1000)
                    .withBatchIntervalMs(200)
                    .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:postgresql://localhost:5432/analytics")
                    .withDriverName("org.postgresql.Driver")
                    .build()
            ))
            .name("Database Sink");

        env.execute("Real-Time ETL Pipeline");
    }

    // Helper classes
    static class JsonParser implements MapFunction<String, Event> {
        private transient ObjectMapper mapper;

        @Override
        public Event map(String json) throws Exception {
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
            return mapper.readValue(json, Event.class);
        }
    }

    static class UserDataEnricher extends RichAsyncFunction<Event, EnrichedEvent> {
        // Implementation similar to previous async example

        @Override
        public void asyncInvoke(Event event, ResultFuture<EnrichedEvent> resultFuture) {
            // Async enrichment logic
        }
    }
}
```

## Performance Optimization

### Chaining Strategy

```java theme={null}
// Good: Let Flink chain operators
stream
    .map(x -> x * 2)
    .filter(x -> x > 0)
    .map(x -> x.toString());

// Bad: Unnecessary disabling of chaining
stream
    .map(x -> x * 2).disableChaining()
    .filter(x -> x > 0).disableChaining()
    .map(x -> x.toString());
```

### Object Reuse

```java theme={null}
// Enable object reuse for better performance (careful with state!)
env.getConfig().enableObjectReuse();
```

### Parallelism Tuning

```java theme={null}
// Set based on workload characteristics
// CPU-bound: parallelism = number of cores
// I/O-bound: parallelism can be higher

// For most operations
env.setParallelism(Runtime.getRuntime().availableProcessors());

// For I/O-heavy operations
stream.map(new IoHeavyMapper())
    .setParallelism(Runtime.getRuntime().availableProcessors() * 2);

// For single-threaded coordination
stream.keyBy(x -> 0)  // All to one key
    .process(new CoordinatorFunction())
    .setParallelism(1);
```

### Network Buffer Tuning

```yaml theme={null}
# flink-conf.yaml
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
```

## Best Practices

### 1. Use KeyBy Wisely

```java theme={null}
// Good: Balanced key distribution
stream.keyBy(transaction -> transaction.userId);

// Bad: Skewed distribution
stream.keyBy(transaction -> transaction.country);  // If most users from one country

// Solution: Add salt to skewed keys
stream.keyBy(transaction -> {
    if (isHotKey(transaction.country)) {
        return transaction.country + "_" + (transaction.hashCode() % 10);
    }
    return transaction.country;
});
```

### 2. Avoid Expensive Operations in Transformations

```java theme={null}
// Bad: Creating objects in hot path
stream.map(x -> new HeavyObject(x));

// Good: Reuse objects or use primitives
stream.map(new RichMapFunction<Input, Output>() {
    private transient HeavyObject reusable;

    @Override
    public void open(Configuration parameters) {
        reusable = new HeavyObject();
    }

    @Override
    public Output map(Input input) {
        reusable.update(input);
        return reusable.process();
    }
});
```

### 3. Handle Errors Gracefully

```java theme={null}
stream.map(new MapFunction<String, Event>() {
    @Override
    public Event map(String value) throws Exception {
        try {
            return parseEvent(value);
        } catch (Exception e) {
            // Log and return null, or use side output for errors
            LOG.error("Failed to parse event: " + value, e);
            return null;
        }
    }
}).filter(Objects::nonNull);

// Better: Use side output for error handling
OutputTag<String> errorTag = new OutputTag<String>("errors"){};

SingleOutputStreamOperator<Event> events = stream.process(
    new ProcessFunction<String, Event>() {
        @Override
        public void processElement(String value, Context ctx, Collector<Event> out) {
            try {
                out.collect(parseEvent(value));
            } catch (Exception e) {
                ctx.output(errorTag, value);
            }
        }
    }
);

// Process errors separately
events.getSideOutput(errorTag).addSink(new ErrorSink());
```

### 4. Monitor Backpressure

```java theme={null}
// Check for backpressure in Flink UI
// If backpressure occurs:
// 1. Increase parallelism
// 2. Optimize expensive operations
// 3. Use async I/O for blocking operations
// 4. Add more resources
```

## Exercises

### Exercise 1: Word Count with State

Implement a streaming word count that maintains counts across the stream lifetime.

```java theme={null}
// TODO: Implement StatefulWordCount
public class StatefulWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.socketTextStream("localhost", 9999);

        // Your code here:
        // 1. Split lines into words
        // 2. Key by word
        // 3. Use ValueState to maintain count
        // 4. Print results

        env.execute("Stateful Word Count");
    }
}
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class StatefulWordCount {
      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env =
              StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream<String> text = env.socketTextStream("localhost", 9999);

          DataStream<Tuple2<String, Long>> wordCounts = text
              .flatMap((String line, Collector<String> out) -> {
                  for (String word : line.split("\\s+")) {
                      out.collect(word);
                  }
              })
              .returns(Types.STRING)
              .keyBy(word -> word)
              .process(new KeyedProcessFunction<String, String, Tuple2<String, Long>>() {

                  private ValueState<Long> countState;

                  @Override
                  public void open(Configuration parameters) {
                      countState = getRuntimeContext().getState(
                          new ValueStateDescriptor<>("count", Long.class));
                  }

                  @Override
                  public void processElement(
                          String word,
                          Context ctx,
                          Collector<Tuple2<String, Long>> out) throws Exception {

                      Long current = countState.value();
                      if (current == null) {
                          current = 0L;
                      }
                      current++;
                      countState.update(current);
                      out.collect(new Tuple2<>(word, current));
                  }
              });

          wordCounts.print();

          env.execute("Stateful Word Count");
      }
  }
  ```
</details>

### Exercise 2: Stream Join

Join two streams (users and transactions) based on userId.

```java theme={null}
// TODO: Implement stream join
// Stream 1: User updates (userId, name, country)
// Stream 2: Transactions (txnId, userId, amount)
// Output: Enriched transactions with user details
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class StreamJoinExample {
      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env =
              StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream<User> users = /* ... */;
          DataStream<Transaction> transactions = /* ... */;

          // Solution using CoProcessFunction
          DataStream<EnrichedTransaction> enriched = transactions
              .keyBy(txn -> txn.userId)
              .connect(users.keyBy(user -> user.userId))
              .process(new CoProcessFunction<Transaction, User, EnrichedTransaction>() {

                  private ValueState<User> userState;

                  @Override
                  public void open(Configuration parameters) {
                      userState = getRuntimeContext().getState(
                          new ValueStateDescriptor<>("user", User.class));
                  }

                  @Override
                  public void processElement1(
                          Transaction txn,
                          Context ctx,
                          Collector<EnrichedTransaction> out) throws Exception {

                      User user = userState.value();
                      if (user != null) {
                          out.collect(new EnrichedTransaction(txn, user));
                      } else {
                          // User not yet seen, could buffer or skip
                      }
                  }

                  @Override
                  public void processElement2(
                          User user,
                          Context ctx,
                          Collector<EnrichedTransaction> out) throws Exception {

                      userState.update(user);
                  }
              });

          enriched.print();
          env.execute("Stream Join");
      }
  }
  ```
</details>

### Exercise 3: Custom Source

Implement a custom source that generates simulated sensor data.

```java theme={null}
// TODO: Implement SensorSource
// Generate readings every second
// Each reading: (sensorId, temperature, timestamp)
```

<details>
  <summary>Solution</summary>

  ```java theme={null}
  public class SensorSource implements SourceFunction<SensorReading> {

      private volatile boolean running = true;
      private final int numSensors;
      private final Random random = new Random();

      public SensorSource(int numSensors) {
          this.numSensors = numSensors;
      }

      @Override
      public void run(SourceContext<SensorReading> ctx) throws Exception {
          while (running) {
              for (int i = 0; i < numSensors; i++) {
                  String sensorId = "sensor_" + i;
                  double temperature = 20 + random.nextGaussian() * 10;
                  long timestamp = System.currentTimeMillis();

                  ctx.collect(new SensorReading(sensorId, temperature, timestamp));
              }
              Thread.sleep(1000);
          }
      }

      @Override
      public void cancel() {
          running = false;
      }
  }

  // Usage
  DataStream<SensorReading> sensorData = env.addSource(new SensorSource(10));
  ```
</details>

## Summary

In this module, you learned:

* DataStream API fundamentals and core concepts
* Sources and sinks for various data systems
* Basic and advanced transformations (map, flatMap, process)
* Rich functions and process functions for complex logic
* Parallel execution, partitioning, and operator chaining
* Connecting streams with union, connect, and broadcast
* Async I/O for efficient external data enrichment
* Real-world ETL pipeline implementation
* Performance optimization techniques
* Production best practices

## Next Steps

<Card title="Module 3: Event Time & Watermarks" icon="clock" href="/distributed-systems-tools/flink-eventtime">
  Learn how to handle out-of-order events with event time processing
</Card>

## Additional Resources

* [Flink DataStream API Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/)
* [Flink Connectors](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/overview/)
* [Async I/O Guide](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/)
