Skip to main content

DataStream API

Module Duration: 4-5 hours Focus: Stream processing with Apache Flink Prerequisites: Streaming concepts, Java/Scala basics

Introduction

The DataStream API is Flink’s foundational API for stream processing, providing fine-grained control over data transformations, state management, and event-time processing. Unlike high-level APIs like SQL, the DataStream API gives you complete flexibility to implement complex streaming logic.

Core Concepts

DataStream Abstraction

A DataStream represents an unbounded sequence of records. Operations on DataStreams are lazy - they build a dataflow graph that gets executed when you call execute().
// Java: Basic DataStream creation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// From collection
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

// From socket
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// From Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
    "input-topic",
    new SimpleStringSchema(),
    properties
);
DataStream<String> kafkaStream = env.addSource(consumer);
// Scala: Basic DataStream creation
val env = StreamExecutionEnvironment.getExecutionEnvironment

// From collection
val numbers = env.fromElements(1, 2, 3, 4, 5)

// From socket
val socketStream = env.socketTextStream("localhost", 9999)

// From Kafka
val consumer = new FlinkKafkaConsumer[String](
  "input-topic",
  new SimpleStringSchema(),
  properties
)
val kafkaStream = env.addSource(consumer)
# Python: Basic DataStream creation
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

# From collection
numbers = env.from_collection([1, 2, 3, 4, 5])

# From Kafka
kafka_source = FlinkKafkaConsumer(
    topics='input-topic',
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': 'localhost:9092'}
)
kafka_stream = env.add_source(kafka_source)

Sources and Sinks

Built-in Sources

// File sources
DataStream<String> textFiles = env.readTextFile("hdfs://path/to/files");

// Collection source
List<Event> events = Arrays.asList(/* events */);
DataStream<Event> fromCollection = env.fromCollection(events);

// Custom source
DataStream<CustomData> customStream = env.addSource(new CustomSourceFunction());

// Kafka source (Flink 1.14+)
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.noWatermarks(),
    "Kafka Source"
);

Built-in Sinks

// Print sink (for debugging)
stream.print();

// File sink
final FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
    .build();

stream.sinkTo(sink);

// Kafka sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .build();

stream.sinkTo(kafkaSink);

// JDBC sink
stream.addSink(JdbcSink.sink(
    "INSERT INTO table (id, name, value) VALUES (?, ?, ?)",
    (statement, record) -> {
        statement.setLong(1, record.id);
        statement.setString(2, record.name);
        statement.setDouble(3, record.value);
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:postgresql://localhost:5432/mydb")
        .withDriverName("org.postgresql.Driver")
        .withUsername("user")
        .withPassword("password")
        .build()
));

Transformations

Basic Transformations

// Map: One-to-one transformation
DataStream<Integer> doubled = numbers.map(x -> x * 2);

// FlatMap: One-to-many transformation
DataStream<String> words = lines.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.split(" ")) {
            out.collect(word);
        }
    }
).returns(Types.STRING);

// Filter: Select elements
DataStream<Integer> evens = numbers.filter(x -> x % 2 == 0);

// KeyBy: Partition by key
KeyedStream<Transaction, String> byUser = transactions.keyBy(t -> t.userId);
// Scala transformations with type safety
val doubled = numbers.map(_ * 2)

val words = lines.flatMap(_.split(" "))

val evens = numbers.filter(_ % 2 == 0)

val byUser = transactions.keyBy(_.userId)

Rich Functions

Rich functions provide access to runtime context, state, and lifecycle methods.
public class EnrichmentMapper extends RichMapFunction<Transaction, EnrichedTransaction> {
    private transient ValueState<UserProfile> profileState;
    private transient Counter transactionCounter;

    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialize state
        ValueStateDescriptor<UserProfile> descriptor =
            new ValueStateDescriptor<>("user-profile", UserProfile.class);
        profileState = getRuntimeContext().getState(descriptor);

        // Initialize metrics
        transactionCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("transactions");
    }

    @Override
    public EnrichedTransaction map(Transaction transaction) throws Exception {
        // Access state
        UserProfile profile = profileState.value();

        if (profile == null) {
            // Fetch from external system (ideally cached)
            profile = fetchUserProfile(transaction.userId);
            profileState.update(profile);
        }

        transactionCounter.inc();

        return new EnrichedTransaction(transaction, profile);
    }

    private UserProfile fetchUserProfile(String userId) {
        // Implementation
        return new UserProfile(userId);
    }
}

