Back to Scaling Node.js Applications guides

Understanding Node.js Streams: A Comprehensive Guide

Ayooluwa Isaiah
Updated on March 24, 2025

Node.js streams provide a powerful way to handle data, especially when working with large amounts of information or when processing data from an external source piece by piece. They allow you to read or write data chunk by chunk, rather than loading everything into memory at once. This approach offers significant performance benefits and is one of Node.js's core strengths.

This article will explore Node.js streams in depth - what they are, how they work, when to use them, and practical examples to help you implement them in your own applications.

What are Node.js streams?

Streams are collections of data that might not be available all at once and don't have to fit in memory. Think of them like a conveyor belt where data arrives and is processed piece by piece, rather than as a whole batch.

In Node.js, streams are instances of EventEmitter, which means they emit events that can be used to read and write data. There are four fundamental stream types in Node.js:

  1. Readable - streams from which data can be read (e.g., reading a file)
  2. Writable - streams to which data can be written (e.g., writing to a file)
  3. Duplex - streams that are both Readable and Writable (e.g., TCP sockets)
  4. Transform - Duplex streams that can modify or transform data as it's written or read (e.g., compression streams)

Why use streams?

Before diving deeper, let's understand why streams are beneficial:

  • Memory efficiency: Process large files without loading everything into memory
  • Time efficiency: Start processing data as soon as it's available, rather than waiting for all data
  • Composability: Easily pipe streams together to create powerful data pipelines

Creating and using readable streams

Readable streams are sources from which data can be consumed. Let's look at how to create and use them.

Reading from a file using streams

The simplest example is reading from a file:

file-read-stream.js
const fs = require('fs');

// Create a readable stream
const readableStream = fs.createReadStream('large-file.txt', {
 encoding: 'utf8',
 highWaterMark: 16 * 1024 // 16KB chunks
});

// Handle stream events
readableStream.on('data', (chunk) => {
 console.log(`Received ${chunk.length} characters`);
});

readableStream.on('end', () => {
 console.log('File reading completed');
});

readableStream.on('error', (error) => {
 console.error('Error reading file:', error);
});

In this example:

  • We create a readable stream using fs.createReadStream()
  • The highWaterMark option sets the buffer size to 16KB
  • We listen for 'data' events which provide chunks of the file
  • The 'end' event signals when the file has been completely read
  • The 'error' event catches any problems

Creating a custom readable stream

You can also implement your own readable stream:

custom-readable.js
const { Readable } = require('stream');

class CounterStream extends Readable {
 constructor(max) {
   super();
   this.max = max;
   this.counter = 0;
 }

 _read() {
   this.counter += 1;

   if (this.counter <= this.max) {
     // Convert to string because streams work with strings or Buffers
     const str = `${this.counter}\n`;
     this.push(str);
   } else {
     // Pushing null signals the end of the stream
     this.push(null);
   }
 }
}

const counterStream = new CounterStream(10);

counterStream.on('data', (chunk) => {
 console.log(chunk.toString());
});

counterStream.on('end', () => {
 console.log('Counter stream finished');
});

This example:

  • Creates a custom Readable stream that emits numbers from 1 to max
  • Implements the _read() method which is called when the stream wants to fetch more data
  • Uses push(chunk) to send data to the consumer
  • Uses push(null) to signal the end of the stream

Reading stream modes

Readable streams operate in two modes:

  1. Flowing mode: Data is read automatically and provided as quickly as possible
  2. Paused mode: The read() method must be called explicitly to get chunks

The examples above use flowing mode by attaching a 'data' event handler. To use paused mode:

paused-read.js
const fs = require('fs');
const readableStream = fs.createReadStream('large-file.txt');

// Switch to paused mode if it was in flowing mode
readableStream.pause();

// Manually read chunks
process.stdin.on('data', () => {
 const chunk = readableStream.read(64);
 if (chunk) {
   console.log(`Read ${chunk.length} bytes:`, chunk.toString());
 } else {
   console.log('No more data to read at the moment');
 }
});

readableStream.on('end', () => {
 console.log('End of stream');
 process.exit(0);
});

console.log('Press enter to read 64 bytes');

Creating and using writable streams

Writable streams allow you to send data to a destination. Let's see how to create and use them.

