Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at dev.Objective() 2015 (Bloomington, MN) with: Ryan Vikander
Ben Nadel at dev.Objective() 2015 (Bloomington, MN) with: Ryan Vikander@rvikander )

How Error Events Affect Piped Streams In Node.js

By Ben Nadel on

Over the weekend, I demonstrated that "error" events don't have any inherent affect on how individual Streams work in Node.js. In that post, I stressed that I was talking about "individual" streams because multi-stream workflows, that use .pipe(), are somewhat affected by "error" events. The "error" still doesn't affect the individual streams; but, Node.js will unpipe the streams depending on the source of the error.


 
 
 

 
 
 
 
 

If you .pipe() one stream into another, error events emitted from the source stream have no bearing on the workflow (unless handled explicitly by the developer). The only error events that have any affect are those emitted by the target / destination stream. If the target stream emits an error, the source stream will disconnect from it (ie, .unpipe() itself from the target stream).

That said, as we've seen before, other than the unpipe response, the error event has no bearing on either stream. This means that after the streams are disconnected, they continue to function normally on their own. To see this in action, I'm going to take one source stream and pipe it into two different target streams. One of the target streams will emit an error event which will cause an .unpipe(); but, the code will demonstrate that all three streams (source + 2 targets) still work like healthy streams.

  • // Include module references.
  • var stream = require( "stream" );
  • var util = require( "util" );
  • var chalk = require( "chalk" );
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // I am a reabable stream in object-mode.
  • function Source() {
  •  
  • stream.Readable.call(
  • this,
  • {
  • objectMode: true
  • }
  • );
  •  
  • this._source = [ "What", "it", "be", "like?" ];
  •  
  • }
  •  
  • util.inherits( Source, stream.Readable );
  •  
  • Source.prototype._read = function( sizeIsIgnored ) {
  •  
  • // Emit an error every time we're asked to read data from the underlying source.
  • // --
  • // NOTE: You would never want to do this - I am only doing this to
  • // demonstrate the interplay between Readable streams and error events.
  • this.emit( "error", new Error( "StreamError" ) );
  •  
  • while ( this._source.length ) {
  •  
  • if ( ! this.push( this._source.shift() ) ) {
  •  
  • break;
  •  
  • }
  •  
  • }
  •  
  • if ( ! this._source.length ) {
  •  
  • this.push( null );
  •  
  • }
  •  
  • };
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // I am a writable stream in object-mode that may or may not emit errors (based on
  • // the instantiation arguments).
  • function Target( doEmitError ) {
  •  
  • stream.Writable.call(
  • this,
  • {
  • objectMode: true
  • }
  • );
  •  
  • this._emitError = ( doEmitError === true );
  •  
  • this._buffer = "";
  •  
  • this.on(
  • "finish",
  • function handleFinish() {
  •  
  • this.emit( "debug", this._buffer );
  •  
  • }
  • );
  •  
  • }
  •  
  • util.inherits( Target, stream.Writable );
  •  
  • Target.prototype._write = function( chunk, encoding, writeDone ) {
  •  
  • this._buffer += ( chunk + " " );
  •  
  • // Emit an error every time we go to write data into the running buffer.
  • // --
  • // NOTE: You would never want to do this - I am only doing this to
  • // demonstrate the interplay between Readable streams and error events.
  • if ( this._emitError ) {
  •  
  • this.emit( "error", new Error( "StreamError (" + chunk + ")" ) );
  •  
  • }
  •  
  • writeDone();
  •  
  • };
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // Create an instance of our readable stream.
  • var source = new Source();
  • var unsafeTarget = new Target( true );
  • var safeTarget = new Target( false );
  •  
  • // Debug errors on the source.
  • source.on(
  • "error",
  • function handleSourceError( error ) {
  •  
  • console.log( chalk.magenta( "Source error:", error.message ) );
  •  
  • }
  • );
  •  
  • // Debug errors on the target.
  • // --
  • // NOTE: We're only doing this for one of the targets since we know that the
  • // "safeTarget" will not emit any errors in this demo.
  • unsafeTarget.on(
  • "error",
  • function handleSourceError( error ) {
  •  
  • console.log( chalk.cyan( "Target error:", error.message ) );
  •  
  • }
  • );
  •  
  • // When the target emits the error, the source is going to disconnect itself from
  • // the destination.
  • unsafeTarget.on(
  • "unpipe",
  • function handleTargetUnpipe( stream ) {
  •  
  • console.log( chalk.yellow( "Unpiped source:", ( stream === source ) ) );
  •  
  • // At this point, the two streams have been disconnected. BUT, the two streams
  • // should continue to function 100% correctly. The errors have done nothing but
  • // interrupted the pipe-connection. As such, we can still write to the target.
  • this.write( "Written after pipe-break." );
  • this.end( "Ended." );
  •  
  • }
  • );
  •  
  • // Debug the state of the buffer when the UNSAFE target ends.
  • unsafeTarget.on(
  • "debug",
  • function handleUnsafeTargetDebug( buffer ) {
  •  
  • console.log( chalk.green( "Unsafe Target Buffer:", buffer ) );
  •  
  • }
  • );
  •  
  • // Debug the state of the buffer when the SAFE target ends.
  • safeTarget.on(
  • "debug",
  • function handleSafeTargetDebug( buffer ) {
  •  
  • console.log( chalk.green( "Safe Target Buffer:", buffer ) );
  •  
  • }
  • );
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // Pipe the source into both the unsafe and safe targets.
  • source.pipe( unsafeTarget );
  • source.pipe( safeTarget );

