Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at the New York ColdFusion User Group (Jul. 2008) with: Michael Dinowitz
Ben Nadel at the New York ColdFusion User Group (Jul. 2008) with: Michael Dinowitz@mdinowitz )

Throwing Errors In The Future Using RxJS In Angular 2 Beta 6

By Ben Nadel on

It's been over a month since I started digging into Angular 2, and I'm only just now starting to play around with data-access layers. Before I started making HTTP requests to a server, however, I wanted to start by returning static data behind simulated network latency. Normally, I could do this quite easily with Promises. But, with Angular 2, the team has ditched Promises in favor of RxJS observables. Trying to simulate network latency with Observables, however, tripped me up immediately as being able to throw an error in the future doesn't appear to be a common part of the observable landscape.


 
 
 

 
 
 
 
 

Run this demo in my JavaScript Demos project on GitHub.

To give you a better context, when I'm experimenting with data-access, I almost never go "over the wire." Instead, I'll use something like the $timeout() service (in Angular 1.x) to simulate the delay that an HTTP request would incur. Sometimes this simulated network latency results in a resolved promise; sometimes it results in a rejected promise:

  • // Angular 1.x promise-base approach.
  •  
  • function getDataSafe() {
  •  
  • // In 1-second, resolve the promise.
  • // --
  • // NOTE: Using $timeout() to simulate network latency.
  • var promise = $timeout( 1000, angular.noop )
  • .then(
  • function handleResolve() {
  •  
  • return( "For the win!" );
  •  
  • }
  • )
  • ;
  •  
  • return( promise );
  •  
  • }
  •  
  • function getDataUnsafe() {
  •  
  • // In 2-seconds, reject the promise.
  • // --
  • // NOTE: Using $timeout() to simulate network latency.
  • var promise = $timeout( 2000, angular.noop )
  • .then(
  • function handleResolve() {
  •  
  • return( $q.reject( "Uh-oh!" ) );
  •  
  • }
  • )
  • ;
  •  
  • return( promise );
  •  
  • }

As you can see, I'm using the $timeout() service to start the promise chain and then I'm either returning a resolved promise or a rejected promise in order to provide the next step in the promise chain.

I have to say, I'm a huge fan of promises. With a very small API (just a couple of methods), you can take control of the asynchronous world of JavaScript development. The RxJS Observables library shipping with Angular 2 Beta 6, in comparison, is massively complex. Promises are like a "hammer". RxJS is like an entire "Home Depot." Sure, there's only so much I can do with a hammer - but it's small in scope and feels like something I can master. RxJS, at least at first glance, feels like it's several orders of magnitude more complex and will clearly require much more time to reach any level of proficiency.

What I thought would be a trivial task - simulating a failed HTTP response - turned out to be something that took me several days to figure out. And, even after days of reading alongside some trial-and-error, I'm still not even sure if I'm doing it "the right way" since there are seemingly so many ways to accomplish the same thing with these observable sequences.

In the following code, I'm doing the same thing - throwing an error in future of an observable stream - using eight different approaches. I start of by initiating an interval stream which logs the time every 1,000 milliseconds. Then, for each of the eight approaches, I'm throwing an error in each subsequent 1,000 millisecond offset.

