> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Capstone Project: End-to-End Data Pipeline

> Build a production-ready, real-time log analytics platform using the full Hadoop ecosystem

# Capstone Project: Real-Time Log Analytics Platform

<Info>
  **Project Duration**: 4-5 hours
  **Complexity**: Production-grade implementation
  **Skills Tested**: All modules 1-7
</Info>

## Project Overview

Build a complete log analytics platform that:

1. **Ingests** web server logs in real-time
2. **Stores** data reliably in HDFS
3. **Processes** logs using MapReduce and Hive
4. **Stores** aggregated results in HBase for fast queries
5. **Orchestrates** the entire pipeline with Oozie
6. **Monitors** system health and job performance

### Business Requirements

**Scenario**: You're building analytics for a high-traffic e-commerce website

**Requirements**:

* Process 100GB of access logs daily
* Identify top products, traffic sources, and error rates
* Sessionize user journeys
* Detect anomalies (unusual traffic spikes, error patterns)
* Generate hourly reports
* Provide sub-second query response for dashboards

***

## Architecture

```
┌──────────────────────────────────────────────────────────┐
│                    DATA SOURCES                          │
│    Web Servers (Apache/Nginx) generating access.log     │
└────────────────────┬─────────────────────────────────────┘
                     │
                     ▼
┌──────────────────────────────────────────────────────────┐
│              INGESTION LAYER (Flume)                     │
│  • Tail log files from web servers                      │
│  • Buffer in memory channel                             │
│  • Write to HDFS in batches                             │
└────────────────────┬─────────────────────────────────────┘
                     │
                     ▼
┌──────────────────────────────────────────────────────────┐
│           STORAGE LAYER (HDFS)                           │
│  /raw/logs/YYYY/MM/DD/HH/access_YYYYMMDD_HH.log         │
│  Partitioned by date and hour                           │
└────────────────────┬─────────────────────────────────────┘
                     │
                     ▼
┌──────────────────────────────────────────────────────────┐
│        PROCESSING LAYER (MapReduce + Hive)               │
│                                                          │
│  ┌────────────────┐  ┌──────────────┐  ┌─────────────┐ │
│  │  Parse & Clean │→ │ Sessionize   │→ │  Aggregate  │ │
│  │  (MapReduce)   │  │ (MapReduce)  │  │   (Hive)    │ │
│  └────────────────┘  └──────────────┘  └─────────────┘ │
└────────────────────┬─────────────────────────────────────┘
                     │
          ┌──────────┴──────────┐
          │                     │
          ▼                     ▼
┌──────────────────┐  ┌──────────────────┐
│  HDFS (Processed)│  │   HBase (Fast    │
│  /processed/*    │  │   Lookup)        │
│  (Parquet)       │  │                  │
└──────────────────┘  └──────────────────┘
          │                     │
          └──────────┬──────────┘
                     │
                     ▼
┌──────────────────────────────────────────────────────────┐
│         ORCHESTRATION (Oozie)                            │
│  Hourly workflow: Ingest → Process → Aggregate → Store  │
└──────────────────────────────────────────────────────────┘
                     │
                     ▼
┌──────────────────────────────────────────────────────────┐
│           VISUALIZATION & MONITORING                     │
│  • Grafana dashboards (via Prometheus)                  │
│  • Custom web UI (queries HBase)                        │
└──────────────────────────────────────────────────────────┘
```

***

## Implementation Steps

### Step 1: Data Ingestion with Flume

**Create Flume Configuration** (`web-logs.conf`):

```properties theme={null}
# Define agent
agent.sources = weblog_source
agent.channels = memory_channel
agent.sinks = hdfs_sink

# Source: Tail access log
agent.sources.weblog_source.type = exec
agent.sources.weblog_source.command = tail -F /var/log/apache2/access.log
agent.sources.weblog_source.channels = memory_channel
agent.sources.weblog_source.batchSize = 1000

# Interceptor: Add timestamp
agent.sources.weblog_source.interceptors = timestamp
agent.sources.weblog_source.interceptors.timestamp.type = timestamp

# Channel: Memory buffer
agent.channels.memory_channel.type = memory
agent.channels.memory_channel.capacity = 100000
agent.channels.memory_channel.transactionCapacity = 10000

# Sink: HDFS with time-based partitioning
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = memory_channel
agent.sinks.hdfs_sink.hdfs.path = /raw/logs/%Y/%m/%d/%H
agent.sinks.hdfs_sink.hdfs.filePrefix = access
agent.sinks.hdfs_sink.hdfs.fileSuffix = .log
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.rollSize = 134217728
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.fileType = DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true
```

**Start Flume Agent**:

