Source: client/firebase-client.js

/*
 * Copyright 2023, TeamDev. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Redistribution and use in source and/or binary forms, with or without
 * modification, must retain the above copyright notice and the following
 * disclaimer.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

"use strict";

import {Observable, Subject, Subscription} from 'rxjs';
import {
  Subscription as SubscriptionObject,
  SubscriptionId
} from '../proto/spine/client/subscription_pb';
import {ActorRequestFactory} from './actor-request-factory';
import {AbstractClientFactory} from './client-factory';
import {CommandingClient} from "./commanding-client";
import {CompositeClient} from "./composite-client";
import {HttpEndpoint} from './http-endpoint';
import {FirebaseDatabaseClient} from './firebase-database-client';
import {FirebaseSubscriptionService} from './firebase-subscription-service';
import ObjectToProto from './object-to-proto';
import {QueryingClient} from "./querying-client";
import {SubscribingClient} from "./subscribing-client";

/**
 * An abstract base for subscription objects.
 *
 * @abstract
 */
class SpineSubscription extends Subscription {

  /**
   * @param {Function} unsubscribe the callbacks that allows to cancel the subscription
   * @param {SubscriptionObject} subscription the wrapped subscription object
   *
   * @protected
   */
  constructor(unsubscribe, subscription) {
    super(unsubscribe);
    this._subscription = subscription;
  }

  /**
   * An internal Spine subscription which includes the topic the updates are received for.
   *
   * @return {SubscriptionObject} a `spine.client.Subscription` instance
   */
  internal() {
    return this._subscription;
  }

  /**
   * @return {String} a string value of the `internal` subscription ID.
   */
  id() {
    return this.internal().getId().getValue();
  }
}

/**
 * A subscription to entity changes on application backend.
 */
class EntitySubscription extends SpineSubscription {

  /**
   * @param {Function} unsubscribe the callback that allows to cancel the subscription
   * @param {{itemAdded: Observable, itemChanged: Observable, itemRemoved: Observable}} observables
   *        the observables for entity change
   * @param {SubscriptionObject} subscription the wrapped subscription object
   */
  constructor({
                unsubscribedBy: unsubscribe,
                withObservables: observables,
                forInternal: subscription
              }) {
    super(unsubscribe, subscription);
    this._observables = observables;
  }

  /**
   * @return {EntitySubscriptionObject} a plain object with observables and unsubscribe method
   */
  toObject() {
    return Object.assign({},
        this._observables,
        {
          unsubscribe: () => {
            return this.unsubscribe();
          }
        });
  }
}

/**
 * A subscription to events that occur in the system.
 */
class EventSubscription extends SpineSubscription {

  /**
   * @param {Function} unsubscribe the callbacks that allows to cancel the subscription
   * @param {Observable} eventEmitted the observable for the emitted events
   * @param {SubscriptionObject} subscription the wrapped subscription object
   */
  constructor({
                unsubscribedBy: unsubscribe,
                withObservable: observable,
                forInternal: subscription
              }) {
    super(unsubscribe, subscription);
    this._observable = observable;
  }

  /**
   * @return {EventSubscriptionObject} a plain object with observables and unsubscribe method
   */
  toObject() {
    return Object.assign(
        {},
        {eventEmitted: this._observable},
        {
          unsubscribe: () => {
            return this.unsubscribe();
          }
        }
    );
  }
}

class FirebaseQueryingClient extends QueryingClient {

  /**
   * Creates an instance of the client.
   *
   * @param {!HttpEndpoint} endpoint the server endpoint to execute queries and commands
   * @param {!FirebaseDatabaseClient} firebaseDatabase the client to read the query results from
   * @param {!ActorRequestFactory} actorRequestFactory a factory to instantiate the actor requests with
   */
  constructor(endpoint, firebaseDatabase, actorRequestFactory) {
    super(actorRequestFactory);
    this._endpoint = endpoint;
    this._firebase = firebaseDatabase;
  }

