Skip to main content

Complex Event Processing: Pattern Detection at Scale

Module Duration: 6-7 hours Focus: Pattern API, sequences, quantifiers, conditions, fraud detection, system monitoring Prerequisites: Flink DataStream API, state management, event-time processing Hands-on Labs: 12+ CEP applications

Introduction: What is Complex Event Processing?

The Problem CEP Solves

Scenario: Detect credit card fraud by identifying patterns like:
“3 declining transactions from different merchants within 10 minutes, followed by 1 approved high-value transaction”
Traditional Approach (painful):
// Maintain complex state manually
class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    private ListState<Transaction> recentTransactions;
    private MapState<Long, Timer> timers;

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
        // Manually track declines
        // Manually detect pattern
        // Manually clean up state
        // 100+ lines of error-prone code
    }
}
CEP Approach (elegant):
Pattern<Transaction, ?> fraudPattern = Pattern
    .<Transaction>begin("declines")
        .where(t -> t.status == Status.DECLINED)
        .times(3)
    .next("approved")
        .where(t -> t.status == Status.APPROVED && t.amount > 500)
    .within(Time.minutes(10));

// Flink handles state, cleanup, matching automatically!

Real-World Use Cases

DomainPatternBusiness Value
FinanceFraud detection (velocity, location, amount patterns)Save $M in fraud losses
E-CommerceCart abandonment, purchase funnel drop-off20-30% conversion increase
CybersecurityIntrusion detection (failed logins → success)Prevent data breaches
IoTEquipment failure prediction (temp spike → vibration → failure)Reduce downtime 40%
TradingMarket anomalies (price spikes, order patterns)Millisecond advantage

Part 1: CEP Fundamentals

The Pattern API

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;

// 1. Define input stream
DataStream<Event> input = ...;

// 2. Define pattern
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getName().equals("login");
        }
    });

// 3. Apply pattern to stream
PatternStream<Event> patternStream = CEP.pattern(input.keyBy("userId"), pattern);

// 4. Select matches
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    @Override
    public Alert select(Map<String, List<Event>> pattern) {
        Event loginEvent = pattern.get("start").get(0);
        return new Alert("User logged in: " + loginEvent.getUserId());
    }
});

Pattern Building Blocks

┌─────────────────────────────────────────────────┐
│           Flink CEP Components                  │
├─────────────────────────────────────────────────┤
│ 1. Patterns          - Sequence definitions     │
│ 2. Conditions        - Event matching rules     │
│ 3. Quantifiers       - Occurrence counts        │
│ 4. Contiguity        - Strictness levels        │
│ 5. Time Constraints  - Within clauses           │
└─────────────────────────────────────────────────┘

Part 2: Pattern Basics - Individual Patterns

Simple Pattern

// Match single event type
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("login")
    .where(event -> event.getStatus().equals("success"));

Chaining Patterns (next, followedBy, followedByAny)

1. Strict Contiguity (.next)

Only immediate successor, no events in between.
// Login immediately followed by purchase (strict)
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
    .where(e -> e.getType().equals("login"))
    .next("purchase")
    .where(e -> e.getType().equals("purchase"));

// Matches:   login → purchase ✓
// Not matches: login → browse → purchase ✗

2. Relaxed Contiguity (.followedBy)

Allows other events in between, but maintains order.
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
    .where(e -> e.getType().equals("login"))
    .followedBy("purchase")
    .where(e -> e.getType().equals("purchase"));

// Matches:   login → browse → purchase ✓
// Matches:   login → purchase ✓
// Not matches: purchase → login ✗ (wrong order)

3. Non-Deterministic Relaxed (.followedByAny)

Allows multiple matches with different intermediate events.
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(e -> e.getName().equals("a"))
    .followedByAny("middle")
    .where(e -> e.getName().equals("b"))
    .followedByAny("end")
    .where(e -> e.getName().equals("c"));

// Input: a → b → d → b → c
// Matches:
//   - a → b(1st) → c  ✓
//   - a → b(2nd) → c  ✓

