Capstone Project: Real-Time Log Analytics Platform
Project Duration: 4-5 hours
Complexity: Production-grade implementation
Skills Tested: All modules 1-7
Project Overview
Build a complete log analytics platform that:- Ingests web server logs in real-time
- Stores data reliably in HDFS
- Processes logs using MapReduce and Hive
- Stores aggregated results in HBase for fast queries
- Orchestrates the entire pipeline with Oozie
- Monitors system health and job performance
Business Requirements
Scenario: You’re building analytics for a high-traffic e-commerce website Requirements:- Process 100GB of access logs daily
- Identify top products, traffic sources, and error rates
- Sessionize user journeys
- Detect anomalies (unusual traffic spikes, error patterns)
- Generate hourly reports
- Provide sub-second query response for dashboards
Architecture
Copy
┌──────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ Web Servers (Apache/Nginx) generating access.log │
└────────────────────┬─────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ INGESTION LAYER (Flume) │
│ • Tail log files from web servers │
│ • Buffer in memory channel │
│ • Write to HDFS in batches │
└────────────────────┬─────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ STORAGE LAYER (HDFS) │
│ /raw/logs/YYYY/MM/DD/HH/access_YYYYMMDD_HH.log │
│ Partitioned by date and hour │
└────────────────────┬─────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ PROCESSING LAYER (MapReduce + Hive) │
│ │
│ ┌────────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ Parse & Clean │→ │ Sessionize │→ │ Aggregate │ │
│ │ (MapReduce) │ │ (MapReduce) │ │ (Hive) │ │
│ └────────────────┘ └──────────────┘ └─────────────┘ │
└────────────────────┬─────────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ HDFS (Processed)│ │ HBase (Fast │
│ /processed/* │ │ Lookup) │
│ (Parquet) │ │ │
└──────────────────┘ └──────────────────┘
│ │
└──────────┬──────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ ORCHESTRATION (Oozie) │
│ Hourly workflow: Ingest → Process → Aggregate → Store │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ VISUALIZATION & MONITORING │
│ • Grafana dashboards (via Prometheus) │
│ • Custom web UI (queries HBase) │
└──────────────────────────────────────────────────────────┘
Implementation Steps
Step 1: Data Ingestion with Flume
Create Flume Configuration (web-logs.conf):
Copy
# Define agent
agent.sources = weblog_source
agent.channels = memory_channel
agent.sinks = hdfs_sink
# Source: Tail access log
agent.sources.weblog_source.type = exec
agent.sources.weblog_source.command = tail -F /var/log/apache2/access.log
agent.sources.weblog_source.channels = memory_channel
agent.sources.weblog_source.batchSize = 1000
# Interceptor: Add timestamp
agent.sources.weblog_source.interceptors = timestamp
agent.sources.weblog_source.interceptors.timestamp.type = timestamp
# Channel: Memory buffer
agent.channels.memory_channel.type = memory
agent.channels.memory_channel.capacity = 100000
agent.channels.memory_channel.transactionCapacity = 10000
# Sink: HDFS with time-based partitioning
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = memory_channel
agent.sinks.hdfs_sink.hdfs.path = /raw/logs/%Y/%m/%d/%H
agent.sinks.hdfs_sink.hdfs.filePrefix = access
agent.sinks.hdfs_sink.hdfs.fileSuffix = .log
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.rollSize = 134217728
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.fileType = DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true
Copy
flume-ng agent \
--name agent \
--conf-file web-logs.conf \
-Dflume.root.logger=INFO,console
Step 2: Parse Logs with MapReduce
LogParser.java:Copy
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogParser {
public static class ParseMapper
extends Mapper<Object, Text, NullWritable, Text> {
// Apache Combined Log Format regex
private static final Pattern LOG_PATTERN = Pattern.compile(
"^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] " +
"\"(\\S+) (\\S+) (\\S+)\" (\\d+) (\\d+) " +
"\"([^\"]*)\" \"([^\"]*)\"");
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher matcher = LOG_PATTERN.matcher(line);
if (matcher.matches()) {
String ip = matcher.group(1);
String timestamp = matcher.group(2);
String method = matcher.group(3);
String url = matcher.group(4);
String protocol = matcher.group(5);
int statusCode = Integer.parseInt(matcher.group(6));
long bytes = Long.parseLong(matcher.group(7));
String referer = matcher.group(8);
String userAgent = matcher.group(9);
// Output as CSV for easier Hive processing
String output = String.format(
"%s,%s,%s,%s,%s,%d,%d,%s,%s",
ip, timestamp, method, url, protocol,
statusCode, bytes, referer, userAgent
);
context.write(NullWritable.get(), new Text(output));
// Track errors
if (statusCode >= 400) {
context.getCounter("LogStats", "Errors").increment(1);
}
// Track requests
context.getCounter("LogStats", "TotalRequests").increment(1);
} else {
context.getCounter("LogStats", "MalformedLines").increment(1);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Log Parser");
job.setJarByClass(LogParser.class);
job.setMapperClass(ParseMapper.class);
job.setNumReduceTasks(0); // Map-only job
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Copy
hadoop jar log-parser.jar LogParser \
/raw/logs/2024/01/15 \
/processed/parsed/2024/01/15
Step 3: Create Hive Tables
Create External Table on Parsed Logs:Copy
CREATE EXTERNAL TABLE access_logs (
ip STRING,
log_timestamp STRING,
method STRING,
url STRING,
protocol STRING,
status_code INT,
bytes_sent BIGINT,
referer STRING,
user_agent STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/processed/parsed'
TBLPROPERTIES ('skip.header.line.count'='0');
-- Create partitioned table for better performance
CREATE TABLE access_logs_partitioned (
ip STRING,
log_timestamp STRING,
method STRING,
url STRING,
protocol STRING,
status_code INT,
bytes_sent BIGINT,
referer STRING,
user_agent STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');
-- Load data into partitioned table
INSERT INTO access_logs_partitioned
PARTITION (year=2024, month=1, day=15)
SELECT * FROM access_logs
WHERE log_timestamp LIKE '15/Jan/2024%';
Copy
-- Top 10 URLs by traffic
SELECT url, COUNT(*) as hits, SUM(bytes_sent) as total_bytes
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY url
ORDER BY hits DESC
LIMIT 10;
-- Error rate by hour
SELECT
substr(log_timestamp, 13, 2) as hour,
COUNT(*) as total_requests,
SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors,
(SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) as error_rate
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY substr(log_timestamp, 13, 2)
ORDER BY hour;
-- Traffic sources (referers)
SELECT
CASE
WHEN referer LIKE '%google%' THEN 'Google'
WHEN referer LIKE '%facebook%' THEN 'Facebook'
WHEN referer LIKE '%twitter%' THEN 'Twitter'
WHEN referer = '-' THEN 'Direct'
ELSE 'Other'
END as traffic_source,
COUNT(*) as visits
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY
CASE
WHEN referer LIKE '%google%' THEN 'Google'
WHEN referer LIKE '%facebook%' THEN 'Facebook'
WHEN referer LIKE '%twitter%' THEN 'Twitter'
WHEN referer = '-' THEN 'Direct'
ELSE 'Other'
END
ORDER BY visits DESC;
Step 4: Sessionization with MapReduce
SessionBuilder.java:Copy
public class SessionBuilder {
public static class SessionMapper
extends Mapper<Object, Text, Text, Event> {
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
String ip = fields[0];
String timestamp = fields[1];
String url = fields[3];
// Use IP as session key (in production, use cookie/user ID)
Event event = new Event(parseTimestamp(timestamp), url);
context.write(new Text(ip), event);
}
private long parseTimestamp(String timestamp) {
// Parse "15/Jan/2024:10:30:45 +0000" to epoch
// Simplified for example
return System.currentTimeMillis();
}
}
public static class SessionReducer
extends Reducer<Text, Event, Text, Session> {
private static final long SESSION_TIMEOUT = 30 * 60 * 1000; // 30 min
@Override
public void reduce(Text key, Iterable<Event> values, Context context)
throws IOException, InterruptedException {
List<Event> events = new ArrayList<>();
for (Event e : values) {
events.add(new Event(e)); // Deep copy
}
// Sort by timestamp
Collections.sort(events, (a, b) ->
Long.compare(a.getTimestamp(), b.getTimestamp()));
// Build sessions
List<Session> sessions = new ArrayList<>();
Session currentSession = null;
for (Event event : events) {
if (currentSession == null ||
event.getTimestamp() - currentSession.getLastEventTime()
> SESSION_TIMEOUT) {
// Start new session
if (currentSession != null) {
sessions.add(currentSession);
}
currentSession = new Session(event.getTimestamp());
}
currentSession.addEvent(event);
}
if (currentSession != null) {
sessions.add(currentSession);
}
// Emit sessions
for (int i = 0; i < sessions.size(); i++) {
String sessionKey = key.toString() + "_" + i;
context.write(new Text(sessionKey), sessions.get(i));
}
}
}
// Custom Writables for Event and Session
public static class Event implements Writable, Comparable<Event> {
private long timestamp;
private String url;
// Implement Writable and Comparable methods...
}
public static class Session implements Writable {
private long startTime;
private long endTime;
private List<String> urls;
// Methods to calculate:
// - Session duration
// - Page views
// - Bounce rate (single-page sessions)
}
}
Step 5: Store Aggregates in HBase
Create HBase Table:Copy
hbase shell
create 'url_stats', 'metrics', 'metadata'
Copy
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class HiveToHBase {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("url_stats"));
// Read aggregated data from Hive output
// For each URL statistic:
String url = "/products/widget";
long hits = 1523;
long totalBytes = 45678901;
Put put = new Put(Bytes.toBytes(url));
put.addColumn(Bytes.toBytes("metrics"),
Bytes.toBytes("hits"),
Bytes.toBytes(hits));
put.addColumn(Bytes.toBytes("metrics"),
Bytes.toBytes("total_bytes"),
Bytes.toBytes(totalBytes));
put.addColumn(Bytes.toBytes("metadata"),
Bytes.toBytes("last_updated"),
Bytes.toBytes(System.currentTimeMillis()));
table.put(put);
table.close();
connection.close();
}
}
Copy
public class URLStatsQuery {
public static URLStats getStats(String url) throws Exception {
Configuration config = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("url_stats"));
Get get = new Get(Bytes.toBytes(url));
Result result = table.get(get);
long hits = Bytes.toLong(result.getValue(
Bytes.toBytes("metrics"), Bytes.toBytes("hits")));
long totalBytes = Bytes.toLong(result.getValue(
Bytes.toBytes("metrics"), Bytes.toBytes("total_bytes")));
table.close();
connection.close();
return new URLStats(url, hits, totalBytes);
}
}
Step 6: Orchestrate with Oozie
workflow.xml:Copy
<workflow-app name="log-analytics-pipeline" xmlns="uri:oozie:workflow:0.5">
<start to="check-data"/>
<!-- Check if data exists -->
<decision name="check-data">
<switch>
<case to="parse-logs">
${fs:exists('/raw/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}')}
</case>
<default to="end"/>
</switch>
</decision>
<!-- Parse logs with MapReduce -->
<action name="parse-logs">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>LogParser</main-class>
<arg>/raw/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
<arg>/processed/parsed/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
</map-reduce>
<ok to="aggregate-hive"/>
<error to="fail"/>
</action>
<!-- Aggregate with Hive -->
<action name="aggregate-hive">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>aggregate.hql</script>
<param>YEAR=${YEAR}</param>
<param>MONTH=${MONTH}</param>
<param>DAY=${DAY}</param>
</hive>
<ok to="sessionize"/>
<error to="fail"/>
</action>
<!-- Build sessions -->
<action name="sessionize">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>SessionBuilder</main-class>
<arg>/processed/parsed/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
<arg>/processed/sessions/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
</map-reduce>
<ok to="load-hbase"/>
<error to="fail"/>
</action>
<!-- Load to HBase -->
<action name="load-hbase">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>HiveToHBase</main-class>
</java>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Workflow failed: ${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
Copy
<coordinator-app name="log-analytics-coordinator"
frequency="${coord:hours(1)}"
start="${start}" end="${end}"
timezone="UTC"
xmlns="uri:oozie:coordinator:0.4">
<action>
<workflow>
<app-path>${workflowPath}</app-path>
<configuration>
<property>
<name>YEAR</name>
<value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
</property>
<property>
<name>MONTH</name>
<value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
</property>
<property>
<name>DAY</name>
<value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>HOUR</name>
<value>${coord:formatTime(coord:nominalTime(), 'HH')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Step 7: Monitoring Dashboard
Grafana Dashboard (Prometheus metrics):Copy
{
"title": "Log Analytics Pipeline",
"panels": [
{
"title": "Requests per Minute",
"targets": [{"expr": "rate(total_requests[1m])"}]
},
{
"title": "Error Rate",
"targets": [{"expr": "errors / total_requests * 100"}]
},
{
"title": "Top URLs",
"type": "table",
"targets": [{"expr": "topk(10, url_hits)"}]
},
{
"title": "Pipeline Latency",
"targets": [{"expr": "oozie_workflow_duration_seconds"}]
}
]
}
Testing
Unit Tests
Copy
@Test
public void testLogParser() {
String logLine = "192.168.1.1 - - [15/Jan/2024:10:30:45 +0000] " +
"\"GET /products HTTP/1.1\" 200 1234 \"-\" \"Mozilla/5.0\"";
// Test parsing logic
Matcher matcher = LOG_PATTERN.matcher(logLine);
assertTrue(matcher.matches());
assertEquals("192.168.1.1", matcher.group(1));
assertEquals("200", matcher.group(6));
}
Integration Tests
Copy
# Generate sample logs
python generate_logs.py --lines 100000 > test.log
# Run pipeline
flume-ng agent --conf-file test-flume.conf &
hadoop jar log-parser.jar LogParser /raw/logs/test /processed/test
# Verify output
hdfs dfs -cat /processed/test/part-m-00000 | head -10
Project Deliverables
- Source Code: All MapReduce, Hive, and Java files
- Configuration: Flume, Oozie, HBase configs
- Documentation: Architecture diagram, setup instructions
- Dashboard: Grafana JSON export
- Performance Report: Job execution times, resource usage
- Lessons Learned: Challenges faced and solutions
Bonus Challenges
- Real-time Processing: Replace MapReduce with Spark Streaming
- Machine Learning: Detect anomalies using Spark MLlib
- Data Quality: Add data validation and cleansing steps
- Cost Optimization: Implement data lifecycle policies
- Multi-tenancy: Support multiple websites in same pipeline
Congratulations!
You’ve built a production-ready data pipeline using the full Hadoop ecosystem. This capstone demonstrates:- Architecture Design: Multi-layer data pipeline
- Data Engineering: ETL, parsing, aggregation
- Performance: Optimization techniques
- Operations: Monitoring, orchestration
- Integration: Multiple Hadoop components working together
Next Steps
- Deploy to cloud (AWS EMR, Azure HDInsight, GCP Dataproc)
- Explore modern alternatives (Apache Spark, Apache Flink)
- Consider managed services (Databricks, Snowflake)
- Share your project on GitHub!
Back to Course Overview
Review course structure and explore other modules