Skip to main content

Capstone: Production-Grade Real-Time Fraud Detection System

Project Duration: 12-15 hours Difficulty: Advanced Technologies: Flink, Kafka, CEP, ML, RocksDB, Kubernetes, Prometheus Deliverable: Complete production-ready fraud detection pipeline

Project Overview

What You’ll Build

A real-time fraud detection system that processes millions of transactions per day and detects fraudulent activity using:
  1. Multiple Detection Strategies: Velocity, location, amount spikes, behavioral patterns
  2. Complex Event Processing: Pattern matching across transaction sequences
  3. Machine Learning: Real-time feature extraction and model scoring
  4. State Management: Petabyte-scale state with RocksDB
  5. Production Deployment: Kubernetes, HA, monitoring, alerting

System Architecture

┌───────────────────────────────────────────────────────────────┐
│                 Fraud Detection Pipeline                      │
├───────────────────────────────────────────────────────────────┤
│                                                               │
│  ┌─────────┐      ┌──────────────────┐      ┌────────────┐  │
│  │  Kafka  │─────►│  Flink Pipeline  │─────►│   Alerts   │  │
│  │ (Source)│      │                  │      │  (Kafka)   │  │
│  └─────────┘      │  1. Enrichment   │      └────────────┘  │
│                   │  2. Feature Eng  │              │        │
│  ┌─────────┐      │  3. CEP Patterns │      ┌────────────┐  │
│  │  MySQL  │◄────►│  4. ML Scoring   │─────►│   MySQL    │  │
│  │(Lookup) │      │  5. Alerting     │      │  (Alerts)  │  │
│  └─────────┘      └──────────────────┘      └────────────┘  │
│                             │                                │
│                   ┌─────────▼─────────┐                      │
│                   │  State (RocksDB)  │                      │
│                   │  - User Profiles  │                      │
│                   │  - Tx History     │                      │
│                   │  - ML Features    │                      │
│                   └───────────────────┘                      │
│                                                               │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Monitoring: Prometheus + Grafana                    │   │
│  │  - Throughput, latency, state size                   │   │
│  │  - Checkpoint metrics, backpressure                  │   │
│  │  - Alert SLAs, fraud detection rate                  │   │
│  └──────────────────────────────────────────────────────┘   │
└───────────────────────────────────────────────────────────────┘

Part 1: Project Setup

Prerequisites

# Required tools
- Docker & Docker Compose
- Kubernetes (minikube or cloud)
- kubectl
- Maven 3.8+
- Java 11+
- Python 3.9+ (for ML model serving)

Project Structure

fraud-detection-flink/
├── pom.xml
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/frauddetection/
│   │   │       ├── FraudDetectionJob.java
│   │   │       ├── models/
│   │   │       │   ├── Transaction.java
│   │   │       │   ├── FraudAlert.java
│   │   │       │   └── UserProfile.java
│   │   │       ├── sources/
│   │   │       │   └── TransactionGenerator.java
│   │   │       ├── enrichment/
│   │   │       │   └── UserProfileEnrichment.java
│   │   │       ├── features/
│   │   │       │   └── FeatureExtractor.java
│   │   │       ├── cep/
│   │   │       │   ├── VelocityFraudDetector.java
│   │   │       │   ├── LocationAnomalyDetector.java
│   │   │       │   └── DecliningPatternDetector.java
│   │   │       ├── ml/
│   │   │       │   └── MLScoringFunction.java
│   │   │       └── sinks/
│   │   │           └── AlertSink.java
│   │   └── resources/
│   │       └── flink-conf.yaml
│   └── test/
│       └── java/
│           └── com/frauddetection/
│               └── FraudDetectionTest.java
├── k8s/
│   ├── flink-cluster.yaml
│   ├── kafka.yaml
│   └── monitoring.yaml
├── docker-compose.yml
└── README.md

