Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.
Ben Nadel at cf.Objective() 2010 (Minneapolis, MN) with: Simon Free and Dan Wilson
Ben Nadel at cf.Objective() 2010 (Minneapolis, MN) with: Simon Free@simonfree ) and Dan Wilson@DanWilson )

Encapsulating Firebase Using RxJS Streams In Angular 4.2.3

By Ben Nadel on

Firebase is a "realtime" database. Which means that, unlike many other databases, it can "push" data down to a client using WebSockets. This allows for some very interesting, high-performance use-cases like magnetic poetry. But, it also means that the API for communicating with Firebase is quite different than the API provided by more traditional request-response persistence mechanisms. Unfortunately, for my applications, this often means that the implementation details of Firebase leak out of the data access layer (DAL) and into other parts of the application. This has never sat right with me (I would love to minimize leaky abstractions); so, today, I'm trying to use RxJS Streams to push the Firebase implementation details back down into the DAL.


 
 
 

 
 
 
 
 

Run this demo in my JavaScript Demos project on GitHub.

This is not a new concern for me. This issue of leaking Firebase implementation details has bothered me ever since I first looked at Firebase and AngularJS three years ago. But, I had no idea how to deal with it - Firebase's API was so different than anything I was used to, I didn't have any mental models for reconciling it with my architectural unease.

The other day, however, I had a revelation about database abstractions in my application development. Or rather, I had an "inversion." Rather than thinking about how my database abstraction layer could fit over multiple database implementations, I reversed the onus; I started to think about using the abstraction to drive the implementation. Meaning, I started to think about the database abstraction layer as the API definition, pushing the responsibility of conformity down to the actual database implementations.

This change in perspective on database abstractions helped me realize that I shouldn't focus on Firebase as the "problem." Instead, I should focus on designing an abstraction layer API that has some "realtime" capabilities; and only then, worry about getting Firebase - or any other database implementation - to conform to said realtime API.

To experiment with this inversion of responsibilities, I created a simple Angular 4 application in which you can create and delete a list of messages. In addition to the CRUD (Create, Read, Update, Delete) methods exposed by the database abstraction, there is also a single "stream" method for realtime updates of the messages collection. I've defined this API through an abstract Gateway class:

  • // Import the core angular services.
  • import { Observable } from "rxjs/Observable";
  •  
  • export interface Message {
  • id?: string;
  • text: string;
  • createdAt: Date;
  • }
  •  
  • // All implementations of the MessageGateway must extend this class.
  • // --
  • // NOTE: By making this a Class instead of an Interface, we can also use it as the
  • // dependency-injection token since it represents a "Type".
  • export abstract class MessageGateway {
  • abstract createMessage( message: Message ) : Promise<string>;
  • abstract deleteMessage( id: string ) : Promise<void>;
  • abstract readMessages() : Promise<Message[]>;
  • abstract readMessagesAsStream() : Observable<Message[]>;
  • }

As you can see, the MessageGateway abstract class (and Angular dependency-injection token) defines a readMessagesAsStream() method which returns an RxJS Observable. This RxJS observable stream will emit Message[] collection events to subscribers as the collection changes in realtime.