Writing to a file using streams

Here's a basic example of writing to a file:

file-write-stream.js
const fs = require('fs');

// Create a writable stream
const writableStream = fs.createWriteStream('output.txt');

// Write data to the stream
writableStream.write('This is the first line\n');
writableStream.write('This is the second line\n');
writableStream.write('This is the third line\n');

// End the stream
writableStream.end('This is the final line\n');

writableStream.on('finish', () => {
 console.log('All data has been written');
});

writableStream.on('error', (error) => {
 console.error('Error writing to file:', error);
});

In this example:

  • We create a writable stream with fs.createWriteStream()
  • We write multiple lines with the write() method
  • We end the stream with end(), which can also write final data
  • The 'finish' event tells us when all data has been written

Creating a custom writable stream

Let's create a custom writable stream that transforms text to uppercase:

custom-writable.js
const { Writable } = require('stream');

class UppercaseWriter extends Writable {
 constructor(options) {
   // Calls the stream.Writable() constructor
   super(options);
 }

 _write(chunk, encoding, callback) {
   // Convert the chunk to uppercase
   const uppercase = chunk.toString().toUpperCase();

   // Print to the console
   process.stdout.write(uppercase);

   // Call callback to indicate we're ready for the next chunk
   callback();
 }
}

// Create an instance of our custom stream
const uppercaseWriter = new UppercaseWriter();

// Write data to it
uppercaseWriter.write('Hello, ');
uppercaseWriter.write('world!\n');
uppercaseWriter.write('This text will be uppercase.\n');
uppercaseWriter.end();

uppercaseWriter.on('finish', () => {
 console.log('All data has been processed');
});

This example:

  • Creates a custom Writable stream that transforms text to uppercase
  • Implements the _write() method which processes incoming chunks
  • Uses process.stdout.write() to output the transformed data
  • Calls the callback() function to signal we're ready for more data

Duplex and transform streams

Duplex and transform streams are more advanced stream types that combine reading and writing capabilities.

Duplex streams

A duplex stream is both readable and writable, like a TCP socket:

duplex-example.js
const { Duplex } = require('stream');

class NumberDuplex extends Duplex {
 constructor(options) {
   super(options);
   this.current = 0;
   this.max = 5;
   this.received = [];
 }

 _read() {
   this.current++;
   if (this.current <= this.max) {
     this.push(`${this.current}\n`);
   } else {
     this.push(null);
   }
 }

 _write(chunk, encoding, callback) {
   this.received.push(chunk.toString().trim());
   callback();
 }

 _final(callback) {
   console.log('Received messages:', this.received);
   callback();
 }
}

const duplex = new NumberDuplex();

// Read side
duplex.on('data', (chunk) => {
 console.log('Read:', chunk.toString());
});

duplex.on('end', () => {
 console.log('Read complete');
});

// Write side
duplex.write('a');
duplex.write('b');
duplex.write('c');
duplex.end();

This duplex stream:

  • Produces numbers from 1 to 5 on its readable side
  • Collects letters on its writable side
  • Operates the read and write sides independently

Transform streams

Transform streams are duplex streams that can modify data as it passes through:

transform-example.js
const { Transform } = require('stream');
const fs = require('fs');

class ReverseTransform extends Transform {
 _transform(chunk, encoding, callback) {
   // Reverse the string in the chunk
   const reversed = chunk.toString().split('').reverse().join('');
   // Push the transformed data
   this.push(reversed);
   callback();
 }
}

// Create transform stream
const reverser = new ReverseTransform();

// Create readable and writable streams
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('reversed.txt');

// Pipe data through the transform stream
readStream
 .pipe(reverser)
 .pipe(writeStream);

writeStream.on('finish', () => {
 console.log('Transformation complete');
});

This transform stream:

  • Takes input text and reverses it
  • Uses the _transform() method which handles both reading and writing
  • Is placed between a readable and writable stream using pipes

Stream piping

One of the most powerful features of streams is the ability to pipe them together, creating data processing pipelines.

Basic piping

Here's a simple example that copies a file:

basic-pipe.js
const fs = require('fs');

const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');

// Pipe the read stream into the write stream
readStream.pipe(writeStream);

writeStream.on('finish', () => {
 console.log('File copy completed');
});

Building a pipeline