Maven Dependencies (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.frauddetection</groupId>
    <artifactId>fraud-detection-flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.18.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <java.version>11</java.version>
        <kafka.version>3.4.0</kafka.version>
    </properties>

    <dependencies>
        <!-- Flink Core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink CEP -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink JDBC -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
        </dependency>

        <!-- RocksDB State Backend -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- MySQL Driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

        <!-- JSON -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
            <scope>provided</scope>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.frauddetection.FraudDetectionJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Part 2: Data Models

Transaction Model

package com.frauddetection.models;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Transaction implements Serializable {
    private String transactionId;
    private String cardId;
    private String userId;
    private double amount;
    private String merchantId;
    private String merchantCategory;
    private String country;
    private String city;
    private double latitude;
    private double longitude;
    private String status; // APPROVED, DECLINED
    private long timestamp;
    private String deviceId;
    private String ipAddress;

    // For JSON serialization
    public static class Schema extends org.apache.flink.api.common.serialization.DeserializationSchema<Transaction>,
            org.apache.flink.api.common.serialization.SerializationSchema<Transaction> {

        private final com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();

        @Override
        public Transaction deserialize(byte[] message) throws IOException {
            return objectMapper.readValue(message, Transaction.class);
        }

        @Override
        public byte[] serialize(Transaction element) {
            try {
                return objectMapper.writeValueAsBytes(element);
            } catch (Exception e) {
                throw new RuntimeException("Failed to serialize transaction", e);
            }
        }

        @Override
        public TypeInformation<Transaction> getProducedType() {
            return TypeInformation.of(Transaction.class);
        }
    }
}

User Profile Model

package com.frauddetection.models;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserProfile implements Serializable {
    private String userId;
    private String cardId;
    private double avgTransactionAmount;
    private double stdDevAmount;
    private int totalTransactions;
    private String homeCountry;
    private String homeCity;
    private long lastTransactionTime;
    private boolean isHighRiskUser;

    public void updateWithTransaction(Transaction txn) {
        // Update running statistics
        double oldAvg = this.avgTransactionAmount;
        this.totalTransactions++;

        // Incremental mean
        this.avgTransactionAmount = oldAvg + (txn.getAmount() - oldAvg) / this.totalTransactions;

        // Incremental standard deviation (Welford's method)
        double delta = txn.getAmount() - oldAvg;
        double delta2 = txn.getAmount() - this.avgTransactionAmount;
        this.stdDevAmount = Math.sqrt(
            (Math.pow(this.stdDevAmount, 2) * (this.totalTransactions - 1) + delta * delta2) / this.totalTransactions
        );

        this.lastTransactionTime = txn.getTimestamp();
    }
}

Fraud Alert Model

package com.frauddetection.models;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class FraudAlert implements Serializable {
    private String alertId;
    private String cardId;
    private String userId;
    private String transactionId;
    private FraudType fraudType;
    private double riskScore;
    private String description;
    private long detectedAt;
    private String metadata; // JSON with additional context

    public enum FraudType {
        VELOCITY_FRAUD,          // Too many transactions in short time
        AMOUNT_SPIKE,            // Unusually high amount
        LOCATION_ANOMALY,        // Impossible travel distance
        DECLINING_PATTERN,       // Multiple declines followed by approval
        BEHAVIORAL_ANOMALY,      // ML-detected anomaly
        MERCHANT_RISK,           // High-risk merchant category
        DEVICE_MISMATCH          // Different device for same user
    }
}

Part 3: Core Pipeline Implementation

Main Job (FraudDetectionJob.java)

package com.frauddetection;

import com.frauddetection.models.*;
import com.frauddetection.cep.*;
import com.frauddetection.features.*;
import com.frauddetection.enrichment.*;
import com.frauddetection.ml.*;
import com.frauddetection.sinks.*;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.time.Duration;
import java.util.Properties;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        // 1. Setup execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. Configure checkpointing
        env.enableCheckpointing(60000); // 1 minute
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.setMinPauseBetweenCheckpoints(30000);
        checkpointConfig.setCheckpointTimeout(600000); // 10 minutes
        checkpointConfig.enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        checkpointConfig.enableUnalignedCheckpoints();

        // 3. Configure state backend (RocksDB for large state)
        StateBackend backend = new RocksDBStateBackend("s3://my-bucket/flink/checkpoints", true);
        ((RocksDBStateBackend) backend).enableIncrementalCheckpointing(true);
        env.setStateBackend(backend);

        // 4. Configure restart strategy
        env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
            Duration.ofSeconds(10),
            Duration.ofMinutes(5),
            2.0,
            Duration.ofHours(1),
            10
        ));

        // 5. Kafka source configuration
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "kafka:9092");
        kafkaProps.setProperty("group.id", "fraud-detection");

        FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<>(
            "transactions",
            new Transaction.Schema(),
            kafkaProps
        );

        // 6. Create source stream with watermarks
        DataStream<Transaction> transactions = env
            .addSource(kafkaSource)
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((txn, ts) -> txn.getTimestamp())
            )
            .name("Kafka Transaction Source");

        // 7. Enrich transactions with user profiles
        DataStream<EnrichedTransaction> enrichedTransactions = transactions
            .keyBy(Transaction::getCardId)
            .flatMap(new UserProfileEnrichment())
            .name("User Profile Enrichment");

        // 8. Extract ML features
        DataStream<FeatureVector> features = enrichedTransactions
            .keyBy(EnrichedTransaction::getCardId)
            .flatMap(new FeatureExtractor())
            .name("Feature Extraction");

        // 9. CEP-based fraud detection
        DataStream<FraudAlert> velocityAlerts = new VelocityFraudDetector()
            .detect(enrichedTransactions)
            .name("Velocity Fraud Detection");

        DataStream<FraudAlert> locationAlerts = new LocationAnomalyDetector()
            .detect(enrichedTransactions)
            .name("Location Anomaly Detection");

        DataStream<FraudAlert> decliningAlerts = new DecliningPatternDetector()
            .detect(enrichedTransactions)
            .name("Declining Pattern Detection");

        // 10. ML-based fraud scoring
        DataStream<FraudAlert> mlAlerts = features
            .keyBy(FeatureVector::getCardId)
            .flatMap(new MLScoringFunction())
            .name("ML Fraud Scoring");

        // 11. Combine all alerts
        DataStream<FraudAlert> allAlerts = velocityAlerts
            .union(locationAlerts)
            .union(decliningAlerts)
            .union(mlAlerts);

        // 12. Deduplicate and prioritize alerts
        SingleOutputStreamOperator<FraudAlert> deduplicatedAlerts = allAlerts
            .keyBy(FraudAlert::getCardId)
            .flatMap(new AlertDeduplicator())
            .name("Alert Deduplication");

        // 13. Sink to Kafka (for downstream consumers)
        deduplicatedAlerts
            .addSink(new FlinkKafkaProducer<>(
                "fraud-alerts",
                new FraudAlert.Schema(),
                kafkaProps
            ))
            .name("Kafka Alert Sink");

        // 14. Sink to MySQL (for dashboard/analysis)
        deduplicatedAlerts
            .addSink(new AlertSink())
            .name("MySQL Alert Sink");

        // 15. Side output high-risk alerts to PagerDuty
        deduplicatedAlerts
            .filter(alert -> alert.getRiskScore() > 0.9)
            .addSink(new PagerDutySink())
            .name("PagerDuty High-Risk Alerts");

        // 16. Execute job
        env.execute("Real-Time Fraud Detection");
    }
}

