Skip to main content

Hadoop Ecosystem & Integration

Module Duration: 4-5 hours Tools Covered: Hive, Pig, HBase, Sqoop, Flume, Oozie Focus: When to use each tool and integration patterns

Ecosystem Overview

The Hadoop ecosystem provides specialized tools for different data processing needs:
┌─────────────────────────────────────────────────────────┐
│                   DATA SOURCES                          │
│   RDBMS  │  Logs  │  Streams  │  Files  │  APIs        │
└────┬──────┴────┬───┴─────┬─────┴────┬────┴────┬─────────┘
     │           │         │          │         │
     ▼           ▼         ▼          ▼         ▼
┌─────────┐ ┌────────┐ ┌────────┐ ┌────────┐ Custom
│  Sqoop  │ │ Flume  │ │ Kafka  │ │ Direct │ Ingest
│(RDBMS→  │ │ (Log   │ │(Stream)│ │  Copy  │
│ HDFS)   │ │Collect)│ │        │ │        │
└────┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
     └──────────┴──────────┴──────────┘


          ┌──────────────────┐
          │       HDFS       │
          │  (Storage Layer) │
          └────────┬─────────┘

     ┌─────────────┼─────────────┬─────────────┐
     │             │             │             │
     ▼             ▼             ▼             ▼
┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│  Hive   │  │   Pig   │  │MapReduce│  │  Spark  │
│ (SQL)   │  │(Scripts)│  │  (Java) │  │ (Fast)  │
└────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘
     └────────────┴────────────┴────────────┘


          ┌──────────────────┐
          │ Processed Data   │
          └────────┬─────────┘

     ┌─────────────┼─────────────┐
     │             │             │
     ▼             ▼             ▼
┌─────────┐  ┌─────────┐  ┌─────────┐
│  HBase  │  │  HDFS   │  │  BI     │
│(NoSQL)  │  │ (Files) │  │  Tools  │
└─────────┘  └─────────┘  └─────────┘

         Orchestration: Oozie / Airflow

Apache Hive: SQL on Hadoop

What is Hive?

Data warehouse system that provides SQL interface to Hadoop:
  • HiveQL: SQL-like query language
  • Schema on read: Define structure at query time
  • Translates to MapReduce/Tez/Spark: SQL → execution engine

Architecture

┌──────────────────────────────────────────┐
│        Hive Client (CLI/JDBC)            │
└───────────────┬──────────────────────────┘
                │ HiveQL Query

┌──────────────────────────────────────────┐
│         Hive Server (HS2)                │
│  • Parser                                │
│  • Compiler                              │
│  • Optimizer                             │
│  • Execution Engine                      │
└──────┬────────────────────┬──────────────┘
       │                    │
       │ Metadata           │ Job
       ▼                    ▼
┌──────────────┐     ┌─────────────────┐
│   Metastore  │     │  MapReduce/Tez  │
│   (Derby/    │     │   /Spark        │
│    MySQL)    │     └────────┬────────┘
└──────────────┘              │

                        ┌──────────┐
                        │   HDFS   │
                        └──────────┘

Basic HiveQL Examples