For more complex pipelines, you can chain multiple streams:

pipeline-example.js
const fs = require('fs');
const zlib = require('zlib');
const { Transform } = require('stream');

// Create a transform stream to convert to uppercase
class UppercaseTransform extends Transform {
 _transform(chunk, encoding, callback) {
   this.push(chunk.toString().toUpperCase());
   callback();
 }
}

// Create the streams
const readStream = fs.createReadStream('input.txt');
const uppercaseStream = new UppercaseTransform();
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream('output.txt.gz');

// Build the pipeline
readStream
 .pipe(uppercaseStream)  // First transform the data to uppercase
 .pipe(gzipStream)       // Then compress it
 .pipe(writeStream);     // Finally, write to a file

writeStream.on('finish', () => {
 console.log('Pipeline processing completed');
});

This example:

  • Reads data from a file
  • Transforms it to uppercase
  • Compresses it using gzip
  • Writes the compressed data to a file

Using pipeline API

The stream.pipeline() method provides better error handling than .pipe():

stream-pipeline.js
const fs = require('fs');
const zlib = require('zlib');
const { pipeline, Transform } = require('stream');

class UppercaseTransform extends Transform {
 _transform(chunk, encoding, callback) {
   this.push(chunk.toString().toUpperCase());
   callback();
 }
}

// Create the streams
const readStream = fs.createReadStream('input.txt');
const uppercaseStream = new UppercaseTransform();
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream('output.txt.gz');

// Build the pipeline with error handling
pipeline(
 readStream,
 uppercaseStream,
 gzipStream,
 writeStream,
 (err) => {
   if (err) {
     console.error('Pipeline failed:', err);
   } else {
     console.log('Pipeline succeeded');
   }
 }
);

Handling stream errors

Proper error handling is crucial when working with streams:

error-handling.js
const fs = require('fs');
const { pipeline } = require('stream');

const readStream = fs.createReadStream('non-existent-file.txt');
const writeStream = fs.createWriteStream('output.txt');

// Method 1: Error events on individual streams
readStream.on('error', (err) => {
 console.error('Read error:', err.message);
});

writeStream.on('error', (err) => {
 console.error('Write error:', err.message);
});

// Method 2: Using pipeline for better error handling
pipeline(
 fs.createReadStream('another-non-existent-file.txt'),
 fs.createWriteStream('another-output.txt'),
 (err) => {
   if (err) {
     console.error('Pipeline error:', err.message);
   }
 }
);

Practical examples

Let's explore some real-world applications of Node.js streams.

Building a file upload server

Here's how you could use streams to handle file uploads:

file-upload-server.js
const http = require('http');
const fs = require('fs');
const path = require('path');

const server = http.createServer((req, res) => {
 if (req.method === 'POST' && req.url === '/upload') {
   // Create a unique filename
   const filename = path.join(__dirname, 'uploads', `upload-${Date.now()}.file`);

   // Create a writable stream
   const fileStream = fs.createWriteStream(filename);

   // Handle file stream errors
   fileStream.on('error', (err) => {
     console.error('File write error:', err);
     res.statusCode = 500;
     res.end('Error saving file');
   });

   // Track upload progress
   let uploadedBytes = 0;
   req.on('data', (chunk) => {
     uploadedBytes += chunk.length;
     console.log(`Received ${uploadedBytes} bytes`);
   });

   // Pipe the request body to the file
   req.pipe(fileStream);

   // When the upload is finished
   fileStream.on('finish', () => {
     res.statusCode = 200;
     res.setHeader('Content-Type', 'application/json');
     res.end(JSON.stringify({
       success: true,
       filename: path.basename(filename),
       size: uploadedBytes
     }));
   });

   // Handle request errors
   req.on('error', (err) => {
     console.error('Request error:', err);
     fileStream.destroy(err);
   });

   // If the client aborts the request
   req.on('aborted', () => {
     fileStream.destroy();
     fs.unlink(filename, () => {});
   });
 } else {
   res.statusCode = 404;
   res.end('Not found');
 }
});

// Create uploads directory if it doesn't exist
const uploadsDir = path.join(__dirname, 'uploads');
if (!fs.existsSync(uploadsDir)) {
 fs.mkdirSync(uploadsDir);
}