Part 4: CEP Fraud Detectors

Velocity Fraud Detector

package com.frauddetection.cep;

import com.frauddetection.models.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

public class VelocityFraudDetector {

    public DataStream<FraudAlert> detect(DataStream<EnrichedTransaction> stream) {
        // Pattern: >5 transactions in 10 minutes totaling >$10K
        Pattern<EnrichedTransaction, ?> pattern = Pattern
            .<EnrichedTransaction>begin("rapid_txns")
            .where(txn -> txn.getTransaction().getAmount() > 100)
            .timesOrMore(5)
            .within(Time.minutes(10));

        PatternStream<EnrichedTransaction> patternStream = CEP.pattern(
            stream.keyBy(et -> et.getTransaction().getCardId()),
            pattern
        );

        return patternStream.select((map) -> {
            List<EnrichedTransaction> txns = map.get("rapid_txns");
            double totalAmount = txns.stream()
                .mapToDouble(et -> et.getTransaction().getAmount())
                .sum();

            EnrichedTransaction first = txns.get(0);
            EnrichedTransaction last = txns.get(txns.size() - 1);

            if (txns.size() >= 5 && totalAmount > 10000) {
                double riskScore = calculateRiskScore(txns, totalAmount);

                return new FraudAlert(
                    UUID.randomUUID().toString(),
                    first.getTransaction().getCardId(),
                    first.getTransaction().getUserId(),
                    last.getTransaction().getTransactionId(),
                    FraudAlert.FraudType.VELOCITY_FRAUD,
                    riskScore,
                    String.format("Velocity fraud: %d transactions ($%.2f) in 10 min",
                        txns.size(), totalAmount),
                    System.currentTimeMillis(),
                    buildMetadata(txns)
                );
            }
            return null;
        }).filter(Objects::nonNull);
    }