```bash theme={null}
flume-ng agent \
  --name agent \
  --conf-file web-logs.conf \
  -Dflume.root.logger=INFO,console
```

***

### Step 2: Parse Logs with MapReduce

**LogParser.java**:

```java theme={null}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class LogParser {

    public static class ParseMapper
            extends Mapper<Object, Text, NullWritable, Text> {

        // Apache Combined Log Format regex
        private static final Pattern LOG_PATTERN = Pattern.compile(
            "^(\\S+) \\S+ \\S+ \\[([^\\]]+)\\] " +
            "\"(\\S+) (\\S+) (\\S+)\" (\\d+) (\\d+) " +
            "\"([^\"]*)\" \"([^\"]*)\"");

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            String line = value.toString();
            Matcher matcher = LOG_PATTERN.matcher(line);

            if (matcher.matches()) {
                String ip = matcher.group(1);
                String timestamp = matcher.group(2);
                String method = matcher.group(3);
                String url = matcher.group(4);
                String protocol = matcher.group(5);
                int statusCode = Integer.parseInt(matcher.group(6));
                long bytes = Long.parseLong(matcher.group(7));
                String referer = matcher.group(8);
                String userAgent = matcher.group(9);

                // Output as CSV for easier Hive processing
                String output = String.format(
                    "%s,%s,%s,%s,%s,%d,%d,%s,%s",
                    ip, timestamp, method, url, protocol,
                    statusCode, bytes, referer, userAgent
                );

                context.write(NullWritable.get(), new Text(output));

                // Track errors
                if (statusCode >= 400) {
                    context.getCounter("LogStats", "Errors").increment(1);
                }

                // Track requests
                context.getCounter("LogStats", "TotalRequests").increment(1);

            } else {
                context.getCounter("LogStats", "MalformedLines").increment(1);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Log Parser");

        job.setJarByClass(LogParser.class);
        job.setMapperClass(ParseMapper.class);
        job.setNumReduceTasks(0); // Map-only job

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
```

**Run**:

```bash theme={null}
hadoop jar log-parser.jar LogParser \
  /raw/logs/2024/01/15 \
  /processed/parsed/2024/01/15
```

***

### Step 3: Create Hive Tables

**Create External Table on Parsed Logs**:

```sql theme={null}
CREATE EXTERNAL TABLE access_logs (
    ip STRING,
    log_timestamp STRING,
    method STRING,
    url STRING,
    protocol STRING,
    status_code INT,
    bytes_sent BIGINT,
    referer STRING,
    user_agent STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/processed/parsed'
TBLPROPERTIES ('skip.header.line.count'='0');

-- Create partitioned table for better performance
CREATE TABLE access_logs_partitioned (
    ip STRING,
    log_timestamp STRING,
    method STRING,
    url STRING,
    protocol STRING,
    status_code INT,
    bytes_sent BIGINT,
    referer STRING,
    user_agent STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

-- Load data into partitioned table
INSERT INTO access_logs_partitioned
PARTITION (year=2024, month=1, day=15)
SELECT * FROM access_logs
WHERE log_timestamp LIKE '15/Jan/2024%';
```

**Analytical Queries**:

```sql theme={null}
-- Top 10 URLs by traffic
SELECT url, COUNT(*) as hits, SUM(bytes_sent) as total_bytes
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY url
ORDER BY hits DESC
LIMIT 10;

-- Error rate by hour
SELECT
    substr(log_timestamp, 13, 2) as hour,
    COUNT(*) as total_requests,
    SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors,
    (SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) as error_rate
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY substr(log_timestamp, 13, 2)
ORDER BY hour;

-- Traffic sources (referers)
SELECT
    CASE
        WHEN referer LIKE '%google%' THEN 'Google'
        WHEN referer LIKE '%facebook%' THEN 'Facebook'
        WHEN referer LIKE '%twitter%' THEN 'Twitter'
        WHEN referer = '-' THEN 'Direct'
        ELSE 'Other'
    END as traffic_source,
    COUNT(*) as visits
FROM access_logs_partitioned
WHERE year=2024 AND month=1 AND day=15
GROUP BY
    CASE
        WHEN referer LIKE '%google%' THEN 'Google'
        WHEN referer LIKE '%facebook%' THEN 'Facebook'
        WHEN referer LIKE '%twitter%' THEN 'Twitter'
        WHEN referer = '-' THEN 'Direct'
        ELSE 'Other'
    END
ORDER BY visits DESC;
```

***

### Step 4: Sessionization with MapReduce

**SessionBuilder.java**:

