DataStream API
Module Duration: 4-5 hours
Focus: Stream processing with Apache Flink
Prerequisites: Streaming concepts, Java/Scala basics
Introduction
The DataStream API is Flink’s foundational API for stream processing, providing fine-grained control over data transformations, state management, and event-time processing. Unlike high-level APIs like SQL, the DataStream API gives you complete flexibility to implement complex streaming logic.Core Concepts
DataStream Abstraction
ADataStream represents an unbounded sequence of records. Operations on DataStreams are lazy - they build a dataflow graph that gets executed when you call execute().
Copy
// Java: Basic DataStream creation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// From collection
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
// From socket
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
// From Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> kafkaStream = env.addSource(consumer);
Copy
// Scala: Basic DataStream creation
val env = StreamExecutionEnvironment.getExecutionEnvironment
// From collection
val numbers = env.fromElements(1, 2, 3, 4, 5)
// From socket
val socketStream = env.socketTextStream("localhost", 9999)
// From Kafka
val consumer = new FlinkKafkaConsumer[String](
"input-topic",
new SimpleStringSchema(),
properties
)
val kafkaStream = env.addSource(consumer)
Copy
# Python: Basic DataStream creation
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# From collection
numbers = env.from_collection([1, 2, 3, 4, 5])
# From Kafka
kafka_source = FlinkKafkaConsumer(
topics='input-topic',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': 'localhost:9092'}
)
kafka_stream = env.add_source(kafka_source)
Sources and Sinks
Built-in Sources
Copy
// File sources
DataStream<String> textFiles = env.readTextFile("hdfs://path/to/files");
// Collection source
List<Event> events = Arrays.asList(/* events */);
DataStream<Event> fromCollection = env.fromCollection(events);
// Custom source
DataStream<CustomData> customStream = env.addSource(new CustomSourceFunction());
// Kafka source (Flink 1.14+)
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
Built-in Sinks
Copy
// Print sink (for debugging)
stream.print();
// File sink
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build();
stream.sinkTo(sink);
// Kafka sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
stream.sinkTo(kafkaSink);
// JDBC sink
stream.addSink(JdbcSink.sink(
"INSERT INTO table (id, name, value) VALUES (?, ?, ?)",
(statement, record) -> {
statement.setLong(1, record.id);
statement.setString(2, record.name);
statement.setDouble(3, record.value);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/mydb")
.withDriverName("org.postgresql.Driver")
.withUsername("user")
.withPassword("password")
.build()
));
Transformations
Basic Transformations
Copy
// Map: One-to-one transformation
DataStream<Integer> doubled = numbers.map(x -> x * 2);
// FlatMap: One-to-many transformation
DataStream<String> words = lines.flatMap(
(String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
}
).returns(Types.STRING);
// Filter: Select elements
DataStream<Integer> evens = numbers.filter(x -> x % 2 == 0);
// KeyBy: Partition by key
KeyedStream<Transaction, String> byUser = transactions.keyBy(t -> t.userId);
Copy
// Scala transformations with type safety
val doubled = numbers.map(_ * 2)
val words = lines.flatMap(_.split(" "))
val evens = numbers.filter(_ % 2 == 0)
val byUser = transactions.keyBy(_.userId)
Rich Functions
Rich functions provide access to runtime context, state, and lifecycle methods.Copy
public class EnrichmentMapper extends RichMapFunction<Transaction, EnrichedTransaction> {
private transient ValueState<UserProfile> profileState;
private transient Counter transactionCounter;
@Override
public void open(Configuration parameters) throws Exception {
// Initialize state
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
profileState = getRuntimeContext().getState(descriptor);
// Initialize metrics
transactionCounter = getRuntimeContext()
.getMetricGroup()
.counter("transactions");
}
@Override
public EnrichedTransaction map(Transaction transaction) throws Exception {
// Access state
UserProfile profile = profileState.value();
if (profile == null) {
// Fetch from external system (ideally cached)
profile = fetchUserProfile(transaction.userId);
profileState.update(profile);
}
transactionCounter.inc();
return new EnrichedTransaction(transaction, profile);
}
private UserProfile fetchUserProfile(String userId) {
// Implementation
return new UserProfile(userId);
}
}
// Usage
DataStream<EnrichedTransaction> enriched = transactions
.keyBy(t -> t.userId)
.map(new EnrichmentMapper());
Process Functions
ProcessFunction provides the most flexibility with access to timers, state, and side outputs.Copy
public class AlertProcessFunction extends KeyedProcessFunction<String, SensorReading, Alert> {
// State to track last reading
private ValueState<Double> lastTempState;
// State to track timer
private ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
lastTempState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastTemp", Double.class));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(
SensorReading reading,
Context ctx,
Collector<Alert> out) throws Exception {
Double lastTemp = lastTempState.value();
Long timer = timerState.value();
// Update last temperature
lastTempState.update(reading.temperature);
// If temperature increased and no timer set
if (lastTemp != null && reading.temperature > lastTemp && timer == null) {
// Set timer for 1 minute from now
long timerTs = ctx.timerService().currentProcessingTime() + 60000;
ctx.timerService().registerProcessingTimeTimer(timerTs);
timerState.update(timerTs);
}
// If temperature decreased, cancel timer
else if (lastTemp != null && reading.temperature <= lastTemp && timer != null) {
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
}
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Alert> out) throws Exception {
out.collect(new Alert(
"Temperature consistently rising for sensor " + ctx.getCurrentKey()));
timerState.clear();
}
}
// Usage
DataStream<Alert> alerts = sensorData
.keyBy(r -> r.sensorId)
.process(new AlertProcessFunction());
Side Outputs
Side outputs allow splitting a stream into multiple outputs.Copy
// Define output tags
final OutputTag<Transaction> highValueTag =
new OutputTag<Transaction>("high-value"){};
final OutputTag<Transaction> suspiciousTag =
new OutputTag<Transaction>("suspicious"){};
// Process function with side outputs
public class TransactionClassifier extends ProcessFunction<Transaction, Transaction> {
@Override
public void processElement(
Transaction txn,
Context ctx,
Collector<Transaction> out) throws Exception {
// Main output: normal transactions
if (txn.amount < 1000 && !txn.isSuspicious) {
out.collect(txn);
}
// High value side output
else if (txn.amount >= 1000) {
ctx.output(highValueTag, txn);
}
// Suspicious side output
else if (txn.isSuspicious) {
ctx.output(suspiciousTag, txn);
}
}
}
// Apply and get side outputs
SingleOutputStreamOperator<Transaction> mainStream = transactions
.process(new TransactionClassifier());
DataStream<Transaction> highValue = mainStream.getSideOutput(highValueTag);
DataStream<Transaction> suspicious = mainStream.getSideOutput(suspiciousTag);
Parallel Execution
Parallelism Configuration
Copy
// Set default parallelism for the job
env.setParallelism(4);
// Set parallelism for specific operator
DataStream<String> processed = stream
.map(new MyMapper())
.setParallelism(8); // This operator runs with parallelism 8
// Disable parallel execution (parallelism = 1)
stream.filter(x -> x > 0).setParallelism(1);
Partitioning Strategies
Copy
// Key-based partitioning (hash partitioning)
KeyedStream<Event, String> keyed = events.keyBy(e -> e.key);
// Custom partitioning
stream.partitionCustom(
(key, numPartitions) -> key.hashCode() % numPartitions,
e -> e.customKey
);
// Round-robin partitioning (rebalance)
stream.rebalance();
// Rescale (local round-robin)
stream.rescale();
// Broadcast (send to all downstream tasks)
stream.broadcast();
// Global (send all to first partition)
stream.global();
// Forward (one-to-one, no network transfer)
stream.forward();
// Shuffle (random distribution)
stream.shuffle();
Slot Sharing and Chaining
Copy
// Disable operator chaining
stream
.map(new MyMapper())
.disableChaining() // This operator won't chain with others
.filter(x -> x != null);
// Start new chain
stream
.map(new MyMapper())
.startNewChain() // New chain starts here
.filter(x -> x != null);
// Set slot sharing group (operators with same group share slots)
stream
.map(new MyMapper())
.slotSharingGroup("group1")
.filter(x -> x != null)
.slotSharingGroup("group2"); // Different group, different slot
Connecting Streams
Union
Combine multiple streams of the same type.Copy
DataStream<Event> stream1 = /* ... */;
DataStream<Event> stream2 = /* ... */;
DataStream<Event> stream3 = /* ... */;
// Union multiple streams
DataStream<Event> combined = stream1
.union(stream2, stream3);
Connect (CoStream)
Combine two streams of different types with shared state.Copy
// Connect two streams
ConnectedStreams<Transaction, Rule> connected =
transactions.connect(rules);
// CoMap: Transform both streams
DataStream<Alert> alerts = connected.map(
new CoMapFunction<Transaction, Rule, Alert>() {
@Override
public Alert map1(Transaction txn) {
// Process transaction
return checkTransaction(txn);
}
@Override
public Alert map2(Rule rule) {
// Process rule update
return updateRules(rule);
}
}
);
// CoProcessFunction: Advanced processing with state
public class RuleBasedProcessor extends CoProcessFunction<Transaction, Rule, Alert> {
// Broadcast state to store rules
private MapState<String, Rule> rulesState;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, Rule> descriptor =
new MapStateDescriptor<>("rules", String.class, Rule.class);
rulesState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement1(
Transaction txn,
Context ctx,
Collector<Alert> out) throws Exception {
// Check transaction against all rules
for (Map.Entry<String, Rule> entry : rulesState.entries()) {
if (entry.getValue().matches(txn)) {
out.collect(new Alert(txn, entry.getValue()));
}
}
}
@Override
public void processElement2(
Rule rule,
Context ctx,
Collector<Alert> out) throws Exception {
// Update rules
rulesState.put(rule.id, rule);
}
}
DataStream<Alert> alerts = transactions
.keyBy(t -> t.userId)
.connect(rules.broadcast())
.process(new RuleBasedProcessor());
Broadcast State Pattern
Copy
// Define broadcast state descriptor
MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>("RulesBroadcastState", String.class, Rule.class);
// Broadcast the rules stream
BroadcastStream<Rule> ruleBroadcastStream =
rules.broadcast(ruleStateDescriptor);
// Connect transaction stream with broadcast stream
DataStream<Alert> alerts = transactions
.connect(ruleBroadcastStream)
.process(new KeyedBroadcastProcessFunction<String, Transaction, Rule, Alert>() {
@Override
public void processElement(
Transaction txn,
ReadOnlyContext ctx,
Collector<Alert> out) throws Exception {
// Access broadcast state (read-only)
ReadOnlyBroadcastState<String, Rule> broadcastState =
ctx.getBroadcastState(ruleStateDescriptor);
for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {
if (entry.getValue().matches(txn)) {
out.collect(new Alert(txn, entry.getValue()));
}
}
}
@Override
public void processBroadcastElement(
Rule rule,
Context ctx,
Collector<Alert> out) throws Exception {
// Update broadcast state
BroadcastState<String, Rule> broadcastState =
ctx.getBroadcastState(ruleStateDescriptor);
broadcastState.put(rule.id, rule);
}
});
Async I/O
For efficient external data enrichment with non-blocking I/O.Copy
// Async function for database lookup
public class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, DatabaseRecord>> {
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient();
}
@Override
public void asyncInvoke(
String key,
ResultFuture<Tuple2<String, DatabaseRecord>> resultFuture) throws Exception {
// Async database query
CompletableFuture<DatabaseRecord> future = client.asyncQuery(key);
future.whenComplete((record, error) -> {
if (error != null) {
resultFuture.completeExceptionally(error);
} else {
resultFuture.complete(
Collections.singleton(new Tuple2<>(key, record)));
}
});
}
@Override
public void timeout(
String key,
ResultFuture<Tuple2<String, DatabaseRecord>> resultFuture) throws Exception {
// Handle timeout
resultFuture.complete(Collections.emptyList());
}
@Override
public void close() throws Exception {
client.close();
}
}
// Apply async I/O
DataStream<Tuple2<String, DatabaseRecord>> enriched =
AsyncDataStream.unorderedWait(
input,
new AsyncDatabaseRequest(),
1000, // timeout in milliseconds
TimeUnit.MILLISECONDS,
100 // max concurrent requests
);
// Ordered async I/O (preserves order)
DataStream<Tuple2<String, DatabaseRecord>> enrichedOrdered =
AsyncDataStream.orderedWait(
input,
new AsyncDatabaseRequest(),
1000,
TimeUnit.MILLISECONDS,
100
);
Iterations
Flink supports iterative stream processing for machine learning and graph algorithms.Copy
// Define iteration
IterativeStream<Long> iteration = input.iterate();
// Define iteration body
DataStream<Long> iterationBody = iteration
.map(new StepFunction())
.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
// Close iteration (feedback loop)
DataStream<Long> output = iteration.closeWith(iterationBody);
// Example: Fibonacci sequence
DataStream<Tuple2<Long, Long>> fibonacci = env
.fromElements(new Tuple2<>(0L, 1L))
.iterate(5000); // Max 5000 iterations
DataStream<Tuple2<Long, Long>> step = fibonacci.map(
value -> new Tuple2<>(value.f1, value.f0 + value.f1)
);
DataStream<Tuple2<Long, Long>> result = fibonacci.closeWith(step);
Real-World Example: Real-Time ETL Pipeline
Copy
public class RealTimeETLPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(60000); // Checkpoint every minute
// Source: Read from Kafka
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("raw-events")
.setGroupId("etl-pipeline")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> rawEvents = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
// Transform: Parse JSON
DataStream<Event> parsedEvents = rawEvents
.map(new JsonParser())
.name("JSON Parser");
// Filter: Remove invalid events
DataStream<Event> validEvents = parsedEvents
.filter(e -> e.isValid())
.name("Validation Filter");
// Enrich: Add user data
DataStream<EnrichedEvent> enrichedEvents = AsyncDataStream.unorderedWait(
validEvents,
new UserDataEnricher(),
1000,
TimeUnit.MILLISECONDS,
100
).name("User Enrichment");
// Split: Separate high-value events
OutputTag<EnrichedEvent> highValueTag =
new OutputTag<EnrichedEvent>("high-value"){};
SingleOutputStreamOperator<EnrichedEvent> processedEvents =
enrichedEvents.process(new ProcessFunction<EnrichedEvent, EnrichedEvent>() {
@Override
public void processElement(
EnrichedEvent event,
Context ctx,
Collector<EnrichedEvent> out) {
if (event.value >= 1000) {
ctx.output(highValueTag, event);
} else {
out.collect(event);
}
}
}).name("Event Classifier");
// Sink: Write to different destinations
// Regular events to S3
FileSink<EnrichedEvent> s3Sink = FileSink
.forRowFormat(
new Path("s3://bucket/events/"),
new EnrichedEventEncoder()
)
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.build()
)
.build();
processedEvents.sinkTo(s3Sink).name("S3 Sink");
// High-value events to database
processedEvents.getSideOutput(highValueTag)
.addSink(JdbcSink.sink(
"INSERT INTO high_value_events (id, value, timestamp) VALUES (?, ?, ?)",
(statement, event) -> {
statement.setString(1, event.id);
statement.setDouble(2, event.value);
statement.setTimestamp(3, new Timestamp(event.timestamp));
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/analytics")
.withDriverName("org.postgresql.Driver")
.build()
))
.name("Database Sink");
env.execute("Real-Time ETL Pipeline");
}
// Helper classes
static class JsonParser implements MapFunction<String, Event> {
private transient ObjectMapper mapper;
@Override
public Event map(String json) throws Exception {
if (mapper == null) {
mapper = new ObjectMapper();
}
return mapper.readValue(json, Event.class);
}
}
static class UserDataEnricher extends RichAsyncFunction<Event, EnrichedEvent> {
// Implementation similar to previous async example
@Override
public void asyncInvoke(Event event, ResultFuture<EnrichedEvent> resultFuture) {
// Async enrichment logic
}
}
}
Performance Optimization
Chaining Strategy
Copy
// Good: Let Flink chain operators
stream
.map(x -> x * 2)
.filter(x -> x > 0)
.map(x -> x.toString());
// Bad: Unnecessary disabling of chaining
stream
.map(x -> x * 2).disableChaining()
.filter(x -> x > 0).disableChaining()
.map(x -> x.toString());
Object Reuse
Copy
// Enable object reuse for better performance (careful with state!)
env.getConfig().enableObjectReuse();
Parallelism Tuning
Copy
// Set based on workload characteristics
// CPU-bound: parallelism = number of cores
// I/O-bound: parallelism can be higher
// For most operations
env.setParallelism(Runtime.getRuntime().availableProcessors());
// For I/O-heavy operations
stream.map(new IoHeavyMapper())
.setParallelism(Runtime.getRuntime().availableProcessors() * 2);
// For single-threaded coordination
stream.keyBy(x -> 0) // All to one key
.process(new CoordinatorFunction())
.setParallelism(1);
Network Buffer Tuning
Copy
# flink-conf.yaml
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
Best Practices
1. Use KeyBy Wisely
Copy
// Good: Balanced key distribution
stream.keyBy(transaction -> transaction.userId);
// Bad: Skewed distribution
stream.keyBy(transaction -> transaction.country); // If most users from one country
// Solution: Add salt to skewed keys
stream.keyBy(transaction -> {
if (isHotKey(transaction.country)) {
return transaction.country + "_" + (transaction.hashCode() % 10);
}
return transaction.country;
});
2. Avoid Expensive Operations in Transformations
Copy
// Bad: Creating objects in hot path
stream.map(x -> new HeavyObject(x));
// Good: Reuse objects or use primitives
stream.map(new RichMapFunction<Input, Output>() {
private transient HeavyObject reusable;
@Override
public void open(Configuration parameters) {
reusable = new HeavyObject();
}
@Override
public Output map(Input input) {
reusable.update(input);
return reusable.process();
}
});
3. Handle Errors Gracefully
Copy
stream.map(new MapFunction<String, Event>() {
@Override
public Event map(String value) throws Exception {
try {
return parseEvent(value);
} catch (Exception e) {
// Log and return null, or use side output for errors
LOG.error("Failed to parse event: " + value, e);
return null;
}
}
}).filter(Objects::nonNull);
// Better: Use side output for error handling
OutputTag<String> errorTag = new OutputTag<String>("errors"){};
SingleOutputStreamOperator<Event> events = stream.process(
new ProcessFunction<String, Event>() {
@Override
public void processElement(String value, Context ctx, Collector<Event> out) {
try {
out.collect(parseEvent(value));
} catch (Exception e) {
ctx.output(errorTag, value);
}
}
}
);
// Process errors separately
events.getSideOutput(errorTag).addSink(new ErrorSink());
4. Monitor Backpressure
Copy
// Check for backpressure in Flink UI
// If backpressure occurs:
// 1. Increase parallelism
// 2. Optimize expensive operations
// 3. Use async I/O for blocking operations
// 4. Add more resources
Exercises
Exercise 1: Word Count with State
Implement a streaming word count that maintains counts across the stream lifetime.Copy
// TODO: Implement StatefulWordCount
public class StatefulWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999);
// Your code here:
// 1. Split lines into words
// 2. Key by word
// 3. Use ValueState to maintain count
// 4. Print results
env.execute("Stateful Word Count");
}
}
Exercise 2: Stream Join
Join two streams (users and transactions) based on userId.Copy
// TODO: Implement stream join
// Stream 1: User updates (userId, name, country)
// Stream 2: Transactions (txnId, userId, amount)
// Output: Enriched transactions with user details
Exercise 3: Custom Source
Implement a custom source that generates simulated sensor data.Copy
// TODO: Implement SensorSource
// Generate readings every second
// Each reading: (sensorId, temperature, timestamp)
Summary
In this module, you learned:- DataStream API fundamentals and core concepts
- Sources and sinks for various data systems
- Basic and advanced transformations (map, flatMap, process)
- Rich functions and process functions for complex logic
- Parallel execution, partitioning, and operator chaining
- Connecting streams with union, connect, and broadcast
- Async I/O for efficient external data enrichment
- Real-world ETL pipeline implementation
- Performance optimization techniques
- Production best practices
Next Steps
Module 3: Event Time & Watermarks
Learn how to handle out-of-order events with event time processing