Skip to main content
Ben Nadel at CFUNITED 2010 (Landsdown, VA) with: Jude Lehman and Rachel Lehman
Ben Nadel at CFUNITED 2010 (Landsdown, VA) with: Jude Lehman ( @ judewayne ) Rachel Lehman ( @ raelehman )

Searching For The Wrong Type Of Simplicity Can Lead To An Explosion Of Complexity

By on

For the last few weeks, I've been revisiting the idea of state management in an Angular application. It's a topic that I've never felt very confident with; so, I've decided to start building some state management classes as a means to dig in and feel the pain such that I can better understand the existing solutions. That said, I found myself at the end of my rope yesterday, completely bewildered by the nuances of RxJS streams. That is, until I realized that my search for the wrong type of simplicity had lead me to an explosion of complexity. I feel like there's a life-lesson in here somewhere; so, I just wanted to share what I was doing.

It all started when I came across the post, Redux in a single line of code with RxJS, by Rudi Yardley. In that post, Rudi demonstrated that the core of Redux could be implemented as a one-liner RxJS stream:

action$.scan( reducer ).subscribe( renderer )

To be honest, I absolutely suck at RxJS. All but the simplest streams make my head hurt. So, when I see code like this, I am both jealous and inspired by how such simplicity can lead to so much power.

I decided to take Rudi's line of code and wrap it in a TypeScript class so that I could then inject it into other Angular services. But, the more I tried to play with it in a demo application, the harder I seemed to have to fight.

First, I knew that I couldn't trust the reducer method since, as the store-provider, I wasn't going to own it. I didn't want to push the burden of error handling down to the consumer of the Store. So, I had to create a proxy function that would add error handling around the reducer.

Then, I wanted the initial state to be immediately available to any subscribers. But, if I used the "seed" argument of the scan() operator, nothing was happening. So, I had to move the initial state into a startWith() operator, which meant that I was now piping both state and action values into scan() - a heterogeneity which left me feeling gross.

Then, I wanted to make sure that actions would actually get processed even if no one was listening to them. Which means that I had to convert the Cold stream into a Hot stream. And, this is where I felt like I was really losing my marbles (no RxJS pun intended) - nothing I tried seemed to be working. And, the more I Googled, the more frustrated I became.

ASIDE: Finding the right RxJS 6 documentation felt like a Herculean effort! For some reason, when I Google for "rxjs 6 operators", like no actual documentation about operators comes up. Some of the links that looked promising end up leading to 404 Not Found. And, the links that gave me technical documentation had no explanation about what the operators actually did. Finally, I found RxJS Operators by Example by Brian Troncone which was a total life-saver!

And, once I had the Hot stream working, I wanted to make sure that an unsubscribe() invocation didn't stop future actions from being processed, or attempt to restart the stream upon subsequent subscriptions.

When I finally hit rock bottom, this is the monstrosity that I was dealing with. Look primarily at the class constructor and the RxJS stream I was creating:

// Import the core angular services.
import { ConnectableObservable } from "rxjs";
import { distinctUntilChanged } from "rxjs/operators";
import { ErrorHandler } from "@angular/core";
import { Observable } from "rxjs";
import { map } from "rxjs/operators";
import { publishReplay } from "rxjs/operators";
import { scan } from "rxjs/operators";
import { startWith } from "rxjs/operators";
import { Subject } from "rxjs";

// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //

export abstract class AbstractStore<StateType = any, ActionTypes = any> {

	private actionStream: Subject<ActionTypes>;
	private errorHandler: ErrorHandler;
	private state: StateType;
	private stateStream: Observable<StateType>;