    private double calculateRiskScore(List<EnrichedTransaction> txns, double totalAmount) {
        // Risk factors:
        // 1. Number of transactions (weight: 0.3)
        // 2. Total amount (weight: 0.4)
        // 3. Deviation from user average (weight: 0.3)

        double txnScore = Math.min(txns.size() / 10.0, 1.0); // Max at 10 txns
        double amountScore = Math.min(totalAmount / 20000.0, 1.0); // Max at $20K

        EnrichedTransaction first = txns.get(0);
        double avgAmount = first.getUserProfile().getAvgTransactionAmount();
        double deviationScore = avgAmount > 0 ? Math.min(totalAmount / (avgAmount * 5), 1.0) : 0.5;

        return 0.3 * txnScore + 0.4 * amountScore + 0.3 * deviationScore;
    }

    private String buildMetadata(List<EnrichedTransaction> txns) {
        // Build JSON metadata
        Map<String, Object> metadata = new HashMap<>();
        metadata.put("transaction_count", txns.size());
        metadata.put("total_amount", txns.stream().mapToDouble(et -> et.getTransaction().getAmount()).sum());
        metadata.put("transaction_ids", txns.stream().map(et -> et.getTransaction().getTransactionId()).collect(Collectors.toList()));

        try {
            return new ObjectMapper().writeValueAsString(metadata);
        } catch (Exception e) {
            return "{}";
        }
    }
}

Location Anomaly Detector

package com.frauddetection.cep;

import com.frauddetection.models.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;

public class LocationAnomalyDetector {

    public DataStream<FraudAlert> detect(DataStream<EnrichedTransaction> stream) {
        // Pattern: Two transactions from different locations with impossible travel speed
        Pattern<EnrichedTransaction, ?> pattern = Pattern
            .<EnrichedTransaction>begin("first")
            .where(et -> true)
            .next("second")
            .where(new IterativeCondition<EnrichedTransaction>() {
                @Override
                public boolean filter(EnrichedTransaction current, Context<EnrichedTransaction> ctx) throws Exception {
                    EnrichedTransaction first = ctx.getEventsForPattern("first").iterator().next();

                    // Calculate distance and time
                    double distance = calculateDistance(
                        first.getTransaction().getLatitude(),
                        first.getTransaction().getLongitude(),
                        current.getTransaction().getLatitude(),
                        current.getTransaction().getLongitude()
                    );

                    long timeDiffMs = current.getTransaction().getTimestamp() - first.getTransaction().getTimestamp();
                    double timeDiffHours = timeDiffMs / 3600000.0;

                    // Calculate speed (km/h)
                    double speedKmh = distance / timeDiffHours;

                    // Flag if speed > 800 km/h (requires flight, suspicious for short time)
                    return speedKmh > 800 && timeDiffHours < 6;
                }
            })
            .within(Time.hours(6));

        PatternStream<EnrichedTransaction> patternStream = CEP.pattern(
            stream.keyBy(et -> et.getTransaction().getCardId()),
            pattern
        );

        return patternStream.select((map) -> {
            EnrichedTransaction first = map.get("first").get(0);
            EnrichedTransaction second = map.get("second").get(0);

            double distance = calculateDistance(
                first.getTransaction().getLatitude(),
                first.getTransaction().getLongitude(),
                second.getTransaction().getLatitude(),
                second.getTransaction().getLongitude()
            );

            long timeDiffMs = second.getTransaction().getTimestamp() - first.getTransaction().getTimestamp();
            double speedKmh = (distance / (timeDiffMs / 3600000.0));

            return new FraudAlert(
                UUID.randomUUID().toString(),
                second.getTransaction().getCardId(),
                second.getTransaction().getUserId(),
                second.getTransaction().getTransactionId(),
                FraudAlert.FraudType.LOCATION_ANOMALY,
                0.85, // High risk
                String.format("Impossible travel: %.0f km in %.1f hours (%.0f km/h)",
                    distance, timeDiffMs / 3600000.0, speedKmh),
                System.currentTimeMillis(),
                String.format("{\"distance_km\": %.2f, \"speed_kmh\": %.2f}", distance, speedKmh)
            );
        });
    }

    private double calculateDistance(double lat1, double lon1, double lat2, double lon2) {
        // Haversine formula
        final int R = 6371; // Radius of Earth in km

        double latDistance = Math.toRadians(lat2 - lat1);
        double lonDistance = Math.toRadians(lon2 - lon1);

        double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
                + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2))
                * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);

        double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

        return R * c;
    }
}

