Skip to main content
Ben Nadel at CFUNITED 2009 (Lansdowne, VA) with: Kim
Ben Nadel at CFUNITED 2009 (Lansdowne, VA) with: Kim

RxJS Streams Are Inconsistently Asynchronous In Angular 2 Beta 6

By
Published in Comments (15)

Now that I'm starting to dig into the RxJS Observable sequences in Angular 2, I'm trying to translate my Promise-based mental model into a Streams-based mental model. And, as I was trying to figure out how to throw errors in the future using RxJS, I noticed a behavior in RxJS streams that is fundamentally different from Promises: where as promises are always asynchronous, RxJS streams are inconsistently asynchronous.

Run this demo in my JavaScript Demos project on GitHub.

To demonstrate what I mean, all we have to do is create two RxJS streams, one that uses a .delay() operator and one that does not. Then, we can easily log-out the timing of the subscription-based event handlers in the context of the overall page load:

<!doctype html>
<html>
<head>
	<meta charset="utf-8" />

	<title>
		RxJS Streams Are Inconsistently Asynchronous In Angular 2 Beta 6
	</title>
</head>
<body>

	<h1>
		RxJS Streams Are Inconsistently Asynchronous In Angular 2 Beta 6
	</h1>

	<!-- Load demo scripts. -->
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/6/es6-shim.min.js"></script>
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/6/Rx.umd.min.js"></script>
	<script type="text/javascript">

		// NOTE: While this demo is not Angular 2 Beta 6 specific, it is using the version
		// of RxJS that ships with the Angular 2 Betas. As such, I am referring to this
		// demo as having an Angular 2 Beta 6 context.

		// Here, we are subscribing to two different RxJS streams and simply logging the
		// observed values in the context of the general execution of the page to get a
		// sense of where, in time, sequences emit events.

		console.warn( "RxJS - Script - Begin." );

		getStreamA().subscribe(
			function handleNext( value ) {

				console.info( "Stream A value:", value );

			}
		);

		console.log( "In between A and B." );

		getStreamB().subscribe(
			function handleNext( value ) {

				console.info( "Stream B value:", value );

			}
		);

		console.warn( "RxJS - Script - End." );


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// I return an RxJS observable sequence that happens to be synchronous.
		function getStreamA() {

			return( Rx.Observable.of( "stream" ) );

		}


		// I return an RxJS observable sequence that happens to include a delay.
		function getStreamB() {

			return( Rx.Observable.of( "stream" ).delay( 10 ) );

		}

	</script>

</body>
</html>

As you can see, we're subscribing to the RxJS streams immediately after they are created (as one would typically do in an application). And, when we run the above code, we get the following output:

RxJS streams are inconsistently asynchronous, depending on the underlying stream implementation, in Angular 2 Beta 6.

Clearly, Stream A started emitting values immediately (ie, synchronously) after the calling context subscribed to the stream. This is why we can log-out the Stream A value before we log the end of the page load.

Let's quickly compare this to Promises. In the following code, I'm attempting to recreate the same workflow using two Promises instead of RxJS streams:

<!doctype html>
<html ng-app="Demo">
<head>
	<meta charset="utf-8" />

	<title>
		Promises Are Always Asynchronous In Angular 1.x
	</title>
</head>
<body>

	<h1>
		Promises Are Always Asynchronous In Angular 1.x
	</h1>

	<!-- Load scripts. -->
	<script type="text/javascript" src="../../vendor/angularjs/angular-1.4.7.js"></script>
	<script type="text/javascript">

		angular.module( "Demo", [] ).run(
			function testPromises( $timeout, $q ) {

				// Here, we are binding to two different Promise chains and simply
				// logging the resolved values in the context of the general execution
				// of the page to get a sense of where, in time, promises emit values.

				console.warn( "Promise - Script - Begin." );

				getPromiseA().then(
					function handleResolve( value ) {

						console.info( "Promise A value:", value );

					}
				);

				console.log( "In between A and B." );

				getPromiseB().then(
					function handleResolve( value ) {

						console.info( "Promise B value:", value );

					}
				);

				console.warn( "Promise - Script - End." );


				// ------------------------------------------------------------------- //
				// ------------------------------------------------------------------- //


				// I return a Promise that resolves immediately.
				function getPromiseA() {

					return( $q.when( "promise" ) );

				}


				// I return a Promise that happens to include a delay before resolving.
				function getPromiseB() {

					var promise = $timeout( angular.noop, 10 )
						.then(
							function handleResolve() {

								return( "promise" );

							}
						)
					;

					return( promise );

				}

			}
		);

	</script>