Part 3: Quantifiers - Specifying Occurrences

Fixed Count

// Exactly 3 failed login attempts
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("failures")
    .where(e -> e.getStatus().equals("failed"))
    .times(3);

Range

// Between 2 and 5 purchase events
Pattern<Event, ?> pattern = Pattern.<Event>begin("purchases")
    .where(e -> e.getType().equals("purchase"))
    .times(2, 5);

One or More (.oneOrMore)

// At least one page view
Pattern<Event, ?> pattern = Pattern.<Event>begin("views")
    .where(e -> e.getType().equals("page_view"))
    .oneOrMore();

Zero or More (.timesOrMore)

// Optional: Zero or more add-to-cart events
Pattern<Event, ?> pattern = Pattern.<Event>begin("optional")
    .where(e -> e.getType().equals("add_to_cart"))
    .timesOrMore(0);

Greedy Quantifiers (.greedy)

// Match as many as possible (greedy)
Pattern<Event, ?> pattern = Pattern.<Event>begin("events")
    .where(e -> e.getType().equals("click"))
    .oneOrMore().greedy();

// Input: click → click → click → purchase
// Non-greedy: Matches [click] (stops at first valid match)
// Greedy:     Matches [click, click, click] (maximum match)

Part 4: Conditions - Advanced Filtering

Simple Condition

Pattern<Event, ?> pattern = Pattern.<Event>begin("high-value")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getAmount() > 1000;
        }
    });

// Lambda shorthand
Pattern<Event, ?> pattern = Pattern.<Event>begin("high-value")
    .where(event -> event.getAmount() > 1000);

Iterative Condition (Access Previous Events)

// Detect increasing transaction amounts
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first")
    .where(t -> t.getAmount() > 100)
    .next("second")
    .where(new IterativeCondition<Transaction>() {
        @Override
        public boolean filter(Transaction current, Context<Transaction> ctx) throws Exception {
            // Access previous events in pattern
            double firstAmount = 0;
            for (Transaction t : ctx.getEventsForPattern("first")) {
                firstAmount = t.getAmount();
            }
            return current.getAmount() > firstAmount * 1.5; // 50% increase
        }
    });

Subtype Condition

// Match specific event subtype
Pattern<Event, ?> pattern = Pattern.<Event>begin("payment")
    .subtype(PaymentEvent.class)  // Only PaymentEvent instances
    .where(event -> event.getMethod().equals("credit_card"));

Combining Conditions (.or, .where + .or)

// High amount OR international transaction
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("suspicious")
    .where(t -> t.getAmount() > 5000)
    .or(t -> !t.getCountry().equals("US"));

Until Condition (.until)

// Collect page views until checkout
Pattern<Event, ?> pattern = Pattern.<Event>begin("browsing")
    .where(e -> e.getType().equals("page_view"))
    .oneOrMore()
    .until(e -> e.getType().equals("checkout"));

Part 5: Time Constraints

Within (Time Window)

// Pattern must complete within 10 minutes
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(e -> e.getType().equals("login"))
    .followedBy("purchase")
    .where(e -> e.getType().equals("purchase"))
    .within(Time.minutes(10));

// If purchase happens after 10 min from login, pattern doesn't match

After Match Skip Strategy

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;

// Control what happens after pattern match
Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
    .where(e -> e.getType().equals("a"))
    .next("end")
    .where(e -> e.getType().equals("b"));

// Input: a → b → a → b
// skipPastLastEvent(): Matches [a→b], then starts fresh at next 'a'
//   Result: 2 matches
// noSkip(): Overlapping matches allowed
//   Result: 3 matches (a1→b1, b1→a2→b2, a2→b2)
Skip Strategies:
AfterMatchSkipStrategy.noSkip()                    // All overlapping matches
AfterMatchSkipStrategy.skipPastLastEvent()         // Skip to after last matched event
AfterMatchSkipStrategy.skipToFirst("pattern_name") // Skip to first event of named pattern
AfterMatchSkipStrategy.skipToLast("pattern_name")  // Skip to last event of named pattern