// Usage
DataStream<EnrichedTransaction> enriched = transactions
    .keyBy(t -> t.userId)
    .map(new EnrichmentMapper());

Process Functions

ProcessFunction provides the most flexibility with access to timers, state, and side outputs.
public class AlertProcessFunction extends KeyedProcessFunction<String, SensorReading, Alert> {

    // State to track last reading
    private ValueState<Double> lastTempState;
    // State to track timer
    private ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        lastTempState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("lastTemp", Double.class));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class));
    }

    @Override
    public void processElement(
            SensorReading reading,
            Context ctx,
            Collector<Alert> out) throws Exception {

        Double lastTemp = lastTempState.value();
        Long timer = timerState.value();

        // Update last temperature
        lastTempState.update(reading.temperature);

        // If temperature increased and no timer set
        if (lastTemp != null && reading.temperature > lastTemp && timer == null) {
            // Set timer for 1 minute from now
            long timerTs = ctx.timerService().currentProcessingTime() + 60000;
            ctx.timerService().registerProcessingTimeTimer(timerTs);
            timerState.update(timerTs);
        }
        // If temperature decreased, cancel timer
        else if (lastTemp != null && reading.temperature <= lastTemp && timer != null) {
            ctx.timerService().deleteProcessingTimeTimer(timer);
            timerState.clear();
        }
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Alert> out) throws Exception {

        out.collect(new Alert(
            "Temperature consistently rising for sensor " + ctx.getCurrentKey()));
        timerState.clear();
    }
}

// Usage
DataStream<Alert> alerts = sensorData
    .keyBy(r -> r.sensorId)
    .process(new AlertProcessFunction());

Side Outputs

Side outputs allow splitting a stream into multiple outputs.
// Define output tags
final OutputTag<Transaction> highValueTag =
    new OutputTag<Transaction>("high-value"){};
final OutputTag<Transaction> suspiciousTag =
    new OutputTag<Transaction>("suspicious"){};

// Process function with side outputs
public class TransactionClassifier extends ProcessFunction<Transaction, Transaction> {

    @Override
    public void processElement(
            Transaction txn,
            Context ctx,
            Collector<Transaction> out) throws Exception {

        // Main output: normal transactions
        if (txn.amount < 1000 && !txn.isSuspicious) {
            out.collect(txn);
        }
        // High value side output
        else if (txn.amount >= 1000) {
            ctx.output(highValueTag, txn);
        }
        // Suspicious side output
        else if (txn.isSuspicious) {
            ctx.output(suspiciousTag, txn);
        }
    }
}

// Apply and get side outputs
SingleOutputStreamOperator<Transaction> mainStream = transactions
    .process(new TransactionClassifier());

DataStream<Transaction> highValue = mainStream.getSideOutput(highValueTag);
DataStream<Transaction> suspicious = mainStream.getSideOutput(suspiciousTag);

Parallel Execution

Parallelism Configuration

// Set default parallelism for the job
env.setParallelism(4);

// Set parallelism for specific operator
DataStream<String> processed = stream
    .map(new MyMapper())
    .setParallelism(8);  // This operator runs with parallelism 8

// Disable parallel execution (parallelism = 1)
stream.filter(x -> x > 0).setParallelism(1);

Partitioning Strategies

// Key-based partitioning (hash partitioning)
KeyedStream<Event, String> keyed = events.keyBy(e -> e.key);

// Custom partitioning
stream.partitionCustom(
    (key, numPartitions) -> key.hashCode() % numPartitions,
    e -> e.customKey
);

// Round-robin partitioning (rebalance)
stream.rebalance();

// Rescale (local round-robin)
stream.rescale();

// Broadcast (send to all downstream tasks)
stream.broadcast();

// Global (send all to first partition)
stream.global();

// Forward (one-to-one, no network transfer)
stream.forward();

// Shuffle (random distribution)
stream.shuffle();

Slot Sharing and Chaining

// Disable operator chaining
stream
    .map(new MyMapper())
    .disableChaining()  // This operator won't chain with others
    .filter(x -> x != null);

// Start new chain
stream
    .map(new MyMapper())
    .startNewChain()  // New chain starts here
    .filter(x -> x != null);

// Set slot sharing group (operators with same group share slots)
stream
    .map(new MyMapper())
    .slotSharingGroup("group1")
    .filter(x -> x != null)
    .slotSharingGroup("group2");  // Different group, different slot

Connecting Streams

Union

Combine multiple streams of the same type.
DataStream<Event> stream1 = /* ... */;
DataStream<Event> stream2 = /* ... */;
DataStream<Event> stream3 = /* ... */;

