Documentation Index Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
Use this file to discover all available pages before exploring further.
Hadoop Ecosystem & Integration
Module Duration : 4-5 hours
Tools Covered : Hive, Pig, HBase, Sqoop, Flume, Oozie
Focus : When to use each tool and integration patterns
Ecosystem Overview
The Hadoop ecosystem provides specialized tools for different data processing needs:
┌─────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ RDBMS │ Logs │ Streams │ Files │ APIs │
└────┬──────┴────┬───┴─────┬─────┴────┬────┴────┬─────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────┐ ┌────────┐ ┌────────┐ ┌────────┐ Custom
│ Sqoop │ │ Flume │ │ Kafka │ │ Direct │ Ingest
│(RDBMS→ │ │ (Log │ │(Stream)│ │ Copy │
│ HDFS) │ │Collect)│ │ │ │ │
└────┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
└──────────┴──────────┴──────────┘
│
▼
┌──────────────────┐
│ HDFS │
│ (Storage Layer) │
└────────┬─────────┘
│
┌─────────────┼─────────────┬─────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Hive │ │ Pig │ │MapReduce│ │ Spark │
│ (SQL) │ │(Scripts)│ │ (Java) │ │ (Fast) │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
└────────────┴────────────┴────────────┘
│
▼
┌──────────────────┐
│ Processed Data │
└────────┬─────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ HBase │ │ HDFS │ │ BI │
│(NoSQL) │ │ (Files) │ │ Tools │
└─────────┘ └─────────┘ └─────────┘
Orchestration: Oozie / Airflow
Apache Hive: SQL on Hadoop
What is Hive?
Data warehouse system that provides SQL interface to Hadoop:
HiveQL : SQL-like query language
Schema on read : Define structure at query time
Translates to MapReduce/Tez/Spark : SQL → execution engine
Architecture
┌──────────────────────────────────────────┐
│ Hive Client (CLI/JDBC) │
└───────────────┬──────────────────────────┘
│ HiveQL Query
▼
┌──────────────────────────────────────────┐
│ Hive Server (HS2) │
│ • Parser │
│ • Compiler │
│ • Optimizer │
│ • Execution Engine │
└──────┬────────────────────┬──────────────┘
│ │
│ Metadata │ Job
▼ ▼
┌──────────────┐ ┌─────────────────┐
│ Metastore │ │ MapReduce/Tez │
│ (Derby/ │ │ /Spark │
│ MySQL) │ └────────┬────────┘
└──────────────┘ │
▼
┌──────────┐
│ HDFS │
└──────────┘
Basic HiveQL Examples
Create Table :
CREATE TABLE employees (
id INT ,
name STRING,
department STRING,
salary DECIMAL ( 10 , 2 ),
hire_date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
Load Data :
-- Load from local file
LOAD DATA LOCAL INPATH '/tmp/employees.csv'
INTO TABLE employees;
-- Load from HDFS
LOAD DATA INPATH '/data/employees.csv'
INTO TABLE employees;
Partitioned Table (for performance):
CREATE TABLE sales (
order_id INT ,
product STRING,
amount DECIMAL ( 10 , 2 )
)
PARTITIONED BY ( year INT , month INT )
STORED AS ORC ;
-- Insert data
INSERT INTO TABLE sales PARTITION ( year = 2024 , month = 1 )
SELECT order_id, product, amount
FROM raw_sales
WHERE year (order_date) = 2024 AND month (order_date) = 1 ;
-- Query specific partition (faster!)
SELECT SUM (amount)
FROM sales
WHERE year = 2024 AND month = 1 ;
Bucketing (for joins):
CREATE TABLE users (
user_id INT ,
name STRING,
email STRING
)
CLUSTERED BY (user_id) INTO 32 BUCKETS
STORED AS ORC ;
Complex Queries
-- Join example
SELECT u . name , COUNT ( o . order_id ) as order_count
FROM users u
JOIN orders o ON u . user_id = o . user_id
WHERE o . order_date >= '2024-01-01'
GROUP BY u . name
HAVING COUNT ( o . order_id ) > 10
ORDER BY order_count DESC
LIMIT 100 ;
-- Window function
SELECT
name ,
department,
salary,
AVG (salary) OVER ( PARTITION BY department) as dept_avg,
RANK () OVER ( PARTITION BY department ORDER BY salary DESC ) as rank
FROM employees;
-- Common Table Expression (CTE)
WITH high_earners AS (
SELECT * FROM employees WHERE salary > 100000
),
dept_summary AS (
SELECT department, COUNT ( * ) as count
FROM high_earners
GROUP BY department
)
SELECT * FROM dept_summary WHERE count > 5 ;
Use ORC or Parquet Format :
CREATE TABLE sales_optimized (
order_id INT ,
product STRING,
amount DECIMAL ( 10 , 2 )
)
STORED AS ORC
TBLPROPERTIES ( "orc.compress" = "SNAPPY" );
-- Convert existing table
INSERT OVERWRITE TABLE sales_optimized
SELECT * FROM sales_text;
Enable Vectorization :
SET hive . vectorized . execution . enabled = true;
SET hive . vectorized . execution . reduce . enabled = true;
Cost-Based Optimizer :
SET hive . cbo . enable = true;
SET hive . compute . query . using . stats = true;
SET hive . stats . fetch . column . stats = true;
-- Gather statistics
ANALYZE TABLE employees COMPUTE STATISTICS ;
ANALYZE TABLE employees COMPUTE STATISTICS FOR COLUMNS;
Apache Pig: Data Flow Scripting
What is Pig?
High-level platform for creating data processing programs:
Pig Latin : Procedural data flow language
ETL focus : Extract, Transform, Load pipelines
Compiles to MapReduce : Like Hive, but procedural
Pig Latin Basics
-- Load data
employees = LOAD '/data/employees.csv'
USING PigStorage(',')
AS (id:int, name:chararray, dept:chararray,
salary:double, hire_date:chararray);
-- Filter
high_earners = FILTER employees BY salary > 100000;
-- Transform
employees_with_bonus = FOREACH high_earners GENERATE
id,
name,
dept,
salary,
salary * 0.10 AS bonus;
-- Group
by_dept = GROUP employees_with_bonus BY dept;
-- Aggregate
dept_summary = FOREACH by_dept GENERATE
group AS department,
COUNT(employees_with_bonus) AS emp_count,
AVG(employees_with_bonus.salary) AS avg_salary,
SUM(employees_with_bonus.bonus) AS total_bonus;
-- Sort
sorted = ORDER dept_summary BY avg_salary DESC;
-- Store result
STORE sorted INTO '/output/dept_summary'
USING PigStorage(',');
Join in Pig
-- Load datasets
users = LOAD '/data/users.csv'
USING PigStorage(',')
AS (user_id:int, name:chararray, city:chararray);
orders = LOAD '/data/orders.csv'
USING PigStorage(',')
AS (order_id:int, user_id:int, amount:double);
-- Inner join
joined = JOIN users BY user_id, orders BY user_id;
-- Left outer join
left_join = JOIN users BY user_id LEFT OUTER, orders BY user_id;
-- Result
result = FOREACH joined GENERATE
users::name,
users::city,
orders::amount;
STORE result INTO '/output/user_orders';
User-Defined Functions (UDF)
// Custom Java UDF
package com.example;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class ToUpperCase extends EvalFunc < String > {
@ Override
public String exec ( Tuple input ) throws IOException {
if (input == null || input . size () == 0 ) return null ;
String str = (String) input . get ( 0 );
return str == null ? null : str . toUpperCase ();
}
}
-- Register JAR
REGISTER myudfs.jar;
-- Use UDF
data = LOAD '/data/input.txt' AS (text:chararray);
upper = FOREACH data GENERATE com.example.ToUpperCase(text);
Apache HBase: NoSQL Database on HDFS
What is HBase?
Distributed, column-oriented NoSQL database:
Real-time read/write : Unlike HDFS (batch only)
Billions of rows : Horizontal scalability
Sparse data : Efficient storage of varying columns
Based on Google Bigtable : Similar design
Architecture
┌──────────────────────────────────────────┐
│ HBase Client │
└───────────────┬──────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ HMaster (Master) │
│ • Schema management │
│ • Region assignment │
│ • Load balancing │
└───────────────┬──────────────────────────┘
│
┌───────────┼────────────┬────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐┌─────────┐┌─────────┐┌─────────┐
│RegionServer││RegionServer││RegionServer││RegionServer│
│ ││ ││ ││ │
│┌───────┐││┌───────┐││┌───────┐││┌───────┐│
││Region ││││Region ││││Region ││││Region ││
│└───────┘││└───────┘││└───────┘││└───────┘│
└─────────┘└─────────┘└─────────┘└─────────┘
│ │ │ │
└──────────┴──────────┴──────────┘
│
▼
┌─────────┐
│ HDFS │
└─────────┘
Data Model
Table: users
┌──────────────────────────────────────────────────────────┐
│ Row Key │ Column Family: info │ CF: activity │
│ │ name │ email │ last_login │ count │
├─────────┼─────────┼──────────────┼────────────┼────────┤
│ user001 │ Alice │ a@ex.com │ 2024-01-15 │ 42 │
│ user002 │ Bob │ b@ex.com │ 2024-01-14 │ 15 │
│ user003 │ Charlie │ c@ex.com │ 2024-01-16 │ 7 │
└─────────┴─────────┴──────────────┴────────────┴────────┘
Key Concepts :
Row Key : Unique identifier, sorted order
Column Family : Group of columns, defined at table creation
Column : Qualified by family:qualifier (e.g., info:name)
Timestamp : Multiple versions of same cell
HBase Shell
# Create table
create 'users', 'info', 'activity'
# Put data
put 'users', 'user001', 'info:name', 'Alice'
put 'users', 'user001', 'info:email', 'alice@example.com'
put 'users', 'user001', 'activity:last_login', '2024-01-15'
# Get data
get 'users', 'user001'
# Scan table
scan 'users'
# Scan with filter
scan 'users', {STARTROW = > 'user001', ENDROW = > 'user100'}
# Delete
delete 'users', 'user001', 'info:email'
# Drop table
disable 'users'
drop 'users'
Java API
import org.apache.hadoop.hbase. * ;
import org.apache.hadoop.hbase.client. * ;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseExample {
public static void main ( String [] args ) throws Exception {
Configuration config = HBaseConfiguration . create ();
Connection connection = ConnectionFactory . createConnection (config);
// Create table
Admin admin = connection . getAdmin ();
TableName tableName = TableName . valueOf ( "users" );
if ( ! admin . tableExists (tableName)) {
HTableDescriptor desc = new HTableDescriptor (tableName);
desc . addFamily ( new HColumnDescriptor ( "info" ));
desc . addFamily ( new HColumnDescriptor ( "activity" ));
admin . createTable (desc);
}
// Get table
Table table = connection . getTable (tableName);
// Put data
Put put = new Put ( Bytes . toBytes ( "user001" ));
put . addColumn ( Bytes . toBytes ( "info" ),
Bytes . toBytes ( "name" ),
Bytes . toBytes ( "Alice" ));
put . addColumn ( Bytes . toBytes ( "info" ),
Bytes . toBytes ( "email" ),
Bytes . toBytes ( "alice@example.com" ));
table . put (put);
// Get data
Get get = new Get ( Bytes . toBytes ( "user001" ));
Result result = table . get (get);
byte [] name = result . getValue ( Bytes . toBytes ( "info" ),
Bytes . toBytes ( "name" ));
System . out . println ( "Name: " + Bytes . toString (name));
// Scan with filter
Scan scan = new Scan ();
scan . addColumn ( Bytes . toBytes ( "info" ), Bytes . toBytes ( "name" ));
ResultScanner scanner = table . getScanner (scan);
for ( Result r : scanner) {
byte [] value = r . getValue ( Bytes . toBytes ( "info" ),
Bytes . toBytes ( "name" ));
System . out . println ( Bytes . toString (value));
}
scanner . close ();
table . close ();
connection . close ();
}
}
Apache Sqoop: RDBMS ↔ Hadoop
Import from RDBMS to HDFS
# Basic import
sqoop import \
--connect jdbc:mysql://dbserver/database \
--username user \
--password pass \
--table employees \
--target-dir /data/employees
# Import with query
sqoop import \
--connect jdbc:mysql://dbserver/database \
--username user \
--password-file hdfs://path/to/password \
--query 'SELECT * FROM employees WHERE $CONDITIONS AND dept="Engineering"' \
--target-dir /data/eng_employees \
--split-by employee_id \
--num-mappers 4
# Import to Hive
sqoop import \
--connect jdbc:mysql://dbserver/database \
--table employees \
--hive-import \
--hive-table employees \
--create-hive-table
# Incremental import
sqoop import \
--connect jdbc:mysql://dbserver/database \
--table orders \
--incremental append \
--check-column order_id \
--last-value 100000 \
--target-dir /data/orders
Export from Hadoop to RDBMS
sqoop export \
--connect jdbc:mysql://dbserver/database \
--username user \
--password pass \
--table employee_summary \
--export-dir /output/summary \
--input-fields-terminated-by ',' \
--num-mappers 2
Apache Flume: Log Collection
Architecture
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Source │────▶│ Channel │────▶│ Sink │
│ (Input) │ │ (Buffer) │ │ (Output) │
└─────────────┘ └──────────────┘ └─────────────┘
Examples:
Source: Tail log files, HTTP, Syslog, Kafka
Channel: Memory, File (durable)
Sink: HDFS, HBase, Kafka, Elasticsearch
Configuration Example
flume.conf :
# Agent name
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# Source: Tail log file
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/app.log
agent1.sources.source1.channels = channel1
# Channel: Memory buffer
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
# Sink: HDFS
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hdfs.path = /logs/%Y/%m/%d/%H
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 134217728
agent1.sinks.sink1.hdfs.rollCount = 0
Run Flume :
flume-ng agent \
--name agent1 \
--conf-file flume.conf \
--conf /etc/flume/conf \
-Dflume.root.logger=INFO,console
Apache Oozie: Workflow Scheduler
Workflow Example
workflow.xml :
< workflow-app name = "data-pipeline" xmlns = "uri:oozie:workflow:0.5" >
< start to = "ingest" />
<!-- Sqoop import -->
< action name = "ingest" >
< sqoop xmlns = "uri:oozie:sqoop-action:0.2" >
< job-tracker > ${jobTracker} </ job-tracker >
< name-node > ${nameNode} </ name-node >
< command >
import --connect ${dbConnect}
--table employees
--target-dir /data/raw/employees
</ command >
</ sqoop >
< ok to = "transform" />
< error to = "fail" />
</ action >
<!-- Hive transformation -->
< action name = "transform" >
< hive xmlns = "uri:oozie:hive-action:0.2" >
< job-tracker > ${jobTracker} </ job-tracker >
< name-node > ${nameNode} </ name-node >
< script > transform.hql </ script >
</ hive >
< ok to = "export" />
< error to = "fail" />
</ action >
<!-- Sqoop export -->
< action name = "export" >
< sqoop xmlns = "uri:oozie:sqoop-action:0.2" >
< job-tracker > ${jobTracker} </ job-tracker >
< name-node > ${nameNode} </ name-node >
< command >
export --connect ${dbConnect}
--table employee_summary
--export-dir /data/processed/summary
</ command >
</ sqoop >
< ok to = "end" />
< error to = "fail" />
</ action >
< kill name = "fail" >
< message > Workflow failed, error: ${wf:errorMessage(wf:lastErrorNode())} </ message >
</ kill >
< end name = "end" />
</ workflow-app >
Use Case Best Tool Ad-hoc SQL queries on HDFS Hive Complex ETL pipelines Pig or Spark Real-time read/write HBase Import from RDBMS Sqoop Log aggregation Flume or Kafka Workflow orchestration Oozie or Airflow Fast iterative processing Spark (not covered here)
What’s Next?
Module 6: Data Processing Patterns & Best Practices Learn proven patterns and anti-patterns for efficient data processing