Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at CFinNC 2009 (Raleigh, North Carolina) with: Critter Gewlas
Ben Nadel at CFinNC 2009 (Raleigh, North Carolina) with: Critter Gewlas@Critter )

Turning Buffers Into Readable Streams In Node.js

By Ben Nadel on

So far, I've been digging deep into error handling in Node.js streams, trying to build a more robust mental mode. And, as I've done that, I've mostly created streams that were completely self-contained in terms of "state". As a new experiment, I wanted to look at creating a Node.js stream that had a slightly more complex state. This time, I'm going to turn a cached Buffer object into a Readable stream that can be piped into an HTTP response stream.


 
 
 

 
 
 
 
 

Once we have a cached Buffer object, we could theoretically write the entire buffer content to the HTTP response in a single .write() or .end() call. But, from what I've read, it seems that we want to try to avoid this approach as it may consume a large amount of memory if response data gets buffered on slower client-connections. As such, we should try our best to leverage the stream-like nature of HTTP responses such that data is served and flushed in a non-blocking, low-overhead manner.

In this exploration, I'm going to wrap a cached Buffer object in a Readable stream that can be piped into the (writable) HTTP response stream. Since Buffers represent raw memory references, we don't have to duplicate the buffer content for each stream. Instead, we can simply iterate over the shared object and use the .slice() method to push portions of it onto the underlying Readable stream buffer.

  • // Required module references.
  • var stream = require( "stream" );
  • var chalk = require( "chalk" );
  • var util = require( "util" );
  • var http = require( "http" );
  • var fileSystem = require( "fs" );
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // I turn the given source Buffer into a Readable stream.
  • function BufferStream( source ) {
  •  
  • if ( ! Buffer.isBuffer( source ) ) {
  •  
  • throw( new Error( "Source must be a buffer." ) );
  •  
  • }
  •  
  • // Super constructor.
  • stream.Readable.call( this );
  •  
  • this._source = source;
  •  
  • // I keep track of which portion of the source buffer is currently being pushed
  • // onto the internal stream buffer during read actions.
  • this._offset = 0;
  • this._length = source.length;
  •  
  • // When the stream has ended, try to clean up the memory references.
  • this.on( "end", this._destroy );
  •  
  • }
  •  
  • util.inherits( BufferStream, stream.Readable );
  •  
  •  
  • // I attempt to clean up variable references once the stream has been ended.
  • // --
  • // NOTE: I am not sure this is necessary. But, I'm trying to be more cognizant of memory
  • // usage since my Node.js apps will (eventually) never restart.
  • BufferStream.prototype._destroy = function() {
  •  
  • this._source = null;
  • this._offset = null;
  • this._length = null;
  •  
  • };
  •  
  •  
  • // I read chunks from the source buffer into the underlying stream buffer.
  • // --
  • // NOTE: We can assume the size value will always be available since we are not
  • // altering the readable state options when initializing the Readable stream.
  • BufferStream.prototype._read = function( size ) {
  •  
  • // If we haven't reached the end of the source buffer, push the next chunk onto
  • // the internal stream buffer.
  • if ( this._offset < this._length ) {
  •  
  • this.push( this._source.slice( this._offset, ( this._offset + size ) ) );
  •  
  • this._offset += size;
  •  
  • }
  •  
  • // If we've consumed the entire source buffer, close the readable stream.
  • if ( this._offset >= this._length ) {
  •  
  • this.push( null );
  •  
  • }
  •  
  • };
  •  
  •  
  • // ---------------------------------------------------------- //
  • // ---------------------------------------------------------- //
  •  
  •  
  • // Read the file into memory. We're using the "Sync" version of this to reduce the
  • // complexity of the exploration.
  • var tankGirlBuffer = fileSystem.readFileSync( "./tank-girl.png" );
  •  
  •  
  • // Create a web server that streams the cached file back on every request.
  • var server = http.createServer(
  • function handleHttpRequest( request, response ) {
  •  
  • // We're hard-coding this stuff since there's nothing dynamic about the demo.
  • response.writeHead(
  • 200,
  • "OK",
  • {
  • "Content-Type": "image/png",
  • "Content-Disposition": "inline; filename=tank-girl.png",
  • "Contente-Length": tankGirlBuffer.length
  • }
  • );
  •  
  • // Crate a new instance of the Buffer Stream to wrap the cached buffer. Then,
  • // pipe that stream into the HTTP response.
  • // --
  • // NOTE: Once the BufferStream "ends", it will automatically end the HTTP
  • // response stream as well.
  • new BufferStream( tankGirlBuffer )
  • .pipe( response )
  • ;
  •  
  • }
  • );
  •  
  • server.listen( 8080 );
  •  
  • console.log( chalk.yellow( "Server running on port 8080." ) );

As you can see, once the Buffer is wrapped into a Readable stream sub-class, we can use the .pipe() method to stream it into the HTTP response. This means that we'll only be pushing data as the client is able to consume it (the beauty of using pipe).

Internally, my BufferStream is binding to its own "end" event. I am doing this in an attempt to help the garbage collector find unreachable objects. I don't know if this is necessary; but, I do know that a Node.js application doesn't get refreshed like a browser-based JavaScript application. As such, I'm trying to become overly aware of how memory gets used.

Also, I couldn't find much information on whether or not the HTTP response stream ever emits an "error" event. There's nothing in the documentation about it. Furthermore, I don't see any way that the Readable stream could emit an error. As such, I am not including any error handling in this demo.

Anyway, just some more Node.js stream exploration. I'm starting to feel more comfortable with Node.js streams; but, they aren't anywhere near second-nature yet.




Reader Comments

Nice Example!

A couple of minor fixes.

1. You mentioned your concern for memory... On line #33 you should bind your 'end' callback to the scope of your BufferStream instance.

this.on( "end", this._destroy.bind(this) );

2. Line #99 when setting the headers has a typo, 'Contente-Length' should be 'Content-Length'.

Hey Ben,

Thanks so much for this code. Any desire to publish this as an npm module? I can't find anything else that accomplishes this as elegantly as you have.

Thanks again!