> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Complex Event Processing: Pattern Detection at Scale with Flink CEP

> Master pattern detection in event streams - from CEP fundamentals to fraud detection, anomaly detection, and production patterns

# Complex Event Processing: Pattern Detection at Scale

<Info>
  **Module Duration**: 6-7 hours
  **Focus**: Pattern API, sequences, quantifiers, conditions, fraud detection, system monitoring
  **Prerequisites**: Flink DataStream API, state management, event-time processing
  **Hands-on Labs**: 12+ CEP applications
</Info>

## Introduction: What is Complex Event Processing?

### The Problem CEP Solves

**Scenario**: Detect credit card fraud by identifying patterns like:

> "3 declining transactions from different merchants within 10 minutes, followed by 1 approved high-value transaction"

**Traditional Approach** (painful):

```java theme={null}
// Maintain complex state manually
class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    private ListState<Transaction> recentTransactions;
    private MapState<Long, Timer> timers;

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
        // Manually track declines
        // Manually detect pattern
        // Manually clean up state
        // 100+ lines of error-prone code
    }
}
```

**CEP Approach** (elegant):

```java theme={null}
Pattern<Transaction, ?> fraudPattern = Pattern
    .<Transaction>begin("declines")
        .where(t -> t.status == Status.DECLINED)
        .times(3)
    .next("approved")
        .where(t -> t.status == Status.APPROVED && t.amount > 500)
    .within(Time.minutes(10));

// Flink handles state, cleanup, matching automatically!
```

### Real-World Use Cases

| Domain            | Pattern                                                         | Business Value             |
| ----------------- | --------------------------------------------------------------- | -------------------------- |
| **Finance**       | Fraud detection (velocity, location, amount patterns)           | Save \$M in fraud losses   |
| **E-Commerce**    | Cart abandonment, purchase funnel drop-off                      | 20-30% conversion increase |
| **Cybersecurity** | Intrusion detection (failed logins → success)                   | Prevent data breaches      |
| **IoT**           | Equipment failure prediction (temp spike → vibration → failure) | Reduce downtime 40%        |
| **Trading**       | Market anomalies (price spikes, order patterns)                 | Millisecond advantage      |

***

## Part 1: CEP Fundamentals

### The Pattern API

```java theme={null}
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;

// 1. Define input stream
DataStream<Event> input = ...;

// 2. Define pattern
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getName().equals("login");
        }
    });

// 3. Apply pattern to stream
PatternStream<Event> patternStream = CEP.pattern(input.keyBy("userId"), pattern);

// 4. Select matches
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    @Override
    public Alert select(Map<String, List<Event>> pattern) {
        Event loginEvent = pattern.get("start").get(0);
        return new Alert("User logged in: " + loginEvent.getUserId());
    }
});
```

### Pattern Building Blocks

```
┌─────────────────────────────────────────────────┐
│           Flink CEP Components                  │
├─────────────────────────────────────────────────┤
│ 1. Patterns          - Sequence definitions     │
│ 2. Conditions        - Event matching rules     │
│ 3. Quantifiers       - Occurrence counts        │
│ 4. Contiguity        - Strictness levels        │
│ 5. Time Constraints  - Within clauses           │
└─────────────────────────────────────────────────┘
```

***

## Part 2: Pattern Basics - Individual Patterns

### Simple Pattern

```java theme={null}
// Match single event type
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("login")
    .where(event -> event.getStatus().equals("success"));
```

### Chaining Patterns (next, followedBy, followedByAny)

#### 1. Strict Contiguity (.next)

**Only immediate successor, no events in between**.

```java theme={null}
// Login immediately followed by purchase (strict)
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
    .where(e -> e.getType().equals("login"))
    .next("purchase")
    .where(e -> e.getType().equals("purchase"));

// Matches:   login → purchase ✓
// Not matches: login → browse → purchase ✗
```

#### 2. Relaxed Contiguity (.followedBy)

**Allows other events in between, but maintains order**.

```java theme={null}
Pattern<Event, ?> pattern = Pattern.<Event>begin("login")
    .where(e -> e.getType().equals("login"))
    .followedBy("purchase")
    .where(e -> e.getType().equals("purchase"));

// Matches:   login → browse → purchase ✓
// Matches:   login → purchase ✓
// Not matches: purchase → login ✗ (wrong order)
```

