Node Readable Stream
[ View accompanying code on Github ]Node.js Readable Streams
Readable streams are one of those things that are somehow both wildly prolific, yet a complete mystery to so many. If you’ve been copy-pasta-ing your way through them, this is for you.
Two Modes, Three States
A readable stream is simply an abstraction of a data source that is consumed iteratively. The data source could be anything from an HTTP request to a single file. Typically, we use a plumbing analogy when thinking about them: they can be piped, their flow can be turned off and on, and, if poorly implemented, can turn into a nasty mess đź’©.
Streams operate in two modes: paused
or flowing
. Once turned on, you
interface with it through the
EventEmitter class. The
important thing to know here is that data is not generated until there is some
mechanism to handle the data (i.e. it’s flowing). For this reason, there are 3
states: paused
, flowing
, and null
.
Let’s take the following example:
import { Readable } from "node:stream";
const stream = new Readable({
read(size) {
// yada yada yada
},
});
// no piping or event listeners have been defined yet, so the stream is inactive
console.log(stream.readableFlowing); // null
// assigning a handler gets the stream going
stream.on("data", (chunk) => { ... });
console.log(stream.readableFlowing); // true
// removing the handler or simply pausing the stream stops it stream
stream.pause();
console.log(stream.readableFlowing); // false
Alternative, the readable
event emits data, but keeps the stream paused. In
this case, the stream has to be manually read:
import { createReadStream } from "node:fs";
import path from "node:path";
const bigOlFile = path.resolve(__dirname, "big-ol-file.txt");
const stream = createReadStream(bigOlFile, { encoding: "utf8" });
stream.on("readable", () => {
const chunk = stream.read();
console.log(chunk);
});
console.log(stream.readableFlowing); // false
Async and the Internal Buffer
So far, we’ve really just been focused on the client, which interfaces with the stream, but there’s a whole internal gizmo to be weary of. Just because the client is turning off the tap, doesn’t mean the data source is going to call it quits. This is called out in the Node docs:
While readable.readableFlowing is false, data may be accumulating within the stream’s internal buffer.
This becomes especially apparent when you’re asynchronous handling data:
import { createReadStream } from "node:fs";
const stream = createReadStream("path/to/file.txt", { encoding: "utf8" });
console.log("this logs first!");
stream.on("data", async (chunk) => {
// classic "wait"
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log("this logs last!");
});
stream.on("end", () => console.log("this logs second!"));
huh? So here’s the breakdown: though we’re waiting for some data-processing
functionality to resolve, the internal buffer is still trucking along. This is
classic JavaScript. The stream is emitting data
events, and the client
responds by throwing some async code into the heap. Presuming the stream takes
less than a second to process, it will emit a end
event before the async code
resolves.
So WTF are we to do? You have two options, save the data inside some massive variable and resolve it once the stream is complete… which seems more like a lake and less like a stream. Or, use an async iterator:
import { createReadStream } from "node:fs";
const stream = createReadStream("path/to/file.txt", { encoding: "utf8" });
for await (const chunk of stream) {
console.log("still going!");
}
console.log("the end!");
Conclusion
That’s pretty much the extent of it. The next step would be diving into pipes, but we’ll save that for another day.
If you’re hungry for more and want to see it in action, then pull down this interactive demo and let it rip.