Last week, I blogged about how the EventEmitter class, in Angular 2, extends the RxJS Subject class and, therefore, the RxJS Observer and Observable classes as well. With this realization, I then demonstrated that you could use the EventEmitter to coordinate error logging to the server. In the comments of my first post, however, Ward Bell warned that we should not assume that the EventEmitter is a Subject and that this behavior would likely be removed before the final Angular 2 release:
Do NOT count on EventEmitter continuing to be an Observable! Do NOT count on those Observable operators being there in the future! These will be deprecated soon and probably removed before release. Use EventEmitter only for event binding between a child and parent component. Do not subscribe to it. Do not call any of those methods. Only call eve.emit().
In another comment, he recommended using the RxJS Subject class instead. So, as an experiment, I wanted to revisit the idea of error logging, this time using the RxJS Subject class instead of the EventEmitter as the coordinating stream.
This is an experiment. I am not necessarily recommending this level of complexity when it comes to error logging. But, I did want to try and build something non-trivial that dealt with errors and RxJS Subjects and observable streams. In this case, rather than having the ExceptionHandler log the errors to the server (as I did in my previous post), the ExceptionHandler implementation is going to expose an RxJS stream of errors. Then, another class - ErrorLogger - is going to subscribe to those errors and log them to the server:
Internally, the ExceptionHandler implementation is going to maintain two different RxJS Subjects: errors and stringifiedErrors. Each of these will emit error events; but, the "stringified" version will emit the errors in String format. Now, we don't want to expose the Subjects directly because that would allow consumers to tinker with the Observer portion of the API. As such, when the ExceptionHandler exposes them, it does so by wrapping them in another RxJS stream which only exposes the Observable interface.
The ErrorLogger class will then require the ExceptionHandler as an injectable, where it will merge the stringifiedErrors stream with its own internal error stream before it logs the errors to the server. This way, the ErrorLogger class can be consumed explicitly by other areas of the application as well as implicitly by way of the ExceptionHandler and uncaught exceptions.
As you can see, I'm using three different RxJS Subject instances: the two for the error streams exposed by the ExcpetionHandler implementation and the one used internally by the ErrorLogger so that it can then merge the various error sources together into a unified stream. And, when we run this code and trigger some errors, we can see the error values passing from the ExceptionHandler to the ErrorLogger by way of the Subject sequences:
Again, I wouldn't necessarily go with this level of complexity when logging errors. Really, I would just keep it all inside the ExceptionHandler service. But, I wanted a context in which to play with the RxJS Subject class. And, I think this worked out quite nicely.
Want to use code from this post? Check out the license.
Not sure if I should be honored or frightened to see my name attached to your post. ;-)
It's great that you're exploring RxJS. It is the new hotness. It's a big, complex API too. Scares me but I'm learning.
A few thoughts:
I didn't see anything in your example that consumed the `errors` stream; only the `stringifiedErrors` stream. Did I miss it?
By convention, observable variable names are suffixed by `$` so one might see `stringifiedErrors$`. Yup ... another example of Hungarian Notation. Not saying you should do it. I'm doing it (I think).
Hiding the `Observer` side of the `Subject` with `new Observable.from(subject)` seems like a good approach. I wonder what Rx gurus do.
I see that you want to push errors to the server (and console?) whether anyone subscribes to your error streams or not. In Rx terms, you want the stream to be "hot". You can do that with the `publish` operator too (see "Cold v. Hot" in https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md). Not sure it matters.
The problem with your subscribe error path is, as you noted, the "stream has broken and will no longer log errors". You haven't solved that. You can if you wish by using `catch`.
.merge( exceptionHandler.stringifiedErrors )
.flatMap( sendToServer )
The `handleError` will message the error to console. Because it doesn't do anything (like re-throw the error or return `Rx.Observable.throw(new Error(revised-error))`), the stream stays alive.
That also means you'll get a funky value coming into your subscribe `handleValue`. That's because, the way we've written the catch, you're back on the "happy path". You might want to alter your catch to return a value that `handleValue` can recognize as a "fail".
It never stops, does it? ;-)
Honored, of course :P
You are correct about the .errors property - nothing subscribes to it. I included it only because I didn't like the idea of the ExceptionHandler exposing a stream ONLY for strings if it accepts Errors - felt like a mismatch. Was just an emotional decision.
And good point about the publish stuff. You are right, I could have thrown a .publish() in there to automatically pull-through from the top.
Now, the error stuff is a bit confusing for me so far. Because, I actually tried using the .catch() and found that I was not getting the results I expected. After some Googling, I came across a StackOverflow answer that did mention that .catch() won't necessarily prevent *certain parts* of the stream from being unsubscribed - it had to do with where the .catch() was and what it was protecting .... honest, I didn't really understand.
What I was finding was that even with the .catch(), I could get the console-logging to keep working, but the [simulated] HTTP requests stopped firing.
I also realized that even if I DID protect against it, I would have the problem of an "error" coming through to the "value" handler. I'd have to do something like a "flag" the way you suggested. While I don't have the answers, I definitely knew I wouldn't get them in this post regardless; so, I just sort of punted on the issue altogether.
I'll see if I can go back and isolate the issue to be more RxJS specific (without all the Angular 2 scenario), see if I can get something working.
The `catch` is just another operator in the observable chain. It doesn't make the stream "hot". That's why you only got those errors going when you subscribed. Too separate concerns.
The `catch` operator takes the flow off the "sad path" and puts it back on the "happy path". That's why control continues onto the success callback in your `subscribe`. That's why you'll want the catch handler to return a well-known value (object perhaps) that your success callback can distinguish.
I don't know that I'm "right" about `publish`. The `subscribe` is doing the trick and with `publish` I think you also have to activate it by calling `observable.connect();`. That's not the most obvious thing in the world either.
It might matter if you were returning the `errorStream` (aka `error$` ;-)) so that something "downstream" could consume it.
What often surprises people familiar with promises is that `subscribe` returns a `Subscription`, not an `Observable`. I've seen too many people write
someStream$.subscribe(...).map(...); // Oops!
In their heads they're thinking `subscribe` is like `then`. It is NOT!
Just another of the many ways in which applying our familiarity with promises to observables leads to grief.
Word up - translating the mental model from promises to streams has been an interesting journey. I get the subscription stuff (and the subsequent .unsubscribe() you can call on it). But, I still haven't figured out how to "disconnect" a hot stream from its underlying source. Perhaps next on my list of things to explore.
I think you want `subject.onCompleted();`
I've been thinking more about the .catch() issue. Upon some further contemplation, I realize that the .catch() returns a *stream*. So, in that sense, it is NOT like a Promise chain in that you are using the same chain but transforming a value. If you .catch() an error in one stream, you return a reference to *another stream*, which essentially disconnects you from the original stream that threw the error.
Right now, I'm playing around with returning a reference to the original stream. But, so far, running into problems with some chain configuration. Very interesting, mind bending stuff.
re: onComplete - thanks, I'll try that out!
Actually, I think that last statement is a bit unfair - contrasting Promises and Streams. After all, in a Promise chain, you can only ever evaluate the chain *once*. Promises only ever settle on one value, be it through resolution or rejection. So, in that case - in the case where you only ever evaluate the chain / stream *once*, then the .catch() operator in RxJS is very much like the .catch() method in Promise.
The main difference is that in Streams, you can emit more than one value. And, in cases where you have a hot observable, the .catch() operator does NOT work like one might imagine the .catch() Promise method to work in so much as it seems to unsubscribe from the original source and move on to the "catch" source.
Ok, so I took a few hours and really experimented with the .catch() operator. I think I get it now - it actually unsubscribes from the source stream and then subscribes to the stream returned *from the CATCH* operator. So, it complete disconnects from the error stream, which is why I was having problems.
Some trial and error and experimentation:
Apparently, I can get around all of this by just using the ".retry()" operator, assuming I don't want to actually deal with the error directly. That said, the .retry() operator STILL disconnects from the underlying source ... only that it then reconnects to the source (which in this case is a HOT stream, so it just carries on as if nothing happened).
It's so simple :P
Turns out the `.from( subject )` approach was incorrect. While it *looked* like it was working, it actually just passed the Subject through (since it already implemented the Observable interface). It looks like the right was to do this is to call .asObservable() on the Observable portion of the Subject interface:
This wraps the Subject in a new Observable instance, protecting the Observer portion of the Subject interface.