Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at TechCrunch Disrupt (New York, NY) with: Aaron Foss and Mark C. Webster
Ben Nadel at TechCrunch Disrupt (New York, NY) with: Aaron Foss@aaronfoss ) and Mark C. Webster@markcwebster )

Exposing Promise / Deferred Functionality On Streams In Node.js

By Ben Nadel on

As I've been diving into Node.js, I'm starting to see that Streams will often live side-by-side with Promises (I use the Q library for promises). Some of those streams make sense as streams; but, some of those streams might make more sense as promises. Or, at least, they might be easier to consume as promises. As such, I wanted to experiment with exposing promises on streams that will translate various stream events into deferred value resolution and rejection.


 
 
 

 
 
 
 
 

When thinking about this, the hardest part is mapping something that is so discrete - Promises - onto something that is so flexible - Streams. With promises, there are only a few distinct states that can exist. With streams, on the other hand, the matter is much more fluid. Take an "error" event, for example. A deferred value can only be rejected once. But, a stream can emit any number of "error" events and still continue to work properly. As such, I don't believe there is a good way to generically map streams onto promises; rather, such a mapping may only make sense on a per-case basis.

The other hard part about this was determining how much one feature should overpower the other. Meaning, I think people really enjoy using streams; so, I didn't want to create a deferred value that fully encapsulated the stream. Instead, what I wanted to do was create a stream that exposes "thenable" functionality. This way, the developer could leverage the stream when it makes sense, such as when piping one stream into multiple destinations; and, they could leverage the exposed promise when it makes sense.

NOTE: I know there are very likely to be existing solutions that handle this in a "generic" way. But, I wanted a chance to noodle on the matter a bit before I looked for other solutions.

To explore this sexy cohabitation, I created a writable stream subclass that aggregates its write-content and then emits a "content" event once it is finished. This "content" event also corresponds to a promise resolution, which is "thenable" directly on the stream instance. To demonstrate this, I've created a super simple HTTP server that does nothing but pipe the incoming request into this writable stream and then echoes back the content (or the error):

  • // Require our core node modules.
  • var http = require( "http" );
  • var util = require( "util" );
  •  
  • // Require our core application modules.
  • var createContentStream = require( "./content-stream" ).createWriteStream;
  •  
  •  
  • // ----------------------------------------------------------------------------------- //
  • // ----------------------------------------------------------------------------------- //
  •  
  •  
  • // Create a simple HTTP server so that we can aggregate incoming request content.
  • var httpServer = http.createServer(
  • function handleHttpRequest( request, response ) {
  •  
  • // Pipe the request into a ContentStream, which will aggregate the request
  • // body and expose the content through a promise value.
  • // --
  • // NOTE: the ContentStream has a ".promise" property; but, as a convenience, it
  • // also directly exposes ".then()", ".catch()", and ".finally()" methods which
  • // are bound to the underlying promise.
  • request
  • .pipe( createContentStream() ) // <-- Returns stream, exposes .then().
  • .on(
  • "error",
  • function handleErrorEvent( error ) {
  •  
  • // NOTE: This is just here to demonstrate that I can listen for error
  • // events directly on the stream (N possible events); or, I can use
  • // the .catch() method (1 possible event); or, both.
  • console.log( "Content error:", util.inspect( error ) );
  •  
  • }
  • )
  • .then(
  • function handleResolve( content ) {
  •  
  • response.write( "CONTENT: " );
  • response.write( content );
  • response.write( "\n" );
  •  
  • }
  • )
  • .catch(
  • function handleReject( error ) {
  •  
  • response.write( "ERROR: " );
  • response.write( util.inspect( error ) );
  • response.write( "\n" );
  •  
  • }
  • )
  • .finally(
  • function handleDone() {
  •  
  • // No matter what happens, close the response.
  • response.end( "Fin!" );
  •  
  • }
  • )
  • ;
  •  
  • }
  • );
  •  
  • httpServer.listen( 8080 );
  •  
  • console.log( "Node server listening on port 8080." );

