Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at Scotch On The Rock (SOTR) 2010 (London) with: Justin Carter
Ben Nadel at Scotch On The Rock (SOTR) 2010 (London) with: Justin Carter@justincarter )

HTTP Requests Are Cold / Lazy Streams In Angular 2 Beta 6

By Ben Nadel on

Yesterday, I watched a great video on RxJS streams titled, "Everything is a Stream" by Rob Wormald. In the video, Rob casually mentioned that, since HTTP requests in Angular 2 are now RxJS observable streams, the underlying AJAX (Asynchronous JavaScript and XML) request won't be opened until someone actually subscribes to the response. This sounded very suspicious! I thought for sure that he must be mistaken. So, I tested it myself. And, sure enough, in Angular 2, HTTP requests are implemented using cold (ie, lazy) RxJS streams.


 
 
 

 
 
 
 
 

Run this demo in my JavaScript Demos project on GitHub.

First, I should clarify that Rob's statement sounded odd to me because, in an HTTP request, I view the response as a "nice to have." Now, I don't mean that the response is optional; but rather, that there are cases in which I can imagine triggering an HTTP request without caring about what happens to the response (ie, "fire and forget"). For example, creating a heartbeat or logging a client-side statsD metric - sure the response may fail for some reason; but, that doesn't mean that I care about it or that I can even do anything meaningful about the error.

