Node.js Readable Streams

Readable streams are one of those things that are somehow both wildly prolific, yet a complete mystery to so many. If you've been copy-pasta-ing your way through them, this is for you.

Two Modes, Three States

A readable stream is simply an abstraction of a data source that is consumed iteratively. The data source could be anything from an HTTP request to a single file. Typically, we use a plumbing analogy when thinking about them: they can be piped, their flow can be turned off and on, and, if poorly implemented, can turn into a nasty mess 💩.

Streams operate in two modes: paused or flowing. Once turned on, you interface with it through the EventEmitter class. The important thing to know here is that data is not generated until there is some mechanism to handle the data (i.e. it's flowing). For this reason, there are 3 states: paused, flowing, and null.

Let's take the following example:

import { Readable } from "node:stream";

const stream = new Readable({
  read(size) {
    // yada yada yada
  },
});

// no piping or event listeners have been defined yet, so the stream is inactive
console.log(stream.readableFlowing); // null

// assigning a handler gets the stream going
stream.on("data", (chunk) => { ... });
console.log(stream.readableFlowing); // true

// removing the handler or simply pausing the stream stops it stream
stream.pause();
console.log(stream.readableFlowing); // false

Alternative, the readable event emits data, but keeps the stream paused. In this case, the stream has to be manually read:

import { createReadStream } from "node:fs";
import path from "node:path";

const bigOlFile = path.resolve(__dirname, "big-ol-file.txt");
const stream = createReadStream(bigOlFile, { encoding: "utf8" });

stream.on("readable", () => {
  const chunk = stream.read();
  console.log(chunk);
});

console.log(stream.readableFlowing); // false

Async and the Internal Buffer

So far, we've really just been focused on the client, which interfaces with the stream, but there's a whole internal gizmo to be weary of. Just because the client is turning off the tap, doesn't mean the data source is going to call it quits. This is called out in the Node docs:

While readable.readableFlowing is false, data may be accumulating within the stream's internal buffer.

This becomes especially apparent when you're asynchronous handling data:

import { createReadStream } from "node:fs";

const stream = createReadStream("path/to/file.txt", { encoding: "utf8" });

console.log("this logs first!");

stream.on("data", async (chunk) => {
  // classic "wait"
  await new Promise((resolve) => setTimeout(resolve, 1000));
  console.log("this logs last!");
});

stream.on("end", () => console.log("this logs second!"));

huh? So here's the breakdown: though we're waiting for some data-processing functionality to resolve, the internal buffer is still trucking along. This is classic JavaScript. The stream is emitting data events, and the client responds by throwing some async code into the heap. Presuming the stream takes less than a second to process, it will emit a end event before the async code resolves.

So WTF are we to do? You have two options, save the data inside some massive variable and resolve it once the stream is complete... which seems more like a lake and less like a stream. Or, use an async iterator:

import { createReadStream } from "node:fs";

const stream = createReadStream("path/to/file.txt", { encoding: "utf8" });

for await (const chunk of stream) {
  console.log("still going!");
}

console.log("the end!");

Conclusion

That's pretty much the extent of it. The next step would be diving into pipes, but we'll save that for another day.

If you're hungry for more and want to see it in action, then pull down this interactive demo and let it rip.