Create Table:
CREATE TABLE employees (
    id INT,
    name STRING,
    department STRING,
    salary DECIMAL(10,2),
    hire_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
Load Data:
-- Load from local file
LOAD DATA LOCAL INPATH '/tmp/employees.csv'
INTO TABLE employees;

-- Load from HDFS
LOAD DATA INPATH '/data/employees.csv'
INTO TABLE employees;
Partitioned Table (for performance):
CREATE TABLE sales (
    order_id INT,
    product STRING,
    amount DECIMAL(10,2)
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC;

-- Insert data
INSERT INTO TABLE sales PARTITION (year=2024, month=1)
SELECT order_id, product, amount
FROM raw_sales
WHERE year(order_date) = 2024 AND month(order_date) = 1;

-- Query specific partition (faster!)
SELECT SUM(amount)
FROM sales
WHERE year=2024 AND month=1;
Bucketing (for joins):
CREATE TABLE users (
    user_id INT,
    name STRING,
    email STRING
)
CLUSTERED BY (user_id) INTO 32 BUCKETS
STORED AS ORC;

Complex Queries

-- Join example
SELECT u.name, COUNT(o.order_id) as order_count
FROM users u
JOIN orders o ON u.user_id = o.user_id
WHERE o.order_date >= '2024-01-01'
GROUP BY u.name
HAVING COUNT(o.order_id) > 10
ORDER BY order_count DESC
LIMIT 100;

-- Window function
SELECT
    name,
    department,
    salary,
    AVG(salary) OVER (PARTITION BY department) as dept_avg,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;

-- Common Table Expression (CTE)
WITH high_earners AS (
    SELECT * FROM employees WHERE salary > 100000
),
dept_summary AS (
    SELECT department, COUNT(*) as count
    FROM high_earners
    GROUP BY department
)
SELECT * FROM dept_summary WHERE count > 5;

Performance Optimization

Use ORC or Parquet Format:
CREATE TABLE sales_optimized (
    order_id INT,
    product STRING,
    amount DECIMAL(10,2)
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

-- Convert existing table
INSERT OVERWRITE TABLE sales_optimized
SELECT * FROM sales_text;
Enable Vectorization:
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;
Cost-Based Optimizer:
SET hive.cbo.enable = true;
SET hive.compute.query.using.stats = true;
SET hive.stats.fetch.column.stats = true;

-- Gather statistics
ANALYZE TABLE employees COMPUTE STATISTICS;
ANALYZE TABLE employees COMPUTE STATISTICS FOR COLUMNS;

Apache Pig: Data Flow Scripting

What is Pig?

High-level platform for creating data processing programs:
  • Pig Latin: Procedural data flow language
  • ETL focus: Extract, Transform, Load pipelines
  • Compiles to MapReduce: Like Hive, but procedural

Pig Latin Basics

-- Load data
employees = LOAD '/data/employees.csv'
    USING PigStorage(',')
    AS (id:int, name:chararray, dept:chararray,
        salary:double, hire_date:chararray);

-- Filter
high_earners = FILTER employees BY salary > 100000;

-- Transform
employees_with_bonus = FOREACH high_earners GENERATE
    id,
    name,
    dept,
    salary,
    salary * 0.10 AS bonus;

-- Group
by_dept = GROUP employees_with_bonus BY dept;

-- Aggregate
dept_summary = FOREACH by_dept GENERATE
    group AS department,
    COUNT(employees_with_bonus) AS emp_count,
    AVG(employees_with_bonus.salary) AS avg_salary,
    SUM(employees_with_bonus.bonus) AS total_bonus;

-- Sort
sorted = ORDER dept_summary BY avg_salary DESC;

-- Store result
STORE sorted INTO '/output/dept_summary'
    USING PigStorage(',');

Join in Pig

-- Load datasets
users = LOAD '/data/users.csv'
    USING PigStorage(',')
    AS (user_id:int, name:chararray, city:chararray);

orders = LOAD '/data/orders.csv'
    USING PigStorage(',')
    AS (order_id:int, user_id:int, amount:double);

-- Inner join
joined = JOIN users BY user_id, orders BY user_id;

-- Left outer join
left_join = JOIN users BY user_id LEFT OUTER, orders BY user_id;

-- Result
result = FOREACH joined GENERATE
    users::name,
    users::city,
    orders::amount;

STORE result INTO '/output/user_orders';

User-Defined Functions (UDF)

// Custom Java UDF
package com.example;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class ToUpperCase extends EvalFunc<String> {
    @Override
    public String exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0) return null;

        String str = (String) input.get(0);
        return str == null ? null : str.toUpperCase();
    }
}
-- Register JAR
REGISTER myudfs.jar;

-- Use UDF
data = LOAD '/data/input.txt' AS (text:chararray);
upper = FOREACH data GENERATE com.example.ToUpperCase(text);

Apache HBase: NoSQL Database on HDFS

What is HBase?

Distributed, column-oriented NoSQL database:
  • Real-time read/write: Unlike HDFS (batch only)
  • Billions of rows: Horizontal scalability
  • Sparse data: Efficient storage of varying columns
  • Based on Google Bigtable: Similar design

Architecture

┌──────────────────────────────────────────┐
│           HBase Client                   │
└───────────────┬──────────────────────────┘


┌──────────────────────────────────────────┐
│          HMaster (Master)                │
│  • Schema management                     │
│  • Region assignment                     │
│  • Load balancing                        │
└───────────────┬──────────────────────────┘

    ┌───────────┼────────────┬────────────┐
    │           │            │            │
    ▼           ▼            ▼            ▼
┌─────────┐┌─────────┐┌─────────┐┌─────────┐
│RegionServer││RegionServer││RegionServer││RegionServer│
│         ││         ││         ││         │
│┌───────┐││┌───────┐││┌───────┐││┌───────┐│
││Region ││││Region ││││Region ││││Region ││
│└───────┘││└───────┘││└───────┘││└───────┘│
└─────────┘└─────────┘└─────────┘└─────────┘
     │          │          │          │
     └──────────┴──────────┴──────────┘


               ┌─────────┐
               │  HDFS   │
               └─────────┘

Data Model

Table: users
┌──────────────────────────────────────────────────────────┐
│ Row Key │ Column Family: info    │ CF: activity        │
│         │ name    │ email        │ last_login │ count  │
├─────────┼─────────┼──────────────┼────────────┼────────┤
│ user001 │ Alice   │ [email protected]     │ 2024-01-15 │ 42     │
│ user002 │ Bob     │ [email protected]     │ 2024-01-14 │ 15     │
│ user003 │ Charlie │ [email protected]     │ 2024-01-16 │ 7      │
└─────────┴─────────┴──────────────┴────────────┴────────┘
Key Concepts:
  • Row Key: Unique identifier, sorted order
  • Column Family: Group of columns, defined at table creation
  • Column: Qualified by family:qualifier (e.g., info:name)
  • Timestamp: Multiple versions of same cell

HBase Shell

# Create table
create 'users', 'info', 'activity'

# Put data
put 'users', 'user001', 'info:name', 'Alice'
put 'users', 'user001', 'info:email', '[email protected]'
put 'users', 'user001', 'activity:last_login', '2024-01-15'

# Get data
get 'users', 'user001'

# Scan table
scan 'users'

# Scan with filter
scan 'users', {STARTROW => 'user001', ENDROW => 'user100'}

# Delete
delete 'users', 'user001', 'info:email'

# Drop table
disable 'users'
drop 'users'

Java API

import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseExample {

    public static void main(String[] args) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(config);

        // Create table
        Admin admin = connection.getAdmin();
        TableName tableName = TableName.valueOf("users");

        if (!admin.tableExists(tableName)) {
            HTableDescriptor desc = new HTableDescriptor(tableName);
            desc.addFamily(new HColumnDescriptor("info"));
            desc.addFamily(new HColumnDescriptor("activity"));
            admin.createTable(desc);
        }

        // Get table
        Table table = connection.getTable(tableName);

        // Put data
        Put put = new Put(Bytes.toBytes("user001"));
        put.addColumn(Bytes.toBytes("info"),
                     Bytes.toBytes("name"),
                     Bytes.toBytes("Alice"));
        put.addColumn(Bytes.toBytes("info"),
                     Bytes.toBytes("email"),
                     Bytes.toBytes("[email protected]"));
        table.put(put);

        // Get data
        Get get = new Get(Bytes.toBytes("user001"));
        Result result = table.get(get);

        byte[] name = result.getValue(Bytes.toBytes("info"),
                                      Bytes.toBytes("name"));
        System.out.println("Name: " + Bytes.toString(name));

        // Scan with filter
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
        ResultScanner scanner = table.getScanner(scan);

        for (Result r : scanner) {
            byte[] value = r.getValue(Bytes.toBytes("info"),
                                      Bytes.toBytes("name"));
            System.out.println(Bytes.toString(value));
        }

        scanner.close();
        table.close();
        connection.close();
    }
}