Once we have this realtime database API contract, we can then create a consumer - the App component - that doesn't need to know anything about Firebase. In fact, we don't even have to be using Firebase at all; we can use any data persistence implementation that conforms to the abstract class's realtime API:

  • // Import the core angular services.
  • import { Component } from "@angular/core";
  • import { OnInit } from "@angular/core";
  •  
  • // Import the application components and services.
  • import { Message } from "./gateways/message.gateway";
  • import { MessageGateway } from "./gateways/message.gateway";
  •  
  • @Component({
  • selector: "my-app",
  • styleUrls: [ "./app.component.css" ],
  • template:
  • `
  • <h2>
  • You Have {{ messages.length }} Message(s)!
  • </h2>
  •  
  • <ul *ngIf="messages.length">
  • <li
  • *ngFor="let message of messages"
  • title="Created at {{ message.createdAt.toTimeString() }}">
  •  
  • {{ message.text }} &mdash;
  • <a (click)="deleteMessage( message )">Delete</a>
  •  
  • </li>
  • </ul>
  •  
  • <form (submit)="addMessage()">
  • <input type="text" name="message" [(ngModel)]="form.message" autofocus />
  • <input type="submit" value="Add Message" />
  • </form>
  •  
  • <hr />
  •  
  • <h2>
  • Choose A Gateway Implementation
  • </h2>
  •  
  • <ul>
  • <li><a href="./index.htm?default">In-Memory Gateway</a></li>
  • <li><a href="./index.htm?localstorage">LocalStorage Gateway</a></li>
  • <li><a href="./index.htm?firebase">Firebase Gateway</a></li>
  • </ul>
  •  
  • <p>
  • Note that all of these implementations provide a "stream" I/O method.
  • </p>
  • `
  • })
  • export class AppComponent implements OnInit {
  •  
  • public form: {
  • message: string;
  • };
  • public messages: Message[];
  •  
  • // NOTE: Even though the "type" here is the MessageGateway, the actual implementation
  • // may be anything that extends the "MessageGateway" base class. We're simply using
  • // the MessageGateway as the dependency-injection token for our gateway choice.
  • private messageGateway: MessageGateway;
  •  
  •  
  • // I initialize the app component.
  • constructor( messageGateway: MessageGateway ) {
  •  
  • this.messageGateway = messageGateway;
  •  
  • this.form = {
  • message: ""
  • };
  • this.messages = [];
  •  
  • }
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  • // I add a new message to the messages collection.
  • public addMessage() : void {
  •  
  • // If no message content has been entered, just ignore request.
  • if ( ! this.form.message ) {
  •  
  • return;
  •  
  • }
  •  
  • this.messageGateway.createMessage({
  • text: this.form.message,
  • createdAt: new Date()
  • });
  • this.form.message = "";
  •  
  • // NOTE: We're NOT applying the change OPTIMISTICALLY to the local model because
  • // we're reading the messages collection as a STREAM. As such, we should always
  • // get very fast local changes (from the STREAM) that we can apply in a uniform
  • // manor across our CRUD (Create, Read, Update, Delete) operations.
  •  
  • }
  •  
  •  
  • // I delete the given message from the messages collection.
  • public deleteMessage( message: Message ) : void {
  •  
  • this.messageGateway.deleteMessage( message.id );
  •  
  • // NOTE: We're NOT applying the change OPTIMISTICALLY to the local model because
  • // we're reading the messages collection as a STREAM. As such, we should always
  • // get very fast local changes (from the STREAM) that we can apply in a uniform
  • // manor across our CRUD (Create, Read, Update, Delete) operations.
  •  
  • }
  •  
  •  
  • // I get called once, right after component initialization.
  • public ngOnInit() : void {
  •  
  • // The message gateway provides stream-based access to the messages collection.
  • // This will emit a new messages collection whenever the messages collection
  • // has been altered in any way (initialized, updated, deleted, etc.).
  • var subscription = this.messageGateway
  • .readMessagesAsStream()
  • .subscribe(
  • ( messages: Message[] ) : void => {
  •  
  • this.messages = messages;
  •  
  • },
  • ( error: any ) : void => {
  •  
  • console.warn( "Read Message As Stream Error" );
  • console.error( error );
  •  
  • }
  • )
  • ;
  •  
  • }
  •  
  • }

As you can see, there's not much going on this App component. We have a create method and a delete method. And, we have the MessageGateway RxJS stream that we subscribe to. We use this RxJS stream as both the initial source of data and as the ongoing source of message collection events.

If we run this application, we can see how it works:


 
 
 

 
 Encapsulating Firebaes using RxJS streams. 
 
 
 

Notice that there is no concept of Firebase leaking into my controller / component tree. I'm only dealing with features exposed by the abstraction layer: Message domain models and RxJS streams. Of course, under the hood, we are using Firebase. Well, at least some of the time. If you look at the bottom of the App, you'll see that the user can actually choose from three different data persistence implementations:

  • In-Memory
  • LocalStorage
  • Firebase

While the App component only cares about the "MessageGateway", we can use the browser's Location to define alternate configurations for the dependency-injection container. Meaning, in our application's NgModule, we can provide different classes - Types - for the "MessageGateway" dependency-injection token:

  • // Import the core angular services.
  • import { BrowserModule } from "@angular/platform-browser";
  • import { FormsModule } from "@angular/forms";
  • import { NgModule } from "@angular/core";
  • import { Provider } from "@angular/core";
  •  
  • // Import the application components and services.
  • import { AppComponent } from "./app.component";
  • import { FirebaseMessageGateway } from "./gateways/firebase-message.gateway";
  • import { InMemoryMessageGateway } from "./gateways/in-memory-message.gateway";
  • import { MessageGateway } from "./gateways/message.gateway";
  • import { LocalStorageMessageGateway } from "./gateways/local-storage-message.gateway";
  •  
  • @NgModule({
  • bootstrap: [
  • AppComponent
  • ],
  • imports: [
  • BrowserModule,
  • FormsModule
  • ],
  • declarations: [
  • AppComponent
  • ],
  • providers: [
  • getGatewayImplementation() // Pick one of 3 implementations.
  • ]
  • })
  • export class AppModule {
  • // ...
  • }
  •  
  • // ----------------------------------------------------------------------------------- //
  • // ----------------------------------------------------------------------------------- //
  •  
  • // I return the MessageGateway provider based on the LOCATION state.
  • // --
  • // NOTE: I had originally set this up to return just the useClass value. However, when
  • // I did that, TypeScript kept complaining that I was missing a useFactory property in
  • // my Provider. As such, I just moved the entire Provider configuration here.
  • function getGatewayImplementation() : Provider {
  •  
  • switch ( window.location.search ) {
  • case "?localstorage":
  • return({
  • provide: MessageGateway,
  • useClass: LocalStorageMessageGateway
  • });
  • break;
  • case "?firebase":
  • return({
  • provide: MessageGateway,
  • useClass: FirebaseMessageGateway
  • });
  • break;
  • default:
  • return({
  • provide: MessageGateway,
  • useClass: InMemoryMessageGateway
  • });
  • break;
  • }
  •  
  • // NOTE: I need to use {"allowUnreachableCode": true} tsconfig setting so that the
  • // compiler doesn't complain about unreachable code here.
  •  
  • }

Depending on what's in the URL at Angular bootstrap time, the App component will receive one of the following instances when it asks for the "MessageGateway":

  • InMemoryMessageGateway
  • LocalStorageMessageGateway
  • FirebaseMessageGateway

Each of these classes extends the abstract MessageGateway class which means that each has to conform to the realtime API required by the abstract class. First, let's look at the InMemoryMessageGateway class which stores all messages in the application memory space:

CAUTION: I am a relative novice when it comes to RxJS; so, view my particular use of RxJS streams as exploratory and not necessarily as "best practice".

  • // Import the core angular services.
  • import { BehaviorSubject } from "rxjs/BehaviorSubject";
  • import { Observable } from "rxjs/Observable";
  •  
  • // Import the application components and services.
  • import { Message } from "./message.gateway";
  • import { MessageGateway } from "./message.gateway";
  •  
  • interface MessageDTO {
  • id: string;
  • text: string;
  • createdAt: number;
  • }
  •  
  • export class InMemoryMessageGateway extends MessageGateway {
  •  
  • private messages: MessageDTO[];
  • private subject: BehaviorSubject<Message[]>;
  • private uid: number;
  •  
  •  
  • // I initialize the in-memory message gateway implementation.
  • constructor() {
  •  
  • super();
  •  
  • this.messages = [];
  • // In this implementation, we're going to use a BehaviorSubject rather than
  • // a normal Subject so that we can replay the last value when the user first
  • // subscribes to the observable stream. This way, the subscription itself will
  • // act as the first read / load of data.
  • this.subject = new BehaviorSubject( [] );
  • this.uid = 0;
  •  
  • }
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  • // I add a new message to the collection. Returns a Promise with the new ID.
  • public createMessage( message: Message ) : Promise<string> {
  •  
  • var dto = {
  • id: this.newID(),
  • text: message.text,
  • createdAt: message.createdAt.getTime()
  • };
  •  
  • this.messages.push( dto );
  •  
  • this.emitMessages();
  •  
  • return( Promise.resolve( dto.id ) );
  •  
  • }
  •  
  •  
  • // I delete the message with the given ID. Returns a Promise.
  • public deleteMessage( id: string ) : Promise<void> {
  •  
  • this.messages = this.messages.filter(
  • ( message: MessageDTO ) : boolean => {
  •  
  • return( message.id !== id );
  •  
  • }
  • );
  •  
  • this.emitMessages();
  •  
  • return( Promise.resolve() );
  •  
  • }
  •  
  •  
  • // I read the messages collection. Returns a Promise.
  • public readMessages() : Promise<Message[]> {
  •  
  • var messages = this.messages.map(
  • ( message: MessageDTO ) : Message => {
  •  
  • return({
  • id: message.id,
  • text: message.text,
  • createdAt: new Date( message.createdAt )
  • });
  •  
  • }
  • );
  •  
  • return( Promise.resolve( messages ) );
  •  
  • }
  •  
  •  
  • // I read the messages collection as a stream. Returns an Observable stream.
  • public readMessagesAsStream() : Observable<Message[]> {
  •  
  • // Push messages into the BehaviorSubject. This will ensure that the Subject has
  • // been primed with data by the time the calling context goes to subscribe to the
  • // stream. Which will, in turn, ensure that the messages collection is pushed to
  • // the observer upon subscription.
  • this.emitMessages();
  •  
  • return( this.subject.asObservable() );
  •  
  • }
  •  
  • // ---
  • // PRIVATE METHODS.
  • // ---
  •  
  • // I read and emit the latest messages collection on the read-stream.
  • private emitMessages() : void {
  •  
  • this.readMessages().then(
  • ( messages: Message[] ) : void => {
  •  
  • this.subject.next( messages );
  •  
  • }
  • );
  •  
  • }
  •  
  •  
  • // I generate a new ID for a new Message object.
  • private newID() : string {
  •  
  • var id = ++this.uid;
  •  
  • return( "id-" + id );
  •  
  • }
  •  
  • }

Internally, this class creates and holds a single BehaviorSubject. The BehaviorSubject, like Subject, allows me to push values into a stream that is subscribed-to by multiple clients. With the BehaviorSubject, my CRUD (Create, Read, Update, Delete) methods can all emit Message[] collection events that will then be broadcast to anyone who calls readMessagesAsStream() and subscribes to the resultant Observable.

This implementation allows my CRUD methods to push "realtime" updates even though all activity is taking place inside a single application. If we move from an in-memory collection to a localStorage collection, however, we can use storage events to push realtime updates across multiple applications all running in the same browser:

  • // Import the core angular services.
  • import { BehaviorSubject } from "rxjs/BehaviorSubject";
  • import { Observable } from "rxjs/Observable";
  •  
  • // Import the application components and services.
  • import { Message } from "./message.gateway";
  • import { MessageGateway } from "./message.gateway";
  •  
  • interface MessageDTO {
  • id: string;
  • text: string;
  • createdAt: number;
  • }
  •  
  • export class LocalStorageMessageGateway extends MessageGateway {
  •  
  • private storageKey: string;
  • private subject: BehaviorSubject<Message[]>;
  •  
  •  
  • // I initialize the localStorage message gateway implementation.
  • constructor() {
  •  
  • super();
  •  
  • this.storageKey = "ng4-demo-firebase-encapsulation";
  • // In this implementation, we're going to use a BehaviorSubject rather than
  • // a normal Subject so that we can replay the last value when the user first
  • // subscribes to the observable stream. This way, the subscription itself will
  • // act as the first read / load of data.
  • this.subject = new BehaviorSubject( [] );
  •  
  • // When the localStorage object is updated from ANOTHER WINDOW, pertaining to
  • // this origin, a "storage" event is triggered. This event, however, is NOT
  • // TRIGGERED if the current window updates the localStorage object. As such,
  • // we can use this event to update our in-memory cache of the localStorage
  • // messages content.
  • window.addEventListener(
  • "storage",
  • ( event: StorageEvent ) : void => {
  •  
  • // Since this event fires for all localStorage events, we want to ignore
  • // any event that may be triggered by a different application. Make sure
  • // it pertains to our localStorage key.
  • if ( event.key === this.storageKey ) {
  •  
  • this.emitMessages();
  •  
  • }
  •  
  • }
  • );
  •  
  • }
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  • // I add a new message to the collection. Returns a Promise with the new ID.
  • public createMessage( message: Message ) : Promise<string> {
  •  
  • var promise = new Promise<string>(
  • ( resolve, reject ) : void => {
  •  
  • var dto = {
  • id: this.newID(),
  • text: message.text,
  • createdAt: message.createdAt.getTime()
  • };
  •  
  • var messages = this.getMessagesFromLocalStorage();
  • messages.push( dto );
  • this.setMessagesToLocalStorage( messages );
  •  
  • this.emitMessages();
  •  
  • resolve( dto.id );
  •  
  • }
  • );
  •  
  • return( promise );
  •  
  • }
  •  
  •  
  • // I delete the message with the given ID. Returns a Promise.
  • public deleteMessage( id: string ) : Promise<void> {
  •  
  • var promise = new Promise<void>(
  • ( resolve, reject ) : void => {
  •  
  • var messages = this.getMessagesFromLocalStorage().filter(
  • ( message: MessageDTO ) : boolean => {
  •  
  • return( message.id !== id );
  •  
  • }
  • );
  • this.setMessagesToLocalStorage( messages );
  •  
  • this.emitMessages();
  •  
  • resolve();
  •  
  • }
  • );
  •  
  • return( promise );
  •  
  • }
  •  
  •  
  • // I read the messages collection. Returns a Promise.
  • public readMessages() : Promise<Message[]> {
  •  
  • var promise = new Promise<Message[]>(
  • ( resolve, reject ) : void => {
  •  
  • var messages = this.getMessagesFromLocalStorage().map(
  • ( message: MessageDTO ) : Message => {
  •  
  • return({
  • id: message.id,
  • text: message.text,
  • createdAt: new Date( message.createdAt )
  • });
  •  
  • }
  • );
  •  
  • resolve( messages );
  •  
  • }
  • );
  •  
  • return( promise );
  •  
  • }
  •  
  •  
  • // I read the messages collection as a stream. Returns an Observable stream.
  • public readMessagesAsStream() : Observable<Message[]> {
  •  
  • // Push messages into the BehaviorSubject. This will ensure that the Subject has
  • // been primed with data by the time the calling context goes to subscribe to the
  • // stream. Which will, in turn, ensure that the messages collection is pushed to
  • // the observer upon subscription.
  • this.emitMessages();
  •  
  • return( this.subject.asObservable() );
  •  
  • }
  •  
  • // ---
  • // PRIVATE METHODS.
  • // ---
  •  
  • // I read and emit the latest messages collection on the read-stream.
  • private emitMessages() : void {
  •  
  • this.readMessages().then(
  • ( messages: Message[] ) : void => {
  •  
  • this.subject.next( messages );
  •  
  • }
  • // CAUTION: To keep this demo simply, I'm not caring about any errors that
  • // could be thrown from this Promise.
  • );
  •  
  • }
  •  
  •  
  • // I read the messages out of localStorage.
  • private getMessagesFromLocalStorage() : MessageDTO[] {
  •  
  • var data = localStorage.getItem( this.storageKey );
  •  
  • if ( ! data ) {
  •  
  • return( [] );
  •  
  • }
  •  
  • return( JSON.parse( data ) );
  •  
  • }
  •  
  •  
  • // I generate a new ID for a new Message object.
  • private newID() : string {
  •  
  • // Since the localStorage is shared by all tabs on the same domain, we have to
  • // be more careful about how we generate the ID. Let's use a bunch of random
  • // character to help prevent conflicts.
  • var validChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
  • var chars = [];
  •  
  • for ( var i = 0 ; i < 30 ; i++ ) {
  •  
  • chars.push(
  • validChars.charAt(
  • ( Math.floor( Math.random() * Date.now() ) % validChars.length )
  • )
  • );
  •  
  • }
  •  
  • return( "m" + chars.join( "" ) );
  •  
  • }
  •  
  •  
  • // I store the given messages in localStorage.
  • private setMessagesToLocalStorage( messages: MessageDTO[] ) : void {
  •  
  • localStorage.setItem( this.storageKey, JSON.stringify( messages ) );
  •  
  • }
  •  
  • }