Declining Pattern Detector

package com.frauddetection.cep;

public class DecliningPatternDetector {

    public DataStream<FraudAlert> detect(DataStream<EnrichedTransaction> stream) {
        // Pattern: 3 declined transactions followed by 1 approved high-value transaction
        Pattern<EnrichedTransaction, ?> pattern = Pattern
            .<EnrichedTransaction>begin("declines")
            .where(et -> et.getTransaction().getStatus().equals("DECLINED"))
            .times(3)
            .followedBy("approve")
            .where(et -> et.getTransaction().getStatus().equals("APPROVED")
                && et.getTransaction().getAmount() > 500)
            .within(Time.minutes(15));

        PatternStream<EnrichedTransaction> patternStream = CEP.pattern(
            stream.keyBy(et -> et.getTransaction().getCardId()),
            pattern
        );

        return patternStream.select((map) -> {
            List<EnrichedTransaction> declines = map.get("declines");
            EnrichedTransaction approve = map.get("approve").get(0);

            return new FraudAlert(
                UUID.randomUUID().toString(),
                approve.getTransaction().getCardId(),
                approve.getTransaction().getUserId(),
                approve.getTransaction().getTransactionId(),
                FraudAlert.FraudType.DECLINING_PATTERN,
                0.92, // Very high risk
                String.format("Credential testing: %d declines followed by $%.2f approval",
                    declines.size(), approve.getTransaction().getAmount()),
                System.currentTimeMillis(),
                buildDeclineMetadata(declines, approve)
            );
        });
    }
}

Part 5: Feature Engineering & ML Scoring

Feature Extractor

package com.frauddetection.features;

import com.frauddetection.models.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;

public class FeatureExtractor extends RichFlatMapFunction<EnrichedTransaction, FeatureVector> {

    private transient MapState<String, Double> features;
    private transient ListState<Transaction> recentTransactions;

    @Override
    public void open(Configuration config) {
        // Feature state
        MapStateDescriptor<String, Double> featureDesc =
            new MapStateDescriptor<>("features", String.class, Double.class);
        features = getRuntimeContext().getMapState(featureDesc);

        // Recent transactions (for velocity features)
        ListStateDescriptor<Transaction> txnDesc =
            new ListStateDescriptor<>("recent_txns", Transaction.class);
        txnDesc.enableTimeToLive(
            StateTtlConfig.newBuilder(Time.hours(24)).build()
        );
        recentTransactions = getRuntimeContext().getListState(txnDesc);
    }

    @Override
    public void flatMap(EnrichedTransaction enriched, Collector<FeatureVector> out) throws Exception {
        Transaction txn = enriched.getTransaction();
        UserProfile profile = enriched.getUserProfile();

        // Feature 1: Velocity features (last 1 hour)
        List<Transaction> recent = new ArrayList<>();
        long oneHourAgo = txn.getTimestamp() - 3600000;
        for (Transaction t : recentTransactions.get()) {
            if (t.getTimestamp() > oneHourAgo) {
                recent.add(t);
            }
        }
        recentTransactions.add(txn);

        features.put("txn_count_1h", (double) recent.size());
        features.put("total_amount_1h", recent.stream().mapToDouble(Transaction::getAmount).sum());

        // Feature 2: Behavioral features
        features.put("amount", txn.getAmount());
        features.put("hour_of_day", (double) (txn.getTimestamp() % 86400000) / 3600000);
        features.put("day_of_week", (double) ((txn.getTimestamp() / 86400000) % 7));

        // Feature 3: Historical features from profile
        features.put("avg_amount", profile.getAvgTransactionAmount());
        features.put("stddev_amount", profile.getStdDevAmount());
        features.put("total_txns", (double) profile.getTotalTransactions());

        // Feature 4: Amount deviation
        double zScore = profile.getStdDevAmount() > 0
            ? (txn.getAmount() - profile.getAvgTransactionAmount()) / profile.getStdDevAmount()
            : 0.0;
        features.put("amount_zscore", zScore);

        // Feature 5: Merchant category risk (simplified)
        features.put("merchant_risk", calculateMerchantRisk(txn.getMerchantCategory()));

        // Feature 6: Time since last transaction
        long timeSinceLastMs = txn.getTimestamp() - profile.getLastTransactionTime();
        features.put("time_since_last_txn_hours", timeSinceLastMs / 3600000.0);

        // Feature 7: International transaction flag
        features.put("is_international", txn.getCountry().equals(profile.getHomeCountry()) ? 0.0 : 1.0);

        // Feature 8: Unique merchants in last hour
        long uniqueMerchants = recent.stream()
            .map(Transaction::getMerchantId)
            .distinct()
            .count();
        features.put("unique_merchants_1h", (double) uniqueMerchants);

        // Create feature vector
        Map<String, Double> featureMap = new HashMap<>();
        for (Map.Entry<String, Double> entry : features.entries()) {
            featureMap.put(entry.getKey(), entry.getValue());
        }

        out.collect(new FeatureVector(
            txn.getCardId(),
            txn.getUserId(),
            txn.getTransactionId(),
            txn.getTimestamp(),
            featureMap
        ));
    }

