Hadoop Ecosystem & Integration
Module Duration: 4-5 hours
Tools Covered: Hive, Pig, HBase, Sqoop, Flume, Oozie
Focus: When to use each tool and integration patterns
Ecosystem Overview
The Hadoop ecosystem provides specialized tools for different data processing needs:Copy
┌─────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ RDBMS │ Logs │ Streams │ Files │ APIs │
└────┬──────┴────┬───┴─────┬─────┴────┬────┴────┬─────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────┐ ┌────────┐ ┌────────┐ ┌────────┐ Custom
│ Sqoop │ │ Flume │ │ Kafka │ │ Direct │ Ingest
│(RDBMS→ │ │ (Log │ │(Stream)│ │ Copy │
│ HDFS) │ │Collect)│ │ │ │ │
└────┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
└──────────┴──────────┴──────────┘
│
▼
┌──────────────────┐
│ HDFS │
│ (Storage Layer) │
└────────┬─────────┘
│
┌─────────────┼─────────────┬─────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Hive │ │ Pig │ │MapReduce│ │ Spark │
│ (SQL) │ │(Scripts)│ │ (Java) │ │ (Fast) │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
└────────────┴────────────┴────────────┘
│
▼
┌──────────────────┐
│ Processed Data │
└────────┬─────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ HBase │ │ HDFS │ │ BI │
│(NoSQL) │ │ (Files) │ │ Tools │
└─────────┘ └─────────┘ └─────────┘
Orchestration: Oozie / Airflow
Apache Hive: SQL on Hadoop
What is Hive?
Data warehouse system that provides SQL interface to Hadoop:- HiveQL: SQL-like query language
- Schema on read: Define structure at query time
- Translates to MapReduce/Tez/Spark: SQL → execution engine
Architecture
Copy
┌──────────────────────────────────────────┐
│ Hive Client (CLI/JDBC) │
└───────────────┬──────────────────────────┘
│ HiveQL Query
▼
┌──────────────────────────────────────────┐
│ Hive Server (HS2) │
│ • Parser │
│ • Compiler │
│ • Optimizer │
│ • Execution Engine │
└──────┬────────────────────┬──────────────┘
│ │
│ Metadata │ Job
▼ ▼
┌──────────────┐ ┌─────────────────┐
│ Metastore │ │ MapReduce/Tez │
│ (Derby/ │ │ /Spark │
│ MySQL) │ └────────┬────────┘
└──────────────┘ │
▼
┌──────────┐
│ HDFS │
└──────────┘
Basic HiveQL Examples
Create Table:Copy
CREATE TABLE employees (
id INT,
name STRING,
department STRING,
salary DECIMAL(10,2),
hire_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
Copy
-- Load from local file
LOAD DATA LOCAL INPATH '/tmp/employees.csv'
INTO TABLE employees;
-- Load from HDFS
LOAD DATA INPATH '/data/employees.csv'
INTO TABLE employees;
Copy
CREATE TABLE sales (
order_id INT,
product STRING,
amount DECIMAL(10,2)
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC;
-- Insert data
INSERT INTO TABLE sales PARTITION (year=2024, month=1)
SELECT order_id, product, amount
FROM raw_sales
WHERE year(order_date) = 2024 AND month(order_date) = 1;
-- Query specific partition (faster!)
SELECT SUM(amount)
FROM sales
WHERE year=2024 AND month=1;
Copy
CREATE TABLE users (
user_id INT,
name STRING,
email STRING
)
CLUSTERED BY (user_id) INTO 32 BUCKETS
STORED AS ORC;
Complex Queries
Copy
-- Join example
SELECT u.name, COUNT(o.order_id) as order_count
FROM users u
JOIN orders o ON u.user_id = o.user_id
WHERE o.order_date >= '2024-01-01'
GROUP BY u.name
HAVING COUNT(o.order_id) > 10
ORDER BY order_count DESC
LIMIT 100;
-- Window function
SELECT
name,
department,
salary,
AVG(salary) OVER (PARTITION BY department) as dept_avg,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;
-- Common Table Expression (CTE)
WITH high_earners AS (
SELECT * FROM employees WHERE salary > 100000
),
dept_summary AS (
SELECT department, COUNT(*) as count
FROM high_earners
GROUP BY department
)
SELECT * FROM dept_summary WHERE count > 5;
Performance Optimization
Use ORC or Parquet Format:Copy
CREATE TABLE sales_optimized (
order_id INT,
product STRING,
amount DECIMAL(10,2)
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
-- Convert existing table
INSERT OVERWRITE TABLE sales_optimized
SELECT * FROM sales_text;
Copy
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;
Copy
SET hive.cbo.enable = true;
SET hive.compute.query.using.stats = true;
SET hive.stats.fetch.column.stats = true;
-- Gather statistics
ANALYZE TABLE employees COMPUTE STATISTICS;
ANALYZE TABLE employees COMPUTE STATISTICS FOR COLUMNS;
Apache Pig: Data Flow Scripting
What is Pig?
High-level platform for creating data processing programs:- Pig Latin: Procedural data flow language
- ETL focus: Extract, Transform, Load pipelines
- Compiles to MapReduce: Like Hive, but procedural
Pig Latin Basics
Copy
-- Load data
employees = LOAD '/data/employees.csv'
USING PigStorage(',')
AS (id:int, name:chararray, dept:chararray,
salary:double, hire_date:chararray);
-- Filter
high_earners = FILTER employees BY salary > 100000;
-- Transform
employees_with_bonus = FOREACH high_earners GENERATE
id,
name,
dept,
salary,
salary * 0.10 AS bonus;
-- Group
by_dept = GROUP employees_with_bonus BY dept;
-- Aggregate
dept_summary = FOREACH by_dept GENERATE
group AS department,
COUNT(employees_with_bonus) AS emp_count,
AVG(employees_with_bonus.salary) AS avg_salary,
SUM(employees_with_bonus.bonus) AS total_bonus;
-- Sort
sorted = ORDER dept_summary BY avg_salary DESC;
-- Store result
STORE sorted INTO '/output/dept_summary'
USING PigStorage(',');
Join in Pig
Copy
-- Load datasets
users = LOAD '/data/users.csv'
USING PigStorage(',')
AS (user_id:int, name:chararray, city:chararray);
orders = LOAD '/data/orders.csv'
USING PigStorage(',')
AS (order_id:int, user_id:int, amount:double);
-- Inner join
joined = JOIN users BY user_id, orders BY user_id;
-- Left outer join
left_join = JOIN users BY user_id LEFT OUTER, orders BY user_id;
-- Result
result = FOREACH joined GENERATE
users::name,
users::city,
orders::amount;
STORE result INTO '/output/user_orders';
User-Defined Functions (UDF)
Copy
// Custom Java UDF
package com.example;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class ToUpperCase extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) return null;
String str = (String) input.get(0);
return str == null ? null : str.toUpperCase();
}
}
Copy
-- Register JAR
REGISTER myudfs.jar;
-- Use UDF
data = LOAD '/data/input.txt' AS (text:chararray);
upper = FOREACH data GENERATE com.example.ToUpperCase(text);
Apache HBase: NoSQL Database on HDFS
What is HBase?
Distributed, column-oriented NoSQL database:- Real-time read/write: Unlike HDFS (batch only)
- Billions of rows: Horizontal scalability
- Sparse data: Efficient storage of varying columns
- Based on Google Bigtable: Similar design
Architecture
Copy
┌──────────────────────────────────────────┐
│ HBase Client │
└───────────────┬──────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ HMaster (Master) │
│ • Schema management │
│ • Region assignment │
│ • Load balancing │
└───────────────┬──────────────────────────┘
│
┌───────────┼────────────┬────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐┌─────────┐┌─────────┐┌─────────┐
│RegionServer││RegionServer││RegionServer││RegionServer│
│ ││ ││ ││ │
│┌───────┐││┌───────┐││┌───────┐││┌───────┐│
││Region ││││Region ││││Region ││││Region ││
│└───────┘││└───────┘││└───────┘││└───────┘│
└─────────┘└─────────┘└─────────┘└─────────┘
│ │ │ │
└──────────┴──────────┴──────────┘
│
▼
┌─────────┐
│ HDFS │
└─────────┘
Data Model
Copy
Table: users
┌──────────────────────────────────────────────────────────┐
│ Row Key │ Column Family: info │ CF: activity │
│ │ name │ email │ last_login │ count │
├─────────┼─────────┼──────────────┼────────────┼────────┤
│ user001 │ Alice │ [email protected] │ 2024-01-15 │ 42 │
│ user002 │ Bob │ [email protected] │ 2024-01-14 │ 15 │
│ user003 │ Charlie │ [email protected] │ 2024-01-16 │ 7 │
└─────────┴─────────┴──────────────┴────────────┴────────┘
- Row Key: Unique identifier, sorted order
- Column Family: Group of columns, defined at table creation
- Column: Qualified by family:qualifier (e.g.,
info:name) - Timestamp: Multiple versions of same cell
HBase Shell
Copy
# Create table
create 'users', 'info', 'activity'
# Put data
put 'users', 'user001', 'info:name', 'Alice'
put 'users', 'user001', 'info:email', '[email protected]'
put 'users', 'user001', 'activity:last_login', '2024-01-15'
# Get data
get 'users', 'user001'
# Scan table
scan 'users'
# Scan with filter
scan 'users', {STARTROW => 'user001', ENDROW => 'user100'}
# Delete
delete 'users', 'user001', 'info:email'
# Drop table
disable 'users'
drop 'users'
Java API
Copy
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseExample {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(config);
// Create table
Admin admin = connection.getAdmin();
TableName tableName = TableName.valueOf("users");
if (!admin.tableExists(tableName)) {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("info"));
desc.addFamily(new HColumnDescriptor("activity"));
admin.createTable(desc);
}
// Get table
Table table = connection.getTable(tableName);
// Put data
Put put = new Put(Bytes.toBytes("user001"));
put.addColumn(Bytes.toBytes("info"),
Bytes.toBytes("name"),
Bytes.toBytes("Alice"));
put.addColumn(Bytes.toBytes("info"),
Bytes.toBytes("email"),
Bytes.toBytes("[email protected]"));
table.put(put);
// Get data
Get get = new Get(Bytes.toBytes("user001"));
Result result = table.get(get);
byte[] name = result.getValue(Bytes.toBytes("info"),
Bytes.toBytes("name"));
System.out.println("Name: " + Bytes.toString(name));
// Scan with filter
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner) {
byte[] value = r.getValue(Bytes.toBytes("info"),
Bytes.toBytes("name"));
System.out.println(Bytes.toString(value));
}
scanner.close();
table.close();
connection.close();
}
}
Apache Sqoop: RDBMS ↔ Hadoop
Import from RDBMS to HDFS
Copy
# Basic import
sqoop import \
--connect jdbc:mysql://dbserver/database \
--username user \
--password pass \
--table employees \
--target-dir /data/employees
# Import with query
sqoop import \
--connect jdbc:mysql://dbserver/database \
--username user \
--password-file hdfs://path/to/password \
--query 'SELECT * FROM employees WHERE $CONDITIONS AND dept="Engineering"' \
--target-dir /data/eng_employees \
--split-by employee_id \
--num-mappers 4
# Import to Hive
sqoop import \
--connect jdbc:mysql://dbserver/database \
--table employees \
--hive-import \
--hive-table employees \
--create-hive-table
# Incremental import
sqoop import \
--connect jdbc:mysql://dbserver/database \
--table orders \
--incremental append \
--check-column order_id \
--last-value 100000 \
--target-dir /data/orders
Export from Hadoop to RDBMS
Copy
sqoop export \
--connect jdbc:mysql://dbserver/database \
--username user \
--password pass \
--table employee_summary \
--export-dir /output/summary \
--input-fields-terminated-by ',' \
--num-mappers 2
Apache Flume: Log Collection
Architecture
Copy
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Source │────▶│ Channel │────▶│ Sink │
│ (Input) │ │ (Buffer) │ │ (Output) │
└─────────────┘ └──────────────┘ └─────────────┘
Examples:
Source: Tail log files, HTTP, Syslog, Kafka
Channel: Memory, File (durable)
Sink: HDFS, HBase, Kafka, Elasticsearch
Configuration Example
flume.conf:Copy
# Agent name
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# Source: Tail log file
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/app.log
agent1.sources.source1.channels = channel1
# Channel: Memory buffer
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
# Sink: HDFS
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hdfs.path = /logs/%Y/%m/%d/%H
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 134217728
agent1.sinks.sink1.hdfs.rollCount = 0
Copy
flume-ng agent \
--name agent1 \
--conf-file flume.conf \
--conf /etc/flume/conf \
-Dflume.root.logger=INFO,console
Apache Oozie: Workflow Scheduler
Workflow Example
workflow.xml:Copy
<workflow-app name="data-pipeline" xmlns="uri:oozie:workflow:0.5">
<start to="ingest"/>
<!-- Sqoop import -->
<action name="ingest">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>
import --connect ${dbConnect}
--table employees
--target-dir /data/raw/employees
</command>
</sqoop>
<ok to="transform"/>
<error to="fail"/>
</action>
<!-- Hive transformation -->
<action name="transform">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>transform.hql</script>
</hive>
<ok to="export"/>
<error to="fail"/>
</action>
<!-- Sqoop export -->
<action name="export">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>
export --connect ${dbConnect}
--table employee_summary
--export-dir /data/processed/summary
</command>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Workflow failed, error: ${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
Tool Selection Guide
| Use Case | Best Tool |
|---|---|
| Ad-hoc SQL queries on HDFS | Hive |
| Complex ETL pipelines | Pig or Spark |
| Real-time read/write | HBase |
| Import from RDBMS | Sqoop |
| Log aggregation | Flume or Kafka |
| Workflow orchestration | Oozie or Airflow |
| Fast iterative processing | Spark (not covered here) |
What’s Next?
Module 6: Data Processing Patterns & Best Practices
Learn proven patterns and anti-patterns for efficient data processing