> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Declarative Stream Processing: Flink SQL & Table API Deep Dive

> Master SQL-based stream processing - from dynamic tables to temporal joins, window aggregations, and production patterns

# Declarative Stream Processing: Flink SQL & Table API

<Info>
  **Module Duration**: 6-7 hours
  **Focus**: Table API, SQL DDL/DML, dynamic tables, temporal joins, UDFs, MATCH\_RECOGNIZE
  **Prerequisites**: SQL knowledge, Flink DataStream API, streaming concepts
  **Hands-on Labs**: 15+ SQL streaming applications
</Info>

## Introduction: Why SQL for Streaming?

### The Paradigm Shift

**Traditional Stream Processing (DataStream API)**:

```java theme={null}
// Imperative: HOW to process data
stream
    .keyBy(event -> event.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new CountAggregateFunction())
    .print();
```

**SQL Stream Processing**:

```sql theme={null}
-- Declarative: WHAT you want
SELECT userId, COUNT(*) as event_count
FROM events
GROUP BY userId, TUMBLE(event_time, INTERVAL '5' MINUTES);
```

### Why This Matters

**Business Impact**:

* **Democratization**: Analysts can write streaming pipelines (not just engineers)
* **Productivity**: 10x faster development (SQL vs Java/Scala)
* **Portability**: Same SQL across batch and streaming
* **Optimization**: Query planner optimizes execution automatically

**Industry Adoption**:

* **Uber**: Migrated 60% of streaming jobs to Flink SQL
* **Netflix**: Keystone platform built on Flink SQL
* **Alibaba**: Largest Flink SQL deployment (10,000+ jobs)
* **LinkedIn**: Real-time metrics pipelines in SQL

***

## Part 1: The Relational Model for Streams

### Dynamic Tables: The Core Abstraction

From the paper "Apache Flink: Stream and Batch Processing in a Single Engine":

> "A dynamic table is a table that changes over time. Queries on dynamic tables yield dynamic tables."

**Key Insight**: Streams are **unbounded tables**, tables are **materialized streams**.

```
Stream → Table:
[{userId: "alice", amount: 100, ts: 10:00}]
[{userId: "bob",   amount: 50,  ts: 10:01}]
[{userId: "alice", amount: 200, ts: 10:02}]

           ↓ (append to table)

Dynamic Table:
+--------+--------+-------+
| userId | amount | ts    |
+--------+--------+-------+
| alice  | 100    | 10:00 |
| bob    | 50     | 10:01 |
| alice  | 200    | 10:02 |
+--------+--------+-------+
```

### Stream-Table Duality

```
┌─────────────────────────────────────┐
│       Stream ↔ Table                │
├─────────────────────────────────────┤
│ Append Stream    → Append Table     │
│ Retract Stream   → Update Table     │
│ Upsert Stream    → Upsert Table     │
└─────────────────────────────────────┘
```

**Example: Aggregation Creates Retractions**:

```sql theme={null}
SELECT userId, COUNT(*) as cnt FROM events GROUP BY userId;

-- Stream of changes:
+I[alice, 1]   -- Insert: alice has 1 event
+I[bob, 1]     -- Insert: bob has 1 event
-U[alice, 1]   -- Retract: alice NO LONGER has 1 event
+U[alice, 2]   -- Update: alice now has 2 events
```

**Change Types**:

* `+I`: Insert (new row)
* `-U`: Update retract (delete old version)
* `+U`: Update insert (insert new version)
* `-D`: Delete

***

## Part 2: Table API Fundamentals

### Creating Tables from Streams

```java theme={null}
import org.apache.flink.table.api.*;
import org.apache.flink.streaming.api.datastream.DataStream;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Method 1: From DataStream
DataStream<Event> eventStream = env.addSource(...);
Table eventTable = tableEnv.fromDataStream(
    eventStream,
    $("userId"),
    $("eventType"),
    $("timestamp").rowtime(),  // Event-time attribute
    $("proctime").proctime()   // Processing-time attribute
);

// Method 2: From DDL (recommended)
tableEnv.executeSql(
    "CREATE TABLE events (" +
    "  userId STRING," +
    "  eventType STRING," +
    "  amount DOUBLE," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);
```