</body>
</html>

As you can see, one function returns an already-resolved promise whereas the other function returns a promise that is delayed by a timer. And yet, when we run the above code, we can see that both promise subscriptions are invoked asynchronously, after the page has finished loading:

Promises are always asynchronous, regardless of the promise chain implementation.

Now, I'm not saying that Promises are "right" and RxJS streams are "wrong." I'm just trying to build up a mental model for RxJS streams. That said, I do think that it is nice that you can always depend on Promises to be asynchronous - it means that your calling code never has to wonder about the underlying implementation of the Promise chain. With RxJS streams, on the other hand, your calling context may very well have to guard against the inconsistently asynchronous nature of observable sequences.

Want to use code from this post? Check out the license.

Reader Comments

38 Comments

If you move the .delay(10) to the getStreamA() in the first example, do you get the same console output? Line 59 instead of 67.

15,810 Comments

@Will,

If I *move* (ie, not copy) the .delay() from StreamB to StreamA, I get the following output:

> RxJS - Script - Begin.
> In between A and B.
> Stream B value: stream
> RxJS - Script - End.
> Stream A value: stream

Here, StreamB becomes *synchronous*, emitting a value immediately, before the page has finished loading. And, StreamA has become *asynchronous*, emitting a value after the page has finished loading.

1 Comments

@Simon,

Thanks, that's good to know.

@Ben,

If you change getStreamA() method to the following, then I'll behave just like a promise:

function getStreamA() {
return( Rx.Observable.of( "stream" ).observeOn(Rx.Scheduler.default) );
}

By default Rx.Scheduler.immediate is used which makes the observable to work immediately (i.e. synchronously) as far as I understand.

15,810 Comments

@Eugene, @Simon,

Very interesting stuff. I tried to look into Schedulers a little when I was seeing if I could get them to work with the .throw() streams - the method signature looks like it accepts a scheduler as an optional argument. But, honestly, it went way over my head :D I'll have to circle back, now that my mental model is a little bit stronger.

That said, all of these approaches still rely on the implementation to be a "known quantity" - an agreed-upon contract, so to speak, between the code consuming the stream and the code producing the stream. That's the nice thing about *Promises* - the contract is implicit in the data-type. Meaning, there's no way to take a promise and *make is synchronous* - it's always async.

3 Comments

@Ben,

I think the big difference is that Promises only resolve once. So you don't have a big trade-off if they are asynchron. Observable streams, on the other hand, can emit multiple times. This means, if you have an array with 1000 entries and emit each value within an Observable stream it would take about 4 seconds to complete with the asynchron approach (assuming setTimeout(function() {}, 0) takes 4ms). Most likely that wouldn't be what the developer wants.

2 Comments

This comes up a lot. I think on the surface, it can feel strange that you aren't able to know whether an Observable is sync or not.

The Rx answer to that concern is generally, done properly, it shouldn't really matter. Using Observables you typically do everything "in band":