// Union multiple streams
DataStream<Event> combined = stream1
    .union(stream2, stream3);

Connect (CoStream)

Combine two streams of different types with shared state.
// Connect two streams
ConnectedStreams<Transaction, Rule> connected =
    transactions.connect(rules);

// CoMap: Transform both streams
DataStream<Alert> alerts = connected.map(
    new CoMapFunction<Transaction, Rule, Alert>() {
        @Override
        public Alert map1(Transaction txn) {
            // Process transaction
            return checkTransaction(txn);
        }

        @Override
        public Alert map2(Rule rule) {
            // Process rule update
            return updateRules(rule);
        }
    }
);

// CoProcessFunction: Advanced processing with state
public class RuleBasedProcessor extends CoProcessFunction<Transaction, Rule, Alert> {

    // Broadcast state to store rules
    private MapState<String, Rule> rulesState;

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Rule> descriptor =
            new MapStateDescriptor<>("rules", String.class, Rule.class);
        rulesState = getRuntimeContext().getMapState(descriptor);
    }

    @Override
    public void processElement1(
            Transaction txn,
            Context ctx,
            Collector<Alert> out) throws Exception {

        // Check transaction against all rules
        for (Map.Entry<String, Rule> entry : rulesState.entries()) {
            if (entry.getValue().matches(txn)) {
                out.collect(new Alert(txn, entry.getValue()));
            }
        }
    }

    @Override
    public void processElement2(
            Rule rule,
            Context ctx,
            Collector<Alert> out) throws Exception {

        // Update rules
        rulesState.put(rule.id, rule);
    }
}

DataStream<Alert> alerts = transactions
    .keyBy(t -> t.userId)
    .connect(rules.broadcast())
    .process(new RuleBasedProcessor());

Broadcast State Pattern

// Define broadcast state descriptor
MapStateDescriptor<String, Rule> ruleStateDescriptor =
    new MapStateDescriptor<>("RulesBroadcastState", String.class, Rule.class);

// Broadcast the rules stream
BroadcastStream<Rule> ruleBroadcastStream =
    rules.broadcast(ruleStateDescriptor);

// Connect transaction stream with broadcast stream
DataStream<Alert> alerts = transactions
    .connect(ruleBroadcastStream)
    .process(new KeyedBroadcastProcessFunction<String, Transaction, Rule, Alert>() {

        @Override
        public void processElement(
                Transaction txn,
                ReadOnlyContext ctx,
                Collector<Alert> out) throws Exception {

            // Access broadcast state (read-only)
            ReadOnlyBroadcastState<String, Rule> broadcastState =
                ctx.getBroadcastState(ruleStateDescriptor);

            for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {
                if (entry.getValue().matches(txn)) {
                    out.collect(new Alert(txn, entry.getValue()));
                }
            }
        }

        @Override
        public void processBroadcastElement(
                Rule rule,
                Context ctx,
                Collector<Alert> out) throws Exception {

            // Update broadcast state
            BroadcastState<String, Rule> broadcastState =
                ctx.getBroadcastState(ruleStateDescriptor);
            broadcastState.put(rule.id, rule);
        }
    });

Async I/O

For efficient external data enrichment with non-blocking I/O.
// Async function for database lookup
public class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, DatabaseRecord>> {

    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient();
    }

    @Override
    public void asyncInvoke(
            String key,
            ResultFuture<Tuple2<String, DatabaseRecord>> resultFuture) throws Exception {

        // Async database query
        CompletableFuture<DatabaseRecord> future = client.asyncQuery(key);

        future.whenComplete((record, error) -> {
            if (error != null) {
                resultFuture.completeExceptionally(error);
            } else {
                resultFuture.complete(
                    Collections.singleton(new Tuple2<>(key, record)));
            }
        });
    }

    @Override
    public void timeout(
            String key,
            ResultFuture<Tuple2<String, DatabaseRecord>> resultFuture) throws Exception {

        // Handle timeout
        resultFuture.complete(Collections.emptyList());
    }

    @Override
    public void close() throws Exception {
        client.close();
    }
}

// Apply async I/O
DataStream<Tuple2<String, DatabaseRecord>> enriched =
    AsyncDataStream.unorderedWait(
        input,
        new AsyncDatabaseRequest(),
        1000,  // timeout in milliseconds
        TimeUnit.MILLISECONDS,
        100    // max concurrent requests
    );

// Ordered async I/O (preserves order)
DataStream<Tuple2<String, DatabaseRecord>> enrichedOrdered =
    AsyncDataStream.orderedWait(
        input,
        new AsyncDatabaseRequest(),
        1000,
        TimeUnit.MILLISECONDS,
        100
    );

Iterations

Flink supports iterative stream processing for machine learning and graph algorithms.
// Define iteration
IterativeStream<Long> iteration = input.iterate();

// Define iteration body
DataStream<Long> iterationBody = iteration
    .map(new StepFunction())
    .filter(new FilterFunction<Long>() {
        @Override
        public boolean filter(Long value) throws Exception {
            return value > 0;
        }
    });

// Close iteration (feedback loop)
DataStream<Long> output = iteration.closeWith(iterationBody);

// Example: Fibonacci sequence
DataStream<Tuple2<Long, Long>> fibonacci = env
    .fromElements(new Tuple2<>(0L, 1L))
    .iterate(5000);  // Max 5000 iterations

DataStream<Tuple2<Long, Long>> step = fibonacci.map(
    value -> new Tuple2<>(value.f1, value.f0 + value.f1)
);

DataStream<Tuple2<Long, Long>> result = fibonacci.closeWith(step);

Real-World Example: Real-Time ETL Pipeline

public class RealTimeETLPipeline {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(4);
        env.enableCheckpointing(60000);  // Checkpoint every minute

        // Source: Read from Kafka
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("raw-events")
            .setGroupId("etl-pipeline")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        DataStream<String> rawEvents = env.fromSource(
            kafkaSource,
            WatermarkStrategy.noWatermarks(),
            "Kafka Source"
        );

        // Transform: Parse JSON
        DataStream<Event> parsedEvents = rawEvents
            .map(new JsonParser())
            .name("JSON Parser");

        // Filter: Remove invalid events
        DataStream<Event> validEvents = parsedEvents
            .filter(e -> e.isValid())
            .name("Validation Filter");

        // Enrich: Add user data
        DataStream<EnrichedEvent> enrichedEvents = AsyncDataStream.unorderedWait(
            validEvents,
            new UserDataEnricher(),
            1000,
            TimeUnit.MILLISECONDS,
            100
        ).name("User Enrichment");

        // Split: Separate high-value events
        OutputTag<EnrichedEvent> highValueTag =
            new OutputTag<EnrichedEvent>("high-value"){};

        SingleOutputStreamOperator<EnrichedEvent> processedEvents =
            enrichedEvents.process(new ProcessFunction<EnrichedEvent, EnrichedEvent>() {
                @Override
                public void processElement(
                        EnrichedEvent event,
                        Context ctx,
                        Collector<EnrichedEvent> out) {

                    if (event.value >= 1000) {
                        ctx.output(highValueTag, event);
                    } else {
                        out.collect(event);
                    }
                }
            }).name("Event Classifier");

        // Sink: Write to different destinations

        // Regular events to S3
        FileSink<EnrichedEvent> s3Sink = FileSink
            .forRowFormat(
                new Path("s3://bucket/events/"),
                new EnrichedEventEncoder()
            )
            .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
            .withRollingPolicy(
                DefaultRollingPolicy.builder()
                    .withRolloverInterval(Duration.ofMinutes(15))
                    .build()
            )
            .build();

        processedEvents.sinkTo(s3Sink).name("S3 Sink");

        // High-value events to database
        processedEvents.getSideOutput(highValueTag)
            .addSink(JdbcSink.sink(
                "INSERT INTO high_value_events (id, value, timestamp) VALUES (?, ?, ?)",
                (statement, event) -> {
                    statement.setString(1, event.id);
                    statement.setDouble(2, event.value);
                    statement.setTimestamp(3, new Timestamp(event.timestamp));
                },
                JdbcExecutionOptions.builder()
                    .withBatchSize(1000)
                    .withBatchIntervalMs(200)
                    .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:postgresql://localhost:5432/analytics")
                    .withDriverName("org.postgresql.Driver")
                    .build()
            ))
            .name("Database Sink");

        env.execute("Real-Time ETL Pipeline");
    }

    // Helper classes
    static class JsonParser implements MapFunction<String, Event> {
        private transient ObjectMapper mapper;

        @Override
        public Event map(String json) throws Exception {
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
            return mapper.readValue(json, Event.class);
        }
    }

    static class UserDataEnricher extends RichAsyncFunction<Event, EnrichedEvent> {
        // Implementation similar to previous async example

        @Override
        public void asyncInvoke(Event event, ResultFuture<EnrichedEvent> resultFuture) {
            // Async enrichment logic
        }
    }
}

Performance Optimization