### Basic Table Operations

```java theme={null}
// Select and filter
Table result = eventTable
    .select($("userId"), $("amount"))
    .where($("amount").isGreater(100));

// Aggregation
Table aggregated = eventTable
    .groupBy($("userId"))
    .select($("userId"), $("amount").sum().as("total"));

// Join
Table orders = tableEnv.from("orders");
Table enriched = eventTable
    .join(orders, $("userId").isEqual($("customerId")))
    .select($("userId"), $("eventType"), $("orderAmount"));

// Convert back to stream
DataStream<Row> resultStream = tableEnv.toChangelogStream(result);
```

***

## Part 3: Flink SQL - The Power of Declarative Processing

### DDL: Creating Tables and Connectors

#### Kafka Source Table

```sql theme={null}
CREATE TABLE kafka_events (
    user_id STRING,
    event_type STRING,
    amount DOUBLE,
    event_time TIMESTAMP(3) METADATA FROM 'timestamp',  -- Kafka timestamp
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-sql-consumer',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);
```

#### JDBC Sink Table (MySQL)

```sql theme={null}
CREATE TABLE mysql_results (
    user_id STRING,
    total_amount DOUBLE,
    event_count BIGINT,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/analytics',
    'table-name' = 'user_aggregates',
    'username' = 'flink',
    'password' = 'password',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s'
);
```

#### Upsert Kafka Sink (Changelog Stream)

```sql theme={null}
CREATE TABLE kafka_upsert_sink (
    user_id STRING,
    total_purchases BIGINT,
    last_update_time TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user-stats',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);
```

#### Filesystem Sink (Parquet)

```sql theme={null}
CREATE TABLE filesystem_sink (
    user_id STRING,
    event_date DATE,
    event_count BIGINT,
    total_amount DOUBLE
) PARTITIONED BY (event_date) WITH (
    'connector' = 'filesystem',
    'path' = 's3://my-bucket/analytics/user-events/',
    'format' = 'parquet',
    'sink.partition-commit.delay' = '1 h',
    'sink.partition-commit.policy.kind' = 'success-file'
);
```

### DML: Querying Streams

#### Simple Filtering and Projection

```sql theme={null}
-- Filter high-value events
INSERT INTO high_value_events
SELECT user_id, event_type, amount, event_time
FROM kafka_events
WHERE amount > 1000;
```

#### Aggregations

```sql theme={null}
-- Total amount per user (running aggregation)
SELECT
    user_id,
    COUNT(*) as event_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount,
    MAX(amount) as max_amount
FROM kafka_events
GROUP BY user_id;
```

***

## Part 4: Windowed Aggregations (The Heart of Streaming SQL)

### Tumbling Windows

**Fixed-size, non-overlapping windows**.

```sql theme={null}
-- Events per 5-minute tumbling window
SELECT
    user_id,
    TUMBLE_START(event_time, INTERVAL '5' MINUTES) as window_start,
    TUMBLE_END(event_time, INTERVAL '5' MINUTES) as window_end,
    COUNT(*) as event_count,
    SUM(amount) as total_amount
FROM kafka_events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '5' MINUTES);
```

**Output**:

```
+--------+--------------+------------+-------------+--------------+
| user_id| window_start | window_end | event_count | total_amount |
+--------+--------------+------------+-------------+--------------+
| alice  | 10:00:00     | 10:05:00   | 15          | 1500.00      |
| alice  | 10:05:00     | 10:10:00   | 22          | 2200.00      |
| bob    | 10:00:00     | 10:05:00   | 8           | 800.00       |
+--------+--------------+------------+-------------+--------------+
```

### Sliding Windows (Hopping)

**Fixed-size, overlapping windows**.

```sql theme={null}
-- 10-minute window, sliding every 5 minutes
SELECT
    user_id,
    HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) as window_start,
    HOP_END(event_time, INTERVAL '10' MINUTES, INTERVAL '10' MINUTES) as window_end,
    COUNT(*) as event_count
FROM kafka_events
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES);
```

**Visualization**:

```
Timeline: |----[Window 1: 10:00-10:10]-----|
          |         |----[Window 2: 10:05-10:15]-----|
          |                  |----[Window 3: 10:10-10:20]-----|
```

### Session Windows

**Variable-size windows based on inactivity gap**.

```sql theme={null}
-- Session window with 30-minute inactivity gap
SELECT
    user_id,
    SESSION_START(event_time, INTERVAL '30' MINUTES) as session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTES) as session_end,
    COUNT(*) as events_in_session,
    SUM(amount) as session_revenue
FROM kafka_events
GROUP BY
    user_id,
    SESSION(event_time, INTERVAL '30' MINUTES);
```

**Example**:

```
User "alice" events:
10:00, 10:05, 10:15 → Session 1 (10:00 - 10:45)
11:30, 11:35        → Session 2 (11:30 - 12:05)
```

### Cumulative Windows (Over Windows)

**Running aggregations with bounded look-back**.

```sql theme={null}
-- Running total over last 1 hour
SELECT
    user_id,
    event_time,
    amount,
    SUM(amount) OVER (
        PARTITION BY user_id
        ORDER BY event_time
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    ) as rolling_1h_total
FROM kafka_events;
```

***

## Part 5: Joins in Streaming SQL

### Regular Join (Cross-Product)

```sql theme={null}
-- Enrich events with user metadata (dimension table)
SELECT
    e.user_id,
    e.event_type,
    e.amount,
    u.country,
    u.premium_tier
FROM kafka_events e
JOIN user_metadata u ON e.user_id = u.user_id;
```

**Warning**: Both sides must be bounded (e.g., windowed) or this creates unbounded state!

### Interval Join (Time-Bounded)

```sql theme={null}
-- Match payment events with orders within 1 hour
SELECT
    o.order_id,
    o.order_time,
    p.payment_id,
    p.payment_time,
    p.amount
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL '1' HOUR;
```

**Flink optimizes this**: Only keeps 1 hour of state per key!

### Temporal Join (Versioned Tables)

**Problem**: Dimension tables change over time. Which version to use?

**Solution**: Temporal join with versioned table.

```sql theme={null}
-- Create versioned dimension table
CREATE TABLE product_catalog (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DOUBLE,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECONDS,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'product-updates',
    'format' = 'json'
);

-- Temporal join: Use product info as of event time
SELECT
    e.event_time,
    e.product_id,
    e.quantity,
    p.product_name,
    p.price,
    e.quantity * p.price as line_total
FROM purchase_events e
LEFT JOIN product_catalog FOR SYSTEM_TIME AS OF e.event_time AS p
ON e.product_id = p.product_id;
```

**Key Benefit**: Correct historical lookups even if product price changed!

### Lookup Join (External Database)

```sql theme={null}
-- Create JDBC lookup table
CREATE TABLE user_profile (
    user_id STRING,
    full_name STRING,
    email STRING,
    country STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/users',
    'table-name' = 'user_profiles',
    'lookup.cache.max-rows' = '10000',
    'lookup.cache.ttl' = '10min'
);

-- Lookup join with caching
SELECT
    e.user_id,
    u.full_name,
    u.country,
    e.event_type,
    e.amount
FROM kafka_events e
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF e.event_time AS u
ON e.user_id = u.user_id;
```

**Performance**: Flink caches lookups to minimize database queries.

***

## Part 6: Advanced SQL Features

### Deduplication

```sql theme={null}
-- Keep only first event per user per day
SELECT user_id, event_type, amount, event_time
FROM (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY user_id, DATE_FORMAT(event_time, 'yyyy-MM-dd')
            ORDER BY event_time ASC
        ) as row_num
    FROM kafka_events
)
WHERE row_num = 1;
```

### Top-N per Key

```sql theme={null}
-- Top 3 purchases per user
SELECT user_id, product_id, amount, purchase_time
FROM (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY user_id
            ORDER BY amount DESC
        ) as rank
    FROM purchases
)
WHERE rank <= 3;
```

### MATCH\_RECOGNIZE (Pattern Detection in SQL!)

**Use Case**: Detect fraud patterns (3 declining transactions followed by 1 success).