someObservable
.someOperator()
.someOperator()
.someOperator()
.subscribe(v => //do side effect);

In an angular2 app, you can take this even further - using the async pipe you need not subscribe at all, and the whole application's data flow happens "in band"

The other thing to mention is *generally* Observables will be async - Observables from arrays are typically not all that common (and generally better handled as vanilla arrays...)

15,810 Comments

@Simon,

Sorry, to clarify, I only mean that the initial emit should be async. Once the async workflow has been initiated, the rest of the events can be done in a synchronous fashion. I *believe* this is what AngularJS 1.x does to some degree. The promise chain is initiated async, but then it doesn't actually kick off a $digest for each .then()'able link - only after the last one... I think. It's been a while since I've actually looked at the internals.

15,810 Comments

@Rob,

I tend to agree - I think it would rarely ever be a problem. And, I will certainly concede that most data-access methods are asynchronous anyway (usually because they are built on top of some underlying AJAX request). I'm definitely at a loss for a concrete example.

So much to learn!

3 Comments

@Ben,

I think which behavior you want really depends on what you are building with RxJS (which is the reason there are different Schedulers for different use-cases). Only the first emit async and the rest not may work in some cases but not in all. For example if you are writing an EventEmitter with RxJS it wouldn't feel right if only the first emit() call is async and rest ist not.

2 Comments

Yes, this behavior is by design.

> Very interesting stuff. I tried to look into Schedulers a little when I was seeing if I could get them to work with the .throw() streams - the method signature looks like it accepts a scheduler as an optional argument. But, honestly, it went way over my head :D I'll have to circle back, now that my mental model is a little bit stronger.

Here's the idea behind schedulers: say you're going to call a function, and that function does something asynchronously via `setTimeout`. How would you unit test that function (without overriding global `setTimeout`), or otherwise change the asynchronous behavior (for example, if you need to render synchronously on the server)? The easiest way is to change the function signature to accept a `setTimeout` implementation.

Schedulers are just a generic interface for running a callback at some point in the future, just like setTimeout. And the operators in Rx that have the option of doing something in the future typically accept a Scheduler, so you can control how and when that operator's future things run.

> That said, all of these approaches still rely on the implementation to be a "known quantity" - an agreed-upon contract, so to speak, between the code consuming the stream and the code producing the stream.

That's one approach, but a better approach is to not make assumptions about whether the Observable will emit synchronously or asynchronously. Rather than modifying outside state from within your Observable handlers, you should put all the logic that depends on an event *inside* the functions you pass to `flatMap`, `subscribe,` etc. When you do that, it doesn't matter whether the Observable emits synchronously or asynchronously, your code is correct either way!

2 Comments

I would also encourage you to take a look at the decision trees for when you should use the various operators.

Operators for creating new Observables:
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/which-static.md

Operators for working with an existing Observable:
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/which-instance.md

For example, if you want to ensure your Observable dispatches asynchronously, that's just an operator away:

```
var Rx = require("rxjs");
var Observable = Rx.Observable;
var Scheduler = Rx.Scheduler;

Observable.of("a", "b", "c")
.observeOn(Scheduler.async)
.subscribe((val) => console.log(`observable ${val}`));

console.log('hey, this prints first!);

```

15,810 Comments

@Ptaylor,

Awesome stuff, thanks! When you explain schedulers in that way, it makes a lot more sense. I still don't know the technical details (like how to implement / select a scheduler). But, now that I know the basic concept, it should make the documentation more clear.

As far as the async vs. sync, I agree that the logic should all be inside the subscriber. But, I guess my only real concern is that the logic may rely on some sort of service or value that is not initialized at the time of the operator invocation. For example, imagine a trite scenario like this:

```
getValues().filter(
. . . . function( value ) {
. . . . . . . . return( _.indexOf( blackListedValues, value ) !== -1 );
. . . . }
);

var blackListedValues = [ ..... ];
```

In this case, if getValues() happens to return synchronously, the code will not work "as expected" since `blackListedValues` will be `undefined` at the time the .filter() operator fires.

Now, if the getValues() is always synchronous, you can solve this by just rearranging the order of expressions. But, imagine that getValues() *used to be* asynchronous, and this code worked fine. Then, someone updated the getValues() stream to pipe-in cache data, if it were available* and then concat'ed the async values. Then, suddenly, this code would start breaking since the cache data would create a synchronous first value.

Of course, you could poke a lot of holes in this:

* Maybe its just poor design to begin with (having a value defined "after" it is referenced).
* Maybe the use of cache data was put in the wrong place - ie, not the responsibility of the stream.
* Maybe the stream "broke the original contract" of the async stream.

I think it's just something that people have to have in the back of their mind. That while the "logic" maybe contained entirely in the operators, the logic may depend on values that are not yet defined based on the timing.

2 Comments

Interesting. True that promises have easily predictable interface; but in some situations it is actually a limitation.

With Rx approach there is **much more** flexibility, which imho is a good thing. It is not only sync vs async, but as well built-in cancellation mechanism, dealing with multiple values and a bunch of really nice operators.

The ultimate beauty and power of RxJS is when you start designing the whole logic / data-flows around it.

The only benefits of promises over observables I can think of are:
1. they are native objects (observables should follow shortly)
2. the learning effort is lower

15,810 Comments

@Artur,

I don't disagree with what you are saying. In the last week or two I've been really trying to wrap my head around RxJS observable sequences. And, all the stuff that I am seeing is cool. So far, some of my favorites are, some of which you mentioned:

* Being able to cancel the stream.
* Unhandled errors are rethrown (so they don't get lost).
* All the filtering, mapping, and transforming goodness.

That said, I still have soooo much to learn.

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel