Understanding Node.js Streams: A Comprehensive Guide
Node.js streams provide a powerful way to handle data, especially when working with large amounts of information or when processing data from an external source piece by piece. They allow you to read or write data chunk by chunk, rather than loading everything into memory at once. This approach offers significant performance benefits and is one of Node.js's core strengths.
This article will explore Node.js streams in depth - what they are, how they work, when to use them, and practical examples to help you implement them in your own applications.
What are Node.js streams?
Streams are collections of data that might not be available all at once and don't have to fit in memory. Think of them like a conveyor belt where data arrives and is processed piece by piece, rather than as a whole batch.
In Node.js, streams are instances of EventEmitter, which means they emit events that can be used to read and write data. There are four fundamental stream types in Node.js:
- Readable - streams from which data can be read (e.g., reading a file)
- Writable - streams to which data can be written (e.g., writing to a file)
- Duplex - streams that are both Readable and Writable (e.g., TCP sockets)
- Transform - Duplex streams that can modify or transform data as it's written or read (e.g., compression streams)
Why use streams?
Before diving deeper, let's understand why streams are beneficial:
- Memory efficiency: Process large files without loading everything into memory
- Time efficiency: Start processing data as soon as it's available, rather than waiting for all data
- Composability: Easily pipe streams together to create powerful data pipelines
Creating and using readable streams
Readable streams are sources from which data can be consumed. Let's look at how to create and use them.
Reading from a file using streams
The simplest example is reading from a file:
const fs = require('fs');
// Create a readable stream
const readableStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB chunks
});
// Handle stream events
readableStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} characters`);
});
readableStream.on('end', () => {
console.log('File reading completed');
});
readableStream.on('error', (error) => {
console.error('Error reading file:', error);
});
In this example:
- We create a readable stream using
fs.createReadStream()
- The
highWaterMark
option sets the buffer size to 16KB - We listen for 'data' events which provide chunks of the file
- The 'end' event signals when the file has been completely read
- The 'error' event catches any problems
Creating a custom readable stream
You can also implement your own readable stream:
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max) {
super();
this.max = max;
this.counter = 0;
}
_read() {
this.counter += 1;
if (this.counter <= this.max) {
// Convert to string because streams work with strings or Buffers
const str = `${this.counter}\n`;
this.push(str);
} else {
// Pushing null signals the end of the stream
this.push(null);
}
}
}
const counterStream = new CounterStream(10);
counterStream.on('data', (chunk) => {
console.log(chunk.toString());
});
counterStream.on('end', () => {
console.log('Counter stream finished');
});
This example:
- Creates a custom
Readable
stream that emits numbers from 1 to max - Implements the
_read()
method which is called when the stream wants to fetch more data - Uses
push(chunk)
to send data to the consumer - Uses
push(null)
to signal the end of the stream
Reading stream modes
Readable streams operate in two modes:
- Flowing mode: Data is read automatically and provided as quickly as possible
- Paused mode: The
read()
method must be called explicitly to get chunks
The examples above use flowing mode by attaching a 'data' event handler. To use paused mode:
const fs = require('fs');
const readableStream = fs.createReadStream('large-file.txt');
// Switch to paused mode if it was in flowing mode
readableStream.pause();
// Manually read chunks
process.stdin.on('data', () => {
const chunk = readableStream.read(64);
if (chunk) {
console.log(`Read ${chunk.length} bytes:`, chunk.toString());
} else {
console.log('No more data to read at the moment');
}
});
readableStream.on('end', () => {
console.log('End of stream');
process.exit(0);
});
console.log('Press enter to read 64 bytes');
Creating and using writable streams
Writable streams allow you to send data to a destination. Let's see how to create and use them.
Writing to a file using streams
Here's a basic example of writing to a file:
const fs = require('fs');
// Create a writable stream
const writableStream = fs.createWriteStream('output.txt');
// Write data to the stream
writableStream.write('This is the first line\n');
writableStream.write('This is the second line\n');
writableStream.write('This is the third line\n');
// End the stream
writableStream.end('This is the final line\n');
writableStream.on('finish', () => {
console.log('All data has been written');
});
writableStream.on('error', (error) => {
console.error('Error writing to file:', error);
});
In this example:
- We create a writable stream with
fs.createWriteStream()
- We write multiple lines with the
write()
method - We end the stream with
end()
, which can also write final data - The 'finish' event tells us when all data has been written
Creating a custom writable stream
Let's create a custom writable stream that transforms text to uppercase:
const { Writable } = require('stream');
class UppercaseWriter extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor
super(options);
}
_write(chunk, encoding, callback) {
// Convert the chunk to uppercase
const uppercase = chunk.toString().toUpperCase();
// Print to the console
process.stdout.write(uppercase);
// Call callback to indicate we're ready for the next chunk
callback();
}
}
// Create an instance of our custom stream
const uppercaseWriter = new UppercaseWriter();
// Write data to it
uppercaseWriter.write('Hello, ');
uppercaseWriter.write('world!\n');
uppercaseWriter.write('This text will be uppercase.\n');
uppercaseWriter.end();
uppercaseWriter.on('finish', () => {
console.log('All data has been processed');
});
This example:
- Creates a custom
Writable
stream that transforms text to uppercase - Implements the
_write()
method which processes incoming chunks - Uses
process.stdout.write()
to output the transformed data - Calls the
callback()
function to signal we're ready for more data
Duplex and transform streams
Duplex and transform streams are more advanced stream types that combine reading and writing capabilities.
Duplex streams
A duplex stream is both readable and writable, like a TCP socket:
const { Duplex } = require('stream');
class NumberDuplex extends Duplex {
constructor(options) {
super(options);
this.current = 0;
this.max = 5;
this.received = [];
}
_read() {
this.current++;
if (this.current <= this.max) {
this.push(`${this.current}\n`);
} else {
this.push(null);
}
}
_write(chunk, encoding, callback) {
this.received.push(chunk.toString().trim());
callback();
}
_final(callback) {
console.log('Received messages:', this.received);
callback();
}
}
const duplex = new NumberDuplex();
// Read side
duplex.on('data', (chunk) => {
console.log('Read:', chunk.toString());
});
duplex.on('end', () => {
console.log('Read complete');
});
// Write side
duplex.write('a');
duplex.write('b');
duplex.write('c');
duplex.end();
This duplex stream:
- Produces numbers from 1 to 5 on its readable side
- Collects letters on its writable side
- Operates the read and write sides independently
Transform streams
Transform streams are duplex streams that can modify data as it passes through:
const { Transform } = require('stream');
const fs = require('fs');
class ReverseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Reverse the string in the chunk
const reversed = chunk.toString().split('').reverse().join('');
// Push the transformed data
this.push(reversed);
callback();
}
}
// Create transform stream
const reverser = new ReverseTransform();
// Create readable and writable streams
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('reversed.txt');
// Pipe data through the transform stream
readStream
.pipe(reverser)
.pipe(writeStream);
writeStream.on('finish', () => {
console.log('Transformation complete');
});
This transform stream:
- Takes input text and reverses it
- Uses the
_transform()
method which handles both reading and writing - Is placed between a readable and writable stream using pipes
Stream piping
One of the most powerful features of streams is the ability to pipe them together, creating data processing pipelines.
Basic piping
Here's a simple example that copies a file:
const fs = require('fs');
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');
// Pipe the read stream into the write stream
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('File copy completed');
});
Building a pipeline
For more complex pipelines, you can chain multiple streams:
const fs = require('fs');
const zlib = require('zlib');
const { Transform } = require('stream');
// Create a transform stream to convert to uppercase
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// Create the streams
const readStream = fs.createReadStream('input.txt');
const uppercaseStream = new UppercaseTransform();
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream('output.txt.gz');
// Build the pipeline
readStream
.pipe(uppercaseStream) // First transform the data to uppercase
.pipe(gzipStream) // Then compress it
.pipe(writeStream); // Finally, write to a file
writeStream.on('finish', () => {
console.log('Pipeline processing completed');
});
This example:
- Reads data from a file
- Transforms it to uppercase
- Compresses it using gzip
- Writes the compressed data to a file
Using pipeline API
The stream.pipeline()
method provides better error handling than .pipe()
:
const fs = require('fs');
const zlib = require('zlib');
const { pipeline, Transform } = require('stream');
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// Create the streams
const readStream = fs.createReadStream('input.txt');
const uppercaseStream = new UppercaseTransform();
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream('output.txt.gz');
// Build the pipeline with error handling
pipeline(
readStream,
uppercaseStream,
gzipStream,
writeStream,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
Handling stream errors
Proper error handling is crucial when working with streams:
const fs = require('fs');
const { pipeline } = require('stream');
const readStream = fs.createReadStream('non-existent-file.txt');
const writeStream = fs.createWriteStream('output.txt');
// Method 1: Error events on individual streams
readStream.on('error', (err) => {
console.error('Read error:', err.message);
});
writeStream.on('error', (err) => {
console.error('Write error:', err.message);
});
// Method 2: Using pipeline for better error handling
pipeline(
fs.createReadStream('another-non-existent-file.txt'),
fs.createWriteStream('another-output.txt'),
(err) => {
if (err) {
console.error('Pipeline error:', err.message);
}
}
);
Practical examples
Let's explore some real-world applications of Node.js streams.
Building a file upload server
Here's how you could use streams to handle file uploads:
const http = require('http');
const fs = require('fs');
const path = require('path');
const server = http.createServer((req, res) => {
if (req.method === 'POST' && req.url === '/upload') {
// Create a unique filename
const filename = path.join(__dirname, 'uploads', `upload-${Date.now()}.file`);
// Create a writable stream
const fileStream = fs.createWriteStream(filename);
// Handle file stream errors
fileStream.on('error', (err) => {
console.error('File write error:', err);
res.statusCode = 500;
res.end('Error saving file');
});
// Track upload progress
let uploadedBytes = 0;
req.on('data', (chunk) => {
uploadedBytes += chunk.length;
console.log(`Received ${uploadedBytes} bytes`);
});
// Pipe the request body to the file
req.pipe(fileStream);
// When the upload is finished
fileStream.on('finish', () => {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({
success: true,
filename: path.basename(filename),
size: uploadedBytes
}));
});
// Handle request errors
req.on('error', (err) => {
console.error('Request error:', err);
fileStream.destroy(err);
});
// If the client aborts the request
req.on('aborted', () => {
fileStream.destroy();
fs.unlink(filename, () => {});
});
} else {
res.statusCode = 404;
res.end('Not found');
}
});
// Create uploads directory if it doesn't exist
const uploadsDir = path.join(__dirname, 'uploads');
if (!fs.existsSync(uploadsDir)) {
fs.mkdirSync(uploadsDir);
}
server.listen(3000, () => {
console.log('Upload server listening on port 3000');
});
This server:
- Accepts file uploads via HTTP POST
- Streams the uploaded data directly to disk
- Tracks upload progress
- Handles various error conditions
- Cleans up incomplete files if the connection is aborted
Creating a CSV parser
Here's a transform stream that parses CSV data:
const { Transform } = require('stream');
const fs = require('fs');
class CSVParser extends Transform {
constructor(options = {}) {
options.objectMode = true;
super(options);
this.headers = null;
this.buffer = '';
this.delimiter = options.delimiter || ',';
}
_transform(chunk, encoding, callback) {
// Add new data to the buffer
this.buffer += chunk.toString();
// Split on newlines
const lines = this.buffer.split('\n');
// Last line might be incomplete, save for next chunk
this.buffer = lines.pop();
// Process lines
for (const line of lines) {
// Skip empty lines
if (line.trim() === '') continue;
// Parse the CSV line
const values = this._parseLine(line);
// Extract headers from the first line
if (!this.headers) {
this.headers = values;
continue;
}
// Convert arrays to objects using headers as keys
const record = {};
for (let i = 0; i < this.headers.length; i++) {
record[this.headers[i]] = values[i];
}
// Push the parsed record
this.push(record);
}
callback();
}
_flush(callback) {
// Process any remaining data
if (this.buffer.trim() !== '') {
const values = this._parseLine(this.buffer);
if (this.headers) {
const record = {};
for (let i = 0; i < this.headers.length; i++) {
record[this.headers[i]] = values[i];
}
this.push(record);
}
}
callback();
}
_parseLine(line) {
// Very basic CSV parsing (doesn't handle quoted fields properly)
return line.split(this.delimiter).map(field => field.trim());
}
}
// Usage example
const parser = new CSVParser();
const readStream = fs.createReadStream('data.csv');
readStream.pipe(parser);
parser.on('data', (record) => {
console.log(record);
});
parser.on('end', () => {
console.log('CSV parsing complete');
});
Note: This is a simplified CSV parser. For production use, consider libraries
like csv-parse
which handle all the edge cases.
HTTP streaming API
Here's an example of an API that streams data to clients:
const http = require('http');
const { Readable } = require('stream');
class DataSource extends Readable {
constructor(options = {}) {
super(options);
this.counter = 0;
this.max = options.max || 1000000;
this.interval = options.interval || 100;
this.timer = null;
}
_read() {
this.timer = setInterval(() => {
this.counter++;
if (this.counter <= this.max) {
// Create a data point
const data = {
id: this.counter,
timestamp: new Date().toISOString(),
value: Math.random() * 100
};
// Push as JSON with newline for streaming
this.push(JSON.stringify(data) + '\n');
} else {
// End of stream
clearInterval(this.timer);
this.push(null);
}
}, this.interval);
// Handle premature stream destruction
this.on('close', () => {
clearInterval(this.timer);
});
}
}
const server = http.createServer((req, res) => {
if (req.url === '/api/data') {
// Parse query parameters
const url = new URL(req.url, `http://${req.headers.host}`);
const max = parseInt(url.searchParams.get('max') || '100');
const interval = parseInt(url.searchParams.get('interval') || '500');
// Set appropriate headers for streaming
res.setHeader('Content-Type', 'application/json');
res.setHeader('Transfer-Encoding', 'chunked');
// Create the data source
const dataSource = new DataSource({ max, interval });
// Stream data to client
dataSource.pipe(res);
// Handle client disconnect
req.on('close', () => {
dataSource.destroy();
});
} else {
res.statusCode = 404;
res.end('Not found');
}
});
server.listen(3000, () => {
console.log('Streaming API server listening on port 3000');
});
This API:
- Generates a stream of JSON data points
- Allows clients to control the flow rate and amount
- Properly handles client disconnects
- Uses chunked transfer encoding for streaming
Stream backpressure
Backpressure is an important concept in streams that prevents memory overflow when a fast producer is paired with a slow consumer.
Understanding backpressure
When you write data to a stream, the write()
method returns a boolean
indicating if the internal buffer is full. If it returns false
, you should
stop writing until the 'drain' event is emitted.
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
// Function to write a lot of data
function writeData(writer, data) {
let i = 100000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last write
writer.write(data + '\n');
} else {
// Check if we should continue
ok = writer.write(data + '\n');
}
} while (i > 0 && ok);
if (i > 0) {
// Wait for the drain event before continuing
writer.once('drain', write);
}
}
}
writeData(writeStream, 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.');
This example demonstrates how to:
- Check the return value of
write()
- Pause writing when the buffer is full
- Resume when the 'drain' event is emitted
Pipe handling of backpressure
When using pipe()
, backpressure is automatically handled for you:
const fs = require('fs');
const { Transform } = require('stream');
// Create a slow transform stream
class SlowTransform extends Transform {
_transform(chunk, encoding, callback) {
// Simulate slow processing
setTimeout(() => {
this.push(chunk);
callback();
}, 100);
}
}
// Fast readable stream
const readStream = fs.createReadStream('large-file.txt', {
highWaterMark: 1024 * 1024 // 1MB chunks
});
// Slow transform stream
const slowTransform = new SlowTransform({
highWaterMark: 16 * 1024 // 16KB buffer
});
// Destination
const writeStream = fs.createWriteStream('output.txt');
// Pipe will handle backpressure automatically
readStream
.pipe(slowTransform)
.pipe(writeStream);
writeStream.on('finish', () => {
console.log('All data processed');
});
Performance considerations
When using streams, keep these performance tips in mind:
- Buffer size: The
highWaterMark
option controls the buffer size and affects memory usage and throughput - Object mode: Streams in object mode have higher overhead than binary streams
- Avoid synchronous operations: Don't block the event loop in stream callbacks
- Pipeline over multiple pipes: Use
pipeline()
for better error handling and resource cleanup - Monitor memory usage: Watch for memory leaks, especially with improper backpressure handling
Final thoughts
Node.js streams provide a powerful and efficient way to handle data, particularly when dealing with large datasets or I/O operations. They enable you to process data incrementally, which reduces memory usage and can significantly improve application performance. The composability of streams through piping allows you to build complex data processing pipelines with clean, maintainable code.
While streams do have a learning curve, mastering them is well worth the effort for Node.js developers. They're used extensively throughout the Node.js ecosystem, from file system operations to HTTP requests and responses. Understanding streams will not only help you write more efficient code but also give you deeper insight into how many Node.js APIs work under the hood.
As you continue working with Node.js, look for opportunities to apply streams in your applications, especially when handling large files, processing real-time data, or building APIs that need to deliver responsive experiences. The streaming paradigm is one of Node.js's greatest strengths and a key reason for its success in data-intensive applications.
Make your mark
Join the writer's program
Are you a developer and love writing and sharing your knowledge with the world? Join our guest writing program and get paid for writing amazing technical guides. We'll get them to the right readers that will appreciate them.
Write for us
Build on top of Better Stack
Write a script, app or project on top of Better Stack and share it with the world. Make a public repository and share it with us at our email.
community@betterstack.comor submit a pull request and help us build better products for everyone.
See the full list of amazing projects on github