    private double calculateMerchantRisk(String category) {
        // Simplified risk mapping
        switch (category.toLowerCase()) {
            case "gambling":
            case "cryptocurrency":
                return 0.9;
            case "travel":
            case "electronics":
                return 0.6;
            case "groceries":
            case "utilities":
                return 0.1;
            default:
                return 0.5;
        }
    }
}

ML Scoring Function

package com.frauddetection.ml;

import com.frauddetection.models.*;

public class MLScoringFunction extends RichFlatMapFunction<FeatureVector, FraudAlert> {

    private transient MLModel model;

    @Override
    public void open(Configuration config) throws Exception {
        // Load pre-trained model (e.g., from S3 or model registry)
        model = MLModel.load("s3://models/fraud-detection-v1.model");
    }

    @Override
    public void flatMap(FeatureVector features, Collector<FraudAlert> out) throws Exception {
        // Score using ML model
        double fraudProbability = model.predict(features.getFeatures());

        // Threshold: Alert if probability > 0.7
        if (fraudProbability > 0.7) {
            out.collect(new FraudAlert(
                UUID.randomUUID().toString(),
                features.getCardId(),
                features.getUserId(),
                features.getTransactionId(),
                FraudAlert.FraudType.BEHAVIORAL_ANOMALY,
                fraudProbability,
                String.format("ML-detected anomaly (score: %.2f)", fraudProbability),
                System.currentTimeMillis(),
                String.format("{\"model_version\": \"v1\", \"features\": %s}",
                    new ObjectMapper().writeValueAsString(features.getFeatures()))
            ));
        }
    }
}

// Simplified ML Model wrapper
class MLModel {
    private Map<String, Double> weights;

    public static MLModel load(String path) {
        // In production: Load from S3, DynamoDB, or model registry
        MLModel model = new MLModel();
        model.weights = new HashMap<>();
        // Feature weights (trained offline)
        model.weights.put("amount_zscore", 0.25);
        model.weights.put("txn_count_1h", 0.20);
        model.weights.put("merchant_risk", 0.15);
        model.weights.put("is_international", 0.15);
        model.weights.put("time_since_last_txn_hours", -0.10);
        model.weights.put("unique_merchants_1h", 0.10);
        return model;
    }

    public double predict(Map<String, Double> features) {
        // Simple linear model: score = sigmoid(sum(weight_i * feature_i))
        double rawScore = 0.0;
        for (Map.Entry<String, Double> entry : weights.entrySet()) {
            rawScore += weights.getOrDefault(entry.getKey(), 0.0) * features.getOrDefault(entry.getKey(), 0.0);
        }
        // Sigmoid
        return 1.0 / (1.0 + Math.exp(-rawScore));
    }
}

Part 6: Deployment & Monitoring

Kubernetes Deployment

# k8s/flink-fraud-detection.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-fraud-config
  namespace: fraud-detection
data:
  flink-conf.yaml: |
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 8
    parallelism.default: 32
    state.backend: rocksdb
    state.checkpoints.dir: s3://fraud-detection/checkpoints
    state.savepoints.dir: s3://fraud-detection/savepoints
    high-availability: kubernetes
    high-availability.storageDir: s3://fraud-detection/ha
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9249
    taskmanager.memory.process.size: 16384m
    taskmanager.memory.managed.fraction: 0.4
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-jobmanager
  namespace: fraud-detection
spec:
  serviceName: flink-jobmanager
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: fraud-detection-flink:1.0
        args: ["jobmanager"]
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "4Gi"
            cpu: "4"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: fraud-detection
