Skip to main content
Node.js Streams

Streams & Buffers

Buffers

JavaScript (in the browser) has historically been poor at handling binary data. Node.js introduced the Buffer class to handle binary data efficiently. A Buffer is a chunk of memory that stores raw binary data. It is similar to an array of integers but corresponds to a raw memory allocation outside the V8 heap.

Creating a Buffer

// Create a buffer of 10 bytes
const buf1 = Buffer.alloc(10);

// Create a buffer from a string
const buf2 = Buffer.from('Hello World');

console.log(buf2); 
// Output: <Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64>
console.log(buf2.toString()); 
// Output: Hello World

Streams

Streams are objects that let you read data from a source or write data to a destination in continuous chunks. There are four types of streams:
  1. Readable: Stream you can read from (e.g., fs.createReadStream).
  2. Writable: Stream you can write to (e.g., fs.createWriteStream).
  3. Duplex: Stream that is both Readable and Writable (e.g., net.Socket).
  4. Transform: Stream that can modify or transform the data as it is written and read (e.g., zlib.createGzip).

Why Streams?

If you want to read a massive file (e.g., 2GB video) and send it to a client:
  • Without Streams: You read the entire 2GB into memory. If 100 users do this, your server crashes.
  • With Streams: You read small chunks (e.g., 64KB) and send them one by one. Memory usage remains low.

Readable Stream Example

const fs = require('fs');

const readStream = fs.createReadStream('largefile.txt', { encoding: 'utf8' });

readStream.on('data', (chunk) => {
  console.log('Received chunk:', chunk.length);
});

readStream.on('end', () => {
  console.log('Finished reading file.');
});

Piping

Piping is a mechanism where we provide the output of one stream as the input to another stream. It is mainly used to get data from one stream and pass it to another.
const fs = require('fs');
const zlib = require('zlib');

const gzip = zlib.createGzip();
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('input.txt.gz');

// Read -> Compress -> Write
readStream.pipe(gzip).pipe(writeStream);

Stream Events

Streams are EventEmitters with specific events:

Readable Stream Events

const readStream = fs.createReadStream('large.txt');

readStream.on('open', () => console.log('File opened'));
readStream.on('data', (chunk) => console.log('Received', chunk.length, 'bytes'));
readStream.on('end', () => console.log('Finished reading'));
readStream.on('close', () => console.log('File closed'));
readStream.on('error', (err) => console.error('Error:', err));

Writable Stream Events

const writeStream = fs.createWriteStream('output.txt');

writeStream.on('finish', () => console.log('Finished writing'));
writeStream.on('close', () => console.log('File closed'));
writeStream.on('error', (err) => console.error('Error:', err));
writeStream.on('drain', () => console.log('Buffer drained, ready for more'));

Backpressure Handling

Backpressure occurs when data comes in faster than it can be written:
const fs = require('fs');

const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.on('data', (chunk) => {
  // write() returns false if buffer is full
  const canContinue = writeStream.write(chunk);
  
  if (!canContinue) {
    // Pause reading until buffer drains
    readStream.pause();
    console.log('Backpressure: pausing read stream');
  }
});

writeStream.on('drain', () => {
  // Buffer drained, resume reading
  readStream.resume();
  console.log('Drain: resuming read stream');
});

// Easier approach: use pipe() which handles backpressure automatically
readStream.pipe(writeStream);

Creating Custom Streams

Custom Readable Stream

const { Readable } = require('stream');

class NumberStream extends Readable {
  constructor(max) {
    super();
    this.current = 0;
    this.max = max;
  }

  _read() {
    if (this.current <= this.max) {
      this.push(String(this.current++) + '\n');
    } else {
      this.push(null); // Signal end of stream
    }
  }
}

const numbers = new NumberStream(10);
numbers.pipe(process.stdout);
// Outputs: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

Custom Writable Stream

const { Writable } = require('stream');

class LogStream extends Writable {
  _write(chunk, encoding, callback) {
    const timestamp = new Date().toISOString();
    console.log(`[${timestamp}] ${chunk.toString().trim()}`);
    callback(); // Signal write complete
  }
}

const logger = new LogStream();
logger.write('Hello World');
logger.write('Another message');

Custom Transform Stream

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

const uppercase = new UppercaseTransform();

process.stdin
  .pipe(uppercase)
  .pipe(process.stdout);
// Converts all input to uppercase

Practical Example: HTTP File Download

const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
  if (req.url === '/download') {
    const filePath = './large-video.mp4';
    const stat = fs.statSync(filePath);
    
    res.writeHead(200, {
      'Content-Type': 'video/mp4',
      'Content-Length': stat.size,
      'Content-Disposition': 'attachment; filename="video.mp4"'
    });
    
    // Stream the file instead of loading into memory
    const readStream = fs.createReadStream(filePath);
    readStream.pipe(res);
    
    readStream.on('error', (err) => {
      res.writeHead(500);
      res.end('Error reading file');
    });
  }
});

server.listen(3000);

Practical Example: Real-time Log Parser

const fs = require('fs');
const readline = require('readline');

async function parseLogFile(filePath) {
  const fileStream = fs.createReadStream(filePath);
  
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity // Handle both \r\n and \n
  });
  
  const stats = {
    total: 0,
    errors: 0,
    warnings: 0
  };
  
  for await (const line of rl) {
    stats.total++;
    if (line.includes('ERROR')) stats.errors++;
    if (line.includes('WARN')) stats.warnings++;
  }
  
  return stats;
}

parseLogFile('./server.log').then(console.log);

pipeline() - Better Error Handling

The pipeline() function handles errors and cleanup automatically:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

// Promise version (Node.js 15+)
const { pipeline: pipelineAsync } = require('stream/promises');

async function compress() {
  await pipelineAsync(
    fs.createReadStream('input.txt'),
    zlib.createGzip(),
    fs.createWriteStream('input.txt.gz')
  );
  console.log('Compression complete');
}

Summary

  • Buffers handle raw binary data efficiently
  • Streams process data in chunks, enabling memory-efficient operations
  • Four stream types: Readable, Writable, Duplex, Transform
  • Piping connects streams and handles backpressure
  • Backpressure prevents memory overflow when producer is faster than consumer
  • Use pipeline() for proper error handling and cleanup
  • Create custom streams by extending stream classes