  read(query) {
    return new Promise((resolve, reject) => {
      this._endpoint.query(query)
          .then(({path}) => this._firebase.getValues(path, values => {
            const typeUrl = query.getTarget().getType();
            const messages = values.map(value => ObjectToProto.convert(value, typeUrl));
            resolve(messages);
          }))
          .catch(error => reject(error));
    });
  }
}

const EVENT_TYPE_URL = 'type.spine.io/spine.core.Event';

/**
 * A {@link SubscribingClient} which receives entity state updates and events from
 * a Firebase Realtime Database.
 */
class FirebaseSubscribingClient extends SubscribingClient {

  /**
   * Creates an instance of the client.
   *
   * @param {!HttpEndpoint} endpoint
   *  the server endpoint to execute queries and commands
   * @param {!FirebaseDatabaseClient} firebaseDatabase
   *  the client to read the query results from
   * @param {!ActorRequestFactory} actorRequestFactory
   *  a factory to instantiate the actor requests with
   * @param {!FirebaseSubscriptionService} subscriptionService
   *  a service handling the subscriptions
   */
  constructor(endpoint, firebaseDatabase, actorRequestFactory, subscriptionService) {
    super(actorRequestFactory);
    this._endpoint = endpoint;
    this._firebase = firebaseDatabase;
    this._subscriptionService = subscriptionService;
  }

  /**
   * @inheritDoc
   */
  subscribe(topic) {
    return this._doSubscribe(topic, this._entitySubscription);
  }

  /**
   * @inheritDoc
   */
  subscribeToEvents(topic) {
    return this._doSubscribe(topic, this._eventSubscription);
  }

  /**
   * @private
   */
  _doSubscribe(topic, createSubscriptionFn) {
    return new Promise((resolve, reject) => {
      this._endpoint.subscribeTo(topic)
          .then(response => {
            const path = response.nodePath.value;
            const internalSubscription =
                FirebaseSubscribingClient.internalSubscription(path, topic);

            const subscription = createSubscriptionFn.call(this, path, internalSubscription);

            resolve(subscription.toObject());
            this._subscriptionService.add(subscription);
          })
          .catch(reject);
    });
  }

  /**
   * @private
   */
  _entitySubscription(path, subscription) {
    const itemAdded = new Subject();
    const itemChanged = new Subject();
    const itemRemoved = new Subject();

    const pathSubscriptions = [
      this._firebase.onChildAdded(path, itemAdded),
      this._firebase.onChildChanged(path, itemChanged),
      this._firebase.onChildRemoved(path, itemRemoved)
    ];

    const typeUrl = subscription.getTopic().getTarget().getType();

    return new EntitySubscription({
      unsubscribedBy: () => {
        FirebaseSubscribingClient._unsubscribe(pathSubscriptions);
        this._subscriptionService.cancelSubscription(subscription);
      },
      withObservables: {
        itemAdded: ObjectToProto.map(itemAdded.asObservable(), typeUrl),
        itemChanged: ObjectToProto.map(itemChanged.asObservable(), typeUrl),
        itemRemoved: ObjectToProto.map(itemRemoved.asObservable(), typeUrl)
      },
      forInternal: subscription
    });
  }

  /**
   * @private
   */
  _eventSubscription(path, subscription) {
    const itemAdded = new Subject();
    const pathSubscription = this._firebase.onChildAdded(path, itemAdded);

    return new EventSubscription({
      unsubscribedBy: () => {
        FirebaseSubscribingClient._unsubscribe([pathSubscription]);
        this._subscriptionService.cancelSubscription(subscription);
      },
      withObservable: ObjectToProto.map(itemAdded.asObservable(), EVENT_TYPE_URL),
      forInternal: subscription
    });
  }

  /**
   * @override
   */
  cancelAllSubscriptions() {
    this._subscriptionService.cancelAllSubscriptions();
  }

  /**
   * Unsubscribes the provided Firebase subscriptions.
   *
   * @param {Array<Subscription>} subscriptions
   * @private
   */
  static _unsubscribe(subscriptions) {
    subscriptions.forEach(subscription => {
      if (!subscription.closed) {
        subscription.unsubscribe();
      }
    });
  }

