Skip to main content

Capstone Project: Real-Time Log Analytics Platform

Project Duration: 4-5 hours Complexity: Production-grade implementation Skills Tested: All modules 1-7

Project Overview

Build a complete log analytics platform that:
  1. Ingests web server logs in real-time
  2. Stores data reliably in HDFS
  3. Processes logs using MapReduce and Hive
  4. Stores aggregated results in HBase for fast queries
  5. Orchestrates the entire pipeline with Oozie
  6. Monitors system health and job performance

Business Requirements

Scenario: You’re building analytics for a high-traffic e-commerce website Requirements:
  • Process 100GB of access logs daily
  • Identify top products, traffic sources, and error rates
  • Sessionize user journeys
  • Detect anomalies (unusual traffic spikes, error patterns)
  • Generate hourly reports
  • Provide sub-second query response for dashboards

Architecture

┌──────────────────────────────────────────────────────────┐
│                    DATA SOURCES                          │
│    Web Servers (Apache/Nginx) generating access.log     │
└────────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│              INGESTION LAYER (Flume)                     │
│  • Tail log files from web servers                      │
│  • Buffer in memory channel                             │
│  • Write to HDFS in batches                             │
└────────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│           STORAGE LAYER (HDFS)                           │
│  /raw/logs/YYYY/MM/DD/HH/access_YYYYMMDD_HH.log         │
│  Partitioned by date and hour                           │
└────────────────────┬─────────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│        PROCESSING LAYER (MapReduce + Hive)               │
│                                                          │
│  ┌────────────────┐  ┌──────────────┐  ┌─────────────┐ │
│  │  Parse & Clean │→ │ Sessionize   │→ │  Aggregate  │ │
│  │  (MapReduce)   │  │ (MapReduce)  │  │   (Hive)    │ │
│  └────────────────┘  └──────────────┘  └─────────────┘ │
└────────────────────┬─────────────────────────────────────┘

          ┌──────────┴──────────┐
          │                     │
          ▼                     ▼
