In this article, we continue covering streams, since they have a significant role in Node.js development. This time we focus on writable streams and pipes. To illustrate how a writable stream works we implement our simple version of a stream writing to a file. We also provide examples of streams appearing in the Node.js environment in the global process object: stdin, stdout, and stderr.
Node.js TypeScript Writable Streams
In previous examples, we use the
fs.writeFile
function that we can create and write to files with:
import * as fs from 'fs';
import * as util from 'util'
const writeFile = util.promisify(fs.writeFile);
writeFile('./file.txt',
'Hello world!',
{ encoding: 'utf8' }
)
.then(() => {
console.log('File created!');
})
.catch(error => console.log(error));
While this works, it is not a solution for every case. Performance when writing big data this way is not that good. Also, using
fs.writeFile
multiple times on the same file requires waiting for the previous one to finish for it to be safe. In this scenario,
fs.createWriteStream
is strongly encouraged. It creates a
writable stream.
To write some data to it, we use the
write
method.
import * as fs from 'fs';
const stream = fs.createWriteStream('./file.txt');
stream.write('Hello world!', () => {
console.log('File created!');
});
To indicate that you intend for no more data to be written to the stream, you can call the
end
method. You can also provide it with the last chunk of the data.
Since every stream is an instance of EventEmitter that we cover in the second part of the series, the
writable stream also has a set of events. One of them is ‘
finish‘. Stream emits it after you call the
end
function and all the data is transmitted.
import * as fs from 'fs';
const stream = fs.createWriteStream('./file.txt');
stream.on('finish', () => {
console.log('All the data is transmitted');
});
stream.write('Hello ');
stream.write('world!')
Since we now know both readable and writable streams, we can combine them. Let’s transfer one big file into another.
import * as fs from 'fs';
const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');
readable.on('data', chunk => {
writable.write(chunk);
});
Here we create a readable stream and switch it into the flowing mode by attaching the ‘data‘ event listener. Every chunk we receive we pass to the writable stream with the write function. While it looks quite convenient, we can do it even better with pipes.
Pipes
The
pipe
function is available for
readable stream. When provided with a
writable stream, it attaches it to the
readable stream and pushes data to the
writable stream.
import * as fs from 'fs';
const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');
readable.pipe(writable);
That simple!
By default, when all data is transmitted, and the
readable emits the ‘
end‘ event, the
writable stream closes with the
writable.end
function.
import * as fs from 'fs';
const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');
writable.on('finish', () => {
console.log('The end!');
});
readable.pipe(writable);
The end!
This behavior can be changed with the
{ end: false }
option.
One note here is that if any error occurs during piping, the writable is not closed automatically, so it might be necessary to track it and close it manually.
Writable stream under the hood
The
fs.createWriteStream
is not the only way of making a
writable stream. We can create our
writable stream to understand it better.
Every
writable stream has to implement a
_write
method that we call indirectly when we write data to a stream.
import { Writable } from 'stream';
const writable = new Writable();
writable._write = function(chunk, encoding, next) {
console.log(chunk.toString());
next();
};
writable.write('Hello world!');
Hello world!
In our simple example, every time we write to the stream, the string is logged to the console. The encoding is a string that might contain the encoding of our data. Calling the next function indicates that the chunk of data is flushed, meaning we finished handling it.
The
_write
method can also be declared by passing it to the
Writable
constructor, or used by
extending the Writable class.
Having all this knowledge, let’s implement a simplified version of a stream that writes data to a file.
import * as fs from 'fs';
import * as util from 'util'
import { Writable } from 'stream';
const writeFile = util.promisify(fs.writeFile);
class WritableFileStream extends Writable {
path: string;
constructor(path: string) {
super();
this.path = path;
}
_write(chunk: any, encoding: string, next: (error?: Error) => void) {
writeFile(this.path, chunk)
.then(() => next())
.catch((error) => next(error));
}
}
const readable = fs.createReadStream('./file1.txt');
const writable = new WritableFileStream('./file2.txt');
readable.pipe(writable);
In the above example, every time we write to our WritableFileStream, we add the data at the end of a file.
Process streams
In the first part of the series, we mention the global
process object. Aside from properties like
process.argv
and
process.execPath
it contains
streams that our application can use.
process.stdin
The
process.stdin
is a
readable stream that gathers the data
incoming to our process. Using it we can listen for data in the terminal. As we mention in the previous part of the series, the
readable streams have modes, and the
stdin stream is in a
paused mode by default. To switch the
stdin stream to
flowing and make application listen for input we need to resume the
stdin. It happens under the hood when attaching the ‘
data‘ event listener.
let a;
let b;
process.stdin.on('data', (data) => {
if (a === undefined) {
a = Number(data.toString());
} else if (b === undefined) {
b = Number(data.toString());
console.log(`${a} + ${b} = ${a + b}`);
}
});
In the example above, we expect two numbers from the terminal and add them together.
As you can see in the animation above, the process does not exit after two numbers. The above is due to the fact, that the
process.stdin
stream is still
flowing. To fix it, we need to pause it.
let a;
let b;
process.stdin.on('data', (data) => {
if(a === undefined) {
a = Number(data.toString());
} else if(b === undefined) {
b = Number(data.toString());
console.log(`${a} + ${b} = ${a + b}`);
process.stdin.pause();
}
});
process.stdout and process.stderr
The
process.stdout
and
process.stderr
are
writable streams. They are used in the
console.log()
, and
console.error()
and writing to them results in text appearing in the console. We can easily make use of that and, for example, log a file:
import * as fs from 'fs';
const readable = fs.createReadStream('./file1.txt');
readable.pipe(process.stdout);
The streams differ from other Node.js in terms of asynchronicity. For more details check out the documentation.
Summary
In this article, we covered
writable streams: how to handle files using them and how to combine them with
readable streams thanks to
pipes. We also implemented our
writable stream for handling files that included writing the
_write
function. We also learned how to pass additional data to our process through the
process.stdin
stream and what the
process.stdout
and
process.stderr
streams are. This knowledge, combined with
readable streams, gives quite a bit of insight into the topic of streams, but there are still some things to be explained in that matter. Stay tuned!
The post Node.js TypeScript #5. Writable streams, pipes, and the process streams appeared first on Marcin Wanago Blog - JavaScript, both frontend and backend.