server.listen(3000, () => {
 console.log('Upload server listening on port 3000');
});

This server:

  • Accepts file uploads via HTTP POST
  • Streams the uploaded data directly to disk
  • Tracks upload progress
  • Handles various error conditions
  • Cleans up incomplete files if the connection is aborted

Creating a CSV parser

Here's a transform stream that parses CSV data:

csv-parser.js
const { Transform } = require('stream');
const fs = require('fs');

class CSVParser extends Transform {
 constructor(options = {}) {
   options.objectMode = true;
   super(options);
   this.headers = null;
   this.buffer = '';
   this.delimiter = options.delimiter || ',';
 }

 _transform(chunk, encoding, callback) {
   // Add new data to the buffer
   this.buffer += chunk.toString();

   // Split on newlines
   const lines = this.buffer.split('\n');

   // Last line might be incomplete, save for next chunk
   this.buffer = lines.pop();

   // Process lines
   for (const line of lines) {
     // Skip empty lines
     if (line.trim() === '') continue;

     // Parse the CSV line
     const values = this._parseLine(line);

     // Extract headers from the first line
     if (!this.headers) {
       this.headers = values;
       continue;
     }

     // Convert arrays to objects using headers as keys
     const record = {};
     for (let i = 0; i < this.headers.length; i++) {
       record[this.headers[i]] = values[i];
     }

     // Push the parsed record
     this.push(record);
   }

   callback();
 }

 _flush(callback) {
   // Process any remaining data
   if (this.buffer.trim() !== '') {
     const values = this._parseLine(this.buffer);

     if (this.headers) {
       const record = {};
       for (let i = 0; i < this.headers.length; i++) {
         record[this.headers[i]] = values[i];
       }
       this.push(record);
     }
   }

   callback();
 }

 _parseLine(line) {
   // Very basic CSV parsing (doesn't handle quoted fields properly)
   return line.split(this.delimiter).map(field => field.trim());
 }
}

// Usage example
const parser = new CSVParser();
const readStream = fs.createReadStream('data.csv');

readStream.pipe(parser);

parser.on('data', (record) => {
 console.log(record);
});

parser.on('end', () => {
 console.log('CSV parsing complete');
});

Note: This is a simplified CSV parser. For production use, consider libraries like csv-parse which handle all the edge cases.

HTTP streaming API

Here's an example of an API that streams data to clients:

streaming-api.js
const http = require('http');
const { Readable } = require('stream');

class DataSource extends Readable {
 constructor(options = {}) {
   super(options);
   this.counter = 0;
   this.max = options.max || 1000000;
   this.interval = options.interval || 100;
   this.timer = null;
 }

 _read() {
   this.timer = setInterval(() => {
     this.counter++;

     if (this.counter <= this.max) {
       // Create a data point
       const data = {
         id: this.counter,
         timestamp: new Date().toISOString(),
         value: Math.random() * 100
       };

       // Push as JSON with newline for streaming
       this.push(JSON.stringify(data) + '\n');
     } else {
       // End of stream
       clearInterval(this.timer);
       this.push(null);
     }
   }, this.interval);

   // Handle premature stream destruction
   this.on('close', () => {
     clearInterval(this.timer);
   });
 }
}

const server = http.createServer((req, res) => {
 if (req.url === '/api/data') {
   // Parse query parameters
   const url = new URL(req.url, `http://${req.headers.host}`);
   const max = parseInt(url.searchParams.get('max') || '100');
   const interval = parseInt(url.searchParams.get('interval') || '500');

   // Set appropriate headers for streaming
   res.setHeader('Content-Type', 'application/json');
   res.setHeader('Transfer-Encoding', 'chunked');

   // Create the data source
   const dataSource = new DataSource({ max, interval });

   // Stream data to client
   dataSource.pipe(res);

   // Handle client disconnect
   req.on('close', () => {
     dataSource.destroy();
   });
 } else {
   res.statusCode = 404;
   res.end('Not found');
 }
});

server.listen(3000, () => {
 console.log('Streaming API server listening on port 3000');
});

This API:

  • Generates a stream of JSON data points
  • Allows clients to control the flow rate and amount
  • Properly handles client disconnects
  • Uses chunked transfer encoding for streaming

Stream backpressure