That said, I wanted to see this new HTTP behavior for myself. So, I created a small demo in which I inject a "heartbeat" service into the root App component. This HeartBeatService will ping a given URL on a given interval in an effort to keep the user's session alive. As a heartbeat, I don't necessarily care whether or not the request works; so, I don't bother subscribing to the response:

  • <!doctype html>
  • <html>
  • <head>
  • <meta charset="utf-8" />
  •  
  • <title>
  • HTTP Requests Are Cold / Lazy Streams In Angular 2 Beta 6
  • </title>
  •  
  • <link rel="stylesheet" type="text/css" href="./demo.css"></link>
  • </head>
  • <body>
  •  
  • <h1>
  • HTTP Requests Are Cold / Lazy Streams In Angular 2 Beta 6
  • </h1>
  •  
  • <my-app>
  • Loading...
  • </my-app>
  •  
  • <!-- Load demo scripts. -->
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/es6-shim.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/Rx.umd.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/angular2-polyfills.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/angular2-all.umd.js"></script>
  • <!-- AlmondJS - minimal implementation of RequireJS. -->
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/almond.js"></script>
  • <script type="text/javascript">
  •  
  • // Defer bootstrapping until all of the components have been declared.
  • // --
  • // NOTE: Not all components have to be required here since they will be
  • // implicitly required by other components.
  • requirejs(
  • [ /* Using require() for better readability. */ ],
  • function run() {
  •  
  • var App = require( "App" );
  • var HeartBeatService = require( "HeartBeatService" );
  •  
  • ng.platform.browser.bootstrap(
  • App,
  • [
  • ng.http.HTTP_PROVIDERS,
  • HeartBeatService
  • ]
  • );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // I provide the root App component.
  • define(
  • "App",
  • function registerApp() {
  •  
  • var HeartBeatService = require( "HeartBeatService" );
  •  
  • // Configure the App component definition.
  • ng.core
  • .Component({
  • selector: "my-app",
  • template:
  • `
  • <p>
  • <a (click)="startHeartbeat()">Start heartbeat</a>
  • &mdash;
  • <a (click)="stopHeartbeat()">Stop heartbeat</a>
  • </p>
  •  
  • <p>
  • The heartbeat will start pinging a URL on an interval
  • to ensure that the users session does not die.
  • </p>
  • `
  • })
  • .Class({
  • constructor: AppController
  • })
  • ;
  •  
  • AppController.parameters = [ new ng.core.Inject( HeartBeatService ) ];
  •  
  • return( AppController );
  •  
  •  
  • // I control the App component.
  • function AppController( heartbeat ) {
  •  
  • var vm = this;
  •  
  • // Expose the public methods.
  • vm.startHeartbeat = startHeartbeat;
  • vm.stopHeartbeat = stopHeartbeat;
  •  
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  •  
  • // I start pinging the heartbeat URL to keep the user's session alive.
  • function startHeartbeat() {
  •  
  • heartbeat.start( "./heartbeat.json", 1000 );
  •  
  • }
  •  
  •  
  • // I stop pinging the heartbeat URL.
  • function stopHeartbeat() {
  •  
  • heartbeat.stop();
  •  
  • }
  •  
  • }
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // I provide a heartbeat service that will make an HTTP request to a given
  • // URL on an interval so as to keep a user's session alive.
  • define(
  • "HeartBeatService",
  • function registerHeartBeatService() {
  •  
  • HeartBeatService.parameters = [ new ng.core.Inject( ng.http.Http ) ];
  •  
  • return( HeartBeatService );
  •  
  •  
  • // I make an HTTP request to a given URL on an interval so as to keep a
  • // user's session alive. Only one heartbeat URL can be pinged at a time.
  • // Any attempt to ping a new URL will result in the previous heartbeat
  • // being stopped.
  • function HeartBeatService( http ) {
  •  
  • // I keep track of the active heartbeat interval.
  • var interval = null;
  •  
  • // Return the public API.
  • return({
  • start: start,
  • stop: stop
  • });
  •  
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  •  
  • // I start pinging the given URL on the given interval. Any existing
  • // heartbeat will be stopped before the new one is started.
  • function start( url, intervalTime ) {
  •  
  • // Stop any active heartbeat before we start a new interval.
  • stop();
  •  
  • console.warn( "Starting session heartbeat" );
  • console.info( "You should start seeing HTTP requests being made." );
  •  
  • interval = setInterval(
  • function ping() {
  •  
  • // Here, we are actually initiating the HTTP request to
  • // the heartbeat URL. However, since this is just a
  • // heartbeat, we don't care about the return value. As
  • // such, we're not going to subscribe to it - what would
  • // be the point, unless we were going to include logic
  • // for stopping the heartbeat on error (which we don't
  • // currently have logic for).
  • http.get( url );
  •  
  • // CAUTION: ^ This doesn't actually work!!!!!!
  •  
  • // Side-note: If you convert the stream to a Promise,
  • // this will initiate the HTTP request as the underlying
  • // operator has to subscribe to the stream in order to
  • // fulfill the promise. But, at that point, you might as
  • // well just use a no-op subscription.
  • // --
  • // http.get( url ).toPromise();
  • // --
  • // http.get( url ).subscribe();
  •  
  • },
  • intervalTime
  • );
  •  
  • }
  •  
  •  
  • // I stop pinging the current heartbeat URL.
  • function stop() {
  •  
  • if ( ! interval ) {
  •  
  • return;
  •  
  • }
  •  
  • console.warn( "Stopping session heartbeat" );
  •  
  • clearInterval( interval );
  • interval = null;
  •  
  • }
  •  
  • }
  •  
  • }
  • );
  •  
  • </script>
  •  
  • </body>
  • </html>

As you can see, when I call .start() on the HeartBeatService, it sets up an interval to start pinging the given URL. However, when we run this page and start the heartbeat, we don't see anything in the console log:


 
 
 

 
 HTTP requests are implemented as cold / lazy RxJS streams in Angular 2. 
 
 
 

The problem, as Rob pointed out, is that HTTP requests are being implemented as "cold" RxJS observable streams. Which means that they don't start producing values until they know that someone (a subscriber) is there to observe them.

Ok, so if we have to think about HTTP requests as cold RxJS observable streams, then I wanted to take a stab at reworking the demo to maximize my stream usage. This time, not only am I subscribing to the HTTP stream, I'm also using RxJS to implement the interval as well:

  • <!doctype html>
  • <html>
  • <head>
  • <meta charset="utf-8" />
  •  
  • <title>
  • HTTP Requests Are Cold / Lazy Streams In Angular 2 Beta 6
  • </title>
  •  
  • <link rel="stylesheet" type="text/css" href="./demo.css"></link>
  • </head>
  • <body>
  •  
  • <h1>
  • HTTP Requests Are Cold / Lazy Streams In Angular 2 Beta 6
  • </h1>
  •  
  • <my-app>
  • Loading...
  • </my-app>
  •  
  • <!-- Load demo scripts. -->
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/es6-shim.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/Rx.umd.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/angular2-polyfills.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/angular2-all.umd.js"></script>
  • <!-- AlmondJS - minimal implementation of RequireJS. -->
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/almond.js"></script>
  • <script type="text/javascript">
  •  
  • // Defer bootstrapping until all of the components have been declared.
  • // --
  • // NOTE: Not all components have to be required here since they will be
  • // implicitly required by other components.
  • requirejs(
  • [ /* Using require() for better readability. */ ],
  • function run() {
  •  
  • var App = require( "App" );
  • var HeartBeatService = require( "HeartBeatService" );
  •  
  • ng.platform.browser.bootstrap(
  • App,
  • [
  • ng.http.HTTP_PROVIDERS,
  • HeartBeatService
  • ]
  • );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // I provide the root App component.
  • define(
  • "App",
  • function registerApp() {
  •  
  • var HeartBeatService = require( "HeartBeatService" );
  •  
  • // Configure the App component definition.
  • ng.core
  • .Component({
  • selector: "my-app",
  • template:
  • `
  • <p>
  • <a (click)="startHeartbeat()">Start heartbeat</a>
  • &mdash;
  • <a (click)="stopHeartbeat()">Stop heartbeat</a>
  • </p>
  •  
  • <p>
  • The heartbeat will start pinging a URL on an interval
  • to ensure that the users session does not die.
  • </p>
  • `
  • })
  • .Class({
  • constructor: AppController
  • })
  • ;
  •  
  • AppController.parameters = [ new ng.core.Inject( HeartBeatService ) ];
  •  
  • return( AppController );
  •  
  •  
  • // I control the App component.
  • function AppController( heartbeat ) {
  •  
  • var vm = this;
  •  
  • // Expose the public methods.
  • vm.startHeartbeat = startHeartbeat;
  • vm.stopHeartbeat = stopHeartbeat;
  •  
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  •  
  • // I start pinging the heartbeat URL to keep the user's session alive.
  • function startHeartbeat() {
  •  
  • heartbeat.start( "./heartbeat.json", 1000 );
  •  
  • }
  •  
  •  
  • // I stop pinging the heartbeat URL.
  • function stopHeartbeat() {
  •  
  • heartbeat.stop();
  •  
  • }
  •  
  • }
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // I provide a heartbeat service that will make an HTTP request to a given
  • // URL on an interval so as to keep a user's session alive.
  • define(
  • "HeartBeatService",
  • function registerHeartBeatService() {
  •  
  • HeartBeatService.parameters = [ new ng.core.Inject( ng.http.Http ) ];
  •  
  • return( HeartBeatService );
  •  
  •  
  • // I make an HTTP request to a given URL on an interval so as to keep a
  • // user's session alive. Only one heartbeat URL can be pinged at a time.
  • // Any attempt to ping a new URL will result in the previous heartbeat
  • // being stopped.
  • function HeartBeatService( http ) {
  •  
  • // I keep track of the active heartbeat stream.
  • var subscription = null;
  •  
  • // Return the public API.
  • return({
  • start: start,
  • stop: stop
  • });
  •  
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  •  
  • // I start pinging the given URL on the given interval. Any existing
  • // heartbeat will be stopped before the new one is started.
  • function start( url, intervalTime ) {
  •  
  • // Stop any active heartbeat before we start a new interval.
  • stop();
  •  
  • console.warn( "Starting session heartbeat" );
  • console.info( "You should start seeing HTTP requests being made." );
  •  
  • // NOTE: We are holding onto the stream subscription so that we
  • // cancel the subscription when the heartbeat is terminated.
  • subscription = Rx.Observable
  • .interval( intervalTime )
  • .switchMap(
  • function ping() {
  •  
  • // Here, we are actually initiating the HTTP request
  • // to the heartbeat URL.
  • return( http.get( url ) );
  •  
  • }
  • )
  •  
  • // In order to get the interval stream to start emitting
  • // next values (which, in turn, will trigger HTTP requests),
  • // we have to subscribe to it.
  • .subscribe()
  • ;
  •  
  • }
  •  
  •  
  • // I stop pinging the current heartbeat URL.
  • function stop() {
  •  
  • if ( ! subscription ) {
  •  
  • return;
  •  
  • }
  •  
  • console.warn( "Stopping session heartbeat" );
  •  
  • subscription.unsubscribe();
  • subscription = null;
  •  
  • }
  •  
  • }
  •  
  • }
  • );
  •  
  • </script>
  •  
  • </body>
  • </html>

This time, I'm using the .interval() operator to manage the timing of the heartbeat and then using the .switchMap() operator to convert each interval to an actual AJAX request. Of course, none of this would do anything without the final .subscribe() call which registers an observer which tells the RxJS stream to start emitting values. And, when we run this page, we finally start seeing some AJAX requests in the console log:


 
 
 

 
 HTTP requests are implemented as cold / lazy RxJS streams in Angular 2. 
 
 
 

Finally, we get the behavior that we expected.

Implementing HTTP requests as cold / lazy RxJS observable streams seems a little odd to me. It seems like it produces a "surprising" behavior; and, surprises in software are generally not a good thing. But, I do understand that implementing HTTP requests as streams makes them easily composable with other operators like .delay(), .flatMap(), .switch(), and .retry(), which is very cool. So, in this case, I'll consider the surprising behavior a known tradeoff.




Reader Comments

afaik all observables ( Rx ) are lazy, and that's a good thing. You have explicit control over them, which you don't get with Promises for example.

What Rob meant by saying HTTP streams are cold, is imo that they are implemented as "cold observables".

There are hot and cold observables.
Cold will return always new stream(value) and hot will return actual value when you subscribe.

Cold - you've started to watch a movie on Netflix or so
Hot - live webinar

anyway, another amazing post Ben. Good job!

thanks

Reply to this Comment

@Martin,

Thank you kind sir! It's really interesting to try and wrap my head around this stuff. It's taking a surprising amount of trial and error and "wait, what?" moments. Especially the concept of hot and cold and how that is perhaps altogether separate from the concept of subscribers.

Take, for example, the EventEmitter in Angular 2. You pass values to it through the .next() operator:

new ng.core.EventEmitter().next( "some value" )

As such, can you think of the EventEmitter as hot? Since it doesn't control its own value origin?

But, on the other hand, I am not sure that it will pass values on down the stream unless it has a subscriber:

new ng.core.EventEmitter().map( mapOperator )

... I don't think will trigger the .map() operator until someone actually subscribes to the EventEmitter instance. So, in that regard, it seems like a "cold" stream.

Very confusing -- until you get used to thinking about. And of course, I'm just shooting from the hip here based on what I can remember. So, some of what I am rambling about may be way off base. Actually, I should really do some R&D with EventEmitter (beyond what I've done so far).

Reply to this Comment

Post A Comment

You — Get Out Of My Dreams, Get Into My Comments
Live in the Now
Oops!
Comment Etiquette: Please do not post spam. Please keep the comments on-topic. Please do not post unrelated questions or large chunks of code. And, above all, please be nice to each other - we're trying to have a good conversation here.