Part 6: Real-World Patterns

Pattern 1: Fraud Detection (Velocity Check)

// Detect: >5 transactions totaling >$10K within 1 hour
public class FraudDetectionCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
            .addSource(new KafkaSource<>(...))
            .assignTimestampsAndWatermarks(...);

        // Pattern: High-velocity transactions
        Pattern<Transaction, ?> fraudPattern = Pattern.<Transaction>begin("txns")
            .where(t -> t.getAmount() > 100)
            .timesOrMore(5)
            .within(Time.hours(1));

        PatternStream<Transaction> patternStream = CEP.pattern(
            transactions.keyBy(Transaction::getCardId),
            fraudPattern
        );

        DataStream<FraudAlert> alerts = patternStream.select(
            new PatternSelectFunction<Transaction, FraudAlert>() {
                @Override
                public FraudAlert select(Map<String, List<Transaction>> pattern) {
                    List<Transaction> txns = pattern.get("txns");
                    double total = txns.stream().mapToDouble(Transaction::getAmount).sum();

                    if (txns.size() > 5 && total > 10000) {
                        return new FraudAlert(
                            txns.get(0).getCardId(),
                            txns.size(),
                            total,
                            "High velocity fraud"
                        );
                    }
                    return null;
                }
            }
        ).filter(Objects::nonNull);

        alerts.addSink(new AlertingSink());
        env.execute("Fraud Detection CEP");
    }
}

Pattern 2: Login Intrusion Detection

// Detect: 3 failed logins followed by 1 success (credential stuffing)
Pattern<LoginEvent, ?> intrusionPattern = Pattern.<LoginEvent>begin("failures")
    .where(event -> event.getStatus().equals(LoginStatus.FAILED))
    .times(3).consecutive()  // 3 consecutive failures
    .followedBy("success")
    .where(event -> event.getStatus().equals(LoginStatus.SUCCESS))
    .within(Time.minutes(5));

PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginStream.keyBy(LoginEvent::getUserId),
    intrusionPattern
);

DataStream<SecurityAlert> alerts = patternStream.select(
    (Map<String, List<LoginEvent>> pattern) -> {
        List<LoginEvent> failures = pattern.get("failures");
        LoginEvent success = pattern.get("success").get(0);

        return new SecurityAlert(
            success.getUserId(),
            "Potential credential stuffing attack",
            failures.size(),
            success.getTimestamp()
        );
    }
);

Pattern 3: E-Commerce Funnel Analysis

// Detect abandoned carts: view → add_to_cart → (no checkout within 1 hour)
Pattern<Event, ?> abandonmentPattern = Pattern.<Event>begin("view")
    .where(e -> e.getType().equals(EventType.PRODUCT_VIEW))
    .followedBy("add_cart")
    .where(e -> e.getType().equals(EventType.ADD_TO_CART))
    .notFollowedBy("checkout")
    .where(e -> e.getType().equals(EventType.CHECKOUT))
    .within(Time.hours(1));

// This uses "NOT" pattern - checkout must NOT occur within 1 hour

Pattern 4: IoT Equipment Failure Prediction

// Detect equipment failure: temp_spike → vibration_increase → (failure within 30 min)
Pattern<SensorEvent, ?> failurePattern = Pattern.<SensorEvent>begin("temp_spike")
    .where(e -> e.getSensorType().equals("temperature") && e.getValue() > 80)
    .followedBy("vibration")
    .where(e -> e.getSensorType().equals("vibration") && e.getValue() > 5)
    .followedBy("failure")
    .where(e -> e.getSensorType().equals("status") && e.getValue() == 0)
    .within(Time.minutes(30));

PatternStream<SensorEvent> patternStream = CEP.pattern(
    sensorStream.keyBy(SensorEvent::getMachineId),
    failurePattern
);