NOTE: This implementation uses the Promise constructor inside its CRUD methods in case localStorage is out of memory or doesn't exist in the current runtime.

The localStorage implementation is very similar to the in-memory implementation; it conforms to the MessageGateway abstract class and exposes all the same CRUD methods as well as the realtime readMessagesAsStream() method. The major difference is that the CRUD methods aren't the only source of realtime events. When the LocalStorageMessageGateway class is initialized, it adds an event listener for the window's "storage" event. This storage event indicates that another tab (in the same browser) changed the localStorage state. This gateway implementation uses that event to trigger a new event on the realtime stream. This allows multiple browser tabs to stay synchronized.

What's becoming obvious to me is that if we start to think of the database abstraction as driving the data layer's API, the architecture becomes a lot more flexible. We've already looked at two "realtime" implementations and we haven't even gotten to the Firebase implementation yet. So, not only does using RxJS for the realtime events hide the Firebase details, it also allows us to use non-Firebase implementations.

That said, let's look at the final, Firebase implementation:

  • // Import the core angular services.
  • import firebase = require( "firebase/app" );
  • import { Observable } from "rxjs/Observable";
  • import { Observer } from "rxjs/Observer";
  •  
  • // Import these libraries for their side-effects.
  • import "firebase/database";
  •  
  • // Import the application components and services.
  • import { Message } from "./message.gateway";
  • import { MessageGateway } from "./message.gateway";
  •  
  • interface MessageDTO {
  • id: string;
  • text: string;
  • createdAt: number;
  • }
  •  
  • export class FirebaseMessageGateway extends MessageGateway {
  •  
  • private firebaseApp: firebase.app.App;
  • private firebaseDB: firebase.database.Database;
  •  
  •  
  • // I initialize the firebase message gateway implementation.
  • constructor() {
  •  
  • super();
  •  
  • this.firebaseApp = firebase.initializeApp({
  • apiKey: "AIzaSyBL208rmWno59jE2k5xSFkRH7D3NIDH6C0",
  • authDomain: "popping-torch-33.firebaseapp.com",
  • databaseURL: "https://popping-torch-33.firebaseio.com",
  • projectId: "popping-torch-33",
  • storageBucket: "popping-torch-33.appspot.com",
  • messagingSenderId: "254167194432"
  • });
  • this.firebaseDB = this.firebaseApp.database();
  •  
  • }
  •  
  • // ---
  • // PUBLIC METHODS.
  • // ---
  •  
  • // I add a new message to the collection. Returns a Promise with the new ID.
  • public createMessage( message: Message ) : Promise<string> {
  •  
  • var ref = this.firebaseDB.ref( "/2017-09-03/" ).push();
  •  
  • var data = {
  • id: ref.key,
  • text: message.text,
  • createdAt: message.createdAt.getTime()
  • };
  •  
  • // NOTE: This will write locally and attempt to save the data to the remote
  • // Firebase server. It returns a Promise that will resolve when the data is
  • // persisted remotely. For now, we're only going to care about the local
  • // operation (hence returning a promise with the key).
  • ref
  • .set( data )
  • .catch(
  • ( error: any ) : void => {
  •  
  • console.log( "Create Message Failed" );
  • console.error( error );
  • console.log( "Transfer Object:" );
  • console.dir( data );
  •  
  • }
  • )
  • ;
  •  
  • // NOTE: We don't have to "emit" any event. Firebase ref nodes will already
  • // emit data as the underlying data is mutated.
  •  
  • return( Promise.resolve( ref.key ) );
  •  
  • }
  •  
  •  
  • // I delete the message with the given ID. Returns a Promise.
  • public deleteMessage( id: string ) : Promise<void> {
  •  
  • // NOTE: This will delete locally and attempt to remove the data to the remote
  • // Firebase server. It returns a Promise that will resolve when the data is
  • // deleted remotely. For now, we're only going to care about the local operation.
  • this.firebaseDB.ref( "/2017-09-03/" + id )
  • .remove()
  • .catch(
  • ( error: any ) : void => {
  •  
  • console.log( "Delete Message Failed" );
  • console.error( error );
  • console.log( "ID:", id );
  •  
  • }
  • )
  • ;
  •  
  • // NOTE: We don't have to "emit" any event. Firebase ref nodes will already
  • // emit data as the underlying data is mutated.
  •  
  • return( Promise.resolve() );
  •  
  • }
  •  
  •  
  • // I read the messages collection. Returns a Promise.
  • public readMessages() : Promise<Message[]> {
  •  
  • var promise = this.firebaseDB
  • .ref( "/2017-09-03/" )
  • .once( "value" )
  • .then(
  • ( snapshot: firebase.database.DataSnapshot ) : Message[] => {
  •  
  • var messages: Message[] = [];
  •  
  • // If there are no messages, just return an empty collection.
  • if ( ! snapshot.exists() ) {
  •  
  • return( messages );
  •  
  • }
  •  
  • // Convert the data transfer objects into actual Messages.
  • snapshot.forEach(
  • ( messageSnapshot: firebase.database.DataSnapshot ) : boolean => {
  •  
  • var message = messageSnapshot.val();
  • messages.push({
  • id: message.id,
  • text: message.text,
  • createdAt: new Date( message.createdAt )
  • });
  •  
  • // CAUTION: This boolean is here to tell Firebase to keep
  • // iterating over the collection (true cancels iteration). It
  • // should be omitted, but TypeScript will complain if it's
  • // not here.
  • return( false );
  •  
  • }
  • );
  •  
  • return( messages );
  •  
  • }
  • )
  • ;
  •  
  • // NOTE: We have to cast to the correct type of Promise otherwise we get a
  • // mismatch due to the use of Promise<any> in the Firebase type definitions.
  • return( <Promise<Message[]>>promise );
  •  
  • }
  •  
  •  
  • // I read the messages collection as a stream. Returns an Observable stream.
  • public readMessagesAsStream() : Observable<Message[]> {
  •  
  • // NOTE: In other implementations of this Gateway, we used a BehaviorSubject.
  • // In this version, we don't have to do that because we are creating on-demand
  • // subscriptions to Firebase references that, by default, state emitting data
  • // when you bind to them. As such, when the calling context goes to subscribe to
  • // this stream, we'll immediately emit the current state of the database as the
  • // first stream event.
  • var stream = new Observable<Message[]>(
  • ( observer: Observer<Message[]> ) : Function => {
  •  
  • var ref = this.firebaseDB.ref( "/2017-09-03/" );
  •  
  • // Bind to the value events on the messages collection. This will fire
  • // every time anything in the given ref tree is changed (ie, a message
  • // is added or removed).
  • var eventHandler = ref.on(
  • "value",
  • ( snapshot: firebase.database.DataSnapshot ) : void => {
  •  
  • var messages: Message[] = [];
  •  
  • // If there are no messages, just emit an empty collection.
  • if ( ! snapshot.exists() ) {
  •  
  • observer.next( messages );
  • return;
  •  
  • }
  •  
  • // Convert the data transfer objects into actual Messages.
  • // --
  • // The .forEach() method will iterate over the child nodes in the
  • // correct createdAt order (based on the structure of the keys).
  • snapshot.forEach(
  • ( messageSnapshot: firebase.database.DataSnapshot ) : boolean => {
  •  
  • var message = messageSnapshot.val();
  • messages.push({
  • id: message.id,
  • text: message.text,
  • createdAt: new Date( message.createdAt )
  • });
  •  
  • // CAUTION: This boolean is here to tell Firebase to
  • // keep iterating over the collection (true cancels
  • // iteration). It should be omitted, but TypeScript will
  • // complain if it's not here.
  • return( false );
  •  
  • }
  • );
  •  
  • observer.next( messages );
  •  
  • }
  • );
  •  
  • // Provide tear down logic so we can stop listening to the ref when the
  • // calling context unsubscribes from the returned stream.
  • function teardown() : void {
  •  
  • ref.off( "value", eventHandler );
  • ref = eventHandler = null;
  •  
  • }
  •  
  • return( teardown );
  •  
  • }
  • );
  •  
  • return( stream );
  •  
  • }
  •  
  • }

