Monday, 11 March, 2019 UTC


Summary

In this article, we continue covering streams, since they have a significant role in Node.js development. This time we focus on writable streams and pipes. To illustrate how a writable stream works we implement our simple version of a stream writing to a file. We also provide examples of streams appearing in the Node.js environment in the global process object: stdin, stdout, and stderr.
Node.js TypeScript Writable Streams
In previous examples, we use the 
fs.writeFile
 function that we can create and write to files with:
import * as fs from 'fs';
import * as util from 'util'

const writeFile = util.promisify(fs.writeFile);

writeFile('./file.txt',
  'Hello world!',
  { encoding: 'utf8' }
)
  .then(() => {
    console.log('File created!');
  })
  .catch(error => console.log(error));
While this works, it is not a solution for every case. Performance when writing big data this way is not that good. Also, using 
fs.writeFile
 multiple times on the same file requires waiting for the previous one to finish for it to be safe. In this scenario, 
fs.createWriteStream
 is strongly encouraged. It creates a writable stream.
To write some data to it, we use the 
write
 method.
import * as fs from 'fs';

const stream = fs.createWriteStream('./file.txt');

stream.write('Hello world!', () => {
  console.log('File created!');
});
To indicate that you intend for no more data to be written to the stream, you can call the 
end
 method. You can also provide it with the last chunk of the data.
Since every stream is an instance of EventEmitter that we cover in the second part of the series, the writable stream also has a set of events. One of them is ‘finish‘. Stream emits it after you call the 
end
 function and all the data is transmitted.
import * as fs from 'fs';

const stream = fs.createWriteStream('./file.txt');

stream.on('finish', () => {
  console.log('All the data is transmitted');
});

stream.write('Hello ');
stream.write('world!')
Since we now know both readable and writable streams, we can combine them. Let’s transfer one big file into another.
import * as fs from 'fs';

const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');

readable.on('data', chunk => {
  writable.write(chunk);
});
Here we create a readable stream and switch it into the flowing mode by attaching the ‘data‘ event listener. Every chunk we receive we pass to the writable stream with the write function. While it looks quite convenient, we can do it even better with pipes.
Pipes
The 
pipe
 function is available for readable stream. When provided with a writable stream, it attaches it to the readable stream and pushes data to the writable stream.
import * as fs from 'fs';

const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');

readable.pipe(writable);
That simple!
By default, when all data is transmitted, and the readable emits the ‘end‘ event, the writable stream closes with the 
writable.end
 function.
import * as fs from 'fs';

const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');

writable.on('finish', () => {
  console.log('The end!');
});

readable.pipe(writable);
The end!
This behavior can be changed with the 
{ end: false }
 option.
One note here is that if any error occurs during piping, the writable is not closed automatically, so it might be necessary to track it and close it manually.
Writable stream under the hood
The 
fs.createWriteStream
 is not the only way of making a writable stream. We can create our writable stream to understand it better.
Every writable stream has to implement a 
_write
 method that we call indirectly when we write data to a stream.
import { Writable } from 'stream';

const writable = new Writable();

writable._write = function(chunk, encoding, next) {
  console.log(chunk.toString());
  next();
};

writable.write('Hello world!');
Hello world!
In our simple example, every time we write to the stream, the string is logged to the console. The encoding is a string that might contain the encoding of our data. Calling the next function indicates that the chunk of data is flushed, meaning we finished handling it.
The 
_write
 method can also be declared by passing it to the 
Writable
 constructor, or used by extending the Writable class.
Having all this knowledge, let’s implement a simplified version of a stream that writes data to a file.
import * as fs from 'fs';
import * as util from 'util'
import { Writable } from 'stream';

const writeFile = util.promisify(fs.writeFile);

class WritableFileStream extends Writable {
  path: string;

  constructor(path: string) {
    super();
    this.path = path;
  }

  _write(chunk: any, encoding: string, next: (error?: Error) => void) {
    writeFile(this.path, chunk)
      .then(() => next())
      .catch((error) => next(error));
  }
}

const readable = fs.createReadStream('./file1.txt');
const writable = new WritableFileStream('./file2.txt');

readable.pipe(writable);
In the above example, every time we write to our WritableFileStream, we add the data at the end of a file.
Process streams
In the first part of the series, we mention the global process object. Aside from properties like
process.argv
 and 
process.execPath
 it contains streams that our application can use.

process.stdin

The 
process.stdin
 is a readable stream that gathers the data incoming to our process. Using it we can listen for data in the terminal. As we mention in the previous part of the series, the readable streams have modes, and the stdin stream is in a paused mode by default. To switch the stdin stream to flowing and make application listen for input we need to resume the stdin. It happens under the hood when attaching the ‘data‘ event listener.
let a;
let b;

process.stdin.on('data', (data) => {
  if (a === undefined) {
    a = Number(data.toString());
  } else if (b === undefined) {
    b = Number(data.toString());
    console.log(`${a} + ${b} = ${a + b}`);
  }
});
In the example above, we expect two numbers from the terminal and add them together.
As you can see in the animation above, the process does not exit after two numbers. The above is due to the fact, that the 
process.stdin
stream is still flowing. To fix it, we need to pause it.
let a;
let b;

process.stdin.on('data', (data) => {
  if(a === undefined) {
    a = Number(data.toString());
  } else if(b === undefined) {
    b = Number(data.toString());
    console.log(`${a} + ${b} = ${a + b}`);
    process.stdin.pause();
  }
});

process.stdout and process.stderr

The 
process.stdout
 and 
process.stderr
are writable streams. They are used in the 
console.log()
, and 
console.error()
 and writing to them results in text appearing in the console. We can easily make use of that and, for example, log a file:
import * as fs from 'fs';

const readable = fs.createReadStream('./file1.txt');

readable.pipe(process.stdout);
The streams differ from other Node.js in terms of asynchronicity. For more details check out the documentation.
Summary
In this article, we covered writable streams: how to handle files using them and how to combine them with readable streams thanks to pipes. We also implemented our writable stream for handling files that included writing the 
_write
 function. We also learned how to pass additional data to our process through the 
process.stdin
 stream and what the 
process.stdout
and 
process.stderr
 streams are. This knowledge, combined with readable streams, gives quite a bit of insight into the topic of streams, but there are still some things to be explained in that matter. Stay tuned!
The post Node.js TypeScript #5. Writable streams, pipes, and the process streams appeared first on Marcin Wanago Blog - JavaScript, both frontend and backend.