#### 3. Non-Deterministic Relaxed (.followedByAny)

**Allows multiple matches with different intermediate events**.

```java theme={null}
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(e -> e.getName().equals("a"))
    .followedByAny("middle")
    .where(e -> e.getName().equals("b"))
    .followedByAny("end")
    .where(e -> e.getName().equals("c"));

// Input: a → b → d → b → c
// Matches:
//   - a → b(1st) → c  ✓
//   - a → b(2nd) → c  ✓
```

***

## Part 3: Quantifiers - Specifying Occurrences

### Fixed Count

```java theme={null}
// Exactly 3 failed login attempts
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("failures")
    .where(e -> e.getStatus().equals("failed"))
    .times(3);
```

### Range

```java theme={null}
// Between 2 and 5 purchase events
Pattern<Event, ?> pattern = Pattern.<Event>begin("purchases")
    .where(e -> e.getType().equals("purchase"))
    .times(2, 5);
```

### One or More (.oneOrMore)

```java theme={null}
// At least one page view
Pattern<Event, ?> pattern = Pattern.<Event>begin("views")
    .where(e -> e.getType().equals("page_view"))
    .oneOrMore();
```

### Zero or More (.timesOrMore)

```java theme={null}
// Optional: Zero or more add-to-cart events
Pattern<Event, ?> pattern = Pattern.<Event>begin("optional")
    .where(e -> e.getType().equals("add_to_cart"))
    .timesOrMore(0);
```

### Greedy Quantifiers (.greedy)

```java theme={null}
// Match as many as possible (greedy)
Pattern<Event, ?> pattern = Pattern.<Event>begin("events")
    .where(e -> e.getType().equals("click"))
    .oneOrMore().greedy();

// Input: click → click → click → purchase
// Non-greedy: Matches [click] (stops at first valid match)
// Greedy:     Matches [click, click, click] (maximum match)
```

***

## Part 4: Conditions - Advanced Filtering

### Simple Condition

```java theme={null}
Pattern<Event, ?> pattern = Pattern.<Event>begin("high-value")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getAmount() > 1000;
        }
    });

// Lambda shorthand
Pattern<Event, ?> pattern = Pattern.<Event>begin("high-value")
    .where(event -> event.getAmount() > 1000);
```

### Iterative Condition (Access Previous Events)

```java theme={null}
// Detect increasing transaction amounts
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first")
    .where(t -> t.getAmount() > 100)
    .next("second")
    .where(new IterativeCondition<Transaction>() {
        @Override
        public boolean filter(Transaction current, Context<Transaction> ctx) throws Exception {
            // Access previous events in pattern
            double firstAmount = 0;
            for (Transaction t : ctx.getEventsForPattern("first")) {
                firstAmount = t.getAmount();
            }
            return current.getAmount() > firstAmount * 1.5; // 50% increase
        }
    });
```

### Subtype Condition

```java theme={null}
// Match specific event subtype
Pattern<Event, ?> pattern = Pattern.<Event>begin("payment")
    .subtype(PaymentEvent.class)  // Only PaymentEvent instances
    .where(event -> event.getMethod().equals("credit_card"));
```

### Combining Conditions (.or, .where + .or)

```java theme={null}
// High amount OR international transaction
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("suspicious")
    .where(t -> t.getAmount() > 5000)
    .or(t -> !t.getCountry().equals("US"));
```

### Until Condition (.until)

```java theme={null}
// Collect page views until checkout
Pattern<Event, ?> pattern = Pattern.<Event>begin("browsing")
    .where(e -> e.getType().equals("page_view"))
    .oneOrMore()
    .until(e -> e.getType().equals("checkout"));
```

***

## Part 5: Time Constraints

### Within (Time Window)

```java theme={null}
// Pattern must complete within 10 minutes
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(e -> e.getType().equals("login"))
    .followedBy("purchase")
    .where(e -> e.getType().equals("purchase"))
    .within(Time.minutes(10));

// If purchase happens after 10 min from login, pattern doesn't match
```

### After Match Skip Strategy

```java theme={null}
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;

// Control what happens after pattern match
Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
    .where(e -> e.getType().equals("a"))
    .next("end")
    .where(e -> e.getType().equals("b"));

// Input: a → b → a → b
// skipPastLastEvent(): Matches [a→b], then starts fresh at next 'a'
//   Result: 2 matches
// noSkip(): Overlapping matches allowed
//   Result: 3 matches (a1→b1, b1→a2→b2, a2→b2)
```