NOTE: The interval stream is there to give context to the timing and order of the subsequent errors.

  • <!doctype html>
  • <html>
  • <head>
  • <meta charset="utf-8" />
  •  
  • <title>
  • Throwing Errors In The Future Using RxJS In Angular 2 Beta 6
  • </title>
  • </head>
  • <body>
  •  
  • <h1>
  • Throwing Errors In The Future Using RxJS In Angular 2 Beta 6
  • </h1>
  •  
  • <!-- Load demo scripts. -->
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/es6-shim.min.js"></script>
  • <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/Rx.umd.min.js"></script>
  • <script type="text/javascript">
  •  
  • // NOTE: While this demo is not Angular 2 Beta 6 specific, it is using the version
  • // of RxJS that ships with the Angular 2 Betas. As such, I am referring to this
  • // demo as having an Angular 2 Beta 6 context.
  •  
  • // First, we're going to setup an interval that logs the current time out every
  • // second so we can see where our deferred errors fall in the space-time continuum.
  • Rx.Observable
  • .interval( 1000 )
  • .take( 10 )
  • .map(
  • function convertIntervalToTimeString( i ) {
  •  
  • return( new Date().toTimeString() );
  •  
  • }
  • )
  • .map(
  • function stripTimeZone( timeString ) {
  •  
  • return( timeString.split( " " ).shift() );
  •  
  • }
  • )
  • .subscribe(
  • function handleNext( time ) {
  •  
  • console.log( time );
  •  
  • }
  • )
  • ;
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // First approach: using the sequence factory function.
  • var errorStream = Rx.Observable.create(
  • function streamFactory( observer ) {
  •  
  • var timer = setTimeout(
  • function deferError() {
  •  
  • observer.error( new Error( "ApproachOneError" ) );
  •  
  • },
  • 2500
  • );
  •  
  • // Return the teardown method for the stream's unsubscribe action.
  • return(
  • function teardown() {
  •  
  • clearTimeout( timer );
  •  
  • }
  • );
  •  
  • }
  • );
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach One - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach One - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach One - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Second approach: mapping timer onto an error stream.
  • var errorStream = Rx.Observable
  • .timer( 3500 )
  • .flatMap(
  • function mapTimerToStream() {
  •  
  • return( Rx.Observable.throw( new Error( "ApproachTwoError" ) ) );
  •  
  • }
  • )
  • ;
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Two - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Two - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Two - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Approach three: Explicitly throwing an error during mapping.
  • var errorStream = Rx.Observable
  • .timer( 4500 )
  • .map(
  • function mapTimerToValue( i ) {
  •  
  • throw( new Error( "ApproachThreeError" ) );
  •  
  • }
  • )
  • ;
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Three - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Three - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Three - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Approach four: Implementing a custom stream creation method. Here, we're
  • // monkey-patching the core Rx.Observable library to expose a .throwLater()
  • // function which will take an error and a dueTime duration. This basically
  • // uses the same approach as above but encapsulates the implementation.
  • Rx.Observable.throwLater = function( error, dueTime ) {
  •  
  • var stream = Rx.Observable
  • .timer( dueTime )
  • .flatMap(
  • function mapTimerToStream() {
  •  
  • return( Rx.Observable.throw( error ) );
  •  
  • }
  • )
  • ;
  •  
  • return( stream );
  •  
  • };
  •  
  • // Consume the newly-exposed method to throw a future error.
  • var errorStream = Rx.Observable
  • .throwLater( new Error( "ApproachFourError" ), 5500 )
  • ;
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Four - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Four - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Four - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Approach five: concatenating two streams.
  • var errorStream = Rx.Observable
  • .timer( 6500 )
  • .concat( Rx.Observable.throw( new Error( "ApproachFiveError" ) ) )
  •  
  • // At this point, the stream will actually contains two values - the timer,
  • // which would trigger the .handleNext() callback, and the error value,
  • // which would trigger the .handleError() callback. We ONLY CARE about the
  • // error, so we're going to buffer values and just consume the last one,
  • // which is the error.
  • // --
  • // NOTE: We're not actually "waiting" until the last value - the error will
  • // be thrown the moment it is encountered. So, in this case, the concept of
  • // "last" is really just "not first."
  • .last()
  • ;
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Five - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Five - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Five - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Approach six: concatenating two streams.
  • var errorStream = Rx.Observable
  • .timer( 7500 )
  • .concat( Rx.Observable.throw( new Error( "ApproachSixError" ) ) )
  •  
  • // In this case, we're basically using the same approach as the .last()
  • // approach in the previous version. This time, however, we're relying on
  • // the fact that errors are greedily consumed in a buffer. So, while our
  • // error is in a buffer that already has a value (the timer index), the
  • // error, when encountered, will supersede the existing buffer content.
  • .bufferCount( 2 )
  • ;
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Six - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Six - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Six - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Approach seven: Creating a HOT stream.
  • (function isolateVariables() {
  •  
  • // This approach is no different than one above. Only, this time, we're
  • // creating a "Hot" stream instead of a "Cold" stream. This means that the
  • // stream will start emitting events before anyone has subscribed to it.
  • // Essentially, this means our timer will start immediately.
  • var errorStream = Rx.Observable
  • .timer( 8500 )
  • .concat( Rx.Observable.throw( new Error( "ApproachSevenError" ) ) )
  • .last()
  •  
  • // This creates a common "proxy" subscriber that will emit the same
  • // event instances to all subscribers.
  • .publish()
  • ;
  •  
  • // This connects the common proxy subscriber to the underlying stream,
  • // basically telling the underlying stream to start emitting events. The
  • // stream is now "Hot" and is producing events that future subscribers
  • // may never see (depending on when they subscribe).
  • errorStream.connect();
  •  
  • // To make sure that the underlying stream is actually producing events
  • // before we subscribe to it, I'm not actually going to subscribe to the
  • // proxy until a few milliseconds before the error is thrown. If the
  • // underlying stream is already producing events, the error will be thrown
  • // shortly after this.
  • setTimeout(
  • function setupSubscription() {
  •  
  • console.info( "Just subscribed approach seven to HOT stream." );
  • errorStream.subscribe( observer );
  •  
  • },
  • 8300 // Error will be thrown 200ms later.
  • );
  •  
  • var observer = Rx.Subscriber.create(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Seven - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Seven - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Seven - complete!" );
  •  
  • }
  • );
  •  
  • })();
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // Approach eight: Creating a custom operator. In this approach, we're creating
  • // a custom operator that exposes a .throw() operator on an existing stream.
  • // --
  • // CAUTION: I don't really understand this just yet - I'm basically copying the
  • // workflow outlined here:
  • // https://xgrommx.github.io/rx-book/content/guidelines/implementations/index.html
  • Rx.Observable.prototype.throw = function( error ) {
  •  
  • return( Rx.Observable.create( streamFactory.bind( this ) ) );
  •  
  • function streamFactory( observer ) {
  •  
  • // Any stream my emit zero or more "values" followed by a "complete"
  • // event. As such, we have to watch for two different use-cases: a
  • // populated set and an empty set. In cases where we have an empty
  • // set, we need to throw the error in the "complete" callback rather
  • // than the "next" callback.
  • var errorThrown = false;
  •  
  • var teardown = this.subscribe(
  • function handleNext( value ) {
  •  
  • errorThrown = true;
  • observer.error( error );
  •  
  • },
  • observer.error.bind( observer ),
  • function hanldeComplete() {
  •  
  • if ( ! errorThrown ) {
  •  
  • observer.error( error );
  •  
  • }
  •  
  • }
  • );
  •  
  • return( teardown );
  •  
  • }
  •  
  • };
  •  
  • // To me, this is the most natural way to think about this.
  • var errorStream = Rx.Observable
  • .timer( 9500 )
  • .throw( new Error( "ApproachEightError" ) )
  • ;
  •  
  • // NOTE: Stream is "Cold," meaning it won't start emitting events until it
  • // has a subscriber; and, each subscriber receives a unique set of events.
  • errorStream.subscribe(
  • function handleNext( value ) {
  •  
  • console.log( "Approach Eight - next:", value );
  •  
  • },
  • function handleError( error ) {
  •  
  • console.error( "Approach Eight - error:", error );
  •  
  • },
  • function handleComplete() {
  •  
  • console.log( "Approach Eight - complete!" );
  •  
  • }
  • );
  •  
  •  
  • // --------------------------------------------------------------------------- //
  • // --------------------------------------------------------------------------- //
  •  
  •  
  • // At this point, all of the event streams have been bound and will emit error
  • // events at some point in the near future.
  • console.log( "Main event loop has finished executing." );
  •  
  • </script>
  •  
  • </body>
  • </html>

