Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at CFUNITED 2009 (Lansdowne, VA) with: Randy Brown
Ben Nadel at CFUNITED 2009 (Lansdowne, VA) with: Randy Brown

You Have To Explicitly End Streams After Pipes Break In Node.js

By Ben Nadel on

I've spent the last few months learning about Node.js in an effort to figure out why my Gulp.js script doesn't work (I still don't know). This has become quite the Herculean task, winding my way down the rabbit hole of Node.js streams. This morning, I wanted to look at one interesting behavior - the fact that you have to explicitly end your streams after your pipes break.


 
 
 

 
 
 
 
 

Last month, I started to look at how error events affect Node.js pipes. I demonstrated that error events don't stop streams, and they only affect the piped-status of connected streams. As such, it shouldn't be surprising that you have to explicitly end streams after pipes break; but, this point has been clouded in my mind thanks to the enormous learning curve of Node.js streams, and a number of misleading examples.

As I've been trying to debug my Gulp.js streams, I've come across a number of examples that look like this:

  • someStream.on( "error", function( error ) {
  •  
  • this.emit( "end" );
  •  
  • });

At first glance, this looks kind of right. But, the problem is that the approach doesn't fulfill the intent. The intent is tell other interested parties (ex, other streams) that the current stream has ended. But, the reality is, the stream hasn't ended - you've just told people it has. The stream will only end when you call .end() on it.

In a pipe context, this miscommunication becomes even more murky because the wrong approach almost looks like it works. But, it will fail in interesting ways. To see this in action, check out the video above.

Now, it doesn't seem that there is any rule of thumb that says you should end a stream just because an error occurred. After all, there's no reason that a stream (aka, an EventEmitter) can't emit more than one error. As such, the appropriate reaction to an error must depend on your particular context. That said, if an error causes streams to be unpiped, you'll probably want to clean that up.

To explore this concept, I've put together a small demo that pipes three streams together:

  • Readable - FriendStream
  • Transform - ComplimentStream
  • Writable - JournalStream

I used these three types of Node.js streams as it mirrors the way Gulp.js works with source streams, Transform streams, and destination streams. In my demo, the compliment stream emits an error half way through the transformations. This causes an unpipe event which has to be dealt with:

  • // Require module references.
  • var stream = require( "stream" );
  • var util = require( "util" );
  • var chalk = require( "chalk" );
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // I am a SOURCE stream, providing a stream of Friends, in object mode.
  • function FriendStream() {
  •  
  • stream.Readable.call(
  • this,
  • {
  • objectMode: true
  • }
  • );
  •  
  • this._friends = [ "Kim", "Sarah", "Kit", "Tricia", "Libby", "Joanna" ];
  •  
  • }
  •  
  • util.inherits( FriendStream, stream.Readable );
  •  
  •  
  • // I read data out of the underlying source and push it only the underlying buffer.
  • FriendStream.prototype._read = function( size ) {
  •  
  • // While we still have Friends, and the buffer is not full, keep pushing friends.
  • while ( this._friends.length && size-- ) {
  •  
  • if ( this.push( this._friends.shift() ) === null ) {
  •  
  • break;
  •  
  • }
  •  
  • }
  •  
  • // If we have no more friends, end the stream.
  • if ( ! this._friends.length ) {
  •  
  • this.push( null );
  •  
  • }
  •  
  • };
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // I am a TRANSFORM stream, decorating each friend with a compliment, in object mode.
  • function ComplimentStream() {
  •  
  • stream.Transform.call(
  • this,
  • {
  • objectMode: true
  • }
  • );
  •  
  • }
  •  
  • util.inherits( ComplimentStream, stream.Transform );
  •  
  •  
  • // I transform the input chunk to the output chunk.
  • ComplimentStream.prototype._transform = function( friend, isEncoded, getNextChunk ) {
  •  
  • // Issuing an error for the exploration.
  • if ( friend === "Kit" ) {
  •  
  • return( getNextChunk( new Error( "No Kits allowed!" ) ) );
  •  
  • }
  •  
  • this.push( friend + ", you are awesome!" );
  •  
  • getNextChunk();
  •  
  • };
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // I am a DESTINATION stream, keeping track of journal line items, in object mode.
  • function JournalStream() {
  •  
  • stream.Writable.call(
  • this,
  • {
  • objectMode: true
  • }
  • );
  •  
  • this._entries = [];
  •  
  • }
  •  
  • util.inherits( JournalStream, stream.Writable );
  •  
  •  
  • // I write the given entry to the internal journal.
  • JournalStream.prototype._write = function( entry, encoding, done ) {
  •  
  • this._entries.push( entry );
  •  
  • done();
  •  
  • };
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // Create a new instance of our compliment stream (ie, our TRANSFORM stream). This acts
  • // as both a Writable and a Readable stream.
  • var complimentStream = new ComplimentStream()
  • .on(
  • "unpipe",
  • function handleUnpipeEvent( source ) {
  •  
  • console.log( chalk.bgYellow( "FriendStream unpiped from ComplimentStream." ) );
  •  
  • }
  • )
  • .on(
  • "error",
  • function handleErrorEvent( error ) {
  •  
  • console.log( chalk.red( "Compliment error:", error ) );
  •  
  • // When the compliment stream raises an error, the FriendStream is
  • // automatically going to unpipe itself from this [ComplimentStream] stream.
  • // That's all that Node.js does in the event of an error in a pipe-context.
  • // The stream itself is still left open. But, since we know that no more
  • // friends are going to be written, we have to explicitly END the Writable
  • // aspect of the Transform stream.
  • // --
  • // NOTE: Sometimes you see people "emit" an "end" event here. That is the
  • // wrong approach as it signals the end of the stream _without_ actually
  • // ending it, which is poor implementation of intent.
  • this.end();
  •  
  • }
  • )
  • ;
  •  
  • // Create our streams and pipe them : FRIENDS -> COMPLIMENT -> JOURNAL.
  • var journalStream = new FriendStream()
  • .pipe( complimentStream )
  • .pipe( new JournalStream() )
  • ;
  •  
  • // When the DESTINATION stream is finished, log the state of the journal entries.
  • journalStream.on(
  • "finish",
  • function handleEndEvent() {
  •  
  • console.log( chalk.cyan( "Stream finished." ) );
  • console.dir( this._entries );
  •  
  • }
  • );

As you can see, the ComplimentStream will emit an error if one of the inputs is "Kit." And, when we run the above Node.js code, we get the following terminal output:

node test.js
FriendStream unpiped from ComplimentStream.
Compliment error: Error: No Kits allowed!
Stream finished.
[ 'Kim, you are awesome!', 'Sarah, you are awesome!' ]

Now, if you had tried to emit("end") instead of calling end(), you would have gotten a very different result; and, one that is not necessarily consistent depending on settings.

For you Node.js developers, this is probably a "yeah, duh" moment - of course you have to end streams. But, for me, building a solid and consistent mental model for Node.js streams has been an uphill battle. Every time I think I have a decent handle on it, I find myself making mistakes and uncovering false assumptions.




Reader Comments

Why not both? Emit an "end" and then .end() it. As Bill Cosby once quipped: "First you say it, then you do it."

Reply to this Comment

@JC,

I like the quote :) I think you won't need end. If you call .end(), the "end" event should be emitted automatically.