┌──────────────────┐  ┌──────────────────┐
│  HDFS (Processed)│  │   HBase (Fast    │
│  /processed/*    │  │   Lookup)        │
│  (Parquet)       │  │                  │
└──────────────────┘  └──────────────────┘
          │                     │
          └──────────┬──────────┘


┌──────────────────────────────────────────────────────────┐
│         ORCHESTRATION (Oozie)                            │
│  Hourly workflow: Ingest → Process → Aggregate → Store  │
└──────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│           VISUALIZATION & MONITORING                     │
│  • Grafana dashboards (via Prometheus)                  │
│  • Custom web UI (queries HBase)                        │
└──────────────────────────────────────────────────────────┘

Implementation Steps

Step 1: Data Ingestion with Flume

Create Flume Configuration (web-logs.conf):
# Define agent
agent.sources = weblog_source
agent.channels = memory_channel
agent.sinks = hdfs_sink

# Source: Tail access log
agent.sources.weblog_source.type = exec
agent.sources.weblog_source.command = tail -F /var/log/apache2/access.log
agent.sources.weblog_source.channels = memory_channel
agent.sources.weblog_source.batchSize = 1000

# Interceptor: Add timestamp
agent.sources.weblog_source.interceptors = timestamp
agent.sources.weblog_source.interceptors.timestamp.type = timestamp

# Channel: Memory buffer
agent.channels.memory_channel.type = memory
agent.channels.memory_channel.capacity = 100000
agent.channels.memory_channel.transactionCapacity = 10000

# Sink: HDFS with time-based partitioning
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = memory_channel
agent.sinks.hdfs_sink.hdfs.path = /raw/logs/%Y/%m/%d/%H
agent.sinks.hdfs_sink.hdfs.filePrefix = access
agent.sinks.hdfs_sink.hdfs.fileSuffix = .log
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.rollSize = 134217728
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.fileType = DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true
Start Flume Agent:
flume-ng agent \
  --name agent \
  --conf-file web-logs.conf \
  -Dflume.root.logger=INFO,console

Step 2: Parse Logs with MapReduce

LogParser.java:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class LogParser {

    public static class ParseMapper
            extends Mapper<Object, Text, NullWritable, Text> {

        // Apache Combined Log Format regex
        private static final Pattern LOG_PATTERN = Pattern.compile(
            "^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] " +
            "\"(\\S+) (\\S+) (\\S+)\" (\\d+) (\\d+) " +
            "\"([^\"]*)\" \"([^\"]*)\"");

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            Matcher matcher = LOG_PATTERN.matcher(line);

            if (matcher.matches()) {
                String ip = matcher.group(1);
                String timestamp = matcher.group(2);
                String method = matcher.group(3);
                String url = matcher.group(4);
                String protocol = matcher.group(5);
                int statusCode = Integer.parseInt(matcher.group(6));
                long bytes = Long.parseLong(matcher.group(7));
                String referer = matcher.group(8);
                String userAgent = matcher.group(9);

                // Output as CSV for easier Hive processing
                String output = String.format(
                    "%s,%s,%s,%s,%s,%d,%d,%s,%s",
                    ip, timestamp, method, url, protocol,
                    statusCode, bytes, referer, userAgent
                );

                context.write(NullWritable.get(), new Text(output));

                // Track errors
                if (statusCode >= 400) {
                    context.getCounter("LogStats", "Errors").increment(1);
                }

                // Track requests
                context.getCounter("LogStats", "TotalRequests").increment(1);

            } else {
                context.getCounter("LogStats", "MalformedLines").increment(1);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Log Parser");

        job.setJarByClass(LogParser.class);
        job.setMapperClass(ParseMapper.class);
        job.setNumReduceTasks(0); // Map-only job

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Run:
hadoop jar log-parser.jar LogParser \
  /raw/logs/2024/01/15 \
  /processed/parsed/2024/01/15

Step 3: Create Hive Tables

Create External Table on Parsed Logs:
CREATE EXTERNAL TABLE access_logs (
    ip STRING,
    log_timestamp STRING,
    method STRING,
    url STRING,
    protocol STRING,
    status_code INT,
    bytes_sent BIGINT,
    referer STRING,
    user_agent STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/processed/parsed'
TBLPROPERTIES ('skip.header.line.count'='0');

-- Create partitioned table for better performance
CREATE TABLE access_logs_partitioned (
    ip STRING,
    log_timestamp STRING,
    method STRING,
    url STRING,
    protocol STRING,
    status_code INT,
    bytes_sent BIGINT,
    referer STRING,
    user_agent STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- Load data into partitioned table
INSERT INTO access_logs_partitioned
PARTITION (year=2024, month=1, day=15)
SELECT * FROM access_logs
WHERE log_timestamp LIKE '15/Jan/2024%';
Analytical Queries:
-- Top 10 URLs by traffic
SELECT url, COUNT(*) as hits, SUM(bytes_sent) as total_bytes
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY url
ORDER BY hits DESC
LIMIT 10;

-- Error rate by hour
SELECT
    substr(log_timestamp, 13, 2) as hour,
    COUNT(*) as total_requests,
    SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors,
    (SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) as error_rate
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY substr(log_timestamp, 13, 2)
ORDER BY hour;

-- Traffic sources (referers)
SELECT
    CASE
        WHEN referer LIKE '%google%' THEN 'Google'
        WHEN referer LIKE '%facebook%' THEN 'Facebook'
        WHEN referer LIKE '%twitter%' THEN 'Twitter'
        WHEN referer = '-' THEN 'Direct'
        ELSE 'Other'
    END as traffic_source,
    COUNT(*) as visits
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY
    CASE
        WHEN referer LIKE '%google%' THEN 'Google'
        WHEN referer LIKE '%facebook%' THEN 'Facebook'
        WHEN referer LIKE '%twitter%' THEN 'Twitter'
        WHEN referer = '-' THEN 'Direct'
        ELSE 'Other'
    END
ORDER BY visits DESC;

Step 4: Sessionization with MapReduce

SessionBuilder.java:
public class SessionBuilder {

    public static class SessionMapper
            extends Mapper<Object, Text, Text, Event> {

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] fields = value.toString().split(",");
            String ip = fields[0];
            String timestamp = fields[1];
            String url = fields[3];

            // Use IP as session key (in production, use cookie/user ID)
            Event event = new Event(parseTimestamp(timestamp), url);
            context.write(new Text(ip), event);
        }

        private long parseTimestamp(String timestamp) {
            // Parse "15/Jan/2024:10:30:45 +0000" to epoch
            // Simplified for example
            return System.currentTimeMillis();
        }
    }

    public static class SessionReducer
            extends Reducer<Text, Event, Text, Session> {

        private static final long SESSION_TIMEOUT = 30 * 60 * 1000; // 30 min

        @Override
        public void reduce(Text key, Iterable<Event> values, Context context)
                throws IOException, InterruptedException {

            List<Event> events = new ArrayList<>();
            for (Event e : values) {
                events.add(new Event(e)); // Deep copy
            }

            // Sort by timestamp
            Collections.sort(events, (a, b) ->
                Long.compare(a.getTimestamp(), b.getTimestamp()));

            // Build sessions
            List<Session> sessions = new ArrayList<>();
            Session currentSession = null;

            for (Event event : events) {
                if (currentSession == null ||
                    event.getTimestamp() - currentSession.getLastEventTime()
                        > SESSION_TIMEOUT) {

                    // Start new session
                    if (currentSession != null) {
                        sessions.add(currentSession);
                    }
                    currentSession = new Session(event.getTimestamp());
                }

                currentSession.addEvent(event);
            }

            if (currentSession != null) {
                sessions.add(currentSession);
            }

            // Emit sessions
            for (int i = 0; i < sessions.size(); i++) {
                String sessionKey = key.toString() + "_" + i;
                context.write(new Text(sessionKey), sessions.get(i));
            }
        }
    }

    // Custom Writables for Event and Session
    public static class Event implements Writable, Comparable<Event> {
        private long timestamp;
        private String url;

        // Implement Writable and Comparable methods...
    }

    public static class Session implements Writable {
        private long startTime;
        private long endTime;
        private List<String> urls;

        // Methods to calculate:
        // - Session duration
        // - Page views
        // - Bounce rate (single-page sessions)
    }
}

Step 5: Store Aggregates in HBase

Create HBase Table:
hbase shell

create 'url_stats', 'metrics', 'metadata'
Populate from Hive:
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class HiveToHBase {

    public static void main(String[] args) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(config);
        Table table = connection.getTable(TableName.valueOf("url_stats"));

        // Read aggregated data from Hive output
        // For each URL statistic:
        String url = "/products/widget";
        long hits = 1523;
        long totalBytes = 45678901;

        Put put = new Put(Bytes.toBytes(url));
        put.addColumn(Bytes.toBytes("metrics"),
                     Bytes.toBytes("hits"),
                     Bytes.toBytes(hits));
        put.addColumn(Bytes.toBytes("metrics"),
                     Bytes.toBytes("total_bytes"),
                     Bytes.toBytes(totalBytes));
        put.addColumn(Bytes.toBytes("metadata"),
                     Bytes.toBytes("last_updated"),
                     Bytes.toBytes(System.currentTimeMillis()));

        table.put(put);

        table.close();
        connection.close();
    }
}
Query HBase (fast lookups for dashboard):
public class URLStatsQuery {

    public static URLStats getStats(String url) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(config);
        Table table = connection.getTable(TableName.valueOf("url_stats"));

        Get get = new Get(Bytes.toBytes(url));
        Result result = table.get(get);

        long hits = Bytes.toLong(result.getValue(
            Bytes.toBytes("metrics"), Bytes.toBytes("hits")));
        long totalBytes = Bytes.toLong(result.getValue(
            Bytes.toBytes("metrics"), Bytes.toBytes("total_bytes")));

        table.close();
        connection.close();

        return new URLStats(url, hits, totalBytes);
    }
}

Step 6: Orchestrate with Oozie

workflow.xml:
<workflow-app name="log-analytics-pipeline" xmlns="uri:oozie:workflow:0.5">
    <start to="check-data"/>

    <!-- Check if data exists -->
    <decision name="check-data">
        <switch>
            <case to="parse-logs">
                ${fs:exists('/raw/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}')}
            </case>
            <default to="end"/>
        </switch>
    </decision>

    <!-- Parse logs with MapReduce -->
    <action name="parse-logs">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>LogParser</main-class>
            <arg>/raw/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
            <arg>/processed/parsed/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
        </map-reduce>
        <ok to="aggregate-hive"/>
        <error to="fail"/>
    </action>

    <!-- Aggregate with Hive -->
    <action name="aggregate-hive">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <script>aggregate.hql</script>
            <param>YEAR=${YEAR}</param>
            <param>MONTH=${MONTH}</param>
            <param>DAY=${DAY}</param>
        </hive>
        <ok to="sessionize"/>
        <error to="fail"/>
    </action>

    <!-- Build sessions -->
    <action name="sessionize">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <main-class>SessionBuilder</main-class>
            <arg>/processed/parsed/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
            <arg>/processed/sessions/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
        </map-reduce>
        <ok to="load-hbase"/>
        <error to="fail"/>
    </action>

    <!-- Load to HBase -->
    <action name="load-hbase">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <main-class>HiveToHBase</main-class>
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Workflow failed: ${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>

    <end name="end"/>
</workflow-app>
coordinator.xml (Schedule hourly):
<coordinator-app name="log-analytics-coordinator"
                 frequency="${coord:hours(1)}"
                 start="${start}" end="${end}"
                 timezone="UTC"
                 xmlns="uri:oozie:coordinator:0.4">

    <action>
        <workflow>
            <app-path>${workflowPath}</app-path>
            <configuration>
                <property>
                    <name>YEAR</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
                </property>
                <property>
                    <name>MONTH</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
                </property>
                <property>
                    <name>DAY</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>HOUR</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'HH')}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Step 7: Monitoring Dashboard

Grafana Dashboard (Prometheus metrics):
{
  "title": "Log Analytics Pipeline",
  "panels": [
    {
      "title": "Requests per Minute",
      "targets": [{"expr": "rate(total_requests[1m])"}]
    },
    {
      "title": "Error Rate",
      "targets": [{"expr": "errors / total_requests * 100"}]
    },
    {
      "title": "Top URLs",
      "type": "table",
      "targets": [{"expr": "topk(10, url_hits)"}]
    },
    {
      "title": "Pipeline Latency",
      "targets": [{"expr": "oozie_workflow_duration_seconds"}]
    }
  ]
}

Testing

Unit Tests

@Test
public void testLogParser() {
    String logLine = "192.168.1.1 - - [15/Jan/2024:10:30:45 +0000] " +
                    "\"GET /products HTTP/1.1\" 200 1234 \"-\" \"Mozilla/5.0\"";

    // Test parsing logic
    Matcher matcher = LOG_PATTERN.matcher(logLine);
    assertTrue(matcher.matches());
    assertEquals("192.168.1.1", matcher.group(1));
    assertEquals("200", matcher.group(6));
}

Integration Tests

# Generate sample logs
python generate_logs.py --lines 100000 > test.log

# Run pipeline
flume-ng agent --conf-file test-flume.conf &
hadoop jar log-parser.jar LogParser /raw/logs/test /processed/test

# Verify output
hdfs dfs -cat /processed/test/part-m-00000 | head -10

Project Deliverables

  1. Source Code: All MapReduce, Hive, and Java files
  2. Configuration: Flume, Oozie, HBase configs
  3. Documentation: Architecture diagram, setup instructions
  4. Dashboard: Grafana JSON export
  5. Performance Report: Job execution times, resource usage
  6. Lessons Learned: Challenges faced and solutions

Bonus Challenges

  1. Real-time Processing: Replace MapReduce with Spark Streaming
  2. Machine Learning: Detect anomalies using Spark MLlib
  3. Data Quality: Add data validation and cleansing steps
  4. Cost Optimization: Implement data lifecycle policies
  5. Multi-tenancy: Support multiple websites in same pipeline

Congratulations!

You’ve built a production-ready data pipeline using the full Hadoop ecosystem. This capstone demonstrates:
  • Architecture Design: Multi-layer data pipeline
  • Data Engineering: ETL, parsing, aggregation
  • Performance: Optimization techniques
  • Operations: Monitoring, orchestration
  • Integration: Multiple Hadoop components working together

Next Steps

  • Deploy to cloud (AWS EMR, Azure HDInsight, GCP Dataproc)
  • Explore modern alternatives (Apache Spark, Apache Flink)
  • Consider managed services (Databricks, Snowflake)
  • Share your project on GitHub!

Back to Course Overview

Review course structure and explore other modules