Skip to main content

Declarative Stream Processing: Flink SQL & Table API

Module Duration: 6-7 hours Focus: Table API, SQL DDL/DML, dynamic tables, temporal joins, UDFs, MATCH_RECOGNIZE Prerequisites: SQL knowledge, Flink DataStream API, streaming concepts Hands-on Labs: 15+ SQL streaming applications

Introduction: Why SQL for Streaming?

The Paradigm Shift

Traditional Stream Processing (DataStream API):
// Imperative: HOW to process data
stream
    .keyBy(event -> event.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new CountAggregateFunction())
    .print();
SQL Stream Processing:
-- Declarative: WHAT you want
SELECT userId, COUNT(*) as event_count
FROM events
GROUP BY userId, TUMBLE(event_time, INTERVAL '5' MINUTES);

Why This Matters

Business Impact:
  • Democratization: Analysts can write streaming pipelines (not just engineers)
  • Productivity: 10x faster development (SQL vs Java/Scala)
  • Portability: Same SQL across batch and streaming
  • Optimization: Query planner optimizes execution automatically
Industry Adoption:
  • Uber: Migrated 60% of streaming jobs to Flink SQL
  • Netflix: Keystone platform built on Flink SQL
  • Alibaba: Largest Flink SQL deployment (10,000+ jobs)
  • LinkedIn: Real-time metrics pipelines in SQL

Part 1: The Relational Model for Streams

Dynamic Tables: The Core Abstraction

From the paper “Apache Flink: Stream and Batch Processing in a Single Engine”:
“A dynamic table is a table that changes over time. Queries on dynamic tables yield dynamic tables.”
Key Insight: Streams are unbounded tables, tables are materialized streams.
Stream → Table:
[{userId: "alice", amount: 100, ts: 10:00}]
[{userId: "bob",   amount: 50,  ts: 10:01}]
[{userId: "alice", amount: 200, ts: 10:02}]

           ↓ (append to table)

Dynamic Table:
+--------+--------+-------+
| userId | amount | ts    |
+--------+--------+-------+
| alice  | 100    | 10:00 |
| bob    | 50     | 10:01 |
| alice  | 200    | 10:02 |
+--------+--------+-------+

Stream-Table Duality

┌─────────────────────────────────────┐
│       Stream ↔ Table                │
├─────────────────────────────────────┤
│ Append Stream    → Append Table     │
│ Retract Stream   → Update Table     │
│ Upsert Stream    → Upsert Table     │
└─────────────────────────────────────┘
Example: Aggregation Creates Retractions:
SELECT userId, COUNT(*) as cnt FROM events GROUP BY userId;

-- Stream of changes:
+I[alice, 1]   -- Insert: alice has 1 event
+I[bob, 1]     -- Insert: bob has 1 event
-U[alice, 1]   -- Retract: alice NO LONGER has 1 event
+U[alice, 2]   -- Update: alice now has 2 events
Change Types:
  • +I: Insert (new row)
  • -U: Update retract (delete old version)
  • +U: Update insert (insert new version)
  • -D: Delete

Part 2: Table API Fundamentals

Creating Tables from Streams

import org.apache.flink.table.api.*;
import org.apache.flink.streaming.api.datastream.DataStream;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Method 1: From DataStream
DataStream<Event> eventStream = env.addSource(...);
Table eventTable = tableEnv.fromDataStream(
    eventStream,
    $("userId"),
    $("eventType"),
    $("timestamp").rowtime(),  // Event-time attribute
    $("proctime").proctime()   // Processing-time attribute
);