Reply to this Comment

This post is rather old, but by now I'm sure that you know that its not you, its them. The reason why its hard to build solid and consistent mental of node streams is that because there isn't any. They are simply incredibly quirky and inconsistent, and only a few people have a complete mental picture of all the weirdness.

I'm betting it doesn't have to be that way, though. I started collecting a list of all the issues here: https://github.com/spion/promise-streams/issues/8 and I'm hoping that once we have all of them collected, a proper redesign that works much better will emerge. I'll try to add all your observations from this article and any others you may have - thanks for that!

Reply to this Comment

What would you recommend to do if you want the stream to continue after unpiping? I would just want the output to include "Tricia", "Libby" and "Joanna" as well, so a simple continue if you would call it that.

Was looking into domains as a solution but these are deprecated right now so that does not seem like a great idea...

More ontopic, thanks for this great post, cleared up some things for me!

Reply to this Comment

@Ben,

Regarding this.end and this.emit('end'), I have experienced this.end() not always triggering the end event. I did both and everything seeemed to work. Thanks for your posts, the general docs for error handling in streams is woeful.

Reply to this Comment

@Ryan

If you have a transform stream and you call .end() then it should emit the finish event once it's piped all it's data.

In the case the stream you were piping to has died then it may be stuck with data in it's buffer after calling .end(), and therefore it does not emit the 'finish' event. A workaround for transform streams I have used in the past is:

const PassThrough = require('stream').PassThrough
...
this.end();
this.pipe(new PassThrough());

pretty ugly but it seems to work ok when emitting an 'error' even is unsuitable.

Reply to this Comment

Hi Ben,

Quite some time since, but how about all those examples that use `process.exit(1)` in the onError handler. Does it do the same thing as `this.end()` ?

I'm having a case where I pipe browserify output into uglifyjs (on cmd) and the command exits with 0 even if there is an error in the browserify part...

Reply to this Comment

Post A Comment

You — Get Out Of My Dreams, Get Into My Comments
Live in the Now
Oops!
Comment Etiquette: Please do not post spam. Please keep the comments on-topic. Please do not post unrelated questions or large chunks of code. And, above all, please be nice to each other - we're trying to have a good conversation here.