**Skip Strategies**:

```java theme={null}
AfterMatchSkipStrategy.noSkip()                    // All overlapping matches
AfterMatchSkipStrategy.skipPastLastEvent()         // Skip to after last matched event
AfterMatchSkipStrategy.skipToFirst("pattern_name") // Skip to first event of named pattern
AfterMatchSkipStrategy.skipToLast("pattern_name")  // Skip to last event of named pattern
```

***

## Part 6: Real-World Patterns

### Pattern 1: Fraud Detection (Velocity Check)

```java theme={null}
// Detect: >5 transactions totaling >$10K within 1 hour
public class FraudDetectionCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
            .addSource(new KafkaSource<>(...))
            .assignTimestampsAndWatermarks(...);

        // Pattern: High-velocity transactions
        Pattern<Transaction, ?> fraudPattern = Pattern.<Transaction>begin("txns")
            .where(t -> t.getAmount() > 100)
            .timesOrMore(5)
            .within(Time.hours(1));

        PatternStream<Transaction> patternStream = CEP.pattern(
            transactions.keyBy(Transaction::getCardId),
            fraudPattern
        );

        DataStream<FraudAlert> alerts = patternStream.select(
            new PatternSelectFunction<Transaction, FraudAlert>() {
                @Override
                public FraudAlert select(Map<String, List<Transaction>> pattern) {
                    List<Transaction> txns = pattern.get("txns");
                    double total = txns.stream().mapToDouble(Transaction::getAmount).sum();

                    if (txns.size() > 5 && total > 10000) {
                        return new FraudAlert(
                            txns.get(0).getCardId(),
                            txns.size(),
                            total,
                            "High velocity fraud"
                        );
                    }
                    return null;
                }
            }
        ).filter(Objects::nonNull);

        alerts.addSink(new AlertingSink());
        env.execute("Fraud Detection CEP");
    }
}
```

### Pattern 2: Login Intrusion Detection

```java theme={null}
// Detect: 3 failed logins followed by 1 success (credential stuffing)
Pattern<LoginEvent, ?> intrusionPattern = Pattern.<LoginEvent>begin("failures")
    .where(event -> event.getStatus().equals(LoginStatus.FAILED))
    .times(3).consecutive()  // 3 consecutive failures
    .followedBy("success")
    .where(event -> event.getStatus().equals(LoginStatus.SUCCESS))
    .within(Time.minutes(5));

PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginStream.keyBy(LoginEvent::getUserId),
    intrusionPattern
);

DataStream<SecurityAlert> alerts = patternStream.select(
    (Map<String, List<LoginEvent>> pattern) -> {
        List<LoginEvent> failures = pattern.get("failures");
        LoginEvent success = pattern.get("success").get(0);

        return new SecurityAlert(
            success.getUserId(),
            "Potential credential stuffing attack",
            failures.size(),
            success.getTimestamp()
        );
    }
);
```

### Pattern 3: E-Commerce Funnel Analysis

```java theme={null}
// Detect abandoned carts: view → add_to_cart → (no checkout within 1 hour)
Pattern<Event, ?> abandonmentPattern = Pattern.<Event>begin("view")
    .where(e -> e.getType().equals(EventType.PRODUCT_VIEW))
    .followedBy("add_cart")
    .where(e -> e.getType().equals(EventType.ADD_TO_CART))
    .notFollowedBy("checkout")
    .where(e -> e.getType().equals(EventType.CHECKOUT))
    .within(Time.hours(1));

// This uses "NOT" pattern - checkout must NOT occur within 1 hour
```

### Pattern 4: IoT Equipment Failure Prediction