DataStream<MaintenanceAlert> alerts = patternStream.select(
    (Map<String, List<SensorEvent>> pattern) -> {
        SensorEvent tempSpike = pattern.get("temp_spike").get(0);
        SensorEvent vibration = pattern.get("vibration").get(0);
        SensorEvent failure = pattern.get("failure").get(0);

        return new MaintenanceAlert(
            tempSpike.getMachineId(),
            "Equipment failure predicted",
            tempSpike.getTimestamp(),
            failure.getTimestamp()
        );
    }
);

Part 7: Advanced CEP Techniques

Pattern Groups (Combining Multiple Patterns)

// Detect: (login from US OR login from EU) followed by purchase
Pattern<Event, ?> loginUS = Pattern.<Event>begin("us_login")
    .where(e -> e.getType().equals("login") && e.getCountry().equals("US"));

Pattern<Event, ?> loginEU = Pattern.<Event>begin("eu_login")
    .where(e -> e.getType().equals("login") && e.getCountry().equals("EU"));

Pattern<Event, ?> combinedPattern = Pattern.begin(
    Pattern.begin("login").where(e -> e.getType().equals("login"))
)
.followedBy("purchase")
.where(e -> e.getType().equals("purchase"));

Timeout Handling (Partial Matches)

// Detect partial patterns that timed out
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

SingleOutputStreamOperator<Alert> result = patternStream.select(
    timeoutTag,
    // Timeout handler (pattern didn't complete)
    new PatternTimeoutFunction<Event, String>() {
        @Override
        public String timeout(Map<String, List<Event>> pattern, long timestamp) {
            return "Pattern timed out for user: " + pattern.get("start").get(0).getUserId();
        }
    },
    // Match handler (pattern completed)
    new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> pattern) {
            return new Alert("Pattern matched!");
        }
    }
);

// Access timed-out patterns
DataStream<String> timeouts = result.getSideOutput(timeoutTag);
timeouts.print("TIMEOUTS");

Flatselect (Multiple Outputs per Match)

// Emit multiple alerts per match
DataStream<Alert> alerts = patternStream.flatSelect(
    new PatternFlatSelectFunction<Transaction, Alert>() {
        @Override
        public void flatSelect(Map<String, List<Transaction>> pattern, Collector<Alert> out) {
            List<Transaction> txns = pattern.get("txns");

            // Emit multiple alerts
            out.collect(new Alert("Velocity fraud", txns.size()));
            out.collect(new Alert("Amount fraud", totalAmount(txns)));

            if (hasInternationalTxn(txns)) {
                out.collect(new Alert("International fraud"));
            }
        }
    }
);

Part 8: Production Patterns

Pattern 1: Multi-Stage Fraud Detection

public class ComprehensiveFraudDetection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env.addSource(...);

        // Stage 1: Velocity fraud
        Pattern<Transaction, ?> velocityPattern = Pattern.<Transaction>begin("rapid")
            .where(t -> t.getAmount() > 50)
            .timesOrMore(5)
            .within(Time.minutes(10));

        // Stage 2: Location fraud (multiple countries)
        Pattern<Transaction, ?> locationPattern = Pattern.<Transaction>begin("first")
            .where(t -> t.getCountry().equals("US"))
            .next("second")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction current, Context<Transaction> ctx) {
                    String firstCountry = ctx.getEventsForPattern("first").iterator().next().getCountry();
                    return !current.getCountry().equals(firstCountry);
                }
            })
            .within(Time.hours(1));

        // Stage 3: Amount spike
        Pattern<Transaction, ?> spikePattern = Pattern.<Transaction>begin("baseline")
            .where(t -> t.getAmount() < 200)
            .times(3)
            .followedBy("spike")
            .where(t -> t.getAmount() > 2000)
            .within(Time.hours(6));

        // Apply all patterns
        KeyedStream<Transaction, String> keyed = transactions.keyBy(Transaction::getCardId);

        DataStream<FraudAlert> velocityAlerts = CEP.pattern(keyed, velocityPattern)
            .select(pattern -> createAlert(pattern, "VELOCITY_FRAUD"));

        DataStream<FraudAlert> locationAlerts = CEP.pattern(keyed, locationPattern)
            .select(pattern -> createAlert(pattern, "LOCATION_FRAUD"));

        DataStream<FraudAlert> spikeAlerts = CEP.pattern(keyed, spikePattern)
            .select(pattern -> createAlert(pattern, "AMOUNT_SPIKE"));

        // Combine alerts
        DataStream<FraudAlert> allAlerts = velocityAlerts
            .union(locationAlerts)
            .union(spikeAlerts);

        allAlerts.addSink(new KafkaSink<>(...));

        env.execute("Comprehensive Fraud Detection");
    }
}