Apache Sqoop: RDBMS ↔ Hadoop

Import from RDBMS to HDFS

# Basic import
sqoop import \
  --connect jdbc:mysql://dbserver/database \
  --username user \
  --password pass \
  --table employees \
  --target-dir /data/employees

# Import with query
sqoop import \
  --connect jdbc:mysql://dbserver/database \
  --username user \
  --password-file hdfs://path/to/password \
  --query 'SELECT * FROM employees WHERE $CONDITIONS AND dept="Engineering"' \
  --target-dir /data/eng_employees \
  --split-by employee_id \
  --num-mappers 4

# Import to Hive
sqoop import \
  --connect jdbc:mysql://dbserver/database \
  --table employees \
  --hive-import \
  --hive-table employees \
  --create-hive-table

# Incremental import
sqoop import \
  --connect jdbc:mysql://dbserver/database \
  --table orders \
  --incremental append \
  --check-column order_id \
  --last-value 100000 \
  --target-dir /data/orders

Export from Hadoop to RDBMS

sqoop export \
  --connect jdbc:mysql://dbserver/database \
  --username user \
  --password pass \
  --table employee_summary \
  --export-dir /output/summary \
  --input-fields-terminated-by ',' \
  --num-mappers 2

Apache Flume: Log Collection

Architecture

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Source    │────▶│   Channel    │────▶│    Sink     │
│ (Input)     │     │  (Buffer)    │     │  (Output)   │
└─────────────┘     └──────────────┘     └─────────────┘