Since the code is all event-driven, it's a little hard to follow - I suggest you watch the video. But, when we run the above code, we get the following terminal output:

bens-imac:pipe ben$ node test.js
Source error: StreamError
Unpiped source: true
Target error: StreamError (What)
Target error: StreamError (Written after pipe-break.)
Target error: StreamError (Ended.)
Unsafe Target Buffer: What Written after pipe-break. Ended.
Safe Target Buffer: it be like?

Let's try to break this down, line by line, so we can see how the error events are affecting the individual streams as well as the stream interactions.

Source error: StreamError

This is the source stream emitting an error when populating the underlying stream buffer. This has no affect at all - not on the source stream and not on the stream pipes.

Unpiped source: true

This is the source stream reacting to the error event in the unsafeTarget .write() method. It [the source] is unpiping itself from the unsafeTarget. However, the first chunk of data was still written to the unsafeTarget buffer since the error was emitted as part of the write-action.

Target error: StreamError (What)

This is that first chunk getting written to the unsafeTarget buffer, before the .unpipe() call has any affect.

Target error: StreamError (Written after pipe-break.)
Target error: StreamError (Ended.)

Inside the "unpipe" event on the unsafeTarget, we make two more explicit writes to the unsafeTarget stream. This is to demonstrate that the unsafeTarget stream continues to function properly even after the "error" event and the "unpipe" event.

Unsafe Target Buffer: What Written after pipe-break. Ended.
Safe Target Buffer: it be like?

This is the "debug" event on both target streams that outputs the aggregated buffer. As you can see, the source continued to pipe data into the safeTarget even after it [source] was disconnected from the unsafeTarget. Furthermore, the unsafeTarget was able to continue accepting writes after being disconnected from the source.

What we're seeing here is that "error" events, in Node.js, will disconnect piped-streams. However, the streams in question will continue to work properly. Or perhaps more importantly, will remain open. This means that after streams are unpiped, you probably have to end them explicitly in your error handling.




Reader Comments

Hi

I have maybe a stupid comment: how it happens that safe target buffer got: "it be like?" Why "what" was cut ? I checked something like: var s = createReadStream("input.txt") and have it piped twice ( s.pipe(fs.createWriteStream("1.txt"); s.pipe(fs.createWriteStream("2.txt")) and both 1.txt and 2.txt holds an exact copy of input so it looks like a read stream can be piped to multiple write streams. Why in you case it does not work ?

Just wasted hours with a bug caused by this strange behavior.
I think it's not particularly intuitive and helpful that the streams are unpiped.
Is there any profound reason I'm overlooking? ^^

@Adrian,

It would just be guess on my part. I suppose the idea might be that if the destination stream throws an error, the assumption is that all subsequent writes to that stream would also throw an error; so, the unpipe prevents that from happening. I wonder if you could do some sort of re-pipe action upon unpipe... like an exponential back-off when trying to communicate with an API. Hmmm, gives me something to think about.

@Jmilkiewicz,

That's a really good question. I am not entirely sure. I assume that the error raised by the Unsafe pipe stopped that particular chunk from being written to the other streams (in this case, the Safe stream). I'll see if I can isolate that condition in a test case.

@Jmilkiewicz,

It looks like that was a bug in Node. I say that because I upgraded my version of node locally and re-ran this script and all the other pipes gets the correct data. When I run it under *v0.12.2*, the last stream reads:

> Safe Target Buffer: What it be like?

... where as in my blog post here, it read:

> Safe Target Buffer: it be like?

So, it seems to have been fixed in recent releases of node.