```sql theme={null}
SELECT *
FROM transactions
MATCH_RECOGNIZE (
    PARTITION BY card_id
    ORDER BY transaction_time
    MEASURES
        FIRST(A.transaction_time) as pattern_start,
        LAST(D.transaction_time) as pattern_end,
        COUNT(A.*) as decline_count
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (A+ B)
    DEFINE
        A AS A.status = 'DECLINED',
        B AS B.status = 'APPROVED' AND B.amount > 500
) AS fraud_patterns;
```

**Explanation**:

* `A+`: One or more declined transactions
* `B`: Followed by an approved high-value transaction
* **Output**: Only matched patterns (potential fraud!)

**More Complex Pattern**:

```sql theme={null}
-- Detect: Small purchase, then 3 large purchases within 1 hour
PATTERN (SMALL_TXN (LARGE_TXN){3})
DEFINE
    SMALL_TXN AS SMALL_TXN.amount < 10,
    LARGE_TXN AS LARGE_TXN.amount > 500
        AND LARGE_TXN.transaction_time <= SMALL_TXN.transaction_time + INTERVAL '1' HOUR;
```

### User-Defined Functions (UDFs)

#### Scalar UDF

```java theme={null}
// Define UDF
public class HashFunction extends ScalarFunction {
    public String eval(String input) {
        return DigestUtils.sha256Hex(input);
    }
}

// Register UDF
tableEnv.createTemporarySystemFunction("hash", HashFunction.class);

// Use in SQL
tableEnv.executeSql(
    "SELECT user_id, hash(user_id) as user_id_hash FROM events"
);
```

#### Table Function (UDTF)

```java theme={null}
// Split comma-separated tags into rows
@FunctionHint(output = @DataTypeHint("ROW<tag STRING>"))
public class SplitFunction extends TableFunction<Row> {
    public void eval(String tags) {
        for (String tag : tags.split(",")) {
            collect(Row.of(tag.trim()));
        }
    }
}

// Register
tableEnv.createTemporarySystemFunction("split_tags", SplitFunction.class);

// Use with LATERAL TABLE
tableEnv.executeSql(
    "SELECT event_id, tag " +
    "FROM events, LATERAL TABLE(split_tags(tags)) AS T(tag)"
);
```

#### Aggregate Function (UDAF)

```java theme={null}
// Weighted average
public class WeightedAvgAccumulator {
    public double sum = 0;
    public double weight = 0;
}

public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }

    public void accumulate(WeightedAvgAccumulator acc, double value, double weight) {
        acc.sum += value * weight;
        acc.weight += weight;
    }

    @Override
    public Double getValue(WeightedAvgAccumulator acc) {
        return acc.weight == 0 ? null : acc.sum / acc.weight;
    }
}

// Use
tableEnv.executeSql(
    "SELECT user_id, weighted_avg(score, weight) FROM events GROUP BY user_id"
);
```

***

## Part 7: Real-World Use Cases

### Use Case 1: Real-Time Dashboard (Tumbling Windows)

```sql theme={null}
-- Minutely metrics for dashboard
CREATE VIEW dashboard_metrics AS
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as metric_time,
    COUNT(*) as total_events,
    COUNT(DISTINCT user_id) as unique_users,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_order_value,
    COUNT(CASE WHEN event_type = 'error' THEN 1 END) as error_count
FROM kafka_events
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

INSERT INTO kafka_dashboard_sink SELECT * FROM dashboard_metrics;
```

### Use Case 2: Fraud Detection (Stateful Aggregation)

```sql theme={null}
-- Alert if user makes >5 transactions totaling >$10,000 in 1 hour
SELECT
    user_id,
    HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR) as window_start,
    COUNT(*) as txn_count,
    SUM(amount) as total_amount
FROM transactions
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
HAVING COUNT(*) > 5 AND SUM(amount) > 10000;
```

### Use Case 3: Sessionization (Session Windows)

```sql theme={null}
-- Compute session metrics
CREATE VIEW user_sessions AS
SELECT
    user_id,
    SESSION_START(event_time, INTERVAL '30' MINUTES) as session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTES) as session_end,
    COUNT(*) as page_views,
    COUNT(DISTINCT page_url) as unique_pages,
    SUM(CASE WHEN event_type = 'purchase' THEN amount ELSE 0 END) as session_revenue,
    MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as converted
FROM clickstream
GROUP BY
    user_id,
    SESSION(event_time, INTERVAL '30' MINUTES);
```