As you can see, the createContentStream() function returns a stream that exposes both the Event Emitter's event binding methods (ex, .on()) as well as the promise-oriented methods, .then(), .catch(), and .finally(). Once the .then() method is called, we drop down into a promise workflow, which means that the returned value is the promise, not the stream. But, if we were to maintain a reference to the stream itself, we could have continued to bind other events that don't necessarily fit nicely into a resolve/reject/notify model.

Now, let's take a look at the content-stream.js module. Like I said before, it's a subclass of the Writable stream; but, it also exposes a .promise property. And, for convenience, the .then(), .catch(), and .finally() methods are also exposed directly on the stream itself, though they are bound to the promise for proper execution:

  • // Require our core node modules.
  • var Buffer = require( "buffer" ).Buffer;
  • var Q = require( "q" );
  • var stream = require( "stream" );
  • var util = require( "util" );
  •  
  •  
  • // ----------------------------------------------------------------------------------- //
  • // ----------------------------------------------------------------------------------- //
  •  
  •  
  • // I am a factory function for creating writable content streams. If maxContentLength
  • // is omitted, the content stream will use the default value.
  • exports.createWriteStream = function( maxContentLength ) {
  •  
  • return( new ContentStream( maxContentLength ) );
  •  
  • };
  •  
  •  
  • // I am a writable stream that accumulates content across writes and emits a "content"
  • // event once the entirety of the content has been aggregated. The content is emitted
  • // as a UTF-8 encoded value (for now).
  • function ContentStream( newMaxContentLength ) {
  •  
  • // Call the super constructor.
  • stream.Writable.call( this );
  •  
  • // I am the max length that the aggregated content size can be. If the max size is
  • // exceeded, an error is emitted.
  • this.maxContentLength = ( newMaxContentLength || 65536 ); // 64Kb.
  •  
  • // I hold the running sum of buffer lengths being aggregated internally. I am used
  • // to validate the max content length.
  • this._contentLength = 0 ;
  •  
  • // I hold the individual chunks across writes so that we don't have to concat all
  • // the chunk values until the stream is finished.
  • this._buffers = [];
  •  
  • // I am the deferred value representation of the stream state.
  • // --
  • // NOTE: The stream and the deferred value are only linked because we say they are
  • // linked. As such, it's up to us to determine the interplay between stream events
  • // and the deferred value.
  • this._deferred = Q.defer();
  •  
  • // Expose the promise on the stream.
  • var promise = this.promise = this._deferred.promise;
  •  
  • // Expose the promise methods on the stream itself (as a convenience). Since the
  • // Promise is implemented using prototypal inheritance (as opposed to relying on
  • // lexical binding), we have to bind the method references back to the promise.
  • // --
  • // NOTE: This makes the stream a "thenable" object.
  • this.then = promise.then.bind( promise );
  • this.catch = promise.catch.bind( promise );
  • this.finally = promise.finally.bind( promise );
  •  
  • // When the stream is closed, and the finish event is emitted; at that point, we can
  • // flatten all of the aggregated buffers.
  • this.once( "finish", this._handleFinishEvent );
  •  
  • // When the content event is emitted, we can use that resolve the deferred value.
  • // --
  • // NOTE: We are using .once() since we can only resolve the value once.
  • this.once( "content", this._deferred.resolve );
  •  
  • // If anything goes wrong, the stream will emit an error, which we can use to reject
  • // the deferred value.
  • // --
  • // CAUTION: Since we are binding to the error event, it will be sufficient to stop
  • // the error event from bubbling up as an exception. However, any existing "pipe"
  • // will be automatically unpiped due to the default stream behavior. That said, we
  • // are using .on() (instead of .once()) so that we will catch more than one error
  • // event. Just because we reject the deferred value, it doesn't mean that we want to
  • // start letting subsequent error events go unhandled.
  • this.on( "error", this._deferred.reject );
  •  
  • }
  •  
  • util.inherits( ContentStream, stream.Writable );
  •  
  •  
  • // ---
  • // PRIVATE METHODS.
  • // ---
  •  
  •  
  • // I handle the finish event emitted on the stream, which is emitted once the write
  • // stream has been closed.
  • ContentStream.prototype._handleFinishEvent = function() {
  •  
  • // Collapse all the buffers into a single string value.
  • var content = Buffer.concat( this._buffers ).toString( "utf-8" );
  •  
  • this.emit( "content", content );
  •  
  • };
  •  
  •  
  • // I consume the chunk of data being written to the stream.
  • ContentStream.prototype._write = function( chunk, encoding, chunkConsumed ) {
  •  
  • // The stream and the underlying deferred value are not inherently linked. As such,
  • // there's nothing that will stop the stream from accepting writes just because the
  • // deferred value has been resolved or rejected. As such, we have to reject any write
  • // that is executed after the deferred value is no longer in a pending state.
  • if ( ! this.promise.isPending() ) {
  •  
  • return( chunkConsumed( new Error( "Stream is no longer pending." ) ) );
  •  
  • }
  •  
  • // Check to see if the incoming chunk puts the accumulated content length over
  • // the max allowed length. If so, pass-through an error (which will lead to an
  • // error event being emitted, which will lead to our deferred value being rejected).
  • if ( ( this._contentLength += chunk.length ) > this.maxContentLength ) {
  •  
  • return( chunkConsumed( new Error( "Content too large." ) ) );
  •  
  • }
  •  
  • this._buffers.push( chunk );
  •  
  • chunkConsumed();
  •  
  • };