Backpressure is an important concept in streams that prevents memory overflow when a fast producer is paired with a slow consumer.

Understanding backpressure

When you write data to a stream, the write() method returns a boolean indicating if the internal buffer is full. If it returns false, you should stop writing until the 'drain' event is emitted.

backpressure-example.js
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');

// Function to write a lot of data
function writeData(writer, data) {
 let i = 100000;
 write();

 function write() {
   let ok = true;
   do {
     i--;
     if (i === 0) {
       // Last write
       writer.write(data + '\n');
     } else {
       // Check if we should continue
       ok = writer.write(data + '\n');
     }
   } while (i > 0 && ok);

   if (i > 0) {
     // Wait for the drain event before continuing
     writer.once('drain', write);
   }
 }
}

writeData(writeStream, 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.');

This example demonstrates how to:

  • Check the return value of write()
  • Pause writing when the buffer is full
  • Resume when the 'drain' event is emitted

Pipe handling of backpressure

When using pipe(), backpressure is automatically handled for you:

pipe-backpressure.js
const fs = require('fs');
const { Transform } = require('stream');

// Create a slow transform stream
class SlowTransform extends Transform {
 _transform(chunk, encoding, callback) {
   // Simulate slow processing
   setTimeout(() => {
     this.push(chunk);
     callback();
   }, 100);
 }
}

// Fast readable stream
const readStream = fs.createReadStream('large-file.txt', {
 highWaterMark: 1024 * 1024 // 1MB chunks
});

// Slow transform stream
const slowTransform = new SlowTransform({
 highWaterMark: 16 * 1024 // 16KB buffer
});

// Destination
const writeStream = fs.createWriteStream('output.txt');

// Pipe will handle backpressure automatically
readStream
 .pipe(slowTransform)
 .pipe(writeStream);

writeStream.on('finish', () => {
 console.log('All data processed');
});

Performance considerations

When using streams, keep these performance tips in mind:

  1. Buffer size: The highWaterMark option controls the buffer size and affects memory usage and throughput
  2. Object mode: Streams in object mode have higher overhead than binary streams
  3. Avoid synchronous operations: Don't block the event loop in stream callbacks
  4. Pipeline over multiple pipes: Use pipeline() for better error handling and resource cleanup
  5. Monitor memory usage: Watch for memory leaks, especially with improper backpressure handling

Final thoughts

Node.js streams provide a powerful and efficient way to handle data, particularly when dealing with large datasets or I/O operations. They enable you to process data incrementally, which reduces memory usage and can significantly improve application performance. The composability of streams through piping allows you to build complex data processing pipelines with clean, maintainable code.

While streams do have a learning curve, mastering them is well worth the effort for Node.js developers. They're used extensively throughout the Node.js ecosystem, from file system operations to HTTP requests and responses. Understanding streams will not only help you write more efficient code but also give you deeper insight into how many Node.js APIs work under the hood.

As you continue working with Node.js, look for opportunities to apply streams in your applications, especially when handling large files, processing real-time data, or building APIs that need to deliver responsive experiences. The streaming paradigm is one of Node.js's greatest strengths and a key reason for its success in data-intensive applications.

Author's avatar
Article by
Ayooluwa Isaiah
Ayo is a technical content manager at Better Stack. His passion is simplifying and communicating complex technical ideas effectively. His work was featured on several esteemed publications including LWN.net, Digital Ocean, and CSS-Tricks. When he's not writing or coding, he loves to travel, bike, and play tennis.
Got an article suggestion? Let us know
Next article
Load Testing Node.js with Artillery: A Beginner's Guide
Observability and monitoring reveals problems, but load testing with Artillery can help you find them before they happen.
Licensed under CC-BY-NC-SA

This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.

Make your mark

Join the writer's program

Are you a developer and love writing and sharing your knowledge with the world? Join our guest writing program and get paid for writing amazing technical guides. We'll get them to the right readers that will appreciate them.

Write for us
Writer of the month
Marin Bezhanov
Marin is a software engineer and architect with a broad range of experience working...
Build on top of Better Stack

Write a script, app or project on top of Better Stack and share it with the world. Make a public repository and share it with us at our email.

community@betterstack.com

or submit a pull request and help us build better products for everyone.

See the full list of amazing projects on github