```java theme={null}
public class SessionBuilder {

    public static class SessionMapper
            extends Mapper<Object, Text, Text, Event> {

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] fields = value.toString().split(",");
            String ip = fields[0];
            String timestamp = fields[1];
            String url = fields[3];

            // Use IP as session key (in production, use cookie/user ID)
            Event event = new Event(parseTimestamp(timestamp), url);
            context.write(new Text(ip), event);
        }

        private long parseTimestamp(String timestamp) {
            // Parse "15/Jan/2024:10:30:45 +0000" to epoch
            // Simplified for example
            return System.currentTimeMillis();
        }
    }

    public static class SessionReducer
            extends Reducer<Text, Event, Text, Session> {

        private static final long SESSION_TIMEOUT = 30 * 60 * 1000; // 30 min

        @Override
        public void reduce(Text key, Iterable<Event> values, Context context)
                throws IOException, InterruptedException {

            List<Event> events = new ArrayList<>();
            for (Event e : values) {
                events.add(new Event(e)); // Deep copy
            }

            // Sort by timestamp
            Collections.sort(events, (a, b) ->
                Long.compare(a.getTimestamp(), b.getTimestamp()));

            // Build sessions
            List<Session> sessions = new ArrayList<>();
            Session currentSession = null;

            for (Event event : events) {
                if (currentSession == null ||
                    event.getTimestamp() - currentSession.getLastEventTime()
                        > SESSION_TIMEOUT) {

                    // Start new session
                    if (currentSession != null) {
                        sessions.add(currentSession);
                    }
                    currentSession = new Session(event.getTimestamp());
                }

                currentSession.addEvent(event);
            }

            if (currentSession != null) {
                sessions.add(currentSession);
            }

            // Emit sessions
            for (int i = 0; i < sessions.size(); i++) {
                String sessionKey = key.toString() + "_" + i;
                context.write(new Text(sessionKey), sessions.get(i));
            }
        }
    }

    // Custom Writables for Event and Session
    public static class Event implements Writable, Comparable<Event> {
        private long timestamp;
        private String url;

        // Implement Writable and Comparable methods...
    }

    public static class Session implements Writable {
        private long startTime;
        private long endTime;
        private List<String> urls;

        // Methods to calculate:
        // - Session duration
        // - Page views
        // - Bounce rate (single-page sessions)
    }
}
```

***

### Step 5: Store Aggregates in HBase

**Create HBase Table**:

```bash theme={null}
hbase shell

create 'url_stats', 'metrics', 'metadata'
```

**Populate from Hive**:

```java theme={null}
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class HiveToHBase {

    public static void main(String[] args) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(config);
        Table table = connection.getTable(TableName.valueOf("url_stats"));

        // Read aggregated data from Hive output
        // For each URL statistic:
        String url = "/products/widget";
        long hits = 1523;
        long totalBytes = 45678901;

        Put put = new Put(Bytes.toBytes(url));
        put.addColumn(Bytes.toBytes("metrics"),
                     Bytes.toBytes("hits"),
                     Bytes.toBytes(hits));
        put.addColumn(Bytes.toBytes("metrics"),
                     Bytes.toBytes("total_bytes"),
                     Bytes.toBytes(totalBytes));
        put.addColumn(Bytes.toBytes("metadata"),
                     Bytes.toBytes("last_updated"),
                     Bytes.toBytes(System.currentTimeMillis()));

        table.put(put);

        table.close();
        connection.close();
    }
}
```

**Query HBase** (fast lookups for dashboard):

```java theme={null}
public class URLStatsQuery {

    public static URLStats getStats(String url) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(config);
        Table table = connection.getTable(TableName.valueOf("url_stats"));

        Get get = new Get(Bytes.toBytes(url));
        Result result = table.get(get);

        long hits = Bytes.toLong(result.getValue(
            Bytes.toBytes("metrics"), Bytes.toBytes("hits")));
        long totalBytes = Bytes.toLong(result.getValue(
            Bytes.toBytes("metrics"), Bytes.toBytes("total_bytes")));

        table.close();
        connection.close();

        return new URLStats(url, hits, totalBytes);
    }
}
```

***

### Step 6: Orchestrate with Oozie

**workflow\.xml**:

```xml theme={null}
<workflow-app name="log-analytics-pipeline" xmlns="uri:oozie:workflow:0.5">
    <start to="check-data"/>

    <!-- Check if data exists -->
    <decision name="check-data">
        <switch>
            <case to="parse-logs">
                ${fs:exists('/raw/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}')}
            </case>
            <default to="end"/>
        </switch>
    </decision>

    <!-- Parse logs with MapReduce -->
    <action name="parse-logs">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>LogParser</main-class>
            <arg>/raw/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
            <arg>/processed/parsed/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
        </map-reduce>
        <ok to="aggregate-hive"/>
        <error to="fail"/>
    </action>

    <!-- Aggregate with Hive -->
    <action name="aggregate-hive">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <script>aggregate.hql</script>
            <param>YEAR=${YEAR}</param>
            <param>MONTH=${MONTH}</param>
            <param>DAY=${DAY}</param>
        </hive>
        <ok to="sessionize"/>
        <error to="fail"/>
    </action>

    <!-- Build sessions -->
    <action name="sessionize">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <main-class>SessionBuilder</main-class>
            <arg>/processed/parsed/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
            <arg>/processed/sessions/${YEAR}/${MONTH}/${DAY}/${HOUR}</arg>
        </map-reduce>
        <ok to="load-hbase"/>
        <error to="fail"/>
    </action>

    <!-- Load to HBase -->
    <action name="load-hbase">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <main-class>HiveToHBase</main-class>
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Workflow failed: ${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>

    <end name="end"/>
</workflow-app>
```

**coordinator.xml** (Schedule hourly):

```xml theme={null}
<coordinator-app name="log-analytics-coordinator"
                 frequency="${coord:hours(1)}"
                 start="${start}" end="${end}"
                 timezone="UTC"
                 xmlns="uri:oozie:coordinator:0.4">

    <action>
        <workflow>
            <app-path>${workflowPath}</app-path>
            <configuration>
                <property>
                    <name>YEAR</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
                </property>
                <property>
                    <name>MONTH</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
                </property>
                <property>
                    <name>DAY</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>HOUR</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'HH')}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>
```

***

### Step 7: Monitoring Dashboard

**Grafana Dashboard** (Prometheus metrics):

```json theme={null}
{
  "title": "Log Analytics Pipeline",
  "panels": [
    {
      "title": "Requests per Minute",
      "targets": [{"expr": "rate(total_requests[1m])"}]
    },
    {
      "title": "Error Rate",
      "targets": [{"expr": "errors / total_requests * 100"}]
    },
    {
      "title": "Top URLs",
      "type": "table",
      "targets": [{"expr": "topk(10, url_hits)"}]
    },
    {
      "title": "Pipeline Latency",
      "targets": [{"expr": "oozie_workflow_duration_seconds"}]
    }
  ]
}
```

***

## Testing

### Unit Tests

```java theme={null}
@Test
public void testLogParser() {
    String logLine = "192.168.1.1 - - [15/Jan/2024:10:30:45 +0000] " +
                    "\"GET /products HTTP/1.1\" 200 1234 \"-\" \"Mozilla/5.0\"";

    // Test parsing logic
    Matcher matcher = LOG_PATTERN.matcher(logLine);
    assertTrue(matcher.matches());
    assertEquals("192.168.1.1", matcher.group(1));
    assertEquals("200", matcher.group(6));
}
```

### Integration Tests

```bash theme={null}
# Generate sample logs
python generate_logs.py --lines 100000 > test.log

# Run pipeline
flume-ng agent --conf-file test-flume.conf &
hadoop jar log-parser.jar LogParser /raw/logs/test /processed/test

# Verify output
hdfs dfs -cat /processed/test/part-m-00000 | head -10
```

***

## Project Deliverables

1. **Source Code**: All MapReduce, Hive, and Java files
2. **Configuration**: Flume, Oozie, HBase configs
3. **Documentation**: Architecture diagram, setup instructions
4. **Dashboard**: Grafana JSON export
5. **Performance Report**: Job execution times, resource usage
6. **Lessons Learned**: Challenges faced and solutions

***

## Bonus Challenges

1. **Real-time Processing**: Replace MapReduce with Spark Streaming
2. **Machine Learning**: Detect anomalies using Spark MLlib
3. **Data Quality**: Add data validation and cleansing steps
4. **Cost Optimization**: Implement data lifecycle policies
5. **Multi-tenancy**: Support multiple websites in same pipeline

***

## Congratulations!

You've built a production-ready data pipeline using the full Hadoop ecosystem. This capstone demonstrates:

* **Architecture Design**: Multi-layer data pipeline
* **Data Engineering**: ETL, parsing, aggregation
* **Performance**: Optimization techniques
* **Operations**: Monitoring, orchestration
* **Integration**: Multiple Hadoop components working together

### Next Steps

* Deploy to cloud (AWS EMR, Azure HDInsight, GCP Dataproc)
* Explore modern alternatives (Apache Spark, Apache Flink)
* Consider managed services (Databricks, Snowflake)
* Share your project on GitHub!

<Card title="Back to Course Overview" icon="book" href="/distributed-systems-tools/hadoop-overview">
  Review course structure and explore other modules
</Card>