### Use Case 4: Feature Engineering for ML

```sql theme={null}
-- Real-time feature computation for fraud model
CREATE VIEW user_features AS
SELECT
    user_id,
    -- Velocity features
    COUNT(*) as txns_last_1h,
    SUM(amount) as spend_last_1h,
    -- Behavioral features
    COUNT(DISTINCT merchant_id) as unique_merchants_1h,
    AVG(amount) as avg_txn_amount_1h,
    STDDEV_POP(amount) as stddev_amount_1h,
    -- Pattern features
    MAX(amount) as max_amount_1h,
    SUM(CASE WHEN declined = true THEN 1 ELSE 0 END) as declines_last_1h
FROM transactions
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR);
```

***

## Part 8: Performance Optimization

### State Management in SQL

#### Problem: Unbounded State

```sql theme={null}
-- DANGEROUS: Unbounded state (keeps all users forever!)
SELECT user_id, COUNT(*) FROM events GROUP BY user_id;
```

#### Solution 1: Table State TTL

```sql theme={null}
-- Set state TTL via table config
CREATE TABLE events (
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'events',
    'format' = 'json'
);

-- Configure state TTL (via job config)
SET 'table.exec.state.ttl' = '7d';  -- Expire state after 7 days

SELECT user_id, COUNT(*) FROM events GROUP BY user_id;
```

#### Solution 2: Windowed Aggregation

```sql theme={null}
-- Bounded state: Only keeps 1 hour per window
SELECT
    user_id,
    COUNT(*)
FROM events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '1' HOUR);
```

### Mini-Batch Optimization

```sql theme={null}
-- Enable mini-batch aggregation for higher throughput
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';

-- Now aggregations buffer up to 5s or 5000 records before emitting
SELECT user_id, COUNT(*) FROM events GROUP BY user_id;
```

**Trade-off**: Higher throughput, slightly higher latency (5s max).

### Local-Global Aggregation

```sql theme={null}
-- Enable local-global optimization for skewed keys
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';

-- Flink now does:
-- 1. Local aggregation (reduce data sent over network)
-- 2. Global aggregation (final result)
```

**Benefit**: 2-5x faster for aggregations with skewed keys.

***

## Part 9: Connectors Deep Dive

### Kafka Connector Options

```sql theme={null}
CREATE TABLE kafka_table (
    user_id STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'events',
    'properties.bootstrap.servers' = 'kafka:9092',

    -- Consumer options
    'scan.startup.mode' = 'earliest-offset',  -- or 'latest-offset', 'group-offsets', 'timestamp'
    'scan.startup.timestamp-millis' = '1640995200000',  -- If using timestamp mode
    'properties.group.id' = 'flink-sql-group',

    -- Format
    'format' = 'json',  -- or 'avro', 'csv', 'debezium-json', 'canal-json'
    'json.timestamp-format.standard' = 'ISO-8601',

    -- Performance
    'scan.parallelism' = '4',  -- Override source parallelism

    -- Metadata
    'value.fields-include' = 'ALL',  -- Include metadata fields

    -- Kafka-specific
    'properties.max.poll.records' = '500',
    'properties.fetch.min.bytes' = '1048576'  -- 1 MB
);
```

### Filesystem Connector (S3/HDFS)

```sql theme={null}
CREATE TABLE s3_parquet_sink (
    user_id STRING,
    event_date DATE,
    event_count BIGINT,
    total_amount DOUBLE
) PARTITIONED BY (event_date) WITH (
    'connector' = 'filesystem',
    'path' = 's3a://my-bucket/events/',
    'format' = 'parquet',

    -- Partitioning
    'partition.time-extractor.timestamp-pattern' = '$dt $hm',
    'sink.partition-commit.trigger' = 'partition-time',
    'sink.partition-commit.delay' = '1 h',
    'sink.partition-commit.policy.kind' = 'success-file,metastore',

    -- S3-specific
    'sink.rolling-policy.file-size' = '128MB',
    'sink.rolling-policy.rollover-interval' = '30min',

    -- Compression
    'parquet.compression' = 'snappy'
);
```

