The true power of Hadoop lies not just in HDFS and MapReduce, but in the rich ecosystem of tools built on top of it. This chapter explores the key components that transform Hadoop from a low-level distributed file system and processing framework into a complete big data platform.
-- CREATE TABLECREATE TABLE employees ( id INT, name STRING, salary DECIMAL(10,2), dept STRING, hire_date DATE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','STORED AS TEXTFILE;-- LOAD DATALOAD DATA INPATH '/user/data/employees.csv'INTO TABLE employees;-- SELECT QUERYSELECT dept, AVG(salary) as avg_salaryFROM employeesWHERE hire_date >= '2020-01-01'GROUP BY deptHAVING AVG(salary) > 50000ORDER BY avg_salary DESC;-- JOINSELECT e.name, e.salary, d.dept_nameFROM employees eJOIN departments dON e.dept = d.dept_id;-- SUBQUERYSELECT name, salaryFROM employeesWHERE salary > ( SELECT AVG(salary) FROM employees);
Behind the Scenes:Each query translates to MapReduce/Tez job:
SELECT dept, COUNT(*) FROM employees GROUP BY dept;↓ Translates to:Map Phase:• Read employees table from HDFS• For each row, emit (dept, 1)Shuffle:• Group by deptReduce Phase:• For each dept, sum counts• Write to HDFSResult written to temp HDFS location.
Organizing Data for Performance
-- CREATE PARTITIONED TABLECREATE TABLE sales ( transaction_id STRING, amount DECIMAL(10,2), customer_id STRING)PARTITIONED BY (year INT, month INT)STORED AS PARQUET;-- LOAD DATA INTO PARTITIONINSERT INTO TABLE salesPARTITION (year=2023, month=1)SELECT transaction_id, amount, customer_idFROM raw_salesWHERE year(transaction_date) = 2023 AND month(transaction_date) = 1;
-- TEXT (default, human-readable)CREATE TABLE users_text ( id INT, name STRING)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','STORED AS TEXTFILE;-- PARQUET (columnar, compressed)CREATE TABLE users_parquet ( id INT, name STRING, age INT, email STRING)STORED AS PARQUET;-- ORC (Optimized Row Columnar)CREATE TABLE users_orc ( id INT, name STRING, age INT, email STRING)STORED AS ORC;-- AVRO (schema evolution)CREATE TABLE users_avroSTORED AS AVRO;
Format Comparison:
┌─────────┬──────────┬──────────┬─────────────┬────────┐│ Format │ Type │ Compress │ Query Speed │ Write │├─────────┼──────────┼──────────┼─────────────┼────────┤│ Text │ Row │ Poor │ Slow │ Fast ││ Sequence│ Row │ Good │ Medium │ Fast ││ Avro │ Row │ Good │ Medium │ Medium ││ Parquet │ Columnar │ Excellent│ Fast │ Medium ││ ORC │ Columnar │ Excellent│ Fast │ Medium │└─────────┴──────────┴──────────┴─────────────┴────────┘EXAMPLE: 1 Billion rows, 100 columns─────────────────────────────────────Text (CSV):• Size: 1 TB• Compression: ~700 GB (gzip)• Query (SELECT col1, col2): Read 700 GBParquet:• Size: 200 GB (built-in compression)• Query (SELECT col1, col2): Read 4 GB (only 2 columns!)• 175x faster query!WHEN TO USE:───────────Text/CSV:• Human-readable debugging• One-time ingestion• Small datasetsParquet:• Analytical queries (SELECT few columns)• Long-term storage• Production data lakesORC:• Hive-specific optimizations• ACID transactions (Hive 3+)• Slightly better than Parquet in HiveAvro:• Schema evolution (add/remove columns)• Streaming data (Kafka)• Row-based processing
Making Queries Faster
-- 1. BUCKETING (hash partitioning)CREATE TABLE users_bucketed ( id INT, name STRING, email STRING)CLUSTERED BY (id) INTO 32 BUCKETSSTORED AS PARQUET;-- Benefit: Efficient joins, sampling-- 2. MAP-SIDE JOIN (for small tables)SET hive.auto.convert.join=true;SET hive.mapjoin.smalltable.filesize=25000000; -- 25MBSELECT /*+ MAPJOIN(small_table) */ big_table.*, small_table.nameFROM big_tableJOIN small_tableON big_table.id = small_table.id;-- Small table loaded into memory on each mapper-- No shuffle needed!-- 3. VECTORIZATION (process 1024 rows at once)SET hive.vectorized.execution.enabled=true;-- Benefit: 3-5x speedup on analytical queries-- 4. COST-BASED OPTIMIZATION (statistics)ANALYZE TABLE employees COMPUTE STATISTICS;ANALYZE TABLE employees COMPUTE STATISTICS FOR COLUMNS;-- Hive uses stats to:-- • Choose join order-- • Decide map-side vs reduce-side join-- • Optimize query plan-- 5. TEZ EXECUTION ENGINESET hive.execution.engine=tez; -- Default: mr-- Tez benefits:-- • DAG-based (vs multiple MR stages)-- • Container reuse-- • 2-10x faster than MapReduce-- 6. PARTITION PRUNINGSELECT * FROM salesWHERE year = 2023 AND month = 1;-- Only reads year=2023/month=1/ partition-- 7. PREDICATE PUSHDOWNSELECT id, name FROM usersWHERE age > 30;-- For Parquet/ORC:-- • Filter applied during file read-- • Skip entire row groups that don't match-- • Don't even read non-matching data
Query Performance Tips:
SLOW QUERY:──────────SELECT u.name, o.amountFROM big_orders oJOIN small_users uON o.user_id = u.idWHERE u.region = 'US';Issues:• Big table on left (not optimal)• No hint for map-side join• Filter after join (wasteful)OPTIMIZED:─────────SELECT /*+ MAPJOIN(u) */ u.name, o.amountFROM small_users uJOIN big_orders oON u.id = o.user_idWHERE u.region = 'US';Improvements:• Map-side join (small table in memory)• Filter on small table first• Small table on left side• Result: 10-100x faster
-- MANAGED TABLE (Hive owns data)CREATE TABLE managed_sales ( id INT, amount DECIMAL);-- Data stored in Hive warehouse:-- /user/hive/warehouse/managed_sales/DROP TABLE managed_sales;-- Data DELETED from HDFS!-- EXTERNAL TABLE (Hive doesn't own data)CREATE EXTERNAL TABLE external_sales ( id INT, amount DECIMAL)LOCATION '/data/sales';-- Data at /data/sales/ (outside Hive warehouse)DROP TABLE external_sales;-- Only metadata deleted, data remains!
When to Use Each:
MANAGED TABLES:──────────────Use when:• Hive is primary consumer• Want Hive to manage lifecycle• Data is temporary/intermediateBenefit:• DROP TABLE cleans everything• Simpler managementEXTERNAL TABLES:───────────────Use when:• Multiple tools access data (Hive, Spark, Impala)• Data produced outside Hive (Kafka, Flume)• Production data (don't want accidental deletion)Benefit:• Data survives table drop• Flexibility in storage locationBEST PRACTICE:─────────────Production: Use EXTERNAL tablesReason: Prevents accidental data loss
SerDe (Serializer/Deserializer)
Reading Custom Formats
-- Default: LazySimpleSerDe (CSV-like)CREATE TABLE default_format ( id INT, name STRING)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ',';-- JSON SerDeCREATE TABLE json_data ( id INT, name STRING, attributes MAP<STRING, STRING>)ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';-- Can read JSON files directly!-- RegEx SerDe (parse logs)CREATE TABLE apache_logs ( ip STRING, timestamp STRING, request STRING, status INT, bytes BIGINT)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'WITH SERDEPROPERTIES ( "input.regex" = "([^ ]*) [^ ]* [^ ]* \\[([^\\]]*)\\] \"([^\"]*)\" ([0-9]*) ([0-9]*)");-- Parquet SerDe (built-in)CREATE TABLE parquet_data ( id INT, name STRING)STORED AS PARQUET;-- Uses org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe-- Custom SerDe (write your own!)CREATE TABLE custom_format ( ...)ROW FORMAT SERDE 'com.company.CustomSerDe'WITH SERDEPROPERTIES ( "field.delim" = "|", "escape.delim" = "\\");
In a standard RDBMS-backed Metastore (MySQL/PostgreSQL), a query like SELECT * FROM sales WHERE year=2023 requires the Metastore to:
Lookup the table sales.
Scan the PARTITIONS table for all entries matching the filter.
Fetch the SDS (Storage Descriptor) for each partition to find the HDFS paths.
The Math of Failure:
If a table has 1,000,000 partitions (common in high-cardinality data), a single ad-hoc query might force the Metastore to load 1GB of metadata into memory just to plan the scan.
This leads to “Metastore OOM” and serialized query planning that can take minutes before a single mapper even starts.
To understand Hive’s performance, one must look past the SQL interface into the transformation pipeline that converts a declarative query into a distributed DAG.
While Hive 1.x relied on MapReduce, modern Hive (2.x/3.x) uses Apache Tez to eliminate the “HDFS barrier” between jobs.
MapReduce Barrier: Every stage in a complex query (e.g., multiple joins) must write intermediate data to HDFS, causing massive I/O overhead.
Tez DAG: Tez allows data to flow directly from one task to the next (e.g., Map -> Reduce -> Reduce) without intermediate HDFS writes. It uses a Directed Acyclic Graph of tasks.
-- LOAD data from HDFSusers = LOAD '/data/users.csv' USING PigStorage(',') AS (id:int, name:chararray, age:int, city:chararray);-- FILTER rowsadults = FILTER users BY age >= 18;-- FOREACH (transform each row)names_ages = FOREACH adults GENERATE name, age;-- GROUP BYby_city = GROUP users BY city;-- Result: {group: "NYC", users: {(1, "Alice", 30, "NYC"), (2, "Bob", 25, "NYC")}}-- JOINorders = LOAD '/data/orders.csv' AS (order_id:int, user_id:int, amount:double);joined = JOIN users BY id, orders BY user_id;-- ORDER BYsorted = ORDER users BY age DESC;-- DISTINCTunique_cities = DISTINCT (FOREACH users GENERATE city);-- LIMITtop_10 = LIMIT sorted 10;-- STORE resultsSTORE top_10 INTO '/output' USING PigStorage('\t');
Example: Word Count in Pig
-- Load inputlines = LOAD '/input/books.txt' AS (line:chararray);-- Split into wordswords = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) AS word;-- Group by wordgrouped = GROUP words BY word;-- Countcounts = FOREACH grouped GENERATE group AS word, COUNT(words) AS count;-- Sort by count descendingsorted = ORDER counts BY count DESC;-- Store resultSTORE sorted INTO '/output/wordcount';
Equivalent in MapReduce: ~200 lines of Java!
Power User Techniques
-- COGROUP (group multiple datasets)users = LOAD '/users' AS (id, name);orders = LOAD '/orders' AS (order_id, user_id, amount);cogrouped = COGROUP users BY id, orders BY user_id;-- Result: {group: 1, users: {(1,"Alice")}, orders: {(101,1,50), (102,1,75)}}-- FLATTEN (unnest)flattened = FOREACH cogrouped GENERATE group, FLATTEN(users), FLATTEN(orders);-- UDF (User Defined Function)REGISTER 'my_udfs.jar';processed = FOREACH users GENERATE id, com.company.MyUDF(name) AS processed_name;-- NESTED FOREACHby_city = GROUP users BY city;result = FOREACH by_city { sorted = ORDER users BY age DESC; top_3 = LIMIT sorted 3; GENERATE group AS city, top_3;}-- CROSS (Cartesian product)colors = LOAD '/colors' AS (color:chararray);sizes = LOAD '/sizes' AS (size:chararray);combinations = CROSS colors, sizes;-- UNIONdataset1 = LOAD '/data1' AS (id, value);dataset2 = LOAD '/data2' AS (id, value);combined = UNION dataset1, dataset2;-- SAMPLE (random sampling)sampled = SAMPLE users 0.1; -- 10% sample-- SPLIT (conditional split)SPLIT users INTO young IF age < 30, old IF age >= 30;
When to Use Each
USE CASE: Calculate average order by city─────────────────────────────────────────HIVE (Declarative SQL):──────────────────────SELECT city, AVG(amount) as avg_orderFROM users uJOIN orders o ON u.id = o.user_idGROUP BY city;PIG (Procedural Dataflow):─────────────────────────users = LOAD '/users' AS (id, name, city);orders = LOAD '/orders' AS (order_id, user_id, amount);joined = JOIN users BY id, orders BY user_id;grouped = GROUP joined BY city;result = FOREACH grouped GENERATE group AS city, AVG(joined.amount) AS avg_order;STORE result INTO '/output';COMPARISON:──────────┌──────────────┬─────────────┬─────────────────┐│ Aspect │ Hive │ Pig │├──────────────┼─────────────┼─────────────────┤│ Language │ SQL-like │ Procedural ││ Users │ Analysts │ Engineers ││ Use Case │ Analytics │ ETL pipelines ││ Learning │ Easy (SQL) │ Moderate ││ Flexibility │ Medium │ High ││ Schema │ Required │ Optional ││ Optimization │ Automatic │ Manual control │└──────────────┴─────────────┴─────────────────┘CHOOSE HIVE WHEN:────────────────• Users know SQL• Ad-hoc analytics• BI tool integration• Formal schemas• Let optimizer decide planCHOOSE PIG WHEN:───────────────• Complex data flows• ETL pipelines• Iterative development• Schema evolution• Need fine-grained control• Custom UDFs commonTREND:─────Both declining in favor of Apache Spark:• Faster (in-memory)• More expressive (DataFrames, SQL)• Unified API (batch + stream)But Hive/Pig still widely used in legacy systems.
// CREATE TABLEHBaseAdmin admin = new HBaseAdmin(conf);HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("users"));tableDesc.addFamily(new HColumnDescriptor("info"));tableDesc.addFamily(new HColumnDescriptor("prefs"));admin.createTable(tableDesc);// PUT (insert or update)HTable table = new HTable(conf, "users");Put put = new Put(Bytes.toBytes("user_001"));put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Alice"));put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("email"), Bytes.toBytes("alice@example.com"));table.put(put);// GET (read single row)Get get = new Get(Bytes.toBytes("user_001"));Result result = table.get(get);byte[] name = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));System.out.println("Name: " + Bytes.toString(name));// SCAN (read multiple rows)Scan scan = new Scan();scan.setStartRow(Bytes.toBytes("user_000"));scan.setStopRow(Bytes.toBytes("user_100"));ResultScanner scanner = table.getScanner(scan);for (Result r : scanner) { // Process each row}scanner.close();// DELETEDelete delete = new Delete(Bytes.toBytes("user_001"));table.delete(delete);
PROBLEM: Sequential Row Keys───────────────────────────Bad Design:Row keys: timestamp_00001, timestamp_00002, ...Issue:• All writes go to SAME region (hotspot!)• Region 1: [timestamp_00001 to timestamp_10000]• Region 2: [timestamp_10001 to timestamp_20000]• New writes always hit Region 2• Other regions idleSOLUTION 1: Salting (hash prefix)─────────────────────────────────Original: timestamp_00001Salted: {hash(timestamp) % 10}_timestamp_00001Result:• Writes distributed across 10 regions• No hotspot!Trade-off:• Range scans more complex (need to query all salts)SOLUTION 2: Reverse Timestamp─────────────────────────────For most recent data access:Row key: {Long.MAX_VALUE - timestamp}_userIdResult:• Recent data has lowest row key• Scan from beginning gets newest firstSOLUTION 3: Composite Keys──────────────────────────userId_timestampBenefits:• All user's data together• Can scan user's history efficiently• Natural distribution (different users)BEST PRACTICES:──────────────1. Avoid sequential IDs Bad: 1, 2, 3, 4, ... Good: uuid(), hash(id), ...2. Consider access patterns • Frequent: Recent user events • Row key: userId_reverseTimestamp3. Pre-split table • Create regions upfront • Distribute load from start4. Keep row keys short • Stored in every cell • Long keys waste spaceEXAMPLE: URL Shortener─────────────────────Bad: shortCode (e.g., "abc123")Why: All reads/writes hit same region initiallyGood: {hash(shortCode) % 100}_shortCodeWhy: 100-way distribution from start
When to Use HBase
HBASE vs HDFS:─────────────┌──────────────────┬───────────┬──────────────┐│ Feature │ HDFS │ HBase │├──────────────────┼───────────┼──────────────┤│ Access Pattern │ Batch │ Random R/W ││ Latency │ Minutes │ Milliseconds ││ Updates │ Append │ Yes ││ Deletes │ No │ Yes ││ Point Lookups │ Slow │ Fast ││ Full Scans │ Fast │ Slow ││ Use Case │ Analytics │ Serving │└──────────────────┴───────────┴──────────────┘HBASE vs RDBMS:──────────────┌──────────────────┬────────────┬──────────────┐│ Feature │ RDBMS │ HBase │├──────────────────┼────────────┼──────────────┤│ Schema │ Strict │ Flexible ││ Transactions │ ACID │ Row-level ││ Joins │ Yes │ No (manual) ││ Secondary Index │ Yes │ Limited ││ Scale │ Vertical │ Horizontal ││ Consistency │ Strong │ Eventual* ││ Max Data │ TBs │ PBs │└──────────────────┴────────────┴──────────────┘* HBase provides strong consistency within single rowUSE HBASE WHEN:──────────────✓ Need random read/write at scale (billions of rows)✓ Sparse data (rows with different columns)✓ Time-series data (append-heavy)✓ Key-value lookups (user profiles, session data)✓ Write-heavy workloads✓ Horizontal scalability requiredDON'T USE HBASE WHEN:────────────────────✗ Need complex joins✗ Need ACID transactions across rows✗ Small dataset (< 1 billion rows)✗ Primarily analytical queries (use Hive)✗ Need secondary indexes (use Cassandra)COMMON USE CASES:────────────────• User profiles (billions of users)• Message/email storage (Facebook Messages)• Time-series data (sensor data, logs)• Recommendation systems (feature vectors)• Content management (versioned documents)• Social graphs (following relationships)
HBase is not a relational database; it is a Log-Structured Merge-Tree (LSM-Tree) based storage system. This architecture is optimized for high-write throughput and sequential disk I/O.
Because HFiles are immutable, a single row might have data scattered across multiple HFiles (e.g., an update in HFile 3 overriding a value in HFile 1).
Read Path: HBase must check the MemStore, then scan multiple HFiles, merging the results to find the latest version of a cell. This is known as Read Amplification.
Bloom Filters: To speed this up, HBase uses Bloom Filters to skip HFiles that definitely do not contain a specific row key.
HBase achieves “Local Reads” by scheduling RegionServers on the same nodes as their HDFS DataNodes. When a MemStore flushes, the first replica is written to the local disk. Over time, as regions move, HBase relies on the HDFS Balancer and major compactions to restore data locality.
A common failure in production HBase clusters is “Region Squatting”—having too many regions per RegionServer, which fragments the available MemStore memory.The Capacity Formula:
The maximum number of regions a RegionServer can safely handle is bounded by the total MemStore heap:MaxRegions=MemStore_Flush_Size×Num_Column_FamiliesRS_Heap×MemStore_FractionExample:
Heap: 32GB
MemStore Fraction: 0.4 (40% of heap reserved for writes)
Flush Size: 128MB
Column Families: 2
Result: 128×232,768×0.4≈51 regions.
Consequence of Over-provisioning:
If you host 500 regions on this server, each region only gets ~2.5MB of MemStore. This causes “Thundering Flushes” where the server spends all its time writing tiny HFiles, leading to massive compaction pressure and I/O saturation.
When a region exceeds the hbase.hregion.max.filesize (e.g., 10GB), it must split. This is a complex distributed transaction coordinated via ZooKeeper:
PRE_SPLIT: RegionServer (RS) creates a split znode in ZooKeeper.
OFFLINE: RS takes the parent region offline, stopping all writes.
DAUGHTER_CREATION: RS creates two “Reference Files” (daughter regions) pointing to the parent’s HFiles. This is a metadata-only operation (very fast).
OPEN_DAUGHTERS: RS opens the two daughter regions and begins serving requests.
POST_SPLIT: RS updates the .META. table and deletes the parent region metadata.
Compaction (Background): Over time, the daughter regions undergo compaction, which physically splits the parent HFiles into new, independent HFiles, eventually deleting the reference files.
Today, the “Hadoop Ecosystem” has evolved. While Hive remains the standard for SQL-on-HDFS, many processing tasks have shifted to Apache Spark due to its in-memory performance. However, the core principles of the Hadoop ecosystem—separation of storage (HDFS), resource management (YARN), and specialized processing engines—continue to define modern big data architecture.