Complex Event Processing: Pattern Detection at Scale
Module Duration : 6-7 hours
Focus : Pattern API, sequences, quantifiers, conditions, fraud detection, system monitoring
Prerequisites : Flink DataStream API, state management, event-time processing
Hands-on Labs : 12+ CEP applications
Introduction: What is Complex Event Processing?
The Problem CEP Solves
Scenario : Detect credit card fraud by identifying patterns like:
“3 declining transactions from different merchants within 10 minutes, followed by 1 approved high-value transaction”
Traditional Approach (painful):
// Maintain complex state manually
class FraudDetector extends KeyedProcessFunction < String , Transaction , Alert > {
private ListState < Transaction > recentTransactions ;
private MapState < Long , Timer > timers ;
@ Override
public void processElement ( Transaction txn , Context ctx , Collector < Alert > out ) {
// Manually track declines
// Manually detect pattern
// Manually clean up state
// 100+ lines of error-prone code
}
}
CEP Approach (elegant):
Pattern < Transaction , ? > fraudPattern = Pattern
. < Transaction > begin ( "declines" )
. where (t -> t . status == Status . DECLINED )
. times ( 3 )
. next ( "approved" )
. where (t -> t . status == Status . APPROVED && t . amount > 500 )
. within ( Time . minutes ( 10 ));
// Flink handles state, cleanup, matching automatically!
Real-World Use Cases
Domain Pattern Business Value Finance Fraud detection (velocity, location, amount patterns) Save $M in fraud losses E-Commerce Cart abandonment, purchase funnel drop-off 20-30% conversion increase Cybersecurity Intrusion detection (failed logins → success) Prevent data breaches IoT Equipment failure prediction (temp spike → vibration → failure) Reduce downtime 40% Trading Market anomalies (price spikes, order patterns) Millisecond advantage
Part 1: CEP Fundamentals
The Pattern API
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
// 1. Define input stream
DataStream < Event > input = ...;
// 2. Define pattern
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "start" )
. where ( new SimpleCondition < Event >() {
@ Override
public boolean filter ( Event event ) {
return event . getName (). equals ( "login" );
}
});
// 3. Apply pattern to stream
PatternStream < Event > patternStream = CEP . pattern ( input . keyBy ( "userId" ), pattern);
// 4. Select matches
DataStream < Alert > alerts = patternStream . select ( new PatternSelectFunction < Event , Alert >() {
@ Override
public Alert select ( Map < String , List < Event >> pattern ) {
Event loginEvent = pattern . get ( "start" ). get ( 0 );
return new Alert ( "User logged in: " + loginEvent . getUserId ());
}
});
Pattern Building Blocks
┌─────────────────────────────────────────────────┐
│ Flink CEP Components │
├─────────────────────────────────────────────────┤
│ 1. Patterns - Sequence definitions │
│ 2. Conditions - Event matching rules │
│ 3. Quantifiers - Occurrence counts │
│ 4. Contiguity - Strictness levels │
│ 5. Time Constraints - Within clauses │
└─────────────────────────────────────────────────┘
Part 2: Pattern Basics - Individual Patterns
Simple Pattern
// Match single event type
Pattern < LoginEvent , ? > pattern = Pattern. < LoginEvent > begin ( "login" )
. where (event -> event . getStatus (). equals ( "success" ));
Chaining Patterns (next, followedBy, followedByAny)
1. Strict Contiguity (.next)
Only immediate successor, no events in between .
// Login immediately followed by purchase (strict)
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "login" )
. where (e -> e . getType (). equals ( "login" ))
. next ( "purchase" )
. where (e -> e . getType (). equals ( "purchase" ));
// Matches: login → purchase ✓
// Not matches: login → browse → purchase ✗
2. Relaxed Contiguity (.followedBy)
Allows other events in between, but maintains order .
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "login" )
. where (e -> e . getType (). equals ( "login" ))
. followedBy ( "purchase" )
. where (e -> e . getType (). equals ( "purchase" ));
// Matches: login → browse → purchase ✓
// Matches: login → purchase ✓
// Not matches: purchase → login ✗ (wrong order)
3. Non-Deterministic Relaxed (.followedByAny)
Allows multiple matches with different intermediate events .
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "start" )
. where (e -> e . getName (). equals ( "a" ))
. followedByAny ( "middle" )
. where (e -> e . getName (). equals ( "b" ))
. followedByAny ( "end" )
. where (e -> e . getName (). equals ( "c" ));
// Input: a → b → d → b → c
// Matches:
// - a → b(1st) → c ✓
// - a → b(2nd) → c ✓
Part 3: Quantifiers - Specifying Occurrences
Fixed Count
// Exactly 3 failed login attempts
Pattern < LoginEvent , ? > pattern = Pattern. < LoginEvent > begin ( "failures" )
. where (e -> e . getStatus (). equals ( "failed" ))
. times ( 3 );
Range
// Between 2 and 5 purchase events
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "purchases" )
. where (e -> e . getType (). equals ( "purchase" ))
. times ( 2 , 5 );
One or More (.oneOrMore)
// At least one page view
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "views" )
. where (e -> e . getType (). equals ( "page_view" ))
. oneOrMore ();
Zero or More (.timesOrMore)
// Optional: Zero or more add-to-cart events
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "optional" )
. where (e -> e . getType (). equals ( "add_to_cart" ))
. timesOrMore ( 0 );
Greedy Quantifiers (.greedy)
// Match as many as possible (greedy)
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "events" )
. where (e -> e . getType (). equals ( "click" ))
. oneOrMore (). greedy ();
// Input: click → click → click → purchase
// Non-greedy: Matches [click] (stops at first valid match)
// Greedy: Matches [click, click, click] (maximum match)
Part 4: Conditions - Advanced Filtering
Simple Condition
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "high-value" )
. where ( new SimpleCondition < Event >() {
@ Override
public boolean filter ( Event event ) {
return event . getAmount () > 1000 ;
}
});
// Lambda shorthand
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "high-value" )
. where (event -> event . getAmount () > 1000 );
Iterative Condition (Access Previous Events)
// Detect increasing transaction amounts
Pattern < Transaction , ? > pattern = Pattern. < Transaction > begin ( "first" )
. where (t -> t . getAmount () > 100 )
. next ( "second" )
. where ( new IterativeCondition < Transaction >() {
@ Override
public boolean filter ( Transaction current , Context < Transaction > ctx ) throws Exception {
// Access previous events in pattern
double firstAmount = 0 ;
for ( Transaction t : ctx . getEventsForPattern ( "first" )) {
firstAmount = t . getAmount ();
}
return current . getAmount () > firstAmount * 1.5 ; // 50% increase
}
});
Subtype Condition
// Match specific event subtype
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "payment" )
. subtype ( PaymentEvent . class ) // Only PaymentEvent instances
. where (event -> event . getMethod (). equals ( "credit_card" ));
Combining Conditions (.or, .where + .or)
// High amount OR international transaction
Pattern < Transaction , ? > pattern = Pattern. < Transaction > begin ( "suspicious" )
. where (t -> t . getAmount () > 5000 )
. or (t -> ! t . getCountry (). equals ( "US" ));
Until Condition (.until)
// Collect page views until checkout
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "browsing" )
. where (e -> e . getType (). equals ( "page_view" ))
. oneOrMore ()
. until (e -> e . getType (). equals ( "checkout" ));
Part 5: Time Constraints
Within (Time Window)
// Pattern must complete within 10 minutes
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "start" )
. where (e -> e . getType (). equals ( "login" ))
. followedBy ( "purchase" )
. where (e -> e . getType (). equals ( "purchase" ))
. within ( Time . minutes ( 10 ));
// If purchase happens after 10 min from login, pattern doesn't match
After Match Skip Strategy
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
// Control what happens after pattern match
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "start" , AfterMatchSkipStrategy . skipPastLastEvent ())
. where (e -> e . getType (). equals ( "a" ))
. next ( "end" )
. where (e -> e . getType (). equals ( "b" ));
// Input: a → b → a → b
// skipPastLastEvent(): Matches [a→b], then starts fresh at next 'a'
// Result: 2 matches
// noSkip(): Overlapping matches allowed
// Result: 3 matches (a1→b1, b1→a2→b2, a2→b2)
Skip Strategies :
AfterMatchSkipStrategy . noSkip () // All overlapping matches
AfterMatchSkipStrategy . skipPastLastEvent () // Skip to after last matched event
AfterMatchSkipStrategy . skipToFirst ( "pattern_name" ) // Skip to first event of named pattern
AfterMatchSkipStrategy . skipToLast ( "pattern_name" ) // Skip to last event of named pattern
Part 6: Real-World Patterns
Pattern 1: Fraud Detection (Velocity Check)
// Detect: >5 transactions totaling >$10K within 1 hour
public class FraudDetectionCEP {
public static void main ( String [] args ) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
DataStream < Transaction > transactions = env
. addSource ( new KafkaSource <>(...))
. assignTimestampsAndWatermarks (...);
// Pattern: High-velocity transactions
Pattern < Transaction , ? > fraudPattern = Pattern. < Transaction > begin ( "txns" )
. where (t -> t . getAmount () > 100 )
. timesOrMore ( 5 )
. within ( Time . hours ( 1 ));
PatternStream < Transaction > patternStream = CEP . pattern (
transactions . keyBy (Transaction :: getCardId),
fraudPattern
);
DataStream < FraudAlert > alerts = patternStream . select (
new PatternSelectFunction < Transaction , FraudAlert >() {
@ Override
public FraudAlert select ( Map < String , List < Transaction >> pattern ) {
List < Transaction > txns = pattern . get ( "txns" );
double total = txns . stream (). mapToDouble (Transaction :: getAmount). sum ();
if ( txns . size () > 5 && total > 10000 ) {
return new FraudAlert (
txns . get ( 0 ). getCardId (),
txns . size (),
total,
"High velocity fraud"
);
}
return null ;
}
}
). filter (Objects :: nonNull);
alerts . addSink ( new AlertingSink ());
env . execute ( "Fraud Detection CEP" );
}
}
Pattern 2: Login Intrusion Detection
// Detect: 3 failed logins followed by 1 success (credential stuffing)
Pattern < LoginEvent , ? > intrusionPattern = Pattern. < LoginEvent > begin ( "failures" )
. where (event -> event . getStatus (). equals ( LoginStatus . FAILED ))
. times ( 3 ). consecutive () // 3 consecutive failures
. followedBy ( "success" )
. where (event -> event . getStatus (). equals ( LoginStatus . SUCCESS ))
. within ( Time . minutes ( 5 ));
PatternStream < LoginEvent > patternStream = CEP . pattern (
loginStream . keyBy (LoginEvent :: getUserId),
intrusionPattern
);
DataStream < SecurityAlert > alerts = patternStream . select (
( Map < String, List < LoginEvent >> pattern) -> {
List < LoginEvent > failures = pattern . get ( "failures" );
LoginEvent success = pattern . get ( "success" ). get ( 0 );
return new SecurityAlert (
success . getUserId (),
"Potential credential stuffing attack" ,
failures . size (),
success . getTimestamp ()
);
}
);
Pattern 3: E-Commerce Funnel Analysis
// Detect abandoned carts: view → add_to_cart → (no checkout within 1 hour)
Pattern < Event , ? > abandonmentPattern = Pattern. < Event > begin ( "view" )
. where (e -> e . getType (). equals ( EventType . PRODUCT_VIEW ))
. followedBy ( "add_cart" )
. where (e -> e . getType (). equals ( EventType . ADD_TO_CART ))
. notFollowedBy ( "checkout" )
. where (e -> e . getType (). equals ( EventType . CHECKOUT ))
. within ( Time . hours ( 1 ));
// This uses "NOT" pattern - checkout must NOT occur within 1 hour
Pattern 4: IoT Equipment Failure Prediction
// Detect equipment failure: temp_spike → vibration_increase → (failure within 30 min)
Pattern < SensorEvent , ? > failurePattern = Pattern. < SensorEvent > begin ( "temp_spike" )
. where (e -> e . getSensorType (). equals ( "temperature" ) && e . getValue () > 80 )
. followedBy ( "vibration" )
. where (e -> e . getSensorType (). equals ( "vibration" ) && e . getValue () > 5 )
. followedBy ( "failure" )
. where (e -> e . getSensorType (). equals ( "status" ) && e . getValue () == 0 )
. within ( Time . minutes ( 30 ));
PatternStream < SensorEvent > patternStream = CEP . pattern (
sensorStream . keyBy (SensorEvent :: getMachineId),
failurePattern
);
DataStream < MaintenanceAlert > alerts = patternStream . select (
( Map < String, List < SensorEvent >> pattern) -> {
SensorEvent tempSpike = pattern . get ( "temp_spike" ). get ( 0 );
SensorEvent vibration = pattern . get ( "vibration" ). get ( 0 );
SensorEvent failure = pattern . get ( "failure" ). get ( 0 );
return new MaintenanceAlert (
tempSpike . getMachineId (),
"Equipment failure predicted" ,
tempSpike . getTimestamp (),
failure . getTimestamp ()
);
}
);
Part 7: Advanced CEP Techniques
Pattern Groups (Combining Multiple Patterns)
// Detect: (login from US OR login from EU) followed by purchase
Pattern < Event , ? > loginUS = Pattern. < Event > begin ( "us_login" )
. where (e -> e . getType (). equals ( "login" ) && e . getCountry (). equals ( "US" ));
Pattern < Event , ? > loginEU = Pattern. < Event > begin ( "eu_login" )
. where (e -> e . getType (). equals ( "login" ) && e . getCountry (). equals ( "EU" ));
Pattern < Event , ? > combinedPattern = Pattern . begin (
Pattern . begin ( "login" ). where (e -> e . getType (). equals ( "login" ))
)
. followedBy ( "purchase" )
. where (e -> e . getType (). equals ( "purchase" ));
Timeout Handling (Partial Matches)
// Detect partial patterns that timed out
OutputTag < String > timeoutTag = new OutputTag < String >( "timeout" ){};
PatternStream < Event > patternStream = CEP . pattern (input, pattern);
SingleOutputStreamOperator < Alert > result = patternStream . select (
timeoutTag,
// Timeout handler (pattern didn't complete)
new PatternTimeoutFunction < Event , String >() {
@ Override
public String timeout ( Map < String , List < Event >> pattern , long timestamp ) {
return "Pattern timed out for user: " + pattern . get ( "start" ). get ( 0 ). getUserId ();
}
},
// Match handler (pattern completed)
new PatternSelectFunction < Event , Alert >() {
@ Override
public Alert select ( Map < String , List < Event >> pattern ) {
return new Alert ( "Pattern matched!" );
}
}
);
// Access timed-out patterns
DataStream < String > timeouts = result . getSideOutput (timeoutTag);
timeouts . print ( "TIMEOUTS" );
Flatselect (Multiple Outputs per Match)
// Emit multiple alerts per match
DataStream < Alert > alerts = patternStream . flatSelect (
new PatternFlatSelectFunction < Transaction , Alert >() {
@ Override
public void flatSelect ( Map < String , List < Transaction >> pattern , Collector < Alert > out ) {
List < Transaction > txns = pattern . get ( "txns" );
// Emit multiple alerts
out . collect ( new Alert ( "Velocity fraud" , txns . size ()));
out . collect ( new Alert ( "Amount fraud" , totalAmount (txns)));
if ( hasInternationalTxn (txns)) {
out . collect ( new Alert ( "International fraud" ));
}
}
}
);
Part 8: Production Patterns
Pattern 1: Multi-Stage Fraud Detection
public class ComprehensiveFraudDetection {
public static void main ( String [] args ) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
DataStream < Transaction > transactions = env . addSource (...);
// Stage 1: Velocity fraud
Pattern < Transaction , ? > velocityPattern = Pattern. < Transaction > begin ( "rapid" )
. where (t -> t . getAmount () > 50 )
. timesOrMore ( 5 )
. within ( Time . minutes ( 10 ));
// Stage 2: Location fraud (multiple countries)
Pattern < Transaction , ? > locationPattern = Pattern. < Transaction > begin ( "first" )
. where (t -> t . getCountry (). equals ( "US" ))
. next ( "second" )
. where ( new IterativeCondition < Transaction >() {
@ Override
public boolean filter ( Transaction current , Context < Transaction > ctx ) {
String firstCountry = ctx . getEventsForPattern ( "first" ). iterator (). next (). getCountry ();
return ! current . getCountry (). equals (firstCountry);
}
})
. within ( Time . hours ( 1 ));
// Stage 3: Amount spike
Pattern < Transaction , ? > spikePattern = Pattern. < Transaction > begin ( "baseline" )
. where (t -> t . getAmount () < 200 )
. times ( 3 )
. followedBy ( "spike" )
. where (t -> t . getAmount () > 2000 )
. within ( Time . hours ( 6 ));
// Apply all patterns
KeyedStream < Transaction , String > keyed = transactions . keyBy (Transaction :: getCardId);
DataStream < FraudAlert > velocityAlerts = CEP . pattern (keyed, velocityPattern)
. select (pattern -> createAlert (pattern, "VELOCITY_FRAUD" ));
DataStream < FraudAlert > locationAlerts = CEP . pattern (keyed, locationPattern)
. select (pattern -> createAlert (pattern, "LOCATION_FRAUD" ));
DataStream < FraudAlert > spikeAlerts = CEP . pattern (keyed, spikePattern)
. select (pattern -> createAlert (pattern, "AMOUNT_SPIKE" ));
// Combine alerts
DataStream < FraudAlert > allAlerts = velocityAlerts
. union (locationAlerts)
. union (spikeAlerts);
allAlerts . addSink ( new KafkaSink <>(...));
env . execute ( "Comprehensive Fraud Detection" );
}
}
Pattern 2: System Health Monitoring
// Detect cascading failures in microservices
Pattern < LogEvent , ? > cascadePattern = Pattern. < LogEvent > begin ( "error_start" )
. where (e -> e . getLevel (). equals ( "ERROR" ) && e . getService (). equals ( "api-gateway" ))
. followedBy ( "service_errors" )
. where (e -> e . getLevel (). equals ( "ERROR" ))
. oneOrMore ()
. within ( Time . minutes ( 5 ));
PatternStream < LogEvent > patternStream = CEP . pattern (
logStream . keyBy (LogEvent :: getTraceId),
cascadePattern
);
DataStream < Alert > cascadeAlerts = patternStream . select (
( Map < String, List < LogEvent >> pattern) -> {
LogEvent gatewayError = pattern . get ( "error_start" ). get ( 0 );
List < LogEvent > serviceErrors = pattern . get ( "service_errors" );
Set < String > affectedServices = serviceErrors . stream ()
. map (LogEvent :: getService)
. collect ( Collectors . toSet ());
return new Alert (
"Cascading failure detected" ,
gatewayError . getTraceId (),
affectedServices,
serviceErrors . size ()
);
}
);
Part 9: CEP vs Manual State Management
Manual Approach (Complex, Error-Prone)
// DON'T DO THIS: Manual pattern detection
public class ManualFraudDetector extends KeyedProcessFunction < String , Transaction , Alert > {
private MapState < Long , Transaction > recentTransactions ;
private ValueState < Integer > failureCount ;
@ Override
public void processElement ( Transaction txn , Context ctx , Collector < Alert > out ) throws Exception {
// Manually track transactions
recentTransactions . put ( txn . getTimestamp (), txn);
// Manually check pattern
int count = 0 ;
for ( Map . Entry < Long , Transaction > entry : recentTransactions . entries ()) {
if ( txn . getTimestamp () - entry . getKey () < 600000 ) { // 10 min
count ++ ;
}
}
if (count >= 5 ) {
out . collect ( new Alert ( "Fraud detected" ));
}
// Manually clean up old state
Iterator < Map . Entry < Long , Transaction >> iter = recentTransactions . entries (). iterator ();
while ( iter . hasNext ()) {
Map . Entry < Long , Transaction > entry = iter . next ();
if ( txn . getTimestamp () - entry . getKey () > 600000 ) {
iter . remove ();
}
}
// 100+ more lines for complex patterns...
}
}
CEP Approach (Declarative, Maintainable)
// BETTER: Use CEP
Pattern < Transaction , ? > pattern = Pattern. < Transaction > begin ( "txns" )
. where (t -> t . getAmount () > 100 )
. timesOrMore ( 5 )
. within ( Time . minutes ( 10 ));
DataStream < Alert > alerts = CEP . pattern ( transactions . keyBy (...), pattern)
. select (pattern -> new Alert ( "Fraud detected" ));
// Flink handles:
// - State management
// - Pattern matching
// - Cleanup
// - Event-time semantics
State Size Management
// Use .within() to bound state
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "start" )
. where (e -> e . getType (). equals ( "login" ))
. followedBy ( "end" )
. where (e -> e . getType (). equals ( "purchase" ))
. within ( Time . hours ( 1 )); // CRITICAL: State only kept for 1 hour
// Without .within(), state grows unbounded!
Avoid Overly Greedy Patterns
// BAD: Greedy oneOrMore without bound
Pattern < Event , ? > bad = Pattern. < Event > begin ( "events" )
. where (e -> true )
. oneOrMore (). greedy () // Matches ALL events!
. within ( Time . days ( 30 )); // 30 days of state!
// GOOD: Bounded quantifier
Pattern < Event , ? > good = Pattern. < Event > begin ( "events" )
. where (e -> e . getType (). equals ( "click" ))
. times ( 1 , 100 ) // Max 100 events
. within ( Time . hours ( 1 ));
Use Consecutive When Possible
// .consecutive() is more efficient (doesn't buffer intermediate events)
Pattern < Event , ? > pattern = Pattern. < Event > begin ( "failures" )
. where (e -> e . getStatus (). equals ( "failed" ))
. times ( 3 ). consecutive () // Only matches consecutive failures
. within ( Time . minutes ( 5 ));
Part 11: Testing CEP Patterns
public class CEPTest {
@ Test
public void testFraudPattern () throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
env . setParallelism ( 1 );
// Create test data
DataStream < Transaction > testStream = env . fromElements (
new Transaction ( "card1" , 100 , 1000L ),
new Transaction ( "card1" , 200 , 2000L ),
new Transaction ( "card1" , 300 , 3000L ),
new Transaction ( "card1" , 400 , 4000L ),
new Transaction ( "card1" , 500 , 5000L ),
new Transaction ( "card1" , 600 , 6000L ) // 6th txn should trigger
);
Pattern < Transaction , ? > pattern = Pattern. < Transaction > begin ( "txns" )
. where (t -> t . getAmount () > 50 )
. timesOrMore ( 5 )
. within ( Time . seconds ( 10 ));
DataStream < String > result = CEP . pattern ( testStream . keyBy (Transaction :: getCardId), pattern)
. select (p -> "FRAUD DETECTED" );
List < String > results = result . executeAndCollect ( 10 );
assertEquals ( 1 , results . size ());
assertEquals ( "FRAUD DETECTED" , results . get ( 0 ));
}
}
Part 12: Interview Questions
Conceptual
Q1: What is Complex Event Processing and when should you use it?
A : CEP is pattern detection over event streams. Use it when you need to detect sequences or combinations of events (fraud patterns, user behavior, system anomalies) rather than processing events individually.
Q2: Explain the difference between .next(), .followedBy(), and .followedByAny().
A :
.next(): Strict contiguity (no events in between)
.followedBy(): Relaxed contiguity (other events allowed, single match)
.followedByAny(): Non-deterministic relaxed (multiple matches possible)
Q3: Why is the .within() clause important?
A : Without .within(), state grows unbounded as Flink must keep all events indefinitely to potentially match the pattern. .within() bounds state by discarding events older than the time window.
Q4: What’s the difference between greedy and non-greedy quantifiers?
A :
Greedy (.oneOrMore().greedy()): Matches as many events as possible
Non-greedy (.oneOrMore()): Stops at first valid match
Coding
Q: Implement a pattern to detect 3 consecutive failed logins within 5 minutes.
Pattern < LoginEvent , ? > pattern = Pattern. < LoginEvent > begin ( "failures" )
. where (event -> event . getStatus (). equals ( LoginStatus . FAILED ))
. times ( 3 ). consecutive ()
. within ( Time . minutes ( 5 ));
PatternStream < LoginEvent > patternStream = CEP . pattern (
loginStream . keyBy (LoginEvent :: getUserId),
pattern
);
DataStream < Alert > alerts = patternStream . select (
( Map < String, List < LoginEvent >> p) -> {
LoginEvent first = p . get ( "failures" ). get ( 0 );
return new Alert ( "3 failed logins for user: " + first . getUserId ());
}
);
Part 13: Hands-On Lab - Complete Fraud Detection System
public class ProductionFraudDetectionCEP {
public static void main ( String [] args ) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
env . setStreamTimeCharacteristic ( TimeCharacteristic . EventTime );
// Kafka source
DataStream < Transaction > transactions = env
. addSource ( new FlinkKafkaConsumer <>(
"transactions" ,
new TransactionSchema (),
kafkaProps
))
. assignTimestampsAndWatermarks (
WatermarkStrategy
. < Transaction > forBoundedOutOfOrderness ( Duration . ofSeconds ( 10 ))
. withTimestampAssigner ((txn, ts) -> txn . getTimestamp ())
);
KeyedStream < Transaction , String > keyed = transactions . keyBy (Transaction :: getCardId);
// Pattern 1: Velocity fraud (>5 txns in 10 min)
Pattern < Transaction , ? > velocityPattern = Pattern. < Transaction > begin ( "rapid_txns" )
. where (t -> t . getAmount () > 20 )
. timesOrMore ( 5 )
. within ( Time . minutes ( 10 ));
DataStream < FraudAlert > velocityAlerts = CEP . pattern (keyed, velocityPattern)
. select ( new VelocityFraudSelector ());
// Pattern 2: Declining pattern (3 declines → 1 approve)
Pattern < Transaction , ? > decliningPattern = Pattern. < Transaction > begin ( "declines" )
. where (t -> t . getStatus () == Status . DECLINED )
. times ( 3 )
. followedBy ( "approve" )
. where (t -> t . getStatus () == Status . APPROVED && t . getAmount () > 500 )
. within ( Time . minutes ( 15 ));
DataStream < FraudAlert > decliningAlerts = CEP . pattern (keyed, decliningPattern)
. select ( new DecliningPatternSelector ());
// Pattern 3: Geographic anomaly
Pattern < Transaction , ? > geoPattern = Pattern. < Transaction > begin ( "first" )
. where (t -> true )
. next ( "second" )
. where ( new IterativeCondition < Transaction >() {
@ Override
public boolean filter ( Transaction current , Context < Transaction > ctx ) {
Transaction first = ctx . getEventsForPattern ( "first" ). iterator (). next ();
double distance = calculateDistance ( first . getLocation (), current . getLocation ());
long timeDiff = current . getTimestamp () - first . getTimestamp ();
double speedKmh = (distance / timeDiff) * 3600000 ; // km/h
return speedKmh > 800 ; // Impossible speed (flight required)
}
})
. within ( Time . hours ( 2 ));
DataStream < FraudAlert > geoAlerts = CEP . pattern (keyed, geoPattern)
. select ( new GeoAnomalySelector ());
// Combine all alerts
DataStream < FraudAlert > allAlerts = velocityAlerts
. union (decliningAlerts)
. union (geoAlerts);
// Enrich with ML score
DataStream < EnrichedAlert > enrichedAlerts = allAlerts
. map ( new MLEnrichmentFunction ());
// Sink to Kafka for downstream processing
enrichedAlerts . addSink ( new FlinkKafkaProducer <>(
"fraud-alerts" ,
new AlertSchema (),
kafkaProps
));
// Side output to monitoring system
enrichedAlerts
. filter (alert -> alert . getRiskScore () > 0.8 )
. addSink ( new PagerDutySink ());
env . execute ( "Production Fraud Detection CEP" );
}
private static class VelocityFraudSelector implements PatternSelectFunction < Transaction , FraudAlert > {
@ Override
public FraudAlert select ( Map < String , List < Transaction >> pattern ) {
List < Transaction > txns = pattern . get ( "rapid_txns" );
double totalAmount = txns . stream (). mapToDouble (Transaction :: getAmount). sum ();
return new FraudAlert (
txns . get ( 0 ). getCardId (),
FraudType . VELOCITY ,
txns . size (),
totalAmount,
txns . get ( 0 ). getTimestamp (),
txns . get ( txns . size () - 1 ). getTimestamp ()
);
}
}
private static class DecliningPatternSelector implements PatternSelectFunction < Transaction , FraudAlert > {
@ Override
public FraudAlert select ( Map < String , List < Transaction >> pattern ) {
List < Transaction > declines = pattern . get ( "declines" );
Transaction approve = pattern . get ( "approve" ). get ( 0 );
return new FraudAlert (
approve . getCardId (),
FraudType . CREDENTIAL_TESTING ,
declines . size () + 1 ,
approve . getAmount (),
declines . get ( 0 ). getTimestamp (),
approve . getTimestamp ()
);
}
}
}
Summary
What You’ve Mastered
✅ CEP fundamentals and pattern API
✅ Pattern building blocks (conditions, quantifiers, contiguity)
✅ Time constraints and skip strategies
✅ Real-world fraud detection patterns
✅ Advanced techniques (groups, timeouts, flatselect)
✅ Performance optimization
✅ Production deployment patterns
Key Takeaways
CEP Simplifies Complex Logic : What takes 200 lines manually is 10 lines with CEP
Always Use .within() : Unbounded state is dangerous in production
Choose Contiguity Wisely : .next() vs .followedBy() vs .followedByAny()
Test Patterns Thoroughly : Use unit tests with synthetic data
Monitor State Size : CEP patterns can consume significant state
Next Module
Module 8: Flink Operations & Production Deployment Kubernetes deployment, HA, monitoring, and production best practices
Resources
Documentation
Papers
Practice : Build a comprehensive fraud detection system using 5+ CEP patterns. Test with synthetic transaction data. Deploy to Flink cluster and monitor pattern matching metrics!