### JDBC Connector

```sql theme={null}
CREATE TABLE mysql_sink (
    user_id STRING,
    total_amount DOUBLE,
    update_time TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/analytics',
    'table-name' = 'user_stats',
    'username' = 'flink',
    'password' = 'secret',

    -- Performance
    'sink.buffer-flush.max-rows' = '1000',  -- Batch size
    'sink.buffer-flush.interval' = '2s',    -- Max wait time
    'sink.max-retries' = '3',

    -- Connection pool
    'connection.max-retry-timeout' = '30s',

    -- Upsert mode (requires PRIMARY KEY)
    'sink.parallelism' = '4'
);
```

***

## Part 10: Production Patterns

### Pattern 1: End-to-End Pipeline

```sql theme={null}
-- Source: Kafka events
CREATE TABLE raw_events (
    user_id STRING,
    event_type STRING,
    product_id STRING,
    amount DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'raw-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- Enrichment: Product catalog
CREATE TABLE product_catalog (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DOUBLE,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres:5432/catalog',
    'table-name' = 'products'
);

-- Transformation: Enrich and aggregate
CREATE VIEW enriched_metrics AS
SELECT
    e.user_id,
    p.category,
    TUMBLE_START(e.event_time, INTERVAL '5' MINUTES) as window_start,
    COUNT(*) as event_count,
    SUM(e.amount) as total_amount,
    COUNT(DISTINCT e.product_id) as unique_products
FROM raw_events e
LEFT JOIN product_catalog FOR SYSTEM_TIME AS OF e.event_time AS p
    ON e.product_id = p.product_id
GROUP BY
    e.user_id,
    p.category,
    TUMBLE(e.event_time, INTERVAL '5' MINUTES);

-- Sink: Back to Kafka for downstream consumers
CREATE TABLE kafka_metrics_sink (
    user_id STRING,
    category STRING,
    window_start TIMESTAMP(3),
    event_count BIGINT,
    total_amount DOUBLE,
    unique_products BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-metrics',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- Execute pipeline
INSERT INTO kafka_metrics_sink SELECT * FROM enriched_metrics;
```

### Pattern 2: Multi-Sink Fanout

```sql theme={null}
-- One source, multiple sinks
CREATE VIEW processed_events AS
SELECT
    user_id,
    event_type,
    amount,
    event_time,
    DATE_FORMAT(event_time, 'yyyy-MM-dd') as event_date
FROM raw_events
WHERE amount > 0;

-- Sink 1: Real-time to Kafka
INSERT INTO kafka_realtime_sink
SELECT user_id, event_type, amount, event_time
FROM processed_events;

-- Sink 2: Batch to S3 (partitioned by date)
INSERT INTO s3_archive_sink
SELECT user_id, event_type, amount, event_date
FROM processed_events;

-- Sink 3: Aggregates to MySQL
INSERT INTO mysql_dashboard_sink
SELECT
    user_id,
    COUNT(*) as total_events,
    SUM(amount) as total_amount
FROM processed_events
GROUP BY user_id;
```

### Pattern 3: Late Data Handling

```sql theme={null}
-- Side output for late events
CREATE TABLE late_events_sink (
    user_id STRING,
    event_time TIMESTAMP(3),
    arrival_time TIMESTAMP(3),
    lateness_seconds BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'late-events',
    'format' = 'json'
);

-- Main pipeline with allowed lateness
CREATE VIEW windowed_metrics AS
SELECT
    user_id,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(*) as event_count
FROM raw_events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '1' MINUTE);

-- Track late data
INSERT INTO late_events_sink
SELECT
    user_id,
    event_time,
    PROCTIME() as arrival_time,
    TIMESTAMPDIFF(SECOND, event_time, PROCTIME()) as lateness_seconds
FROM raw_events
WHERE event_time < CURRENT_WATERMARK(event_time);
```

***

## Part 11: Debugging & Monitoring

### Query Execution Plan