```java theme={null}
// Detect equipment failure: temp_spike → vibration_increase → (failure within 30 min)
Pattern<SensorEvent, ?> failurePattern = Pattern.<SensorEvent>begin("temp_spike")
    .where(e -> e.getSensorType().equals("temperature") && e.getValue() > 80)
    .followedBy("vibration")
    .where(e -> e.getSensorType().equals("vibration") && e.getValue() > 5)
    .followedBy("failure")
    .where(e -> e.getSensorType().equals("status") && e.getValue() == 0)
    .within(Time.minutes(30));

PatternStream<SensorEvent> patternStream = CEP.pattern(
    sensorStream.keyBy(SensorEvent::getMachineId),
    failurePattern
);

DataStream<MaintenanceAlert> alerts = patternStream.select(
    (Map<String, List<SensorEvent>> pattern) -> {
        SensorEvent tempSpike = pattern.get("temp_spike").get(0);
        SensorEvent vibration = pattern.get("vibration").get(0);
        SensorEvent failure = pattern.get("failure").get(0);

        return new MaintenanceAlert(
            tempSpike.getMachineId(),
            "Equipment failure predicted",
            tempSpike.getTimestamp(),
            failure.getTimestamp()
        );
    }
);
```

***

## Part 7: Advanced CEP Techniques

### Pattern Groups (Combining Multiple Patterns)

```java theme={null}
// Detect: (login from US OR login from EU) followed by purchase
Pattern<Event, ?> loginUS = Pattern.<Event>begin("us_login")
    .where(e -> e.getType().equals("login") && e.getCountry().equals("US"));

Pattern<Event, ?> loginEU = Pattern.<Event>begin("eu_login")
    .where(e -> e.getType().equals("login") && e.getCountry().equals("EU"));

Pattern<Event, ?> combinedPattern = Pattern.begin(
    Pattern.begin("login").where(e -> e.getType().equals("login"))
)
.followedBy("purchase")
.where(e -> e.getType().equals("purchase"));
```

### Timeout Handling (Partial Matches)

```java theme={null}
// Detect partial patterns that timed out
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

SingleOutputStreamOperator<Alert> result = patternStream.select(
    timeoutTag,
    // Timeout handler (pattern didn't complete)
    new PatternTimeoutFunction<Event, String>() {
        @Override
        public String timeout(Map<String, List<Event>> pattern, long timestamp) {
            return "Pattern timed out for user: " + pattern.get("start").get(0).getUserId();
        }
    },
    // Match handler (pattern completed)
    new PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, List<Event>> pattern) {
            return new Alert("Pattern matched!");
        }
    }
);

// Access timed-out patterns
DataStream<String> timeouts = result.getSideOutput(timeoutTag);
timeouts.print("TIMEOUTS");
```

### Flatselect (Multiple Outputs per Match)

```java theme={null}
// Emit multiple alerts per match
DataStream<Alert> alerts = patternStream.flatSelect(
    new PatternFlatSelectFunction<Transaction, Alert>() {
        @Override
        public void flatSelect(Map<String, List<Transaction>> pattern, Collector<Alert> out) {
            List<Transaction> txns = pattern.get("txns");

            // Emit multiple alerts
            out.collect(new Alert("Velocity fraud", txns.size()));
            out.collect(new Alert("Amount fraud", totalAmount(txns)));

            if (hasInternationalTxn(txns)) {
                out.collect(new Alert("International fraud"));
            }
        }
    }
);
```

***

## Part 8: Production Patterns

### Pattern 1: Multi-Stage Fraud Detection

```java theme={null}
public class ComprehensiveFraudDetection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env.addSource(...);

        // Stage 1: Velocity fraud
        Pattern<Transaction, ?> velocityPattern = Pattern.<Transaction>begin("rapid")
            .where(t -> t.getAmount() > 50)
            .timesOrMore(5)
            .within(Time.minutes(10));

        // Stage 2: Location fraud (multiple countries)
        Pattern<Transaction, ?> locationPattern = Pattern.<Transaction>begin("first")
            .where(t -> t.getCountry().equals("US"))
            .next("second")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction current, Context<Transaction> ctx) {
                    String firstCountry = ctx.getEventsForPattern("first").iterator().next().getCountry();
                    return !current.getCountry().equals(firstCountry);
                }
            })
            .within(Time.hours(1));

        // Stage 3: Amount spike
        Pattern<Transaction, ?> spikePattern = Pattern.<Transaction>begin("baseline")
            .where(t -> t.getAmount() < 200)
            .times(3)
            .followedBy("spike")
            .where(t -> t.getAmount() > 2000)
            .within(Time.hours(6));

        // Apply all patterns
        KeyedStream<Transaction, String> keyed = transactions.keyBy(Transaction::getCardId);

        DataStream<FraudAlert> velocityAlerts = CEP.pattern(keyed, velocityPattern)
            .select(pattern -> createAlert(pattern, "VELOCITY_FRAUD"));

        DataStream<FraudAlert> locationAlerts = CEP.pattern(keyed, locationPattern)
            .select(pattern -> createAlert(pattern, "LOCATION_FRAUD"));

        DataStream<FraudAlert> spikeAlerts = CEP.pattern(keyed, spikePattern)
            .select(pattern -> createAlert(pattern, "AMOUNT_SPIKE"));

        // Combine alerts
        DataStream<FraudAlert> allAlerts = velocityAlerts
            .union(locationAlerts)
            .union(spikeAlerts);

        allAlerts.addSink(new KafkaSink<>(...));

        env.execute("Comprehensive Fraud Detection");
    }
}
```

### Pattern 2: System Health Monitoring

```java theme={null}
// Detect cascading failures in microservices
Pattern<LogEvent, ?> cascadePattern = Pattern.<LogEvent>begin("error_start")
    .where(e -> e.getLevel().equals("ERROR") && e.getService().equals("api-gateway"))
    .followedBy("service_errors")
    .where(e -> e.getLevel().equals("ERROR"))
    .oneOrMore()
    .within(Time.minutes(5));

PatternStream<LogEvent> patternStream = CEP.pattern(
    logStream.keyBy(LogEvent::getTraceId),
    cascadePattern
);

DataStream<Alert> cascadeAlerts = patternStream.select(
    (Map<String, List<LogEvent>> pattern) -> {
        LogEvent gatewayError = pattern.get("error_start").get(0);
        List<LogEvent> serviceErrors = pattern.get("service_errors");

        Set<String> affectedServices = serviceErrors.stream()
            .map(LogEvent::getService)
            .collect(Collectors.toSet());

        return new Alert(
            "Cascading failure detected",
            gatewayError.getTraceId(),
            affectedServices,
            serviceErrors.size()
        );
    }
);
```

***

## Part 9: CEP vs Manual State Management

### Manual Approach (Complex, Error-Prone)

```java theme={null}
// DON'T DO THIS: Manual pattern detection
public class ManualFraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    private MapState<Long, Transaction> recentTransactions;
    private ValueState<Integer> failureCount;

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) throws Exception {
        // Manually track transactions
        recentTransactions.put(txn.getTimestamp(), txn);

        // Manually check pattern
        int count = 0;
        for (Map.Entry<Long, Transaction> entry : recentTransactions.entries()) {
            if (txn.getTimestamp() - entry.getKey() < 600000) { // 10 min
                count++;
            }
        }

        if (count >= 5) {
            out.collect(new Alert("Fraud detected"));
        }

        // Manually clean up old state
        Iterator<Map.Entry<Long, Transaction>> iter = recentTransactions.entries().iterator();
        while (iter.hasNext()) {
            Map.Entry<Long, Transaction> entry = iter.next();
            if (txn.getTimestamp() - entry.getKey() > 600000) {
                iter.remove();
            }
        }

        // 100+ more lines for complex patterns...
    }
}
```

### CEP Approach (Declarative, Maintainable)

```java theme={null}
// BETTER: Use CEP
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("txns")
    .where(t -> t.getAmount() > 100)
    .timesOrMore(5)
    .within(Time.minutes(10));

DataStream<Alert> alerts = CEP.pattern(transactions.keyBy(...), pattern)
    .select(pattern -> new Alert("Fraud detected"));

// Flink handles:
// - State management
// - Pattern matching
// - Cleanup
// - Event-time semantics
```

***

## Part 10: Performance Optimization

### State Size Management

```java theme={null}
// Use .within() to bound state
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(e -> e.getType().equals("login"))
    .followedBy("end")
    .where(e -> e.getType().equals("purchase"))
    .within(Time.hours(1));  // CRITICAL: State only kept for 1 hour

// Without .within(), state grows unbounded!
```

### Avoid Overly Greedy Patterns

```java theme={null}
// BAD: Greedy oneOrMore without bound
Pattern<Event, ?> bad = Pattern.<Event>begin("events")
    .where(e -> true)
    .oneOrMore().greedy()  // Matches ALL events!
    .within(Time.days(30));  // 30 days of state!

// GOOD: Bounded quantifier
Pattern<Event, ?> good = Pattern.<Event>begin("events")
    .where(e -> e.getType().equals("click"))
    .times(1, 100)  // Max 100 events
    .within(Time.hours(1));
```

### Use Consecutive When Possible

```java theme={null}
// .consecutive() is more efficient (doesn't buffer intermediate events)
Pattern<Event, ?> pattern = Pattern.<Event>begin("failures")
    .where(e -> e.getStatus().equals("failed"))
    .times(3).consecutive()  // Only matches consecutive failures
    .within(Time.minutes(5));
```

***

## Part 11: Testing CEP Patterns

```java theme={null}
public class CEPTest {
    @Test
    public void testFraudPattern() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Create test data
        DataStream<Transaction> testStream = env.fromElements(
            new Transaction("card1", 100, 1000L),
            new Transaction("card1", 200, 2000L),
            new Transaction("card1", 300, 3000L),
            new Transaction("card1", 400, 4000L),
            new Transaction("card1", 500, 5000L),
            new Transaction("card1", 600, 6000L)  // 6th txn should trigger
        );

        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("txns")
            .where(t -> t.getAmount() > 50)
            .timesOrMore(5)
            .within(Time.seconds(10));

        DataStream<String> result = CEP.pattern(testStream.keyBy(Transaction::getCardId), pattern)
            .select(p -> "FRAUD DETECTED");

        List<String> results = result.executeAndCollect(10);

        assertEquals(1, results.size());
        assertEquals("FRAUD DETECTED", results.get(0));
    }
}
```

***

## Part 12: Interview Questions

### Conceptual

**Q1: What is Complex Event Processing and when should you use it?**

**A**: CEP is pattern detection over event streams. Use it when you need to detect sequences or combinations of events (fraud patterns, user behavior, system anomalies) rather than processing events individually.

**Q2: Explain the difference between .next(), .followedBy(), and .followedByAny().**

**A**:

* `.next()`: Strict contiguity (no events in between)
* `.followedBy()`: Relaxed contiguity (other events allowed, single match)
* `.followedByAny()`: Non-deterministic relaxed (multiple matches possible)

**Q3: Why is the .within() clause important?**

**A**: Without `.within()`, state grows unbounded as Flink must keep all events indefinitely to potentially match the pattern. `.within()` bounds state by discarding events older than the time window.

**Q4: What's the difference between greedy and non-greedy quantifiers?**

**A**:

* **Greedy** (`.oneOrMore().greedy()`): Matches as many events as possible
* **Non-greedy** (`.oneOrMore()`): Stops at first valid match

### Coding

**Q: Implement a pattern to detect 3 consecutive failed logins within 5 minutes.**

```java theme={null}
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("failures")
    .where(event -> event.getStatus().equals(LoginStatus.FAILED))
    .times(3).consecutive()
    .within(Time.minutes(5));

PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginStream.keyBy(LoginEvent::getUserId),
    pattern
);

DataStream<Alert> alerts = patternStream.select(
    (Map<String, List<LoginEvent>> p) -> {
        LoginEvent first = p.get("failures").get(0);
        return new Alert("3 failed logins for user: " + first.getUserId());
    }
);
```

***

## Part 13: Hands-On Lab - Complete Fraud Detection System

```java theme={null}
public class ProductionFraudDetectionCEP {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Kafka source
        DataStream<Transaction> transactions = env
            .addSource(new FlinkKafkaConsumer<>(
                "transactions",
                new TransactionSchema(),
                kafkaProps
            ))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((txn, ts) -> txn.getTimestamp())
            );

        KeyedStream<Transaction, String> keyed = transactions.keyBy(Transaction::getCardId);

        // Pattern 1: Velocity fraud (>5 txns in 10 min)
        Pattern<Transaction, ?> velocityPattern = Pattern.<Transaction>begin("rapid_txns")
            .where(t -> t.getAmount() > 20)
            .timesOrMore(5)
            .within(Time.minutes(10));

        DataStream<FraudAlert> velocityAlerts = CEP.pattern(keyed, velocityPattern)
            .select(new VelocityFraudSelector());

        // Pattern 2: Declining pattern (3 declines → 1 approve)
        Pattern<Transaction, ?> decliningPattern = Pattern.<Transaction>begin("declines")
            .where(t -> t.getStatus() == Status.DECLINED)
            .times(3)
            .followedBy("approve")
            .where(t -> t.getStatus() == Status.APPROVED && t.getAmount() > 500)
            .within(Time.minutes(15));

        DataStream<FraudAlert> decliningAlerts = CEP.pattern(keyed, decliningPattern)
            .select(new DecliningPatternSelector());

        // Pattern 3: Geographic anomaly
        Pattern<Transaction, ?> geoPattern = Pattern.<Transaction>begin("first")
            .where(t -> true)
            .next("second")
            .where(new IterativeCondition<Transaction>() {
                @Override
                public boolean filter(Transaction current, Context<Transaction> ctx) {
                    Transaction first = ctx.getEventsForPattern("first").iterator().next();
                    double distance = calculateDistance(first.getLocation(), current.getLocation());
                    long timeDiff = current.getTimestamp() - first.getTimestamp();
                    double speedKmh = (distance / timeDiff) * 3600000; // km/h
                    return speedKmh > 800; // Impossible speed (flight required)
                }
            })
            .within(Time.hours(2));

        DataStream<FraudAlert> geoAlerts = CEP.pattern(keyed, geoPattern)
            .select(new GeoAnomalySelector());

        // Combine all alerts
        DataStream<FraudAlert> allAlerts = velocityAlerts
            .union(decliningAlerts)
            .union(geoAlerts);

        // Enrich with ML score
        DataStream<EnrichedAlert> enrichedAlerts = allAlerts
            .map(new MLEnrichmentFunction());

        // Sink to Kafka for downstream processing
        enrichedAlerts.addSink(new FlinkKafkaProducer<>(
            "fraud-alerts",
            new AlertSchema(),
            kafkaProps
        ));

        // Side output to monitoring system
        enrichedAlerts
            .filter(alert -> alert.getRiskScore() > 0.8)
            .addSink(new PagerDutySink());

        env.execute("Production Fraud Detection CEP");
    }

    private static class VelocityFraudSelector implements PatternSelectFunction<Transaction, FraudAlert> {
        @Override
        public FraudAlert select(Map<String, List<Transaction>> pattern) {
            List<Transaction> txns = pattern.get("rapid_txns");
            double totalAmount = txns.stream().mapToDouble(Transaction::getAmount).sum();

            return new FraudAlert(
                txns.get(0).getCardId(),
                FraudType.VELOCITY,
                txns.size(),
                totalAmount,
                txns.get(0).getTimestamp(),
                txns.get(txns.size() - 1).getTimestamp()
            );
        }
    }

    private static class DecliningPatternSelector implements PatternSelectFunction<Transaction, FraudAlert> {
        @Override
        public FraudAlert select(Map<String, List<Transaction>> pattern) {
            List<Transaction> declines = pattern.get("declines");
            Transaction approve = pattern.get("approve").get(0);

            return new FraudAlert(
                approve.getCardId(),
                FraudType.CREDENTIAL_TESTING,
                declines.size() + 1,
                approve.getAmount(),
                declines.get(0).getTimestamp(),
                approve.getTimestamp()
            );
        }
    }
}
```

***

## Summary

### What You've Mastered

✅ CEP fundamentals and pattern API
✅ Pattern building blocks (conditions, quantifiers, contiguity)
✅ Time constraints and skip strategies
✅ Real-world fraud detection patterns
✅ Advanced techniques (groups, timeouts, flatselect)
✅ Performance optimization
✅ Production deployment patterns

### Key Takeaways

1. **CEP Simplifies Complex Logic**: What takes 200 lines manually is 10 lines with CEP
2. **Always Use .within()**: Unbounded state is dangerous in production
3. **Choose Contiguity Wisely**: .next() vs .followedBy() vs .followedByAny()
4. **Test Patterns Thoroughly**: Use unit tests with synthetic data
5. **Monitor State Size**: CEP patterns can consume significant state

***

## Next Module

<Card title="Module 8: Flink Operations & Production Deployment" icon="server" href="/distributed-systems-tools/flink-operations">
  Kubernetes deployment, HA, monitoring, and production best practices
</Card>

***

## Resources

### Documentation

* [Flink CEP](https://nightlies.apache.org/flink/flink-docs-stable/docs/libs/cep/)
* [Pattern API](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/cep/pattern/Pattern.html)

### Papers

* ["Efficient Pattern Matching over Event Streams"](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf)

<Info>
  **Practice**: Build a comprehensive fraud detection system using 5+ CEP patterns. Test with synthetic transaction data. Deploy to Flink cluster and monitor pattern matching metrics!
</Info>