  /**
   * Creates a `SubscriptionObject` instance to communicate with Spine server.
   *
   * @param {!String} path a path to object which gets updated in Firebase
   * @param {!spine.client.Topic} topic a topic for which the Subscription gets updates
   * @return {SubscriptionObject} a `SubscriptionObject` instance to communicate with Spine server
   */
  static internalSubscription(path, topic) {
    const subscription = new SubscriptionObject();
    const id = new SubscriptionId();
    id.setValue(path);
    subscription.setId(id);
    subscription.setTopic(topic);
    return subscription;
  }
}

/**
 * An implementation of the `AbstractClientFactory` that creates instances of client which exchanges
 * data with the server via Firebase Realtime Database.
 */
export class FirebaseClientFactory extends AbstractClientFactory {

  /**
   * Creates a new `FirebaseClient` instance which will send the requests on behalf of the provided
   * actor to the provided endpoint, retrieving the data from the provided Firebase storage.
   *
   * Expects that given options contain backend endpoint URL, firebase Database instance and
   * the actor provider.
   *
   * @param {ClientOptions} options
   * @return {Client} a new backend client instance which will send the requests on behalf
   *                  of the provided actor to the provided endpoint, retrieving the data
   *                  from the provided Firebase storage
   * @override
   */
  static _clientFor(options) {
    const httpClient = this._createHttpClient(options);
    const httpResponseHandler = this._createHttpResponseHandler(options);
    const endpoint = new HttpEndpoint(httpClient, httpResponseHandler, options.routing);
    const firebaseDatabaseClient = new FirebaseDatabaseClient(options.firebaseDatabase);
    const requestFactory = ActorRequestFactory.create(options);
    const subscriptionService =
        new FirebaseSubscriptionService(endpoint, options.subscriptionKeepUpInterval);

    const querying = new FirebaseQueryingClient(endpoint, firebaseDatabaseClient, requestFactory);
    const subscribing = new FirebaseSubscribingClient(endpoint,
                                                      firebaseDatabaseClient,
                                                      requestFactory,
                                                      subscriptionService);
    const commanding = new CommandingClient(endpoint, requestFactory);
    return new CompositeClient(querying, subscribing, commanding);
  }

  static createQuerying(options) {
    const httpClient = this._createHttpClient(options);
    const httpResponseHandler = this._createHttpResponseHandler(options);
    const endpoint = new HttpEndpoint(httpClient, httpResponseHandler, options.routing);
    const firebaseDatabaseClient = new FirebaseDatabaseClient(options.firebaseDatabase);
    const requestFactory = ActorRequestFactory.create(options);

    return new FirebaseQueryingClient(endpoint, firebaseDatabaseClient, requestFactory);
  }

  static createSubscribing(options) {
    const httpClient = this._createHttpClient(options);
    const httpResponseHandler = this._createHttpResponseHandler(options);
    const endpoint = new HttpEndpoint(httpClient, httpResponseHandler, options.routing);
    const firebaseDatabaseClient = new FirebaseDatabaseClient(options.firebaseDatabase);
    const requestFactory = ActorRequestFactory.create(options);
    const subscriptionService =
        new FirebaseSubscriptionService(endpoint, options.subscriptionKeepUpInterval);

    return new FirebaseSubscribingClient(endpoint,
                                         firebaseDatabaseClient,
                                         requestFactory,
                                         subscriptionService);
  }

  /**
   * @override
   */
  static _ensureOptionsSufficient(options) {
    super._ensureOptionsSufficient(options);
    const messageForMissing = (option) =>
        `Unable to initialize Client with Firebase storage. The ClientOptions.${option} not specified.`;
    if (!options.endpointUrl) {
      throw new Error(messageForMissing('endpointUrl'));
    }
    if (!options.firebaseDatabase) {
      throw new Error(messageForMissing('firebaseDatabase'));
    }
    if (!options.actorProvider) {
      throw new Error(messageForMissing('actorProvider'));
    }
  }
}