// Method 2: From DDL (recommended)
tableEnv.executeSql(
    "CREATE TABLE events (" +
    "  userId STRING," +
    "  eventType STRING," +
    "  amount DOUBLE," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

Basic Table Operations

// Select and filter
Table result = eventTable
    .select($("userId"), $("amount"))
    .where($("amount").isGreater(100));

// Aggregation
Table aggregated = eventTable
    .groupBy($("userId"))
    .select($("userId"), $("amount").sum().as("total"));

// Join
Table orders = tableEnv.from("orders");
Table enriched = eventTable
    .join(orders, $("userId").isEqual($("customerId")))
    .select($("userId"), $("eventType"), $("orderAmount"));

// Convert back to stream
DataStream<Row> resultStream = tableEnv.toChangelogStream(result);

DDL: Creating Tables and Connectors

Kafka Source Table

CREATE TABLE kafka_events (
    user_id STRING,
    event_type STRING,
    amount DOUBLE,
    event_time TIMESTAMP(3) METADATA FROM 'timestamp',  -- Kafka timestamp
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-sql-consumer',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

JDBC Sink Table (MySQL)

CREATE TABLE mysql_results (
    user_id STRING,
    total_amount DOUBLE,
    event_count BIGINT,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/analytics',
    'table-name' = 'user_aggregates',
    'username' = 'flink',
    'password' = 'password',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s'
);

Upsert Kafka Sink (Changelog Stream)

CREATE TABLE kafka_upsert_sink (
    user_id STRING,
    total_purchases BIGINT,
    last_update_time TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user-stats',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);

Filesystem Sink (Parquet)

CREATE TABLE filesystem_sink (
    user_id STRING,
    event_date DATE,
    event_count BIGINT,
    total_amount DOUBLE
) PARTITIONED BY (event_date) WITH (
    'connector' = 'filesystem',
    'path' = 's3://my-bucket/analytics/user-events/',
    'format' = 'parquet',
    'sink.partition-commit.delay' = '1 h',
    'sink.partition-commit.policy.kind' = 'success-file'
);

DML: Querying Streams

Simple Filtering and Projection

-- Filter high-value events
INSERT INTO high_value_events
SELECT user_id, event_type, amount, event_time
FROM kafka_events
WHERE amount > 1000;

Aggregations

-- Total amount per user (running aggregation)
SELECT
    user_id,
    COUNT(*) as event_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount,
    MAX(amount) as max_amount
FROM kafka_events
GROUP BY user_id;

Part 4: Windowed Aggregations (The Heart of Streaming SQL)

Tumbling Windows

Fixed-size, non-overlapping windows.
-- Events per 5-minute tumbling window
SELECT
    user_id,
    TUMBLE_START(event_time, INTERVAL '5' MINUTES) as window_start,
    TUMBLE_END(event_time, INTERVAL '5' MINUTES) as window_end,
    COUNT(*) as event_count,
    SUM(amount) as total_amount
FROM kafka_events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '5' MINUTES);
Output:
+--------+--------------+------------+-------------+--------------+
| user_id| window_start | window_end | event_count | total_amount |
+--------+--------------+------------+-------------+--------------+
| alice  | 10:00:00     | 10:05:00   | 15          | 1500.00      |
| alice  | 10:05:00     | 10:10:00   | 22          | 2200.00      |
| bob    | 10:00:00     | 10:05:00   | 8           | 800.00       |
+--------+--------------+------------+-------------+--------------+

Sliding Windows (Hopping)

Fixed-size, overlapping windows.
-- 10-minute window, sliding every 5 minutes
SELECT
    user_id,
    HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) as window_start,
    HOP_END(event_time, INTERVAL '10' MINUTES, INTERVAL '10' MINUTES) as window_end,
    COUNT(*) as event_count
FROM kafka_events
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES);
Visualization:
Timeline: |----[Window 1: 10:00-10:10]-----|
          |         |----[Window 2: 10:05-10:15]-----|
          |                  |----[Window 3: 10:10-10:20]-----|

Session Windows

Variable-size windows based on inactivity gap.
-- Session window with 30-minute inactivity gap
SELECT
    user_id,
    SESSION_START(event_time, INTERVAL '30' MINUTES) as session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTES) as session_end,
    COUNT(*) as events_in_session,
    SUM(amount) as session_revenue
FROM kafka_events
GROUP BY
    user_id,
    SESSION(event_time, INTERVAL '30' MINUTES);
Example:
User "alice" events:
10:00, 10:05, 10:15 → Session 1 (10:00 - 10:45)
11:30, 11:35        → Session 2 (11:30 - 12:05)

Cumulative Windows (Over Windows)

Running aggregations with bounded look-back.
-- Running total over last 1 hour
SELECT
    user_id,
    event_time,
    amount,
    SUM(amount) OVER (
        PARTITION BY user_id
        ORDER BY event_time
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    ) as rolling_1h_total
FROM kafka_events;

Part 5: Joins in Streaming SQL

Regular Join (Cross-Product)

-- Enrich events with user metadata (dimension table)
SELECT
    e.user_id,
    e.event_type,
    e.amount,
    u.country,
    u.premium_tier
FROM kafka_events e
JOIN user_metadata u ON e.user_id = u.user_id;
Warning: Both sides must be bounded (e.g., windowed) or this creates unbounded state!