Like I said, there's a boat-load of ways to accomplish the same thing. Of all of these approaches, however, monkey-patching the Observable.prototype to include a .throw() instance method feels like the cleanest and most natural way to think about. That said, these all accomplish roughly the same thing (Hot vs. Cold concepts aside). And, when we run the above code, we get the following output:


 
 
 

 
 Throwing errors in the future using RxJS in Angular 2 Beta 6. 
 
 
 

As you can see, each of the eight approaches triggered an error event at some point in the future after the stream was defined.

As someone who is basically on day-3 of leaning RxJS, I am trying very hard to reserve judgement. That said, I can't help but already miss the comparative simplicity of Promises. Sure, I understand that some tasks with Promises are not as obvious - like canceling a promise or ignoring a result. But, the complexity of some tasks are outweighed, in my eyes, by the simplicity of the API. That said, the Angular 2 team has gone whole-hog into Observables; so, I'll definitely do my best to reseve judgement, dig in, and see what all the excitement is about!




Reader Comments

Yes, Observables are more complex, but also more efficient and gøactually make your components much simpler

The fact that information is pushed to subscribers, rather than components pulling for data is mindblowing at first.

But once you 'get' it, it's so much more elegant and fast.

If your subscribing component only listens to observables Angular2 can optimize change detection a lot.

