Skip to main content
Ben Nadel at the New York ColdFusion User Group (May. 2009) with: Gert Franz and Peter Bell and Mark Drew
Ben Nadel at the New York ColdFusion User Group (May. 2009) with: Gert Franz ( @gert_railo ) Peter Bell ( @peterbell ) Mark Drew ( @markdrew )

Ward Bell: Do Not Expect EventEmitter To Be Observable In Angular 2

By on

Last week, I blogged about how the EventEmitter class, in Angular 2, extends the RxJS Subject class and, therefore, the RxJS Observer and Observable classes as well. With this realization, I then demonstrated that you could use the EventEmitter to coordinate error logging to the server. In the comments of my first post, however, Ward Bell warned that we should not assume that the EventEmitter is a Subject and that this behavior would likely be removed before the final Angular 2 release:

Do NOT count on EventEmitter continuing to be an Observable! Do NOT count on those Observable operators being there in the future! These will be deprecated soon and probably removed before release. Use EventEmitter only for event binding between a child and parent component. Do not subscribe to it. Do not call any of those methods. Only call eve.emit().

In another comment, he recommended using the RxJS Subject class instead. So, as an experiment, I wanted to revisit the idea of error logging, this time using the RxJS Subject class instead of the EventEmitter as the coordinating stream.

Run this demo in my JavaScript Demos project on GitHub.

This is an experiment. I am not necessarily recommending this level of complexity when it comes to error logging. But, I did want to try and build something non-trivial that dealt with errors and RxJS Subjects and observable streams. In this case, rather than having the ExceptionHandler log the errors to the server (as I did in my previous post), the ExceptionHandler implementation is going to expose an RxJS stream of errors. Then, another class - ErrorLogger - is going to subscribe to those errors and log them to the server:

Using the RxJS Subject class to coordinate class communication in Angular 2.

Internally, the ExceptionHandler implementation is going to maintain two different RxJS Subjects: errors and stringifiedErrors. Each of these will emit error events; but, the "stringified" version will emit the errors in String format. Now, we don't want to expose the Subjects directly because that would allow consumers to tinker with the Observer portion of the API. As such, when the ExceptionHandler exposes them, it does so by wrapping them in another RxJS stream which only exposes the Observable interface.

The ErrorLogger class will then require the ExceptionHandler as an injectable, where it will merge the stringifiedErrors stream with its own internal error stream before it logs the errors to the server. This way, the ErrorLogger class can be consumed explicitly by other areas of the application as well as implicitly by way of the ExceptionHandler and uncaught exceptions.

<!doctype html>
<html>
<head>
	<meta charset="utf-8" />

	<title>
		Ward Bell: Do Not Expect EventEmitter To Be Observable In Angular 2
	</title>

	<link rel="stylesheet" type="text/css" href="./demo.css"></link>