Pattern 2: System Health Monitoring

// Detect cascading failures in microservices
Pattern<LogEvent, ?> cascadePattern = Pattern.<LogEvent>begin("error_start")
    .where(e -> e.getLevel().equals("ERROR") && e.getService().equals("api-gateway"))
    .followedBy("service_errors")
    .where(e -> e.getLevel().equals("ERROR"))
    .oneOrMore()
    .within(Time.minutes(5));

PatternStream<LogEvent> patternStream = CEP.pattern(
    logStream.keyBy(LogEvent::getTraceId),
    cascadePattern
);

DataStream<Alert> cascadeAlerts = patternStream.select(
    (Map<String, List<LogEvent>> pattern) -> {
        LogEvent gatewayError = pattern.get("error_start").get(0);
        List<LogEvent> serviceErrors = pattern.get("service_errors");

        Set<String> affectedServices = serviceErrors.stream()
            .map(LogEvent::getService)
            .collect(Collectors.toSet());

        return new Alert(
            "Cascading failure detected",
            gatewayError.getTraceId(),
            affectedServices,
            serviceErrors.size()
        );
    }
);

Part 9: CEP vs Manual State Management

Manual Approach (Complex, Error-Prone)

// DON'T DO THIS: Manual pattern detection
public class ManualFraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    private MapState<Long, Transaction> recentTransactions;
    private ValueState<Integer> failureCount;

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) throws Exception {
        // Manually track transactions
        recentTransactions.put(txn.getTimestamp(), txn);

        // Manually check pattern
        int count = 0;
        for (Map.Entry<Long, Transaction> entry : recentTransactions.entries()) {
            if (txn.getTimestamp() - entry.getKey() < 600000) { // 10 min
                count++;
            }
        }

        if (count >= 5) {
            out.collect(new Alert("Fraud detected"));
        }

        // Manually clean up old state
        Iterator<Map.Entry<Long, Transaction>> iter = recentTransactions.entries().iterator();
        while (iter.hasNext()) {
            Map.Entry<Long, Transaction> entry = iter.next();
            if (txn.getTimestamp() - entry.getKey() > 600000) {
                iter.remove();
            }
        }

        // 100+ more lines for complex patterns...
    }
}

CEP Approach (Declarative, Maintainable)

// BETTER: Use CEP
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("txns")
    .where(t -> t.getAmount() > 100)
    .timesOrMore(5)
    .within(Time.minutes(10));

DataStream<Alert> alerts = CEP.pattern(transactions.keyBy(...), pattern)
    .select(pattern -> new Alert("Fraud detected"));

// Flink handles:
// - State management
// - Pattern matching
// - Cleanup
// - Event-time semantics

Part 10: Performance Optimization

State Size Management

// Use .within() to bound state
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(e -> e.getType().equals("login"))
    .followedBy("end")
    .where(e -> e.getType().equals("purchase"))
    .within(Time.hours(1));  // CRITICAL: State only kept for 1 hour

// Without .within(), state grows unbounded!

Avoid Overly Greedy Patterns

// BAD: Greedy oneOrMore without bound
Pattern<Event, ?> bad = Pattern.<Event>begin("events")
    .where(e -> true)
    .oneOrMore().greedy()  // Matches ALL events!
    .within(Time.days(30));  // 30 days of state!

// GOOD: Bounded quantifier
Pattern<Event, ?> good = Pattern.<Event>begin("events")
    .where(e -> e.getType().equals("click"))
    .times(1, 100)  // Max 100 events
    .within(Time.hours(1));

Use Consecutive When Possible

// .consecutive() is more efficient (doesn't buffer intermediate events)
Pattern<Event, ?> pattern = Pattern.<Event>begin("failures")
    .where(e -> e.getStatus().equals("failed"))
    .times(3).consecutive()  // Only matches consecutive failures
    .within(Time.minutes(5));

Part 11: Testing CEP Patterns

public class CEPTest {
    @Test
    public void testFraudPattern() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Create test data
        DataStream<Transaction> testStream = env.fromElements(
            new Transaction("card1", 100, 1000L),
            new Transaction("card1", 200, 2000L),
            new Transaction("card1", 300, 3000L),
            new Transaction("card1", 400, 4000L),
            new Transaction("card1", 500, 5000L),
            new Transaction("card1", 600, 6000L)  // 6th txn should trigger
        );

        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("txns")
            .where(t -> t.getAmount() > 50)
            .timesOrMore(5)
            .within(Time.seconds(10));

        DataStream<String> result = CEP.pattern(testStream.keyBy(Transaction::getCardId), pattern)
            .select(p -> "FRAUD DETECTED");

        List<String> results = result.executeAndCollect(10);

        assertEquals(1, results.size());
        assertEquals("FRAUD DETECTED", results.get(0));
    }
}

Part 12: Interview Questions

Conceptual

Q1: What is Complex Event Processing and when should you use it? A: CEP is pattern detection over event streams. Use it when you need to detect sequences or combinations of events (fraud patterns, user behavior, system anomalies) rather than processing events individually. Q2: Explain the difference between .next(), .followedBy(), and .followedByAny(). A:
  • .next(): Strict contiguity (no events in between)
  • .followedBy(): Relaxed contiguity (other events allowed, single match)
  • .followedByAny(): Non-deterministic relaxed (multiple matches possible)
Q3: Why is the .within() clause important? A: Without .within(), state grows unbounded as Flink must keep all events indefinitely to potentially match the pattern. .within() bounds state by discarding events older than the time window. Q4: What’s the difference between greedy and non-greedy quantifiers? A:
  • Greedy (.oneOrMore().greedy()): Matches as many events as possible
  • Non-greedy (.oneOrMore()): Stops at first valid match

Coding

Q: Implement a pattern to detect 3 consecutive failed logins within 5 minutes.
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("failures")
    .where(event -> event.getStatus().equals(LoginStatus.FAILED))
    .times(3).consecutive()
    .within(Time.minutes(5));

PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginStream.keyBy(LoginEvent::getUserId),
    pattern
);

DataStream<Alert> alerts = patternStream.select(
    (Map<String, List<LoginEvent>> p) -> {
        LoginEvent first = p.get("failures").get(0);
        return new Alert("3 failed logins for user: " + first.getUserId());
    }
);

Part 13: Hands-On Lab - Complete Fraud Detection System

public class ProductionFraudDetectionCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Kafka source
        DataStream<Transaction> transactions = env
            .addSource(new FlinkKafkaConsumer<>(
                "transactions",
                new TransactionSchema(),
                kafkaProps
            ))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((txn, ts) -> txn.getTimestamp())
            );

        KeyedStream<Transaction, String> keyed = transactions.keyBy(Transaction::getCardId);

        // Pattern 1: Velocity fraud (>5 txns in 10 min)
        Pattern<Transaction, ?> velocityPattern = Pattern.<Transaction>begin("rapid_txns")
            .where(t -> t.getAmount() > 20)
            .timesOrMore(5)
            .within(Time.minutes(10));

        DataStream<FraudAlert> velocityAlerts = CEP.pattern(keyed, velocityPattern)
            .select(new VelocityFraudSelector());

        // Pattern 2: Declining pattern (3 declines → 1 approve)
        Pattern<Transaction, ?> decliningPattern = Pattern.<Transaction>begin("declines")
            .where(t -> t.getStatus() == Status.DECLINED)
            .times(3)
            .followedBy("approve")
            .where(t -> t.getStatus() == Status.APPROVED && t.getAmount() > 500)
            .within(Time.minutes(15));

        DataStream<FraudAlert> decliningAlerts = CEP.pattern(keyed, decliningPattern)
            .select(new DecliningPatternSelector());

        // Pattern 3: Geographic anomaly
        Pattern<Transaction, ?> geoPattern = Pattern.<Transaction>begin("first")
            .where(t -> true)
            .next("second")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction current, Context<Transaction> ctx) {
                    Transaction first = ctx.getEventsForPattern("first").iterator().next();
                    double distance = calculateDistance(first.getLocation(), current.getLocation());
                    long timeDiff = current.getTimestamp() - first.getTimestamp();
                    double speedKmh = (distance / timeDiff) * 3600000; // km/h
                    return speedKmh > 800; // Impossible speed (flight required)
                }
            })
            .within(Time.hours(2));

        DataStream<FraudAlert> geoAlerts = CEP.pattern(keyed, geoPattern)
            .select(new GeoAnomalySelector());

        // Combine all alerts
        DataStream<FraudAlert> allAlerts = velocityAlerts
            .union(decliningAlerts)
            .union(geoAlerts);

        // Enrich with ML score
        DataStream<EnrichedAlert> enrichedAlerts = allAlerts
            .map(new MLEnrichmentFunction());

        // Sink to Kafka for downstream processing
        enrichedAlerts.addSink(new FlinkKafkaProducer<>(
            "fraud-alerts",
            new AlertSchema(),
            kafkaProps
        ));

        // Side output to monitoring system
        enrichedAlerts
            .filter(alert -> alert.getRiskScore() > 0.8)
            .addSink(new PagerDutySink());

        env.execute("Production Fraud Detection CEP");
    }

    private static class VelocityFraudSelector implements PatternSelectFunction<Transaction, FraudAlert> {
        @Override
        public FraudAlert select(Map<String, List<Transaction>> pattern) {
            List<Transaction> txns = pattern.get("rapid_txns");
            double totalAmount = txns.stream().mapToDouble(Transaction::getAmount).sum();

            return new FraudAlert(
                txns.get(0).getCardId(),
                FraudType.VELOCITY,
                txns.size(),
                totalAmount,
                txns.get(0).getTimestamp(),
                txns.get(txns.size() - 1).getTimestamp()
            );
        }
    }

    private static class DecliningPatternSelector implements PatternSelectFunction<Transaction, FraudAlert> {
        @Override
        public FraudAlert select(Map<String, List<Transaction>> pattern) {
            List<Transaction> declines = pattern.get("declines");
            Transaction approve = pattern.get("approve").get(0);

            return new FraudAlert(
                approve.getCardId(),
                FraudType.CREDENTIAL_TESTING,
                declines.size() + 1,
                approve.getAmount(),
                declines.get(0).getTimestamp(),
                approve.getTimestamp()
            );
        }
    }
}

Summary

What You’ve Mastered

✅ CEP fundamentals and pattern API ✅ Pattern building blocks (conditions, quantifiers, contiguity) ✅ Time constraints and skip strategies ✅ Real-world fraud detection patterns ✅ Advanced techniques (groups, timeouts, flatselect) ✅ Performance optimization ✅ Production deployment patterns

Key Takeaways

  1. CEP Simplifies Complex Logic: What takes 200 lines manually is 10 lines with CEP
  2. Always Use .within(): Unbounded state is dangerous in production
  3. Choose Contiguity Wisely: .next() vs .followedBy() vs .followedByAny()
  4. Test Patterns Thoroughly: Use unit tests with synthetic data
  5. Monitor State Size: CEP patterns can consume significant state

Next Module

Module 8: Flink Operations & Production Deployment

Kubernetes deployment, HA, monitoring, and production best practices

Resources

Documentation

Papers

Practice: Build a comprehensive fraud detection system using 5+ CEP patterns. Test with synthetic transaction data. Deploy to Flink cluster and monitor pattern matching metrics!