Chunkservers are the workhorses of GFS, storing actual data and serving client requests. This chapter explores how chunks are stored, how data flows through the system, and how GFS ensures data integrity despite commodity hardware failures.
Chapter Goals:
Understand chunk storage implementation
Master the replication pipeline and data flow optimization
Learn detailed read, write, and record append flows
Explore checksumming and data integrity mechanisms
CHECKSUM STRUCTURE─────────────────Each chunk divided into 64KB blocks:┌─────────────────────────────────────────┐│ Chunk (64MB max) │├─────────────────────────────────────────┤│ Block 0 (64KB) → Checksum 0 (4 bytes) ││ Block 1 (64KB) → Checksum 1 (4 bytes) ││ Block 2 (64KB) → Checksum 2 (4 bytes) ││ ... ││ Block 1023 (64KB) → Checksum 1023 │└─────────────────────────────────────────┘Checksum = 32-bit CRC32For full 64MB chunk:• 1024 blocks• 1024 checksums• 4KB checksum metadataStored: In-memory + on diskCHECKSUM COMPUTATION:────────────────────On Write:─────────def write_block(block_index, data): checksum = crc32(data) write_data_to_disk(block_index, data) update_checksum(block_index, checksum) persist_checksum()On Read:────────def read_block(block_index): data = read_data_from_disk(block_index) expected_checksum = get_checksum(block_index) actual_checksum = crc32(data) if actual_checksum != expected_checksum: report_corruption() read_from_different_replica() else: return dataWHY 64KB BLOCKS?───────────────Trade-off analysis:Smaller blocks (e.g., 4KB):✓ Detect corruption precisely✓ Re-read smaller amount✗ More checksums (higher overhead)✗ More CPU for checksum computationLarger blocks (e.g., 1MB):✓ Fewer checksums✓ Less CPU overhead✗ Must re-read larger amount on corruption✗ Less precise error detection64KB: Sweet spot• 0.006% overhead (4KB per 64MB)• Fast CRC32 computation• Reasonable re-read size• Matches common I/O sizes
Detecting Stale Replicas:
VERSION NUMBER SYSTEM────────────────────Each chunk has version numberPurpose: Detect stale replicasLifecycle:──────────1. Chunk Created: version = 12. Master Grants Lease: version++ (now version = 2) All current replicas updated to 23. Replica Was Down: Server was offline during lease grant Its version still = 1 (STALE!)4. Detection: Server comes back online Heartbeat: "I have chunk X, version 1" Master: "Current version is 2, yours is stale" → Mark for garbage collectionEXAMPLE SCENARIO:────────────────Chunk 0x1a2b, initially version 1Replicas: [cs-5, cs-12, cs-23]Time T0: All have version 1────────────────────────────cs-5: version 1 ✓cs-12: version 1 ✓cs-23: version 1 ✓Time T1: cs-23 goes offline───────────────────────────cs-23 crashes, network partition, etc.Time T2: Client writes to chunk───────────────────────────────Master grants lease → version++New version: 2Master updates:cs-5: version 2 ✓cs-12: version 2 ✓cs-23: offline (still has version 1)Time T3: cs-23 comes back online────────────────────────────────cs-23 heartbeat: "I have chunk 0x1a2b, version 1"Master checks:• Current version: 2• cs-23 version: 1• STALE REPLICA!Master → cs-23: "Delete chunk 0x1a2b (stale)"Master schedules re-replication:• Copy from cs-5 or cs-12 (version 2)• To new chunkserver• Maintain 3 replicasPERSISTENCE:───────────Version stored:• In master's operation log (persistent)• In chunkserver's metadata (persistent)• In master's memory (volatile)After master restart:• Loads version from operation log• Chunkservers report their versions• Master validates and cleans up stale replicas
Globally Unique Identifier:
CHUNK HANDLE───────────64-bit globally unique identifierGeneration: Master assigns at chunk creationProperties:──────────• Unique across entire GFS cluster• Never reused• Identifies chunk forever• Used as filename on chunkserversFormat:───────0x1a2b3c4d5e6f7g8h│ ││ └─ Random/sequential component└──── Timestamp component (optional)Mapping:────────Master maintains:File → List of chunk handles/data/file1 → [0x1a2b..., 0x2c3d..., 0x3e4f...]Each chunk handle → Metadata0x1a2b... → { version: 5, replicas: [cs-5, cs-12, cs-23], primary: cs-5, lease_expiration: T+60}Chunkserver stores:Chunk handle → Chunk file0x1a2b... → /gfs/chunks/0x1a2b3c4d5e6f7g8hBenefits:────────✓ Simple mapping✓ No path dependencies✓ Easy to move chunks between servers✓ No name conflicts✓ Efficient lookup
COMPLETE WRITE OPERATION────────────────────────CLIENT WRITE REQUEST:────────────────────gfs.write("/data/file1", offset=50MB, data=1MB)PHASE 1: LEASE ACQUISITION──────────────────────────Client → Master: "Need to write to /data/file1, chunk 0"Master checks: 1. Chunk 0 exists? Yes 2. Has current primary? No 3. Select primary: cs-5 (lowest load) 4. Grant lease to cs-5 (60 seconds) 5. Increment version: 4 → 5Master → cs-5, cs-12, cs-23: "Increment version to 5"All chunkservers ACKMaster → Client: { chunk_handle: 0x1a2b..., version: 5, primary: cs-5, secondaries: [cs-12, cs-23], lease_expiration: T+60 }PHASE 2: DATA PUSH─────────────────Client forms chain: cs-5 → cs-12 → cs-23(based on network topology)Client → cs-5: { type: DATA_PUSH, chunk_handle: 0x1a2b..., data_id: uuid_123, // Unique ID for this data data: [1MB payload] }cs-5: 1. Buffer data in memory (don't write to disk yet!) 2. Forward to cs-12 while receiving from client → cs-12: DATA_PUSH with same datacs-12: 1. Buffer data in memory 2. Forward to cs-23 → cs-23: DATA_PUSH with same datacs-23: 1. Buffer data in memory 2. Send ACK to cs-12cs-12 → ACK to cs-5cs-5 → ACK to clientClient now knows all replicas have buffered the dataPHASE 3: WRITE COMMAND─────────────────────Client → Primary (cs-5): { type: WRITE_COMMAND, chunk_handle: 0x1a2b..., data_id: uuid_123, // Reference to buffered data offset: 50MB, length: 1MB }PRIMARY (cs-5) PROCESSING:─────────────────────────1. Assign serial number: (If multiple concurrent writes, serialize them) serial_number = next_serial++ // e.g., 422. Apply write to local disk: fd = open("/gfs/chunks/0x1a2b...") lseek(fd, 50MB) write(fd, buffered_data[uuid_123])3. Update checksums: start_block = 50MB / 64KB end_block = 51MB / 64KB for block in [start_block..end_block]: checksum[block] = crc32(data[block])4. Forward write command to secondaries: cs-5 → cs-12, cs-23: { type: APPLY_WRITE, chunk_handle: 0x1a2b..., data_id: uuid_123, offset: 50MB, length: 1MB, serial_number: 42 }SECONDARIES (cs-12, cs-23):──────────────────────────1. Apply in serial number order: (Wait if earlier serial numbers not yet applied)2. Write to disk: Same as primary3. Update checksums: Same as primary4. Send ACK to primary: cs-12 → cs-5: ACK(serial_number=42) cs-23 → cs-5: ACK(serial_number=42)PRIMARY RECEIVES ACKS:─────────────────────If ALL secondaries ACK: cs-5 → Client: SUCCESSIf ANY secondary fails: cs-5 → Client: ERROR (Client retries entire operation)Client writes are atomic: all replicas or noneFAILURE SCENARIOS:─────────────────Scenario 1: cs-23 fails during data push────────────────────────────────────────• Client doesn't receive ACK from all replicas• Client retries entire write• Master may detect cs-23 failure• Master re-replicates chunk elsewhereScenario 2: cs-12 fails during apply────────────────────────────────────• Primary doesn't receive ACK from cs-12• Primary returns ERROR to client• Client retries• Chunk now inconsistent: - cs-5: has write - cs-12: may or may not have write - cs-23: has write• But: All replicas have same version (5)• Next write will succeed and create defined region• GFS consistency model allows this (relaxed)Scenario 3: Primary fails after apply─────────────────────────────────────• Primary wrote locally and sent to secondaries• Crashed before receiving ACKs• Client sees timeout• Client retries• Master's lease expires (60 sec max wait)• Master grants lease to different replica• New primary has the data (it was applied)• Client retry may create duplicate (application handles)
SUCCESSFUL RECORD APPEND───────────────────────CLIENT REQUEST:──────────────offset = gfs.record_append("/data/file1", data=100KB)CLIENT → MASTER:───────────────"Need to append to /data/file1"Master returns last chunk (let's say chunk 5):{ chunk_handle: 0x1a2b..., version: 3, primary: cs-5, secondaries: [cs-12, cs-23], current_size: 60MB // Approximate}DATA PUSH PHASE:───────────────(Same as write - push to all replicas)Client → cs-5 → cs-12 → cs-23Data buffered in memory on allAPPEND COMMAND:──────────────Client → Primary (cs-5):{ type: RECORD_APPEND, chunk_handle: 0x1a2b..., data_id: uuid_456, length: 100KB}PRIMARY PROCESSING:──────────────────1. Check current offset: current_offset = get_chunk_size(0x1a2b...) current_offset = 60MB2. Check if record fits: new_offset = 60MB + 100KB if new_offset <= 64MB: # Fits! Proceed else: # Doesn't fit, pad and use next chunk3. Assign offset: append_offset = 60MB4. Assign serial number: serial = next_serial++5. Apply locally: fd = open("/gfs/chunks/0x1a2b...") lseek(fd, 60MB) write(fd, buffered_data[uuid_456])6. Update checksums7. Forward to secondaries: cs-5 → cs-12, cs-23: { type: APPLY_APPEND, data_id: uuid_456, offset: 60MB, // SAME offset for all! length: 100KB, serial: serial }SECONDARIES APPLY:─────────────────cs-12: lseek(60MB) write(data) update_checksums() ACK to primarycs-23: SamePRIMARY → CLIENT:────────────────{ status: SUCCESS, offset: 60MB}Client now knows record is at offset 60MBAll replicas identical!
APPEND AT CHUNK BOUNDARY───────────────────────Current chunk state:• Chunk 5: 63.9MB used (almost full)• Want to append: 200KBCheck: 63.9MB + 200KB = 64.1MB > 64MB→ Doesn't fit in current chunk!PRIMARY ACTION:──────────────1. Pad current chunk to 64MB: padding = 64MB - 63.9MB = 100KB write(padding_bytes) // All zeros or random2. All secondaries do same: cs-5 → cs-12, cs-23: { type: PAD_TO_END, chunk: 0x1a2b..., padding: 100KB }3. Tell client to retry with next chunk: Primary → Client: { status: RETRY_NEXT_CHUNK, next_chunk: 6 }CLIENT RETRIES:──────────────Client → Master: "Need chunk 6 of /data/file1"Master: • Creates chunk 6 if doesn't exist • Assigns chunk handle, selects replicas • Returns metadataClient retries append with chunk 6→ Succeeds at offset 0 of new chunkRESULT:──────Chunk 5: 63.9MB data + 100KB padding = 64MB (full)Chunk 6: 200KB record at offset 0Application sees:• Record at chunk 6, offset 0• Knows to skip padding when reading• Uses record markers/checksums to identify data
APPEND WITH FAILURES───────────────────Scenario: Secondary fails during appendCLIENT REQUEST:──────────────gfs.record_append("/data/file1", data=100KB)DATA PUSH:─────────Client → cs-5 → cs-12 → cs-23cs-5: Buffered ✓cs-12: Buffered ✓cs-23: CRASH (network partition, disk failure, etc.)Client receives ACK from cs-5, cs-12 onlyAPPEND COMMAND:──────────────Client → Primary (cs-5): RECORD_APPEND(data_id, 100KB)PRIMARY APPLIES:───────────────cs-5: • Assigns offset: 60MB • Writes locally at 60MB ✓cs-5 → cs-12: APPLY_APPEND(offset=60MB)cs-12: • Writes at 60MB ✓ • ACKs to primary ✓cs-5 → cs-23: APPLY_APPEND(offset=60MB)cs-23: • TIMEOUT (offline!)PRIMARY RECEIVES:────────────────ACKs: cs-12 ✓, cs-23 ✗NOT all replicas ACKed!Primary → Client: ERROR: Append failedCURRENT STATE:─────────────cs-5: Has record at 60MB ✓cs-12: Has record at 60MB ✓cs-23: Doesn't have record ✗Chunk is now INCONSISTENT!CLIENT RETRIES:──────────────Client retries entire operation:1. Re-push data to all replicas (cs-23 may be back, or replaced)2. Primary assigns NEW offset: Now at 60.1MB (after previous record)3. Applies to all replicas4. SUCCESSRESULT:──────Chunk contents:┌──────────────────────────────┐│ ... ││ 60.0MB: Record data (100KB) │ ← First attempt (on cs-5, cs-12)│ 60.1MB: Record data (100KB) │ ← Retry (on all replicas)│ ... │└──────────────────────────────┘• DUPLICATE at 60.0MB (only on cs-5, cs-12)• Successful record at 60.1MB (on all)• Client receives offset 60.1MB• Application will see duplicate when readingAPPLICATION HANDLING:────────────────────Reader scans file:for record in read_file("/data/file1"): if not valid_checksum(record): skip // Inconsistent region (60.0MB on cs-23) if seen_id(record.id): skip // Duplicate (60.0MB duplicate) process(record) // Process 60.1MB recordApplication de-duplicates based on unique IDsApplication skips inconsistent regionsGFS guarantees:• At least once delivery• May have duplicates• May have inconsistent regions• Application handles this
MULTIPLE CONCURRENT APPENDS──────────────────────────Scenario: 3 clients append simultaneouslyCLIENT A: append(data_A, 50KB)CLIENT B: append(data_B, 75KB)CLIENT C: append(data_C, 100KB)All three:1. Push data to replicas2. Send append command to primaryPRIMARY SERIALIZES:──────────────────Primary receives three append requests:Request queue:1. Append(data_A, 50KB) - arrived first2. Append(data_B, 75KB) - arrived second3. Append(data_C, 100KB) - arrived thirdPrimary processes IN ORDER:Step 1: Process data_A──────────────────────current_offset = 60MBassign offset: 60MBwrite data_A at 60MBforward to secondaries: "write at 60MB"new_offset = 60MB + 50KB = 60.05MBStep 2: Process data_B──────────────────────current_offset = 60.05MBassign offset: 60.05MBwrite data_B at 60.05MBforward to secondaries: "write at 60.05MB"new_offset = 60.05MB + 75KB = 60.125MBStep 3: Process data_C──────────────────────current_offset = 60.125MBassign offset: 60.125MBwrite data_C at 60.125MBforward to secondaries: "write at 60.125MB"new_offset = 60.125MB + 100KB = 60.225MBRESULT:──────All replicas have identical contents:┌────────────────────────────────┐│ ... ││ 60.000MB: data_A (50KB) ││ 60.050MB: data_B (75KB) ││ 60.125MB: data_C (100KB) ││ 60.225MB: <next append here> │└────────────────────────────────┘Clients receive:• Client A: offset 60.000MB• Client B: offset 60.050MB• Client C: offset 60.125MBKEY POINT:─────────Primary serializes ALL appends→ Consistent order across all replicas→ No distributed consensus needed→ Simple, fast, scalableThroughput:• Limited by primary's processing• ~1000s of appends/second possible• Perfect for MapReduce intermediate data
READ PATH WITH CORRUPTION:─────────────────────────Client reads from cs-5:1. Chunkserver reads block from disk2. Computes checksum3. Compares with stored checksum4. MISMATCH! Corruption detectedChunkserver → Client: ERROR: Data corruptedClient response: 1. Try different replica (cs-12) 2. Report corruption to master 3. Return correct data to applicationMaster action: 1. Mark cs-5's copy as corrupted 2. Schedule re-replication from cs-12 3. Eventually delete corrupted copy from cs-5 4. Restore to 3 replicasEXAMPLE:───────Read request: chunk 0x1a2b, offset 5MB, length 1MBcs-5 reads blocks [80..95] (5MB/64KB to 6MB/64KB)Block 87: checksum mismatch!Expected: 0xabcd1234Actual: 0xabcd1235 ← One bit differentError returned to clientClient retries:cs-12 → Checksum OK → Return dataMaster notified:• cs-5 chunk 0x1a2b: CORRUPTED• Re-replicate from cs-12 or cs-23• Delete from cs-5 after re-replication
Scrubbing
Proactive Corruption Detection:
BACKGROUND SCRUBBING───────────────────Chunkserver runs background task:def scrubbing_task(): while True: for chunk in all_chunks: if chunk.idle() and low_load(): verify_chunk(chunk) sleep(random(60, 300)) # 1-5 minutesdef verify_chunk(chunk_handle): for block_index in range(num_blocks): data = read_block(chunk_handle, block_index) expected = get_checksum(block_index) actual = crc32(data) if expected != actual: # Found corruption! log_error(...) report_to_master(chunk_handle, block_index) # Don't fix locally, let master coordinateBENEFITS:────────• Detect corruption before client reads• Find rarely-read data corruption• Catch disk degradation early• Reduce read errors for clientsSCHEDULING:──────────• Low priority (idle time only)• Rate limited (1-2 chunks/minute)• Doesn't impact foreground I/O• Full cluster scrub: weeks/monthsMASTER RESPONSE:───────────────Chunkserver: "Chunk 0x1a2b is corrupted"Master: 1. Verify chunk has other good replicas 2. Increment re-replication priority 3. Schedule re-replication 4. Mark corrupted replica for deletion 5. Eventually delete from reporting chunkserver
Write-Time Optimization
Append Checksum Optimization:
APPEND CHECKSUM HANDLING───────────────────────Problem: Last block may be partialExample:────────Chunk size: 5.5MBBlocks: 0-87 (88 blocks)Last block (87): Only 32KB used (not full 64KB)Append: 100KB dataNaive approach:──────────────1. Read last block (32KB existing data)2. Append new data (100KB)3. Compute checksum for modified blocks4. Write back→ Extra read I/O! Slow!GFS Optimization:────────────────1. Don't read last block2. Append data starting at block boundary3. Pad if necessaryExample:────────Block 87: 32KB data + 32KB paddingBlock 88: 64KB new data (part of 100KB)Block 89: 36KB new data + 28KB unusedTrade-off:• Some wasted space (padding)• No extra read I/O• Faster appends• Acceptable for append-heavy workloadCHECKSUM INCREMENTAL UPDATE:───────────────────────────For record append:1. Primary assigns offset2. Writes data3. Computes checksums for new blocks only4. Sends checksums to secondaries with data5. Secondaries verify and storeNo need to re-checksum entire chunk→ Fast, scalable
Expected Answer:GFS uses pipelined data transfer to maximize network bandwidth utilization:Without Pipelining (Sequential):
Client sends to replica 1, waits for completion
Then sends to replica 2, waits
Then sends to replica 3
Time: 3× transfer time
Only one network link active at a time
With Pipelining:
Client sends to replica 1
Replica 1 forwards to replica 2 while still receiving from client
Replica 2 forwards to replica 3 while receiving from replica 1
All network links active simultaneously
Time: ~1× transfer time (plus small propagation delay)
Benefits:
3× speedup for 3 replicas
Fully utilizes network topology
Each link operates at full bandwidth
Essential for GFS’s high throughput goals
GFS chains replicas by network distance (closest first) to minimize cross-rack transfers and optimize pipelining.
Intermediate: How does record append handle concurrent writers?
Expected Answer:Record append enables multiple clients to append to the same file concurrently without coordination:Process:
Data Push: Each client pushes data to all replicas independently
Append Request: Each client sends append command to primary (not to a specific offset!)
Primary Serializes: Primary assigns offset for each append in the order received
Sequential Application: Primary applies appends sequentially, each at its assigned offset
Replica Coordination: Primary tells secondaries exact offset for each append
Offset Return: Primary returns assigned offset to each client
Key Properties:
Primary acts as serialization point (no distributed consensus needed)
All replicas apply appends in same order at same offsets
Each client learns where its record was placed
No locking or coordination between clients required
Enables thousands of concurrent appenders
Failure Handling:
If append fails on any replica, client retries
Retry may create duplicate at different offset
Application de-duplicates using unique record IDs
At-least-once delivery guaranteed
Perfect for MapReduce where thousands of mappers append to shared output files.
Advanced: Explain GFS's checksum strategy and trade-offs
Expected Answer:GFS uses 32-bit CRC32 checksums on 64KB blocks:Design:
Each 64MB chunk divided into 1024 × 64KB blocks
Each block has 32-bit checksum (4KB metadata per chunk)
Checksums stored in memory and on disk
Verified on every read
Why 64KB blocks?Smaller blocks (4KB):
✓ More precise corruption detection
✗ More checksums (higher memory overhead)
✗ More CPU for computation
✗ Higher metadata storage
Larger blocks (1MB):
✓ Less overhead
✗ Must re-read more data on corruption
✗ Less precise detection
64KB: Balance of precision and overheadWrite Optimization:
For appends: Don’t re-read last partial block
Pad to block boundary instead
Trade space for speed (append-optimized workload)
Detection Timing:
Read-time: Every read verified
Scrubbing: Background task checks idle chunks
Replication: Verified during chunk copy
Corruption Response:
Read from different replica
Report to master
Master schedules re-replication
Corrupted replica deleted
Trade-offs:
0.006% space overhead (acceptable)
CPU cost negligible (modern CRC32 instructions)
False negative: ~1 in 4 billion (acceptable for commodity hardware)
Catches virtually all bit flips, disk corruption, memory errors
System Design: How would you optimize GFS for small random writes?
Expected Answer:GFS is optimized for large sequential writes/appends. For small random writes, several optimizations possible:1. Client-Side Buffering:
Buffer multiple small writes in client
Batch into larger writes (e.g., 1MB)
Flush periodically or when buffer full
Trade-off: Latency for throughput, memory usage
2. Log-Structured Approach:
Append small writes to log file
Background compaction merges into final locations
Like LevelDB/RocksDB approach
Trade-off: Complexity, write amplification
3. Write Coalescing at Chunkserver:
Chunkserver buffers writes in memory
Coalesces overlapping/adjacent writes
Flushes in batches
Trade-off: Durability concerns, memory usage
4. Smaller Chunks:
Reduce from 64MB to 4-8MB
Less internal fragmentation
More metadata overhead (acceptable for small files)
Trade-off: More master load, more metadata
5. Separate Tier:
Small-write tier with different chunk size/strategy
Large-write tier with current 64MB chunks
Route based on access pattern
Trade-off: System complexity, two code paths
6. Hybrid Storage:
Small writes to SSD/NVME (low latency)
Large sequential to HDD (high throughput)
Background migration based on patterns
Trade-off: Cost, complexity
Real-World:
Colossus (GFS successor) uses metadata sharding and smaller chunks
HDFS has append-only model similar to GFS
Modern systems like Ceph use different strategies entirely
For truly random small writes, key-value stores (Bigtable, HBase) better fit