	// I initialize the concrete class that extends the Abstract Store.
	constructor( errorHandler: ErrorHandler ) {

		this.errorHandler = errorHandler;

		// I keep track of the current state snapshot.
		this.state = this.getInitialState();

		// I provide an ingress stream for dispatched actions.
		this.actionStream = new Subject();

		// I provide an egress stream for the reduced state. As actions are piped into
		// the actionStream, they will be passed through the reduce() method and then
		// stored into the current state snapshot.
		this.stateStream = this.actionStream.pipe(
			startWith( this.state ),
			scan(
				( currentState: StateType, action: ActionTypes ) : StateType => {

					// Since the .reduce() function is outside our scope of control, we
					// can't trust it. As such, we have to assume that it can throw an
					// error; and, if so, we have to catch that error such that we don't
					// push the error handling responsibility down onto the subscribers.
					try {

						return( this.state = this.reduce( currentState, action ) );

					} catch ( error ) {

						this.errorHandler.handleError( error );

						// Since the reduce did not alter the state, just forward the
						// current state aggregation.
						return( currentState );

					}

				}
				// this.state
			),
			// I ensure that if the state was not changed by the action, we don't emit a
			// new value down to the subscribers.
			distinctUntilChanged(),
			// Right now, the state stream is a COLD stream which means that it will only
			// process actions if someone has subscribed to it. In order to enable
			// actions to be processed prior to a subscription, we have to convert it
			// from a COLD stream to a HOT stream. To do this, we will publish the
			// COLD stream. This will allow it to be multi-casted.
			// --
			// NOTE: the "replay" part of "publishReplay()" will allow all new stream
			// subscribers to immediately get the last emitted value.
			publishReplay( 1 )
		);

		// Now that we've created a mulit-casting stream, we need to connect the HOT
		// stream to the underlying COLD stream such that events will start flowing.
		// --
		// NOTE: The typing for .pipe() does not understand the connected observable. As
		// such, we have to cast it before we call connect.
		// --
		// READ MORE: https://stackoverflow.com/questions/50371887/rxjs-6-get-connectableobservable-from-observable
		( this.stateStream as ConnectableObservable<StateType> ).connect();

	}

	// ---
	// PUBLIC METHODS.
	// ---

	// I dispatch the given action to the store reducers.
	public dispatch( action: ActionTypes ) : void {

		this.actionStream.next( action );

	}


	// I get the current state snapshot.
	public getState() : StateType {

		return( this.state );

	}


	// I get the current state as a stream (will always emit the current state value as
	// the first item in the stream).
	public getStateAsStream(): Observable<StateType> {

		return( this.stateStream );

	}


	// I return the given top-level state key as a stream (will always emit the current
	// key value as the first item in the stream).
	public select<K extends keyof StateType>( key: K ) : Observable<StateType[K]> {

		var selectStream = this.stateStream.pipe(
			map(
				( state: StateType ) => {

					return( state[ key ] );

				}
			),
			distinctUntilChanged()
		);

		return( selectStream );

	}

	// ---
	// PRIVATE METHODS.
	// ---

	// ABSTRACT METHOD: I return the initial value of the state snapshot.
	protected getInitialState() : StateType {

		throw( new Error( "The abstract method [getInitialState] must be implemented." ) );

	}


	// ABSTRACT METHOD: I return the new state value based on the given action.
	protected reduce( state: StateType, action: ActionTypes ) : StateType {

		throw( new Error( "The abstract method [reduce] must be implemented." ) );

	}

}

Excuse me for a moment ....

I just threw up in my mouth a little, animated GIF.

To be 100% clear, I am in no way bashing RxJS here. I suck at RxJS. Like I said above, streams make my head hurt. I'm just an unfrozen caveman lawyer, and I don't understand how people can reason about these kinds of abstractions.

What I am trying to illustrate here is how my search for "simplicity" became a slippery slope when paired with my level-of-expertise (or lack thereof). When wielded by inexperienced hands, simple abstractions can lead to absurd complexity like what we see above.

Luckily, while walking the dog, I remembered that a BehaviorSubject() has a .getValue() method. And this gave me a moment of delightful clarity:

I realized that all I actually needed for any of this was a BehaviorSubject(). And that I was seeking the wrong type of simplicity. So I sat down and started refactoring until things made sense again:

// Import the core angular services.
import { BehaviorSubject } from "rxjs";
import { distinctUntilChanged } from "rxjs/operators";
import { ErrorHandler } from "@angular/core";
import { Observable } from "rxjs";
import { map } from "rxjs/operators";

// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //

export abstract class AbstractStore<StateType = any, ActionTypes = any> {

	private errorHandler: ErrorHandler;
	private stateSubject: BehaviorSubject<StateType>;

	// I initialize the concrete class that extends the Abstract Store.
	constructor( errorHandler: ErrorHandler ) {

		this.errorHandler = errorHandler;

		// I provide an ingress and egress stream for the current state. When a new
		// action is dispatched into the store, the results of the reduction will be
		// emitted into this stream.
		this.stateSubject = new BehaviorSubject( this.getInitialState() );

	}

	// ---
	// PUBLIC METHODS.
	// ---

	// I dispatch the given action to the store reducers.
	public dispatch( action: ActionTypes ) : void {

		// Since the .reduce() function is outside our scope of control, we can't trust
		// it. As such, we have to assume that it can throw an error; and, if so, we have
		// to catch that error such that we don't push the error handling responsibility
		// down onto the calling context.
		try {

			this.stateSubject.next(
				this.reduce( this.getStateSnapshot(), action )
			);

		} catch ( error ) {

			this.errorHandler.handleError( error );

		}

	}


	// I get the current state as a stream (will always emit the current state value as
	// the first item in the stream).
	public getState(): Observable<StateType> {

		return( this.stateSubject.pipe( distinctUntilChanged() ) );

	}


	// I get the current state snapshot.
	public getStateSnapshot() : StateType {

		return( this.stateSubject.getValue() );

	}


	// I return the given top-level state key as a stream (will always emit the current
	// key value as the first item in the stream).
	public select<K extends keyof StateType>( key: K ) : Observable<StateType[K]> {

		var selectStream = this.stateSubject.pipe(
			map(
				( state: StateType ) => {

					return( state[ key ] );

				}
			),
			distinctUntilChanged()
		);

		return( selectStream );

	}

	// ---
	// PRIVATE METHODS.
	// ---

	// ABSTRACT METHOD: I return the initial value of the state snapshot.
	protected getInitialState() : StateType {

		throw( new Error( "The abstract method [getInitialState] must be implemented." ) );

	}


	// ABSTRACT METHOD: I return the new state value based on the given action.
	protected reduce( state: StateType, action: ActionTypes ) : StateType {

		throw( new Error( "The abstract method [reduce] must be implemented." ) );

	}

}

This code does the same thing, but is much simpler and far more easy to reason about. It is the right kind of simple because it is the right separation of concerns. In my first approach, I was trying to make the stream do "all the things." Which had no intrinsic value. In the my latter approach, I'm handling the action processing outside of the stream, which is the right place to do it and leads to cleaner error handling and completely side-steps the Cold vs. Hot stream concepts.

For me, the core lesson here is to do the right things for the right reasons. When I first embarked on this journey, my goal wasn't to create a Store, it was to "create a Store using an RxJS stream." Such an approach was merely a solution in search of a problem. And, it lead me down a path of unnecessary and deleterious complexity. It wasn't until I stepped back and questioned my motivations that I was able to see the simple solution and the clear separation of responsibilities.

Want to use code from this post? Check out the license.

Reader Comments

15,674 Comments

@All,

Not that it's specifically related to this post, but the BehaviorSubject() keeps coming back into my life as a solution for stream-based behaviors. This morning, I took a look at using the BehaviorSubject() to implement a React-inspired simple state store that exposes a .setState() method:

www.bennadel.com/blog/3522-creating-a-simple-setstate-store-using-an-rxjs-behaviorsubject-in-angular-6-1-10.htm

My proxy class is just a thin layer that adds the lightest bit of logic before deferring to the underlying RxJS stream.

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel