// Imperative: HOW to process datastream .keyBy(event -> event.userId) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .aggregate(new CountAggregateFunction()) .print();
SQL Stream Processing:
Copy
-- Declarative: WHAT you wantSELECT userId, COUNT(*) as event_countFROM eventsGROUP BY userId, TUMBLE(event_time, INTERVAL '5' MINUTES);
SELECT userId, COUNT(*) as cnt FROM events GROUP BY userId;-- Stream of changes:+I[alice, 1] -- Insert: alice has 1 event+I[bob, 1] -- Insert: bob has 1 event-U[alice, 1] -- Retract: alice NO LONGER has 1 event+U[alice, 2] -- Update: alice now has 2 events
-- Total amount per user (running aggregation)SELECT user_id, COUNT(*) as event_count, SUM(amount) as total_amount, AVG(amount) as avg_amount, MAX(amount) as max_amountFROM kafka_eventsGROUP BY user_id;
-- Running total over last 1 hourSELECT user_id, event_time, amount, SUM(amount) OVER ( PARTITION BY user_id ORDER BY event_time RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) as rolling_1h_totalFROM kafka_events;
-- Match payment events with orders within 1 hourSELECT o.order_id, o.order_time, p.payment_id, p.payment_time, p.amountFROM orders oJOIN payments p ON o.order_id = p.order_idWHERE p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL '1' HOUR;
Flink optimizes this: Only keeps 1 hour of state per key!
-- Keep only first event per user per daySELECT user_id, event_type, amount, event_timeFROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY user_id, DATE_FORMAT(event_time, 'yyyy-MM-dd') ORDER BY event_time ASC ) as row_num FROM kafka_events)WHERE row_num = 1;
-- Top 3 purchases per userSELECT user_id, product_id, amount, purchase_timeFROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY user_id ORDER BY amount DESC ) as rank FROM purchases)WHERE rank <= 3;
Use Case: Detect fraud patterns (3 declining transactions followed by 1 success).
Copy
SELECT *FROM transactionsMATCH_RECOGNIZE ( PARTITION BY card_id ORDER BY transaction_time MEASURES FIRST(A.transaction_time) as pattern_start, LAST(D.transaction_time) as pattern_end, COUNT(A.*) as decline_count ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A+ B) DEFINE A AS A.status = 'DECLINED', B AS B.status = 'APPROVED' AND B.amount > 500) AS fraud_patterns;
Explanation:
A+: One or more declined transactions
B: Followed by an approved high-value transaction
Output: Only matched patterns (potential fraud!)
More Complex Pattern:
Copy
-- Detect: Small purchase, then 3 large purchases within 1 hourPATTERN (SMALL_TXN (LARGE_TXN){3})DEFINE SMALL_TXN AS SMALL_TXN.amount < 10, LARGE_TXN AS LARGE_TXN.amount > 500 AND LARGE_TXN.transaction_time <= SMALL_TXN.transaction_time + INTERVAL '1' HOUR;
// Define UDFpublic class HashFunction extends ScalarFunction { public String eval(String input) { return DigestUtils.sha256Hex(input); }}// Register UDFtableEnv.createTemporarySystemFunction("hash", HashFunction.class);// Use in SQLtableEnv.executeSql( "SELECT user_id, hash(user_id) as user_id_hash FROM events");
// Split comma-separated tags into rows@FunctionHint(output = @DataTypeHint("ROW<tag STRING>"))public class SplitFunction extends TableFunction<Row> { public void eval(String tags) { for (String tag : tags.split(",")) { collect(Row.of(tag.trim())); } }}// RegistertableEnv.createTemporarySystemFunction("split_tags", SplitFunction.class);// Use with LATERAL TABLEtableEnv.executeSql( "SELECT event_id, tag " + "FROM events, LATERAL TABLE(split_tags(tags)) AS T(tag)");
Use Case 1: Real-Time Dashboard (Tumbling Windows)
Copy
-- Minutely metrics for dashboardCREATE VIEW dashboard_metrics ASSELECT TUMBLE_START(event_time, INTERVAL '1' MINUTE) as metric_time, COUNT(*) as total_events, COUNT(DISTINCT user_id) as unique_users, SUM(amount) as total_revenue, AVG(amount) as avg_order_value, COUNT(CASE WHEN event_type = 'error' THEN 1 END) as error_countFROM kafka_eventsGROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);INSERT INTO kafka_dashboard_sink SELECT * FROM dashboard_metrics;
Use Case 2: Fraud Detection (Stateful Aggregation)
Copy
-- Alert if user makes >5 transactions totaling >$10,000 in 1 hourSELECT user_id, HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR) as window_start, COUNT(*) as txn_count, SUM(amount) as total_amountFROM transactionsGROUP BY user_id, HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR)HAVING COUNT(*) > 5 AND SUM(amount) > 10000;
-- Real-time feature computation for fraud modelCREATE VIEW user_features ASSELECT user_id, -- Velocity features COUNT(*) as txns_last_1h, SUM(amount) as spend_last_1h, -- Behavioral features COUNT(DISTINCT merchant_id) as unique_merchants_1h, AVG(amount) as avg_txn_amount_1h, STDDEV_POP(amount) as stddev_amount_1h, -- Pattern features MAX(amount) as max_amount_1h, SUM(CASE WHEN declined = true THEN 1 ELSE 0 END) as declines_last_1hFROM transactionsGROUP BY user_id, HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR);
-- Set state TTL via table configCREATE TABLE events ( user_id STRING, event_type STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS) WITH ( 'connector' = 'kafka', 'topic' = 'events', 'format' = 'json');-- Configure state TTL (via job config)SET 'table.exec.state.ttl' = '7d'; -- Expire state after 7 daysSELECT user_id, COUNT(*) FROM events GROUP BY user_id;
-- Enable mini-batch aggregation for higher throughputSET 'table.exec.mini-batch.enabled' = 'true';SET 'table.exec.mini-batch.allow-latency' = '5s';SET 'table.exec.mini-batch.size' = '5000';-- Now aggregations buffer up to 5s or 5000 records before emittingSELECT user_id, COUNT(*) FROM events GROUP BY user_id;
-- Enable local-global optimization for skewed keysSET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';-- Flink now does:-- 1. Local aggregation (reduce data sent over network)-- 2. Global aggregation (final result)
Benefit: 2-5x faster for aggregations with skewed keys.
// Enable query loggingtableEnv.getConfig().getConfiguration().setString("table.exec.logging.enabled", "true");// Execute with verbose loggingTableResult result = tableEnv.executeSql("SELECT * FROM events WHERE amount > 100");result.print();
-- ❌ BAD: Keeps all user IDs in state forever!SELECT COUNT(DISTINCT user_id) FROM events;-- ✅ GOOD: Window distinct countsSELECT TUMBLE_START(event_time, INTERVAL '1' HOUR) as hour, COUNT(DISTINCT user_id) as unique_usersFROM eventsGROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);
-- ❌ BAD: Unbounded state (keeps entire right table!)SELECT * FROM events e, users u WHERE e.user_id = u.user_id;-- ✅ GOOD: Use proper join syntax with time boundsSELECT * FROM events eJOIN users FOR SYSTEM_TIME AS OF e.event_time AS uON e.user_id = u.user_id;
-- Your solution hereCREATE VIEW dashboard ASSELECT TUMBLE_START(event_time, INTERVAL '1' MINUTE) as minute, COUNT(*) as total_events, COUNT(DISTINCT user_id) as unique_users, SUM(amount) as revenue, -- Use ARRAY_AGG for top products ARRAY_AGG(product_id ORDER BY amount DESC LIMIT 5) as top_productsFROM purchase_eventsGROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);