Chaining Strategy

// Good: Let Flink chain operators
stream
    .map(x -> x * 2)
    .filter(x -> x > 0)
    .map(x -> x.toString());

// Bad: Unnecessary disabling of chaining
stream
    .map(x -> x * 2).disableChaining()
    .filter(x -> x > 0).disableChaining()
    .map(x -> x.toString());

Object Reuse

// Enable object reuse for better performance (careful with state!)
env.getConfig().enableObjectReuse();

Parallelism Tuning

// Set based on workload characteristics
// CPU-bound: parallelism = number of cores
// I/O-bound: parallelism can be higher

// For most operations
env.setParallelism(Runtime.getRuntime().availableProcessors());

// For I/O-heavy operations
stream.map(new IoHeavyMapper())
    .setParallelism(Runtime.getRuntime().availableProcessors() * 2);

// For single-threaded coordination
stream.keyBy(x -> 0)  // All to one key
    .process(new CoordinatorFunction())
    .setParallelism(1);

Network Buffer Tuning

# flink-conf.yaml
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb

Best Practices

1. Use KeyBy Wisely

// Good: Balanced key distribution
stream.keyBy(transaction -> transaction.userId);

// Bad: Skewed distribution
stream.keyBy(transaction -> transaction.country);  // If most users from one country

// Solution: Add salt to skewed keys
stream.keyBy(transaction -> {
    if (isHotKey(transaction.country)) {
        return transaction.country + "_" + (transaction.hashCode() % 10);
    }
    return transaction.country;
});

2. Avoid Expensive Operations in Transformations

// Bad: Creating objects in hot path
stream.map(x -> new HeavyObject(x));

// Good: Reuse objects or use primitives
stream.map(new RichMapFunction<Input, Output>() {
    private transient HeavyObject reusable;

    @Override
    public void open(Configuration parameters) {
        reusable = new HeavyObject();
    }

    @Override
    public Output map(Input input) {
        reusable.update(input);
        return reusable.process();
    }
});

3. Handle Errors Gracefully

stream.map(new MapFunction<String, Event>() {
    @Override
    public Event map(String value) throws Exception {
        try {
            return parseEvent(value);
        } catch (Exception e) {
            // Log and return null, or use side output for errors
            LOG.error("Failed to parse event: " + value, e);
            return null;
        }
    }
}).filter(Objects::nonNull);

// Better: Use side output for error handling
OutputTag<String> errorTag = new OutputTag<String>("errors"){};

SingleOutputStreamOperator<Event> events = stream.process(
    new ProcessFunction<String, Event>() {
        @Override
        public void processElement(String value, Context ctx, Collector<Event> out) {
            try {
                out.collect(parseEvent(value));
            } catch (Exception e) {
                ctx.output(errorTag, value);
            }
        }
    }
);

// Process errors separately
events.getSideOutput(errorTag).addSink(new ErrorSink());

4. Monitor Backpressure

// Check for backpressure in Flink UI
// If backpressure occurs:
// 1. Increase parallelism
// 2. Optimize expensive operations
// 3. Use async I/O for blocking operations
// 4. Add more resources

Exercises

Exercise 1: Word Count with State

Implement a streaming word count that maintains counts across the stream lifetime.
// TODO: Implement StatefulWordCount
public class StatefulWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.socketTextStream("localhost", 9999);

        // Your code here:
        // 1. Split lines into words
        // 2. Key by word
        // 3. Use ValueState to maintain count
        // 4. Print results

        env.execute("Stateful Word Count");
    }
}

Exercise 2: Stream Join

Join two streams (users and transactions) based on userId.
// TODO: Implement stream join
// Stream 1: User updates (userId, name, country)
// Stream 2: Transactions (txnId, userId, amount)
// Output: Enriched transactions with user details

Exercise 3: Custom Source

Implement a custom source that generates simulated sensor data.
// TODO: Implement SensorSource
// Generate readings every second
// Each reading: (sensorId, temperature, timestamp)

Summary

In this module, you learned:
  • DataStream API fundamentals and core concepts
  • Sources and sinks for various data systems
  • Basic and advanced transformations (map, flatMap, process)
  • Rich functions and process functions for complex logic
  • Parallel execution, partitioning, and operator chaining
  • Connecting streams with union, connect, and broadcast
  • Async I/O for efficient external data enrichment
  • Real-world ETL pipeline implementation
  • Performance optimization techniques
  • Production best practices

Next Steps

Module 3: Event Time & Watermarks

Learn how to handle out-of-order events with event time processing

Additional Resources