This implementation is quite a bit different than the other two implementations because Firebase collections are quite a bit different than in-memory or localStorage-based arrays. However, this implementation still extends the same abstract class, MessageGateway, and it still conforms to the same public API.

Unlike the other implementations, however, this version doesn't use a BehaviorSubject. Nor does it ever call an "emit" method. Instead, it uses Firebase as an event-source that automatically emits events when the underlying data is changed. And, when a client goes to read the realtime stream, this implementation creates a new Observable by binding to the "value" event of the Firebase collections node. Not only does this automatically emit events based on local data changes, it also synchronizes data across all Firebase clients.

NOTE: You can see all the fancy synchronization in the demo video.

The Firebase library exposes all kinds of cool functionality. Like the ability to traverse child nodes and subscribe to events on any of the nodes in the database document structure (security not withstanding). Unfortunately, if you start passing Firebase references out of your database access layer, it creates very tight coupling between your application and your data persistence. But, if you change your perspective and shift from viewing Firebase as your data persistence mechanism to thinking about your database access layer as your API for data persistence, Firebase becomes nothing but an implementation detail. This creates a more flexible application architecture that codes to an Interface.

Now, obviously, this exploration is quite limited in scope. After all, it's just a single realtime method. But, I think it demonstrates how RxJS streams can be used to define and implement realtime methods that any number of implementations can conform to. In the past, I've shifted away from RxJS streams and moved back towards Promises. But, this kind of realtime API is exactly the kind of win that RxJS streams can deliver.