I tried to keep the stream and the deferred functionality as separated as I could. I use stream events to resolve and reject the deferred value rather than integrating that logic too deeply within the private functions of the stream. But, as you can see, the separation is not perfect - because a deferred value has finite states, whereas a stream has infinite states, I had to add deferred logic into the ._write() method (where it rejects the write if the deferred value has already been resolved). This decision is arbitrary. Meaning, from a technical standpoint, there's no reason that the stream couldn't continue to accept writes even after the deferred value had been resolved (or more likely, rejected). But, from a philosophical standpoint, I've purposefully created tighter coupling between the stream and the deferred value.

NOTE: It's this arbitrary co-dependence between the stream and the deferred value that I think makes it funky to think about making streams generically "thenable." While one stream might need to reject post-rejection writes, another stream may be perfectly happy accepting post-rejection writes.

Now, if I open up the Postman REST Client (a Google Chrome extension), and post to the Node.js HTTP server, you can see that I get my content echoed back to me:


 
 
 

 
 Making streams  
 
 
 

Note that the incoming content is being echoed back in the .then() resolution handler of the "deferred stream."

I definitely "like" streams; but, I'm absolutely in "love" with promises. And, half the time I use streams, I think what I really want is a promise anyway. It's nice to see that you can make a stream "thenable" by exposing deferred and promise functionality on the stream. But, at this point, I'm not sure if this is exactly the approach that I want to take. I'll have to keep on experimenting with various configurations.




Reader Comments

I respect the idea, promises seem like an awesome api, however this kind of defeats the purpose of having a stream in the first place. Streams are more akin to lazy evaluation where you want to modify the data as you get it and only use what you need at any point in time. While, if you are waiting for all of the buffers to come in, then concatting all of them togethor, then converting all of them to utf8, we miss out on what is so nice in regards to streams in the first place.

I definitely think there is room for improvement in the streams api as I find myself destroying them quite regularly whenever there is a parsing error but I don't think this is the way to go

Reply to this Comment

@Sam,

I don't mean to say that Promises should replace the stream API. Streams are very cool. But, there are definitely situations where streams, in reality, have a much more finite state (in a meaningful sense). For example, imagine that your incoming request stream contains a JSON payload. Sure, you could probably pipe that into an event-based JSON parser, that emits key-value pairs; but, in all likelihood, it would be much simpler to just buffer the request and parse the JSON. Pseudo code:

request.pipe( contentBufferStream )
.then( parseBodyAsJSON )
.then( doSomethingWithParsedValue )
.catch( handleErrors );

Now, you may look at this and think that if you need to parse the body, just entirely wrap the request inside of a method that returns a Promise. And, you might be right - that might just be the way to go, in general, if you want a Promise API.

Since I haven't had a chance to write production-node code yet, this is mostly R&D, in preparation for contributing to our production code.

Reply to this Comment

TBH, I just recently did just that. I Concatenated the buffers and Parsed it, so I will definitely show humility in regards to these types of common use cases where a JSON Object is just wanted or something else which needs the whole buffer is necessary. This type of use case also applies to minimifiers, though I haven't spent enough time to try and figure out if they can be streamed.

Off this point, There is the Stream.Transform with object mode on. However, tbh, at that point your basically just making it a promise anyway when you only expect one peice of data.

readable.pipe(ObjectModeTransform)
.on("data", doSomethingWithTheObject)
.on("error",handleTheError)

readable.pipe(BufferedWriter)
.then(JSONParse)
.then(doSomethingWithTheObject)
.catch(handleTheError)

I actually prefer that extra step also just because I know all of the overhead that I'm going to be running into. The more verbose the flow is, the more we know all the operations involved rather than just a Black Box that "does stuff". However, its clean enough to easily read

Thanks for the response ^__^

Reply to this Comment

I apologize for being spammy, However I cannot edit my comments

On second thought, for arbitrarilly large objects or objects that hold large arrays and etc. We more than likely will only want a few kvs, so buffering it may give us much more than we expect, and or allow for an attack. That being said, we can always create a limit to the buffering and throw it to the catch statement.

Also if there are more kvs than we expect, that may also turn to a type of attack rather than only getting exactly what we want. This can be handled elsewhere (for example in a validator) if we are attempting to store it to the database or allow the object to be handled by multiple flows. Also, we may be storing more than we desire (but this again can be handled by vaidation). However, instead of

var ob = {};
var reqprops = validator.requiredProperties();
readable.pipe(getKVs()).on("kv",function(key, value){
if(!validator[key].test(value)){
return destroyAndDoError("not valid",readable);
//Can exit before the buffer is fully read
}
if(key in reqprops) delete reqprops[key]
ob[key] = value;
}).on("end",function(){
//Can check if there are missing based on what exsisted before
if(Object.keys(reqprops).length > 0) return doError();
storeIt(ob); //Happens nearly right after the stream is finished
});

we

readable
.pipe(BufferIt) //What for the stream to be finished
.then(parseIt) //If an error happens early, we've wasted the previous step
.then(validateIt) //If an error happens early, we've wasted 2 steps
.then(storeIt) //On success
.catch(doError) //This is my favorite part about promises, one error handler instead of doing it on every asynchronous function

Except one is delightfully pretty, and the other is... well... not so much

Perhaps something along the lines of

var reqprops = validator.requiredProperties();
readable.pipe(getKVs({},function mykvHandler(obj,key,value){
if(!validator[key].test(value)) throw "not valid"; //will stop midway
if(key in reqprops) delete reqprops[key]
ob[key] = value;
})).then(function(obj){
if(Object.keys(reqprops).length > 0) throw "missing required";
return obj;
}).then(storeit).catch(doError);

Which still allows for a clean-ish api. It's difficult to say. How do you maximize speed while not making people feel like they have to hack their way to complete it.

Reply to this Comment

@Sam,

Dealing with an "attack" is an interesting problem. In my code in the post, you'll see that as I buffer the incoming request, I'm also checking to see how many bytes I've buffered; and, if I've buffered more than the "max" allowable (based on the constructor configuration), I throw an error.

But, it's an interesting problem indeed. I was watching a video a while back from Douglas Crockford and he was talking about how he had a client that needed to parse a JSON value that was over 2Gb in size! Bananas, right?! Probably, you'll never need a user to POST/PUT a JSON value that large, so it's probably reasonable to set a max-post-size that several orders of magnitude smaller.

Also, I'm with you in that having the .catch() at the end of the promise chain is just joyful to work with. In my ColdFusion code (which is very much synchronous), I am use to wrapping my code in a large try/catch block and then using errors to stop the control-flow, dealing with them [typically] in a Catch block. So, this promise approach makes me feel very comfortable.

Reply to this Comment

Post A Comment

You — Get Out Of My Dreams, Get Into My Comments
Live in the Now
Oops!
Comment Etiquette: Please do not post spam. Please keep the comments on-topic. Please do not post unrelated questions or large chunks of code. And, above all, please be nice to each other - we're trying to have a good conversation here.