Also, you can always use the .toPromise() method and you have yourself a good ol promise

Reply to this Comment

Rather than monkey patching (which, while workable, is a bit sketchy with Typescript!), the best advice I can give is think in terms of transforms.

By far the most powerful operator in Rx is flatMap - it does all sorts of magical things.

A simple example of an Observable that starts with a value, delays 5 seconds, and then *flatMaps* the stream into an error:
https://jsfiddle.net/robwormald/z5xm3yz8/

To make this a bit more re-useable, try out of ridiculously-simple-yet-powerful .let() operator. It simply takes a function which is called with the source stream, and you return a newly transformed stream. This is in the latest version of Rx (beta2) which is available in Angular2 beta7.

https://jsfiddle.net/robwormald/jxyxf37f/

Reply to this Comment

Another option:

Rx.Observable.never()
.timeout(5000, Rx.Observable.throw('error'))

(And while i've been throwing strings, you could of course throw an error object as you did in your versions)

Reply to this Comment

@Nick,

Ha ha, even more approaches! I didn't even notice the .timeout() operator when I was looking through the documentation. Great stuff!

Reply to this Comment

@Rob,

.flatMap() is pretty cool - it's actually the approach I'm using in "Four" in the above demo.

The .let() stuff looks very interesting. If I'm understanding it correctly, it basically gives you a way to implement a custom operator as factory function for the observable binding (mostly just a cup full of keywords there ;)). I'll have to dig into that a bit more. Thanks!

Reply to this Comment

@Lars,

Right now, the biggest selling point for me, in terms of Streams, is the ability to cancel a stream (ie, dispose / unsubscribe). This is something that will be hugely useful when tearing down Components that have pending AJAX requests or other timers. You *could* do this with Promises; but, you end up having to guard against callbacks since you can't really cancel a promise - you have to just "disregard" the callback logic. It was a pain.

As far as the change-detection, I like the idea, but as with Immutable data, I am not sure I want to use it all the time to deal with performance edge-cases. And, I think you can use OnPush change detection to perhaps get many of the same benefits? Am not sure - haven't really dug into change detection at that level yet.

Reply to this Comment

@Julia,

Yes, I think the delay() will work *as long as* the throwing of an error is in the .flatMap() method.

Reply to this Comment

Post A Comment

You — Get Out Of My Dreams, Get Into My Comments
Live in the Now
Oops!
Comment Etiquette: Please do not post spam. Please keep the comments on-topic. Please do not post unrelated questions or large chunks of code. And, above all, please be nice to each other - we're trying to have a good conversation here.