Examples:
Source: Tail log files, HTTP, Syslog, Kafka
Channel: Memory, File (durable)
Sink: HDFS, HBase, Kafka, Elasticsearch

Configuration Example

flume.conf:
# Agent name
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# Source: Tail log file
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/app.log
agent1.sources.source1.channels = channel1

# Channel: Memory buffer
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000

# Sink: HDFS
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hdfs.path = /logs/%Y/%m/%d/%H
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 134217728
agent1.sinks.sink1.hdfs.rollCount = 0
Run Flume:
flume-ng agent \
  --name agent1 \
  --conf-file flume.conf \
  --conf /etc/flume/conf \
  -Dflume.root.logger=INFO,console

Apache Oozie: Workflow Scheduler

Workflow Example

workflow.xml:
<workflow-app name="data-pipeline" xmlns="uri:oozie:workflow:0.5">
    <start to="ingest"/>

    <!-- Sqoop import -->
    <action name="ingest">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <command>
                import --connect ${dbConnect}
                --table employees
                --target-dir /data/raw/employees
            </command>
        </sqoop>
        <ok to="transform"/>
        <error to="fail"/>
    </action>

    <!-- Hive transformation -->
    <action name="transform">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <script>transform.hql</script>
        </hive>
        <ok to="export"/>
        <error to="fail"/>
    </action>

    <!-- Sqoop export -->
    <action name="export">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <command>
                export --connect ${dbConnect}
                --table employee_summary
                --export-dir /data/processed/summary
            </command>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Workflow failed, error: ${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>

    <end name="end"/>
</workflow-app>

Tool Selection Guide

Use CaseBest Tool
Ad-hoc SQL queries on HDFSHive
Complex ETL pipelinesPig or Spark
Real-time read/writeHBase
Import from RDBMSSqoop
Log aggregationFlume or Kafka
Workflow orchestrationOozie or Airflow
Fast iterative processingSpark (not covered here)

What’s Next?

Module 6: Data Processing Patterns & Best Practices

Learn proven patterns and anti-patterns for efficient data processing