```java theme={null}
String query = "SELECT user_id, COUNT(*) FROM events GROUP BY user_id";
String plan = tableEnv.explainSql(query);
System.out.println(plan);
```

**Output**:

```
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+- LogicalTableScan(table=[[default_catalog, default_database, events]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[user_id], select=[user_id, COUNT(*) AS EXPR$1])
+- Exchange(distribution=[hash[user_id]])
   +- TableSourceScan(table=[[default_catalog, default_database, events]], fields=[user_id, ...])
```

### Logging for Debugging

```java theme={null}
// Enable query logging
tableEnv.getConfig().getConfiguration().setString("table.exec.logging.enabled", "true");

// Execute with verbose logging
TableResult result = tableEnv.executeSql("SELECT * FROM events WHERE amount > 100");
result.print();
```

### Common SQL Pitfalls

#### Pitfall 1: Unbounded Distinct

```sql theme={null}
-- ❌ BAD: Keeps all user IDs in state forever!
SELECT COUNT(DISTINCT user_id) FROM events;

-- ✅ GOOD: Window distinct counts
SELECT
    TUMBLE_START(event_time, INTERVAL '1' HOUR) as hour,
    COUNT(DISTINCT user_id) as unique_users
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
```

#### Pitfall 2: Cartesian Joins

```sql theme={null}
-- ❌ BAD: Unbounded state (keeps entire right table!)
SELECT * FROM events e, users u WHERE e.user_id = u.user_id;

-- ✅ GOOD: Use proper join syntax with time bounds
SELECT * FROM events e
JOIN users FOR SYSTEM_TIME AS OF e.event_time AS u
ON e.user_id = u.user_id;
```

***

## Part 12: Exercises

### Exercise 1: Real-Time Analytics Dashboard

**Task**: Build a minutely dashboard showing:

* Total events
* Unique users
* Revenue
* Top 5 products by sales

```sql theme={null}
-- Your solution here
CREATE VIEW dashboard AS
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as minute,
    COUNT(*) as total_events,
    COUNT(DISTINCT user_id) as unique_users,
    SUM(amount) as revenue,
    -- Use ARRAY_AGG for top products
    ARRAY_AGG(product_id ORDER BY amount DESC LIMIT 5) as top_products
FROM purchase_events
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);
```

### Exercise 2: Fraud Detection

**Task**: Detect users with >3 declining transactions within 10 minutes.

```sql theme={null}
-- Your solution here
```

### Exercise 3: Sessionization

**Task**: Compute session duration and page views per session (30-min inactivity).

```sql theme={null}
-- Your solution here
```

***

## Summary

### What You've Mastered

✅ Dynamic tables and stream-table duality
✅ Table API and SQL DDL/DML
✅ Windowed aggregations (tumbling, sliding, session)
✅ Joins (regular, interval, temporal, lookup)
✅ Advanced features (MATCH\_RECOGNIZE, UDFs, Top-N)
✅ Connectors (Kafka, JDBC, filesystem)
✅ Performance optimization (mini-batch, state TTL)
✅ Production patterns (multi-sink, late data handling)

### Key Takeaways

1. **SQL Democratizes Streaming**: Analysts can build real-time pipelines
2. **Windowing is Essential**: Use tumbling/sliding/session windows to bound state
3. **Temporal Joins are Powerful**: Correct historical lookups with versioned tables
4. **State Management Matters**: Always consider state size and TTL
5. **Connectors are Production-Ready**: Kafka, JDBC, S3 all have mature connectors

***

## Next Module

<Card title="Module 7: Complex Event Processing (CEP)" icon="sitemap" href="/distributed-systems-tools/flink-cep">
  Pattern detection and sequencing with Flink CEP
</Card>

***

## Resources

### Documentation

* [Flink SQL](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/)
* [Table API](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/tableapi/)
* [Connectors](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/overview/)

### Papers

* ["Apache Flink: Stream and Batch Processing in a Single Engine"](https://arxiv.org/abs/1506.00556)

<Info>
  **Practice**: Build a complete real-time analytics pipeline: Kafka → Flink SQL (enrich, aggregate) → Multiple sinks (Kafka, MySQL, S3). Monitor state growth and optimize for production!
</Info>