Interval Join (Time-Bounded)

-- Match payment events with orders within 1 hour
SELECT
    o.order_id,
    o.order_time,
    p.payment_id,
    p.payment_time,
    p.amount
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL '1' HOUR;
Flink optimizes this: Only keeps 1 hour of state per key!

Temporal Join (Versioned Tables)

Problem: Dimension tables change over time. Which version to use? Solution: Temporal join with versioned table.
-- Create versioned dimension table
CREATE TABLE product_catalog (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DOUBLE,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECONDS,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'product-updates',
    'format' = 'json'
);

-- Temporal join: Use product info as of event time
SELECT
    e.event_time,
    e.product_id,
    e.quantity,
    p.product_name,
    p.price,
    e.quantity * p.price as line_total
FROM purchase_events e
LEFT JOIN product_catalog FOR SYSTEM_TIME AS OF e.event_time AS p
ON e.product_id = p.product_id;
Key Benefit: Correct historical lookups even if product price changed!

Lookup Join (External Database)

-- Create JDBC lookup table
CREATE TABLE user_profile (
    user_id STRING,
    full_name STRING,
    email STRING,
    country STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/users',
    'table-name' = 'user_profiles',
    'lookup.cache.max-rows' = '10000',
    'lookup.cache.ttl' = '10min'
);

-- Lookup join with caching
SELECT
    e.user_id,
    u.full_name,
    u.country,
    e.event_type,
    e.amount
FROM kafka_events e
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF e.event_time AS u
ON e.user_id = u.user_id;
Performance: Flink caches lookups to minimize database queries.

Part 6: Advanced SQL Features

Deduplication

-- Keep only first event per user per day
SELECT user_id, event_type, amount, event_time
FROM (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY user_id, DATE_FORMAT(event_time, 'yyyy-MM-dd')
            ORDER BY event_time ASC
        ) as row_num
    FROM kafka_events
)
WHERE row_num = 1;

Top-N per Key

-- Top 3 purchases per user
SELECT user_id, product_id, amount, purchase_time
FROM (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY user_id
            ORDER BY amount DESC
        ) as rank
    FROM purchases
)
WHERE rank <= 3;

MATCH_RECOGNIZE (Pattern Detection in SQL!)

Use Case: Detect fraud patterns (3 declining transactions followed by 1 success).
SELECT *
FROM transactions
MATCH_RECOGNIZE (
    PARTITION BY card_id
    ORDER BY transaction_time
    MEASURES
        FIRST(A.transaction_time) as pattern_start,
        LAST(D.transaction_time) as pattern_end,
        COUNT(A.*) as decline_count
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (A+ B)
    DEFINE
        A AS A.status = 'DECLINED',
        B AS B.status = 'APPROVED' AND B.amount > 500
) AS fraud_patterns;
Explanation:
  • A+: One or more declined transactions
  • B: Followed by an approved high-value transaction
  • Output: Only matched patterns (potential fraud!)
More Complex Pattern:
-- Detect: Small purchase, then 3 large purchases within 1 hour
PATTERN (SMALL_TXN (LARGE_TXN){3})
DEFINE
    SMALL_TXN AS SMALL_TXN.amount < 10,
    LARGE_TXN AS LARGE_TXN.amount > 500
        AND LARGE_TXN.transaction_time <= SMALL_TXN.transaction_time + INTERVAL '1' HOUR;

User-Defined Functions (UDFs)

Scalar UDF

// Define UDF
public class HashFunction extends ScalarFunction {
    public String eval(String input) {
        return DigestUtils.sha256Hex(input);
    }
}

// Register UDF
tableEnv.createTemporarySystemFunction("hash", HashFunction.class);

// Use in SQL
tableEnv.executeSql(
    "SELECT user_id, hash(user_id) as user_id_hash FROM events"
);

Table Function (UDTF)

// Split comma-separated tags into rows
@FunctionHint(output = @DataTypeHint("ROW<tag STRING>"))
public class SplitFunction extends TableFunction<Row> {
    public void eval(String tags) {
        for (String tag : tags.split(",")) {
            collect(Row.of(tag.trim()));
        }
    }
}

// Register
tableEnv.createTemporarySystemFunction("split_tags", SplitFunction.class);

// Use with LATERAL TABLE
tableEnv.executeSql(
    "SELECT event_id, tag " +
    "FROM events, LATERAL TABLE(split_tags(tags)) AS T(tag)"
);