spec:
  replicas: 10
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: fraud-detection-flink:1.0
        args: ["taskmanager"]
        resources:
          requests:
            memory: "16Gi"
            cpu: "4"
          limits:
            memory: "16Gi"
            cpu: "8"

Prometheus Alerts

# monitoring/alerts.yaml
groups:
- name: fraud_detection
  rules:
  - alert: HighFraudDetectionLatency
    expr: flink_taskmanager_job_task_operator_latency_p99 > 5000
    for: 10m
    annotations:
      summary: "Fraud detection latency > 5s"

  - alert: CheckpointFailure
    expr: flink_jobmanager_job_numberOfFailedCheckpoints > 3
    for: 15m
    annotations:
      summary: "Multiple checkpoint failures"

  - alert: HighBackpressure
    expr: flink_taskmanager_job_task_backPressureTimeMsPerSecond > 800
    for: 20m
    annotations:
      summary: "Sustained backpressure detected"

  - alert: AlertSinkLag
    expr: flink_taskmanager_job_task_operator_numRecordsOutPerSecond{operator_name="Alert Sink"} < 100
    for: 10m
    annotations:
      summary: "Alert sink throughput dropped"

Part 7: Testing & Validation

Unit Tests

package com.frauddetection;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;

public class FraudDetectionTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
        new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(4)
                .setNumberTaskManagers(1)
                .build());

    @Test
    public void testVelocityFraudDetection() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Create test transactions
        DataStream<Transaction> testStream = env.fromElements(
            new Transaction("txn1", "card1", "user1", 100, "m1", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 1000L, "dev1", "1.1.1.1"),
            new Transaction("txn2", "card1", "user1", 200, "m2", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 2000L, "dev1", "1.1.1.1"),
            new Transaction("txn3", "card1", "user1", 300, "m3", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 3000L, "dev1", "1.1.1.1"),
            new Transaction("txn4", "card1", "user1", 400, "m4", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 4000L, "dev1", "1.1.1.1"),
            new Transaction("txn5", "card1", "user1", 500, "m5", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 5000L, "dev1", "1.1.1.1"),
            new Transaction("txn6", "card1", "user1", 600, "m6", "retail", "US", "NYC", 40.7, -74.0, "APPROVED", 6000L, "dev1", "1.1.1.1") // Should trigger
        );

        // Apply velocity detector
        DataStream<FraudAlert> alerts = new VelocityFraudDetector()
            .detect(testStream.map(txn -> new EnrichedTransaction(txn, new UserProfile())));

        // Collect results
        List<FraudAlert> results = alerts.executeAndCollect(10);

        // Verify
        assertEquals(1, results.size());
        assertEquals(FraudAlert.FraudType.VELOCITY_FRAUD, results.get(0).getFraudType());
    }
}

Summary & Next Steps

What You’ve Built

A production-grade fraud detection system with: Real-time processing: Sub-second latency for millions of transactions ✅ Multi-strategy detection: CEP patterns + ML scoring ✅ Stateful processing: User profiles, transaction history ✅ Fault tolerance: Checkpointing, HA, exactly-once semantics ✅ Scalability: Horizontal scaling with Kubernetes ✅ Observability: Prometheus metrics, Grafana dashboards, PagerDuty alerts

Production Checklist

  • Deploy to Kubernetes cluster
  • Configure S3 for checkpoints/savepoints
  • Setup Kafka cluster (at least 3 brokers)
  • Deploy MySQL for alert storage
  • Configure Prometheus scraping
  • Import Grafana dashboards
  • Setup PagerDuty integration
  • Load test with synthetic data (1M+ txns/day)
  • Test failover (kill JobManager, verify recovery)
  • Monitor checkpoint duration, backpressure, lag

Enhancements

  1. Advanced ML: Train XGBoost/LightGBM models, A/B testing
  2. Graph Analysis: Neo4j for transaction networks
  3. Real-time Model Updates: Online learning, model versioning
  4. Multi-Region Deployment: Active-active for global coverage
  5. Cost Optimization: Spot instances, auto-scaling

Resources

Documentation

Papers

Congratulations! You’ve mastered Apache Flink by building a production-grade system. This project demonstrates all core Flink concepts: DataStream API, CEP, state management, checkpointing, and production deployment. You’re now ready to architect and deploy real-time streaming applications at scale!