</head>
<body>

	<h1>
		Ward Bell: Do Not Expect EventEmitter To Be Observable In Angular 2
	</h1>

	<my-app>
		Loading...
	</my-app>

	<!-- Load demo scripts. -->
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/es6-shim.min.js"></script>
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/Rx.umd.min.js"></script>
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/angular2-polyfills.min.js"></script>
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/angular2-all.umd.js"></script>
	<!-- AlmondJS - minimal implementation of RequireJS. -->
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/almond.js"></script>
	<script type="text/javascript">

		// Defer bootstrapping until all of the components have been declared.
		// --
		// NOTE: Not all components have to be required here since they will be
		// implicitly required by other components.
		requirejs(
			[ /* Using require() for better readability. */ ],
			function run() {

				var App = require( "App" );
				var EmittingExceptionHandler = require( "EmittingExceptionHandler" );
				var ErrorLogger = require( "ErrorLogger" );

				ng.platform.browser.bootstrap(
					App,
					[
						// Here, we are overriding the core implementation of the
						// ExceptionHandler service with our own implementation. In this
						// case, we have to override the core implementation rather than
						// using some sort of class composition because we need AngularJS
						// to pick up our version of the ExceptionHandler service and use
						// it internally within its own Zone.js error handling.
						ng.core.provide(
							ng.core.ExceptionHandler,
							{
								useClass: EmittingExceptionHandler
							}
						),

						// Provide our error logger class token.
						ErrorLogger
					]
				);

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// I provide the root App component.
		define(
			"App",
			function registerApp() {

				var ErrorLogger = require( "ErrorLogger" );

				// Configure the App component definition.
				ng.core
					.Component({
						selector: "my-app",
						template:
						`
							<p>
								<a (click)="triggerError( 1 )">Trigger Error One</a>
								&mdash;
								<a (click)="triggerError( 2 )">Trigger Error Two</a>
								&mdash;
								<a (click)="triggerError( 3 )">Trigger Error Three</a>
							</p>
						`
					})
					.Class({
						constructor: AppController
					})
				;

				AppController.parameters = [
					new ng.core.Inject( ErrorLogger )
				];

				return( AppController );


				// I control the App component.
				function AppController( errorLogger ) {

					var vm = this;

					// Expose the public methods.
					vm.triggerError = triggerError;


					// ---
					// PUBLIC METHODS.
					// ---


					// I trigger the given type of error.
					function triggerError( which ) {

						// In this demo, we are using two different types of errors so
						// that we can see how subsequent errors of the same type are
						// consumed by both the custom ExceptionHandler implementation
						// and the EventLogger, which depends on its own internal RxJS
						// stream configuration.
						if ( which === 1 ) {

							var x = y; // Undefined y reference.

						} else if ( which === 2 ) {

							var foo = bar; // Undefined bar reference.

						} else {

							// For this error, rather than letting the error bubble up,
							// we're going to catch it and log it explicitly to the
							// ErrorLogger instance (bypassing ExceptionHandler).
							try {

								var hello = world; // Undefined world reference.

							} catch ( error ) {

								errorLogger.logError( error );

							}

						}

					}

				}

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// I provide a custom ExceptionHandler that emits errors in addition to logging
		// them to the console.
		define(
			"EmittingExceptionHandler",
			function registerEmittingExceptionHandler() {

				return( EmittingExceptionHandler );


				// I provide a custom implementation of the ExceptionHandler service
				// that exposes an error stream that other services can subscribe to
				// (via the public property).
				function EmittingExceptionHandler() {

					// As the application tells the ExceptionHandler about the errors,
					// we are going to emit both the raw errors and the stringified
					// errors on the public streams so that other services can consume
					// the errors as well.
					var errors = new Rx.Subject();
					var stringifiedErrors = new Rx.Subject();

					// Return the public API.
					// --
					// NOTE: Instead of exposing the error streams directly, I am exposing
					// new streams that are based on the error streams. This will hide the
					// "Observer" portion of the "Subject" class and expose only the
					// "Observable" portion of the API (at least, I think so).
					return({
						call: call,
						errors: Rx.Observable.from( errors ),
						stringifiedErrors: Rx.Observable.from( stringifiedErrors )
					});


					// ---
					// PUBLIC METHODS.
					// ---


					// I handle the given exception data.
					function call( exception, stackTrace, reason ) {

						// While we are overriding the ExceptionHandler service in the
						// context of the local provider chain, we can still leverage
						// the STATIC METHODS of the core ExceptionHandler. And, in this
						// case, the core ExceptionHandler provides a convenience method
						// for converting error data into a normalized string. Since this
						// can be a complicated process, we might as well hand it off to
						// the method that already knows how to get it done right.
						var stringified = ng.core.ExceptionHandler.exceptionToString( exception, stackTrace, reason );

						// If the console is available, log the error to the console.
						if ( window.console ) {

							console.error( stringified );

						}

						// Pass the error versions off to the streams where they can be
						// consumed by the subscribers.
						errors.next( exception );
						stringifiedErrors.next( stringified );

					}

				}

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// I provide an error logger.
		define(
			"ErrorLogger",
			function registerErrorLogger() {

				ErrorLogger.parameters = [
					new ng.core.Inject( ng.core.ExceptionHandler )
				];

				return( ErrorLogger );


				// I provide error logging that will log errors to the server (simulated
				// in this case). Errors can either be explicitly passed in via the
				// .logError() method or implicitly via the ExceptionHandler error event
				// stream.
				function ErrorLogger( exceptionHandler ) {

					// Since we are going to log errors from two different sources - the
					// explicit logError() calls and the implicit ExceptionHandler error
					// stream - we need a way to merge the two streams together so that
					// we can treat them uniformly. This error stream will act as our
					// internal entry point, into which we will merge the ExceptionHandler
					// error stream.
					var errorStream = new Rx.Subject();

					// Merge in the ExceptionHandler stringified error stream and log the
					// resultant errors to the server (simulated).
					// --
					// NOTE: We are using the .distinctUntilChanged() operator in order
					// to prevent repetitive errors from being logged.
					errorStream
						// Since internal method calls will send over Error objects, we
						// want to map those errors to strings before we merged in the
						// stringified errors that come from the ExceptionHandler.
						.map( mapToString )

						// Now, merge both "stringified" streams together and handle
						// the errors in uniformity.
						.merge( exceptionHandler.stringifiedErrors )
						.distinctUntilChanged()
						.flatMap( sendToServer )
						.subscribe(
							function handleValue( value ) {

								// Nothing to do here, but WE DO NEED TO SUBSCRIBE to
								// the stream in order for the event emitter to pass its
								// values downstream to the HTTP flat-mapper.
								console.debug( "Error logging success.", value );

							},
							function handleError( error ) {

								// CAUTION: If we ever get this, our stream has broken
								// and will no longer log errors.
								console.debug( "Error logging error.", error );

							}
						)
					;

					// Return the public API.
					return({
						logError: logError
					});


					// ---
					// PUBLIC METHODS.
					// ---


					// I log the given exception data.
					// --
					// NOTE: The error, in this case, is an Error instance and will be
					// stringified in the error stream.
					function logError( error ) {

						errorStream.next( error );

					}


					// ---
					// PRIVATE METHODS.
					// ---


					// I convert the given error object to a string.
					function mapToString( error ) {

						console.warn( "Mapping to string:", error );

						return( error.toString() );

					}


					// I send the given stringified error to the server and return the
					// normalized response sequence.
					function sendToServer( stringifiedError ) {

						return( Rx.Observable.of( "SIMULATED_HTTP_RESPONSE" ) );

					}

				}

			}
		);

	</script>

</body>
</html>

As you can see, I'm using three different RxJS Subject instances: the two for the error streams exposed by the ExcpetionHandler implementation and the one used internally by the ErrorLogger so that it can then merge the various error sources together into a unified stream. And, when we run this code and trigger some errors, we can see the error values passing from the ExceptionHandler to the ErrorLogger by way of the Subject sequences:

Logging errors to the server using RxJS Subject streams in Angular 2.

Again, I wouldn't necessarily go with this level of complexity when logging errors. Really, I would just keep it all inside the ExceptionHandler service. But, I wanted a context in which to play with the RxJS Subject class. And, I think this worked out quite nicely.

Want to use code from this post? Check out the license.

Reader Comments

18 Comments

Not sure if I should be honored or frightened to see my name attached to your post. ;-)

It's great that you're exploring RxJS. It is the new hotness. It's a big, complex API too. Scares me but I'm learning.

A few thoughts:

I didn't see anything in your example that consumed the `errors` stream; only the `stringifiedErrors` stream. Did I miss it?

By convention, observable variable names are suffixed by `$` so one might see `stringifiedErrors$`. Yup ... another example of Hungarian Notation. Not saying you should do it. I'm doing it (I think).

Hiding the `Observer` side of the `Subject` with `new Observable.from(subject)` seems like a good approach. I wonder what Rx gurus do.

I see that you want to push errors to the server (and console?) whether anyone subscribes to your error streams or not. In Rx terms, you want the stream to be "hot". You can do that with the `publish` operator too (see "Cold v. Hot" in https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md). Not sure it matters.

The problem with your subscribe error path is, as you noted, the "stream has broken and will no longer log errors". You haven't solved that. You can if you wish by using `catch`.

```
.merge( exceptionHandler.stringifiedErrors )
.distinctUntilChanged()
.flatMap( sendToServer )
.catch(handleError)
.subscribe(...)
```
The `handleError` will message the error to console. Because it doesn't do anything (like re-throw the error or return `Rx.Observable.throw(new Error(revised-error))`), the stream stays alive.

That also means you'll get a funky value coming into your subscribe `handleValue`. That's because, the way we've written the catch, you're back on the "happy path". You might want to alter your catch to return a value that `handleValue` can recognize as a "fail".

See https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/errors.md

It never stops, does it? ;-)

15,666 Comments

@Ward,

Honored, of course :P

You are correct about the .errors property - nothing subscribes to it. I included it only because I didn't like the idea of the ExceptionHandler exposing a stream ONLY for strings if it accepts Errors - felt like a mismatch. Was just an emotional decision.

And good point about the publish stuff. You are right, I could have thrown a .publish() in there to automatically pull-through from the top.

Now, the error stuff is a bit confusing for me so far. Because, I actually tried using the .catch() and found that I was not getting the results I expected. After some Googling, I came across a StackOverflow answer that did mention that .catch() won't necessarily prevent *certain parts* of the stream from being unsubscribed - it had to do with where the .catch() was and what it was protecting .... honest, I didn't really understand.

What I was finding was that even with the .catch(), I could get the console-logging to keep working, but the [simulated] HTTP requests stopped firing.

I also realized that even if I DID protect against it, I would have the problem of an "error" coming through to the "value" handler. I'd have to do something like a "flag" the way you suggested. While I don't have the answers, I definitely knew I wouldn't get them in this post regardless; so, I just sort of punted on the issue altogether.

I'll see if I can go back and isolate the issue to be more RxJS specific (without all the Angular 2 scenario), see if I can get something working.

18 Comments

The `catch` is just another operator in the observable chain. It doesn't make the stream "hot". That's why you only got those errors going when you subscribed. Too separate concerns.

The `catch` operator takes the flow off the "sad path" and puts it back on the "happy path". That's why control continues onto the success callback in your `subscribe`. That's why you'll want the catch handler to return a well-known value (object perhaps) that your success callback can distinguish.

I don't know that I'm "right" about `publish`. The `subscribe` is doing the trick and with `publish` I think you also have to activate it by calling `observable.connect();`. That's not the most obvious thing in the world either.

It might matter if you were returning the `errorStream` (aka `error$` ;-)) so that something "downstream" could consume it.

What often surprises people familiar with promises is that `subscribe` returns a `Subscription`, not an `Observable`. I've seen too many people write
```
someStream$.subscribe(...).map(...); // Oops!
```
In their heads they're thinking `subscribe` is like `then`. It is NOT!

Just another of the many ways in which applying our familiarity with promises to observables leads to grief.

15,666 Comments

@Ward,

Word up - translating the mental model from promises to streams has been an interesting journey. I get the subscription stuff (and the subsequent .unsubscribe() you can call on it). But, I still haven't figured out how to "disconnect" a hot stream from its underlying source. Perhaps next on my list of things to explore.

15,666 Comments

@Ward,

I've been thinking more about the .catch() issue. Upon some further contemplation, I realize that the .catch() returns a *stream*. So, in that sense, it is NOT like a Promise chain in that you are using the same chain but transforming a value. If you .catch() an error in one stream, you return a reference to *another stream*, which essentially disconnects you from the original stream that threw the error.

Right now, I'm playing around with returning a reference to the original stream. But, so far, running into problems with some chain configuration. Very interesting, mind bending stuff.

re: onComplete - thanks, I'll try that out!

15,666 Comments

Actually, I think that last statement is a bit unfair - contrasting Promises and Streams. After all, in a Promise chain, you can only ever evaluate the chain *once*. Promises only ever settle on one value, be it through resolution or rejection. So, in that case - in the case where you only ever evaluate the chain / stream *once*, then the .catch() operator in RxJS is very much like the .catch() method in Promise.

The main difference is that in Streams, you can emit more than one value. And, in cases where you have a hot observable, the .catch() operator does NOT work like one might imagine the .catch() Promise method to work in so much as it seems to unsubscribe from the original source and move on to the "catch" source.

15,666 Comments

Ok, so I took a few hours and really experimented with the .catch() operator. I think I get it now - it actually unsubscribes from the source stream and then subscribes to the stream returned *from the CATCH* operator. So, it complete disconnects from the error stream, which is why I was having problems.

Some trial and error and experimentation:

www.bennadel.com/blog/3046-experimenting-with-the-catch-operator-and-stream-continuation-in-rxjs-and-angular-2.htm

Apparently, I can get around all of this by just using the ".retry()" operator, assuming I don't want to actually deal with the error directly. That said, the .retry() operator STILL disconnects from the underlying source ... only that it then reconnects to the source (which in this case is a HOT stream, so it just carries on as if nothing happened).

It's so simple :P

15,666 Comments

@Ward,

Turns out the `.from( subject )` approach was incorrect. While it *looked* like it was working, it actually just passed the Subject through (since it already implemented the Observable interface). It looks like the right was to do this is to call .asObservable() on the Observable portion of the Subject interface:

www.bennadel.com/blog/3048-converting-a-subject-to-an-observable-using-rxjs-in-angular-2.htm

This wraps the Subject in a new Observable instance, protecting the Observer portion of the Subject interface.

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel