Capstone: Production-Grade Real-Time Fraud Detection System
Project Duration: 12-15 hours
Difficulty: Advanced
Technologies: Flink, Kafka, CEP, ML, RocksDB, Kubernetes, Prometheus
Deliverable: Complete production-ready fraud detection pipeline
Project Overview
What You’ll Build
A real-time fraud detection system that processes millions of transactions per day and detects fraudulent activity using:- Multiple Detection Strategies: Velocity, location, amount spikes, behavioral patterns
- Complex Event Processing: Pattern matching across transaction sequences
- Machine Learning: Real-time feature extraction and model scoring
- State Management: Petabyte-scale state with RocksDB
- Production Deployment: Kubernetes, HA, monitoring, alerting
System Architecture
Copy
┌───────────────────────────────────────────────────────────────┐
│ Fraud Detection Pipeline │
├───────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌──────────────────┐ ┌────────────┐ │
│ │ Kafka │─────►│ Flink Pipeline │─────►│ Alerts │ │
│ │ (Source)│ │ │ │ (Kafka) │ │
│ └─────────┘ │ 1. Enrichment │ └────────────┘ │
│ │ 2. Feature Eng │ │ │
│ ┌─────────┐ │ 3. CEP Patterns │ ┌────────────┐ │
│ │ MySQL │◄────►│ 4. ML Scoring │─────►│ MySQL │ │
│ │(Lookup) │ │ 5. Alerting │ │ (Alerts) │ │
│ └─────────┘ └──────────────────┘ └────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ State (RocksDB) │ │
│ │ - User Profiles │ │
│ │ - Tx History │ │
│ │ - ML Features │ │
│ └───────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Monitoring: Prometheus + Grafana │ │
│ │ - Throughput, latency, state size │ │
│ │ - Checkpoint metrics, backpressure │ │
│ │ - Alert SLAs, fraud detection rate │ │
│ └──────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────┘
Part 1: Project Setup
Prerequisites
Copy
# Required tools
- Docker & Docker Compose
- Kubernetes (minikube or cloud)
- kubectl
- Maven 3.8+
- Java 11+
- Python 3.9+ (for ML model serving)
Project Structure
Copy
fraud-detection-flink/
├── pom.xml
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/frauddetection/
│ │ │ ├── FraudDetectionJob.java
│ │ │ ├── models/
│ │ │ │ ├── Transaction.java
│ │ │ │ ├── FraudAlert.java
│ │ │ │ └── UserProfile.java
│ │ │ ├── sources/
│ │ │ │ └── TransactionGenerator.java
│ │ │ ├── enrichment/
│ │ │ │ └── UserProfileEnrichment.java
│ │ │ ├── features/
│ │ │ │ └── FeatureExtractor.java
│ │ │ ├── cep/
│ │ │ │ ├── VelocityFraudDetector.java
│ │ │ │ ├── LocationAnomalyDetector.java
│ │ │ │ └── DecliningPatternDetector.java
│ │ │ ├── ml/
│ │ │ │ └── MLScoringFunction.java
│ │ │ └── sinks/
│ │ │ └── AlertSink.java
│ │ └── resources/
│ │ └── flink-conf.yaml
│ └── test/
│ └── java/
│ └── com/frauddetection/
│ └── FraudDetectionTest.java
├── k8s/
│ ├── flink-cluster.yaml
│ ├── kafka.yaml
│ └── monitoring.yaml
├── docker-compose.yml
└── README.md
Maven Dependencies (pom.xml)
Copy
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<groupId>com.frauddetection</groupId>
<artifactId>fraud-detection-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.18.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<java.version>11</java.version>
<kafka.version>3.4.0</kafka.version>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CEP -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink JDBC -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<!-- RocksDB State Backend -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
<scope>provided</scope>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.frauddetection.FraudDetectionJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Part 2: Data Models
Transaction Model
Copy
package com.frauddetection.models;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Transaction implements Serializable {
private String transactionId;
private String cardId;
private String userId;
private double amount;
private String merchantId;
private String merchantCategory;
private String country;
private String city;
private double latitude;
private double longitude;
private String status; // APPROVED, DECLINED
private long timestamp;
private String deviceId;
private String ipAddress;
// For JSON serialization
public static class Schema extends org.apache.flink.api.common.serialization.DeserializationSchema<Transaction>,
org.apache.flink.api.common.serialization.SerializationSchema<Transaction> {
private final com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
@Override
public Transaction deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Transaction.class);
}
@Override
public byte[] serialize(Transaction element) {
try {
return objectMapper.writeValueAsBytes(element);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize transaction", e);
}
}
@Override
public TypeInformation<Transaction> getProducedType() {
return TypeInformation.of(Transaction.class);
}
}
}
User Profile Model
Copy
package com.frauddetection.models;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserProfile implements Serializable {
private String userId;
private String cardId;
private double avgTransactionAmount;
private double stdDevAmount;
private int totalTransactions;
private String homeCountry;
private String homeCity;
private long lastTransactionTime;
private boolean isHighRiskUser;
public void updateWithTransaction(Transaction txn) {
// Update running statistics
double oldAvg = this.avgTransactionAmount;
this.totalTransactions++;
// Incremental mean
this.avgTransactionAmount = oldAvg + (txn.getAmount() - oldAvg) / this.totalTransactions;
// Incremental standard deviation (Welford's method)
double delta = txn.getAmount() - oldAvg;
double delta2 = txn.getAmount() - this.avgTransactionAmount;
this.stdDevAmount = Math.sqrt(
(Math.pow(this.stdDevAmount, 2) * (this.totalTransactions - 1) + delta * delta2) / this.totalTransactions
);
this.lastTransactionTime = txn.getTimestamp();
}
}
Fraud Alert Model
Copy
package com.frauddetection.models;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FraudAlert implements Serializable {
private String alertId;
private String cardId;
private String userId;
private String transactionId;
private FraudType fraudType;
private double riskScore;
private String description;
private long detectedAt;
private String metadata; // JSON with additional context
public enum FraudType {
VELOCITY_FRAUD, // Too many transactions in short time
AMOUNT_SPIKE, // Unusually high amount
LOCATION_ANOMALY, // Impossible travel distance
DECLINING_PATTERN, // Multiple declines followed by approval
BEHAVIORAL_ANOMALY, // ML-detected anomaly
MERCHANT_RISK, // High-risk merchant category
DEVICE_MISMATCH // Different device for same user
}
}
Part 3: Core Pipeline Implementation
Main Job (FraudDetectionJob.java)
Copy
package com.frauddetection;
import com.frauddetection.models.*;
import com.frauddetection.cep.*;
import com.frauddetection.features.*;
import com.frauddetection.enrichment.*;
import com.frauddetection.ml.*;
import com.frauddetection.sinks.*;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import java.util.Properties;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// 1. Setup execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Configure checkpointing
env.enableCheckpointing(60000); // 1 minute
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(30000);
checkpointConfig.setCheckpointTimeout(600000); // 10 minutes
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
checkpointConfig.enableUnalignedCheckpoints();
// 3. Configure state backend (RocksDB for large state)
StateBackend backend = new RocksDBStateBackend("s3://my-bucket/flink/checkpoints", true);
((RocksDBStateBackend) backend).enableIncrementalCheckpointing(true);
env.setStateBackend(backend);
// 4. Configure restart strategy
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Duration.ofSeconds(10),
Duration.ofMinutes(5),
2.0,
Duration.ofHours(1),
10
));
// 5. Kafka source configuration
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka:9092");
kafkaProps.setProperty("group.id", "fraud-detection");
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<>(
"transactions",
new Transaction.Schema(),
kafkaProps
);
// 6. Create source stream with watermarks
DataStream<Transaction> transactions = env
.addSource(kafkaSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((txn, ts) -> txn.getTimestamp())
)
.name("Kafka Transaction Source");
// 7. Enrich transactions with user profiles
DataStream<EnrichedTransaction> enrichedTransactions = transactions
.keyBy(Transaction::getCardId)
.flatMap(new UserProfileEnrichment())
.name("User Profile Enrichment");
// 8. Extract ML features
DataStream<FeatureVector> features = enrichedTransactions
.keyBy(EnrichedTransaction::getCardId)
.flatMap(new FeatureExtractor())
.name("Feature Extraction");
// 9. CEP-based fraud detection
DataStream<FraudAlert> velocityAlerts = new VelocityFraudDetector()
.detect(enrichedTransactions)
.name("Velocity Fraud Detection");
DataStream<FraudAlert> locationAlerts = new LocationAnomalyDetector()
.detect(enrichedTransactions)
.name("Location Anomaly Detection");
DataStream<FraudAlert> decliningAlerts = new DecliningPatternDetector()
.detect(enrichedTransactions)
.name("Declining Pattern Detection");
// 10. ML-based fraud scoring
DataStream<FraudAlert> mlAlerts = features
.keyBy(FeatureVector::getCardId)
.flatMap(new MLScoringFunction())
.name("ML Fraud Scoring");
// 11. Combine all alerts
DataStream<FraudAlert> allAlerts = velocityAlerts
.union(locationAlerts)
.union(decliningAlerts)
.union(mlAlerts);
// 12. Deduplicate and prioritize alerts
SingleOutputStreamOperator<FraudAlert> deduplicatedAlerts = allAlerts
.keyBy(FraudAlert::getCardId)
.flatMap(new AlertDeduplicator())
.name("Alert Deduplication");
// 13. Sink to Kafka (for downstream consumers)
deduplicatedAlerts
.addSink(new FlinkKafkaProducer<>(
"fraud-alerts",
new FraudAlert.Schema(),
kafkaProps
))
.name("Kafka Alert Sink");
// 14. Sink to MySQL (for dashboard/analysis)
deduplicatedAlerts
.addSink(new AlertSink())
.name("MySQL Alert Sink");
// 15. Side output high-risk alerts to PagerDuty
deduplicatedAlerts
.filter(alert -> alert.getRiskScore() > 0.9)
.addSink(new PagerDutySink())
.name("PagerDuty High-Risk Alerts");
// 16. Execute job
env.execute("Real-Time Fraud Detection");
}
}
Part 4: CEP Fraud Detectors
Velocity Fraud Detector
Copy
package com.frauddetection.cep;
import com.frauddetection.models.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
public class VelocityFraudDetector {
public DataStream<FraudAlert> detect(DataStream<EnrichedTransaction> stream) {
// Pattern: >5 transactions in 10 minutes totaling >$10K
Pattern<EnrichedTransaction, ?> pattern = Pattern
.<EnrichedTransaction>begin("rapid_txns")
.where(txn -> txn.getTransaction().getAmount() > 100)
.timesOrMore(5)
.within(Time.minutes(10));
PatternStream<EnrichedTransaction> patternStream = CEP.pattern(
stream.keyBy(et -> et.getTransaction().getCardId()),
pattern
);
return patternStream.select((map) -> {
List<EnrichedTransaction> txns = map.get("rapid_txns");
double totalAmount = txns.stream()
.mapToDouble(et -> et.getTransaction().getAmount())
.sum();
EnrichedTransaction first = txns.get(0);
EnrichedTransaction last = txns.get(txns.size() - 1);
if (txns.size() >= 5 && totalAmount > 10000) {
double riskScore = calculateRiskScore(txns, totalAmount);
return new FraudAlert(
UUID.randomUUID().toString(),
first.getTransaction().getCardId(),
first.getTransaction().getUserId(),
last.getTransaction().getTransactionId(),
FraudAlert.FraudType.VELOCITY_FRAUD,
riskScore,
String.format("Velocity fraud: %d transactions ($%.2f) in 10 min",
txns.size(), totalAmount),
System.currentTimeMillis(),
buildMetadata(txns)
);
}
return null;
}).filter(Objects::nonNull);
}
private double calculateRiskScore(List<EnrichedTransaction> txns, double totalAmount) {
// Risk factors:
// 1. Number of transactions (weight: 0.3)
// 2. Total amount (weight: 0.4)
// 3. Deviation from user average (weight: 0.3)
double txnScore = Math.min(txns.size() / 10.0, 1.0); // Max at 10 txns
double amountScore = Math.min(totalAmount / 20000.0, 1.0); // Max at $20K
EnrichedTransaction first = txns.get(0);
double avgAmount = first.getUserProfile().getAvgTransactionAmount();
double deviationScore = avgAmount > 0 ? Math.min(totalAmount / (avgAmount * 5), 1.0) : 0.5;
return 0.3 * txnScore + 0.4 * amountScore + 0.3 * deviationScore;
}
private String buildMetadata(List<EnrichedTransaction> txns) {
// Build JSON metadata
Map<String, Object> metadata = new HashMap<>();
metadata.put("transaction_count", txns.size());
metadata.put("total_amount", txns.stream().mapToDouble(et -> et.getTransaction().getAmount()).sum());
metadata.put("transaction_ids", txns.stream().map(et -> et.getTransaction().getTransactionId()).collect(Collectors.toList()));
try {
return new ObjectMapper().writeValueAsString(metadata);
} catch (Exception e) {
return "{}";
}
}
}
Location Anomaly Detector
Copy
package com.frauddetection.cep;
import com.frauddetection.models.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
public class LocationAnomalyDetector {
public DataStream<FraudAlert> detect(DataStream<EnrichedTransaction> stream) {
// Pattern: Two transactions from different locations with impossible travel speed
Pattern<EnrichedTransaction, ?> pattern = Pattern
.<EnrichedTransaction>begin("first")
.where(et -> true)
.next("second")
.where(new IterativeCondition<EnrichedTransaction>() {
@Override
public boolean filter(EnrichedTransaction current, Context<EnrichedTransaction> ctx) throws Exception {
EnrichedTransaction first = ctx.getEventsForPattern("first").iterator().next();
// Calculate distance and time
double distance = calculateDistance(
first.getTransaction().getLatitude(),
first.getTransaction().getLongitude(),
current.getTransaction().getLatitude(),
current.getTransaction().getLongitude()
);
long timeDiffMs = current.getTransaction().getTimestamp() - first.getTransaction().getTimestamp();
double timeDiffHours = timeDiffMs / 3600000.0;
// Calculate speed (km/h)
double speedKmh = distance / timeDiffHours;
// Flag if speed > 800 km/h (requires flight, suspicious for short time)
return speedKmh > 800 && timeDiffHours < 6;
}
})
.within(Time.hours(6));
PatternStream<EnrichedTransaction> patternStream = CEP.pattern(
stream.keyBy(et -> et.getTransaction().getCardId()),
pattern
);
return patternStream.select((map) -> {
EnrichedTransaction first = map.get("first").get(0);
EnrichedTransaction second = map.get("second").get(0);
double distance = calculateDistance(
first.getTransaction().getLatitude(),
first.getTransaction().getLongitude(),
second.getTransaction().getLatitude(),
second.getTransaction().getLongitude()
);
long timeDiffMs = second.getTransaction().getTimestamp() - first.getTransaction().getTimestamp();
double speedKmh = (distance / (timeDiffMs / 3600000.0));
return new FraudAlert(
UUID.randomUUID().toString(),
second.getTransaction().getCardId(),
second.getTransaction().getUserId(),
second.getTransaction().getTransactionId(),
FraudAlert.FraudType.LOCATION_ANOMALY,
0.85, // High risk
String.format("Impossible travel: %.0f km in %.1f hours (%.0f km/h)",
distance, timeDiffMs / 3600000.0, speedKmh),
System.currentTimeMillis(),
String.format("{\"distance_km\": %.2f, \"speed_kmh\": %.2f}", distance, speedKmh)
);
});
}
private double calculateDistance(double lat1, double lon1, double lat2, double lon2) {
// Haversine formula
final int R = 6371; // Radius of Earth in km
double latDistance = Math.toRadians(lat2 - lat1);
double lonDistance = Math.toRadians(lon2 - lon1);
double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
+ Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2))
* Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return R * c;
}
}
Declining Pattern Detector
Copy
package com.frauddetection.cep;
public class DecliningPatternDetector {
public DataStream<FraudAlert> detect(DataStream<EnrichedTransaction> stream) {
// Pattern: 3 declined transactions followed by 1 approved high-value transaction
Pattern<EnrichedTransaction, ?> pattern = Pattern
.<EnrichedTransaction>begin("declines")
.where(et -> et.getTransaction().getStatus().equals("DECLINED"))
.times(3)
.followedBy("approve")
.where(et -> et.getTransaction().getStatus().equals("APPROVED")
&& et.getTransaction().getAmount() > 500)
.within(Time.minutes(15));
PatternStream<EnrichedTransaction> patternStream = CEP.pattern(
stream.keyBy(et -> et.getTransaction().getCardId()),
pattern
);
return patternStream.select((map) -> {
List<EnrichedTransaction> declines = map.get("declines");
EnrichedTransaction approve = map.get("approve").get(0);
return new FraudAlert(
UUID.randomUUID().toString(),
approve.getTransaction().getCardId(),
approve.getTransaction().getUserId(),
approve.getTransaction().getTransactionId(),
FraudAlert.FraudType.DECLINING_PATTERN,
0.92, // Very high risk
String.format("Credential testing: %d declines followed by $%.2f approval",
declines.size(), approve.getTransaction().getAmount()),
System.currentTimeMillis(),
buildDeclineMetadata(declines, approve)
);
});
}
}
Part 5: Feature Engineering & ML Scoring
Feature Extractor
Copy
package com.frauddetection.features;
import com.frauddetection.models.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
public class FeatureExtractor extends RichFlatMapFunction<EnrichedTransaction, FeatureVector> {
private transient MapState<String, Double> features;
private transient ListState<Transaction> recentTransactions;
@Override
public void open(Configuration config) {
// Feature state
MapStateDescriptor<String, Double> featureDesc =
new MapStateDescriptor<>("features", String.class, Double.class);
features = getRuntimeContext().getMapState(featureDesc);
// Recent transactions (for velocity features)
ListStateDescriptor<Transaction> txnDesc =
new ListStateDescriptor<>("recent_txns", Transaction.class);
txnDesc.enableTimeToLive(
StateTtlConfig.newBuilder(Time.hours(24)).build()
);
recentTransactions = getRuntimeContext().getListState(txnDesc);
}
@Override
public void flatMap(EnrichedTransaction enriched, Collector<FeatureVector> out) throws Exception {
Transaction txn = enriched.getTransaction();
UserProfile profile = enriched.getUserProfile();
// Feature 1: Velocity features (last 1 hour)
List<Transaction> recent = new ArrayList<>();
long oneHourAgo = txn.getTimestamp() - 3600000;
for (Transaction t : recentTransactions.get()) {
if (t.getTimestamp() > oneHourAgo) {
recent.add(t);
}
}
recentTransactions.add(txn);
features.put("txn_count_1h", (double) recent.size());
features.put("total_amount_1h", recent.stream().mapToDouble(Transaction::getAmount).sum());
// Feature 2: Behavioral features
features.put("amount", txn.getAmount());
features.put("hour_of_day", (double) (txn.getTimestamp() % 86400000) / 3600000);
features.put("day_of_week", (double) ((txn.getTimestamp() / 86400000) % 7));
// Feature 3: Historical features from profile
features.put("avg_amount", profile.getAvgTransactionAmount());
features.put("stddev_amount", profile.getStdDevAmount());
features.put("total_txns", (double) profile.getTotalTransactions());
// Feature 4: Amount deviation
double zScore = profile.getStdDevAmount() > 0
? (txn.getAmount() - profile.getAvgTransactionAmount()) / profile.getStdDevAmount()
: 0.0;
features.put("amount_zscore", zScore);
// Feature 5: Merchant category risk (simplified)
features.put("merchant_risk", calculateMerchantRisk(txn.getMerchantCategory()));
// Feature 6: Time since last transaction
long timeSinceLastMs = txn.getTimestamp() - profile.getLastTransactionTime();
features.put("time_since_last_txn_hours", timeSinceLastMs / 3600000.0);
// Feature 7: International transaction flag
features.put("is_international", txn.getCountry().equals(profile.getHomeCountry()) ? 0.0 : 1.0);
// Feature 8: Unique merchants in last hour
long uniqueMerchants = recent.stream()
.map(Transaction::getMerchantId)
.distinct()
.count();
features.put("unique_merchants_1h", (double) uniqueMerchants);
// Create feature vector
Map<String, Double> featureMap = new HashMap<>();
for (Map.Entry<String, Double> entry : features.entries()) {
featureMap.put(entry.getKey(), entry.getValue());
}
out.collect(new FeatureVector(
txn.getCardId(),
txn.getUserId(),
txn.getTransactionId(),
txn.getTimestamp(),
featureMap
));
}
private double calculateMerchantRisk(String category) {
// Simplified risk mapping
switch (category.toLowerCase()) {
case "gambling":
case "cryptocurrency":
return 0.9;
case "travel":
case "electronics":
return 0.6;
case "groceries":
case "utilities":
return 0.1;
default:
return 0.5;
}
}
}
ML Scoring Function
Copy
package com.frauddetection.ml;
import com.frauddetection.models.*;
public class MLScoringFunction extends RichFlatMapFunction<FeatureVector, FraudAlert> {
private transient MLModel model;
@Override
public void open(Configuration config) throws Exception {
// Load pre-trained model (e.g., from S3 or model registry)
model = MLModel.load("s3://models/fraud-detection-v1.model");
}
@Override
public void flatMap(FeatureVector features, Collector<FraudAlert> out) throws Exception {
// Score using ML model
double fraudProbability = model.predict(features.getFeatures());
// Threshold: Alert if probability > 0.7
if (fraudProbability > 0.7) {
out.collect(new FraudAlert(
UUID.randomUUID().toString(),
features.getCardId(),
features.getUserId(),
features.getTransactionId(),
FraudAlert.FraudType.BEHAVIORAL_ANOMALY,
fraudProbability,
String.format("ML-detected anomaly (score: %.2f)", fraudProbability),
System.currentTimeMillis(),
String.format("{\"model_version\": \"v1\", \"features\": %s}",
new ObjectMapper().writeValueAsString(features.getFeatures()))
));
}
}
}
// Simplified ML Model wrapper
class MLModel {
private Map<String, Double> weights;
public static MLModel load(String path) {
// In production: Load from S3, DynamoDB, or model registry
MLModel model = new MLModel();
model.weights = new HashMap<>();
// Feature weights (trained offline)
model.weights.put("amount_zscore", 0.25);
model.weights.put("txn_count_1h", 0.20);
model.weights.put("merchant_risk", 0.15);
model.weights.put("is_international", 0.15);
model.weights.put("time_since_last_txn_hours", -0.10);
model.weights.put("unique_merchants_1h", 0.10);
return model;
}
public double predict(Map<String, Double> features) {
// Simple linear model: score = sigmoid(sum(weight_i * feature_i))
double rawScore = 0.0;
for (Map.Entry<String, Double> entry : weights.entrySet()) {
rawScore += weights.getOrDefault(entry.getKey(), 0.0) * features.getOrDefault(entry.getKey(), 0.0);
}
// Sigmoid
return 1.0 / (1.0 + Math.exp(-rawScore));
}
}
Part 6: Deployment & Monitoring
Kubernetes Deployment
Copy
# k8s/flink-fraud-detection.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-fraud-config
namespace: fraud-detection
data:
flink-conf.yaml: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 8
parallelism.default: 32
state.backend: rocksdb
state.checkpoints.dir: s3://fraud-detection/checkpoints
state.savepoints.dir: s3://fraud-detection/savepoints
high-availability: kubernetes
high-availability.storageDir: s3://fraud-detection/ha
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
taskmanager.memory.process.size: 16384m
taskmanager.memory.managed.fraction: 0.4
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: flink-jobmanager
namespace: fraud-detection
spec:
serviceName: flink-jobmanager
replicas: 2
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: fraud-detection-flink:1.0
args: ["jobmanager"]
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "4Gi"
cpu: "4"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: fraud-detection
spec:
replicas: 10
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: fraud-detection-flink:1.0
args: ["taskmanager"]
resources:
requests:
memory: "16Gi"
cpu: "4"
limits:
memory: "16Gi"
cpu: "8"
Prometheus Alerts
Copy
# monitoring/alerts.yaml
groups:
- name: fraud_detection
rules:
- alert: HighFraudDetectionLatency
expr: flink_taskmanager_job_task_operator_latency_p99 > 5000
for: 10m
annotations:
summary: "Fraud detection latency > 5s"
- alert: CheckpointFailure
expr: flink_jobmanager_job_numberOfFailedCheckpoints > 3
for: 15m
annotations:
summary: "Multiple checkpoint failures"
- alert: HighBackpressure
expr: flink_taskmanager_job_task_backPressureTimeMsPerSecond > 800
for: 20m
annotations:
summary: "Sustained backpressure detected"
- alert: AlertSinkLag
expr: flink_taskmanager_job_task_operator_numRecordsOutPerSecond{operator_name="Alert Sink"} < 100
for: 10m
annotations:
summary: "Alert sink throughput dropped"
Part 7: Testing & Validation
Unit Tests
Copy
package com.frauddetection;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
public class FraudDetectionTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(4)
.setNumberTaskManagers(1)
.build());
@Test
public void testVelocityFraudDetection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create test transactions
DataStream<Transaction> testStream = env.fromElements(
new Transaction("txn1", "card1", "user1", 100, "m1", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 1000L, "dev1", "1.1.1.1"),
new Transaction("txn2", "card1", "user1", 200, "m2", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 2000L, "dev1", "1.1.1.1"),
new Transaction("txn3", "card1", "user1", 300, "m3", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 3000L, "dev1", "1.1.1.1"),
new Transaction("txn4", "card1", "user1", 400, "m4", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 4000L, "dev1", "1.1.1.1"),
new Transaction("txn5", "card1", "user1", 500, "m5", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 5000L, "dev1", "1.1.1.1"),
new Transaction("txn6", "card1", "user1", 600, "m6", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 6000L, "dev1", "1.1.1.1") // Should trigger
);
// Apply velocity detector
DataStream<FraudAlert> alerts = new VelocityFraudDetector()
.detect(testStream.map(txn -> new EnrichedTransaction(txn, new UserProfile())));
// Collect results
List<FraudAlert> results = alerts.executeAndCollect(10);
// Verify
assertEquals(1, results.size());
assertEquals(FraudAlert.FraudType.VELOCITY_FRAUD, results.get(0).getFraudType());
}
}
Summary & Next Steps
What You’ve Built
A production-grade fraud detection system with: ✅ Real-time processing: Sub-second latency for millions of transactions ✅ Multi-strategy detection: CEP patterns + ML scoring ✅ Stateful processing: User profiles, transaction history ✅ Fault tolerance: Checkpointing, HA, exactly-once semantics ✅ Scalability: Horizontal scaling with Kubernetes ✅ Observability: Prometheus metrics, Grafana dashboards, PagerDuty alertsProduction Checklist
- Deploy to Kubernetes cluster
- Configure S3 for checkpoints/savepoints
- Setup Kafka cluster (at least 3 brokers)
- Deploy MySQL for alert storage
- Configure Prometheus scraping
- Import Grafana dashboards
- Setup PagerDuty integration
- Load test with synthetic data (1M+ txns/day)
- Test failover (kill JobManager, verify recovery)
- Monitor checkpoint duration, backpressure, lag
Enhancements
- Advanced ML: Train XGBoost/LightGBM models, A/B testing
- Graph Analysis: Neo4j for transaction networks
- Real-time Model Updates: Online learning, model versioning
- Multi-Region Deployment: Active-active for global coverage
- Cost Optimization: Spot instances, auto-scaling
Resources
Documentation
Papers
Congratulations! You’ve mastered Apache Flink by building a production-grade system. This project demonstrates all core Flink concepts: DataStream API, CEP, state management, checkpointing, and production deployment. You’re now ready to architect and deploy real-time streaming applications at scale!