Aggregate Function (UDAF)

// Weighted average
public class WeightedAvgAccumulator {
    public double sum = 0;
    public double weight = 0;
}

public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }

    public void accumulate(WeightedAvgAccumulator acc, double value, double weight) {
        acc.sum += value * weight;
        acc.weight += weight;
    }

    @Override
    public Double getValue(WeightedAvgAccumulator acc) {
        return acc.weight == 0 ? null : acc.sum / acc.weight;
    }
}

// Use
tableEnv.executeSql(
    "SELECT user_id, weighted_avg(score, weight) FROM events GROUP BY user_id"
);

Part 7: Real-World Use Cases

Use Case 1: Real-Time Dashboard (Tumbling Windows)

-- Minutely metrics for dashboard
CREATE VIEW dashboard_metrics AS
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as metric_time,
    COUNT(*) as total_events,
    COUNT(DISTINCT user_id) as unique_users,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_order_value,
    COUNT(CASE WHEN event_type = 'error' THEN 1 END) as error_count
FROM kafka_events
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

INSERT INTO kafka_dashboard_sink SELECT * FROM dashboard_metrics;

Use Case 2: Fraud Detection (Stateful Aggregation)

-- Alert if user makes >5 transactions totaling >$10,000 in 1 hour
SELECT
    user_id,
    HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR) as window_start,
    COUNT(*) as txn_count,
    SUM(amount) as total_amount
FROM transactions
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
HAVING COUNT(*) > 5 AND SUM(amount) > 10000;

Use Case 3: Sessionization (Session Windows)

-- Compute session metrics
CREATE VIEW user_sessions AS
SELECT
    user_id,
    SESSION_START(event_time, INTERVAL '30' MINUTES) as session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTES) as session_end,
    COUNT(*) as page_views,
    COUNT(DISTINCT page_url) as unique_pages,
    SUM(CASE WHEN event_type = 'purchase' THEN amount ELSE 0 END) as session_revenue,
    MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as converted
FROM clickstream
GROUP BY
    user_id,
    SESSION(event_time, INTERVAL '30' MINUTES);

Use Case 4: Feature Engineering for ML

-- Real-time feature computation for fraud model
CREATE VIEW user_features AS
SELECT
    user_id,
    -- Velocity features
    COUNT(*) as txns_last_1h,
    SUM(amount) as spend_last_1h,
    -- Behavioral features
    COUNT(DISTINCT merchant_id) as unique_merchants_1h,
    AVG(amount) as avg_txn_amount_1h,
    STDDEV_POP(amount) as stddev_amount_1h,
    -- Pattern features
    MAX(amount) as max_amount_1h,
    SUM(CASE WHEN declined = true THEN 1 ELSE 0 END) as declines_last_1h
FROM transactions
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR);

Part 8: Performance Optimization

State Management in SQL

Problem: Unbounded State

-- DANGEROUS: Unbounded state (keeps all users forever!)
SELECT user_id, COUNT(*) FROM events GROUP BY user_id;

Solution 1: Table State TTL

-- Set state TTL via table config
CREATE TABLE events (
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'events',
    'format' = 'json'
);

-- Configure state TTL (via job config)
SET 'table.exec.state.ttl' = '7d';  -- Expire state after 7 days

SELECT user_id, COUNT(*) FROM events GROUP BY user_id;

Solution 2: Windowed Aggregation

-- Bounded state: Only keeps 1 hour per window
SELECT
    user_id,
    COUNT(*)
FROM events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '1' HOUR);

Mini-Batch Optimization

-- Enable mini-batch aggregation for higher throughput
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';

-- Now aggregations buffer up to 5s or 5000 records before emitting
SELECT user_id, COUNT(*) FROM events GROUP BY user_id;
Trade-off: Higher throughput, slightly higher latency (5s max).

Local-Global Aggregation

-- Enable local-global optimization for skewed keys
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';

-- Flink now does:
-- 1. Local aggregation (reduce data sent over network)
-- 2. Global aggregation (final result)
Benefit: 2-5x faster for aggregations with skewed keys.

Part 9: Connectors Deep Dive

Kafka Connector Options

CREATE TABLE kafka_table (
    user_id STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'events',
    'properties.bootstrap.servers' = 'kafka:9092',

    -- Consumer options
    'scan.startup.mode' = 'earliest-offset',  -- or 'latest-offset', 'group-offsets', 'timestamp'
    'scan.startup.timestamp-millis' = '1640995200000',  -- If using timestamp mode
    'properties.group.id' = 'flink-sql-group',

    -- Format
    'format' = 'json',  -- or 'avro', 'csv', 'debezium-json', 'canal-json'
    'json.timestamp-format.standard' = 'ISO-8601',

    -- Performance
    'scan.parallelism' = '4',  -- Override source parallelism

    -- Metadata
    'value.fields-include' = 'ALL',  -- Include metadata fields

    -- Kafka-specific
    'properties.max.poll.records' = '500',
    'properties.fetch.min.bytes' = '1048576'  -- 1 MB
);

Filesystem Connector (S3/HDFS)

CREATE TABLE s3_parquet_sink (
    user_id STRING,
    event_date DATE,
    event_count BIGINT,
    total_amount DOUBLE
) PARTITIONED BY (event_date) WITH (
    'connector' = 'filesystem',
    'path' = 's3a://my-bucket/events/',
    'format' = 'parquet',

    -- Partitioning
    'partition.time-extractor.timestamp-pattern' = '$dt $hm',
    'sink.partition-commit.trigger' = 'partition-time',
    'sink.partition-commit.delay' = '1 h',
    'sink.partition-commit.policy.kind' = 'success-file,metastore',

    -- S3-specific
    'sink.rolling-policy.file-size' = '128MB',
    'sink.rolling-policy.rollover-interval' = '30min',

    -- Compression
    'parquet.compression' = 'snappy'
);

JDBC Connector

CREATE TABLE mysql_sink (
    user_id STRING,
    total_amount DOUBLE,
    update_time TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/analytics',
    'table-name' = 'user_stats',
    'username' = 'flink',
    'password' = 'secret',

    -- Performance
    'sink.buffer-flush.max-rows' = '1000',  -- Batch size
    'sink.buffer-flush.interval' = '2s',    -- Max wait time
    'sink.max-retries' = '3',

    -- Connection pool
    'connection.max-retry-timeout' = '30s',

    -- Upsert mode (requires PRIMARY KEY)
    'sink.parallelism' = '4'
);

Part 10: Production Patterns

Pattern 1: End-to-End Pipeline

-- Source: Kafka events
CREATE TABLE raw_events (
    user_id STRING,
    event_type STRING,
    product_id STRING,
    amount DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'raw-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- Enrichment: Product catalog
CREATE TABLE product_catalog (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DOUBLE,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres:5432/catalog',
    'table-name' = 'products'
);

-- Transformation: Enrich and aggregate
CREATE VIEW enriched_metrics AS
SELECT
    e.user_id,
    p.category,
    TUMBLE_START(e.event_time, INTERVAL '5' MINUTES) as window_start,
    COUNT(*) as event_count,
    SUM(e.amount) as total_amount,
    COUNT(DISTINCT e.product_id) as unique_products
FROM raw_events e
LEFT JOIN product_catalog FOR SYSTEM_TIME AS OF e.event_time AS p
    ON e.product_id = p.product_id
GROUP BY
    e.user_id,
    p.category,
    TUMBLE(e.event_time, INTERVAL '5' MINUTES);

-- Sink: Back to Kafka for downstream consumers
CREATE TABLE kafka_metrics_sink (
    user_id STRING,
    category STRING,
    window_start TIMESTAMP(3),
    event_count BIGINT,
    total_amount DOUBLE,
    unique_products BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-metrics',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- Execute pipeline
INSERT INTO kafka_metrics_sink SELECT * FROM enriched_metrics;

Pattern 2: Multi-Sink Fanout

-- One source, multiple sinks
CREATE VIEW processed_events AS
SELECT
    user_id,
    event_type,
    amount,
    event_time,
    DATE_FORMAT(event_time, 'yyyy-MM-dd') as event_date
FROM raw_events
WHERE amount > 0;

-- Sink 1: Real-time to Kafka
INSERT INTO kafka_realtime_sink
SELECT user_id, event_type, amount, event_time
FROM processed_events;

-- Sink 2: Batch to S3 (partitioned by date)
INSERT INTO s3_archive_sink
SELECT user_id, event_type, amount, event_date
FROM processed_events;

-- Sink 3: Aggregates to MySQL
INSERT INTO mysql_dashboard_sink
SELECT
    user_id,
    COUNT(*) as total_events,
    SUM(amount) as total_amount
FROM processed_events
GROUP BY user_id;

Pattern 3: Late Data Handling

-- Side output for late events
CREATE TABLE late_events_sink (
    user_id STRING,
    event_time TIMESTAMP(3),
    arrival_time TIMESTAMP(3),
    lateness_seconds BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'late-events',
    'format' = 'json'
);

-- Main pipeline with allowed lateness
CREATE VIEW windowed_metrics AS
SELECT
    user_id,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(*) as event_count
FROM raw_events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '1' MINUTE);

-- Track late data
INSERT INTO late_events_sink
SELECT
    user_id,
    event_time,
    PROCTIME() as arrival_time,
    TIMESTAMPDIFF(SECOND, event_time, PROCTIME()) as lateness_seconds
FROM raw_events
WHERE event_time < CURRENT_WATERMARK(event_time);

Part 11: Debugging & Monitoring

Query Execution Plan

String query = "SELECT user_id, COUNT(*) FROM events GROUP BY user_id";
String plan = tableEnv.explainSql(query);
System.out.println(plan);
Output:
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+- LogicalTableScan(table=[[default_catalog, default_database, events]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[user_id], select=[user_id, COUNT(*) AS EXPR$1])
+- Exchange(distribution=[hash[user_id]])
   +- TableSourceScan(table=[[default_catalog, default_database, events]], fields=[user_id, ...])

Logging for Debugging

// Enable query logging
tableEnv.getConfig().getConfiguration().setString("table.exec.logging.enabled", "true");

// Execute with verbose logging
TableResult result = tableEnv.executeSql("SELECT * FROM events WHERE amount > 100");
result.print();

Common SQL Pitfalls

Pitfall 1: Unbounded Distinct

-- ❌ BAD: Keeps all user IDs in state forever!
SELECT COUNT(DISTINCT user_id) FROM events;

-- ✅ GOOD: Window distinct counts
SELECT
    TUMBLE_START(event_time, INTERVAL '1' HOUR) as hour,
    COUNT(DISTINCT user_id) as unique_users
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);

Pitfall 2: Cartesian Joins

-- ❌ BAD: Unbounded state (keeps entire right table!)
SELECT * FROM events e, users u WHERE e.user_id = u.user_id;

-- ✅ GOOD: Use proper join syntax with time bounds
SELECT * FROM events e
JOIN users FOR SYSTEM_TIME AS OF e.event_time AS u
ON e.user_id = u.user_id;

Part 12: Exercises

Exercise 1: Real-Time Analytics Dashboard

Task: Build a minutely dashboard showing:
  • Total events
  • Unique users
  • Revenue
  • Top 5 products by sales
-- Your solution here
CREATE VIEW dashboard AS
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as minute,
    COUNT(*) as total_events,
    COUNT(DISTINCT user_id) as unique_users,
    SUM(amount) as revenue,
    -- Use ARRAY_AGG for top products
    ARRAY_AGG(product_id ORDER BY amount DESC LIMIT 5) as top_products
FROM purchase_events
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

Exercise 2: Fraud Detection

Task: Detect users with >3 declining transactions within 10 minutes.
-- Your solution here

Exercise 3: Sessionization

Task: Compute session duration and page views per session (30-min inactivity).
-- Your solution here

Summary

What You’ve Mastered

✅ Dynamic tables and stream-table duality ✅ Table API and SQL DDL/DML ✅ Windowed aggregations (tumbling, sliding, session) ✅ Joins (regular, interval, temporal, lookup) ✅ Advanced features (MATCH_RECOGNIZE, UDFs, Top-N) ✅ Connectors (Kafka, JDBC, filesystem) ✅ Performance optimization (mini-batch, state TTL) ✅ Production patterns (multi-sink, late data handling)

Key Takeaways

  1. SQL Democratizes Streaming: Analysts can build real-time pipelines
  2. Windowing is Essential: Use tumbling/sliding/session windows to bound state
  3. Temporal Joins are Powerful: Correct historical lookups with versioned tables
  4. State Management Matters: Always consider state size and TTL
  5. Connectors are Production-Ready: Kafka, JDBC, S3 all have mature connectors

Next Module

Module 7: Complex Event Processing (CEP)

Pattern detection and sequencing with Flink CEP

Resources

Documentation

Papers

Practice: Build a complete real-time analytics pipeline: Kafka → Flink SQL (enrich, aggregate) → Multiple sinks (Kafka, MySQL, S3). Monitor state growth and optimize for production!