Looking For A New Job?

Ooops, there are no jobs. Post one now for only $29 and own this real estate!

100% of job board revenue is donated to Kiva. Loans that change livesFind out more »

Reader Comments

Ben, thanks for super blog. very helpful . I also love to see how Firebase nicely works with Angular, no intermediate layer with web api needed

Reply to this Comment

@Oren,

Can you expand on that? Why would this not work with Ahead of Time (AoT) compiling? It's just a service that interfaces with another service.

Reply to this Comment

Yeah, it is really a problem.. I don't quite like this line: abstract readMessagesAsStream() : Observable<Message[]>; which means you will receive the whole array of messages each time collection changes, which is ok (i guess), but there is more sophisticated way of sync-ing collection in firestore. You could listen to say stateChanges() event which receives DocumentChangeAction[] and then reflect changes in your messages, which is a bit harder to abstract :) Any thoughts about that?

Reply to this Comment

@Taras,

It's an interesting question. I am not too familiar with the Firestore API; but, I assume that with an increased need for specialization, you may need to start having additional abstractions that maintain state and provide behavior.

For this demo, things are super simple, which allows the relatively small abstraction. But, you can imagine that if the "messages" could be grouped by "category" (or something), even the simple Stream would have to start filtering as part of its pipeline processing. I could probably start by just using the closure-based state (ie, keeping the "category ID" in the Observer closure) and then filtering on it when new messages were emitted. But, as the state management needs get more complex, closing over variables might not be sufficient - perhaps I would have to break out into a Class that exposes an Observable?

I'm mostly just thinking out-loud here because, frankly, I'm not very good with Observables :) I can only do a few things, so I don't have a great set of mental models on how they can be created, exposed, and consumed.

Reply to this Comment

Post A Comment

You — Get Out Of My Dreams, Get Into My Comments
Live in the Now
Oops!
NEW: Some basic markdown formatting is now supported: bold, italic, blockquotes, lists, fenced code-blocks. Read more about markdown syntax »
Comment Etiquette: Please do not post spam. Please keep the comments on-topic. Please do not post unrelated questions or large chunks of code. And, above all, please be nice to each other - we're trying to have a good conversation here.