Source: client/firebase-subscription-service.js

/*
 * Copyright 2023, TeamDev. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Redistribution and use in source and/or binary forms, with or without
 * modification, must retain the above copyright notice and the following
 * disclaimer.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

"use strict";

import {Duration} from './time-utils';
import ObjectToProto from './object-to-proto';
import {Status} from '../proto/spine/core/response_pb';

/**
 * The default interval for sending subscription keep up requests.
 *
 * @type {Duration}
 */
const DEFAULT_KEEP_UP_INTERVAL = new Duration({minutes: 2});

/**
 * A service that manages the active subscriptions periodically sending requests
 * to keep them running.
 */
export class FirebaseSubscriptionService {

  /**
   * @param {Endpoint} endpoint an endpoint to communicate with
   * @param {?Duration} keepUpInterval a custom interval for sending subscription keep up requests
   */
  constructor(endpoint, keepUpInterval) {
    /**
     * @type {SpineSubscription[]}
     * @private
     */
    this._subscriptions = [];
    /**
     * @type {Endpoint}
     * @private
     */
    this._endpoint = endpoint;
    /**
     * @type {Duration}
     * @private
     */
    this._keepUpInterval = keepUpInterval
        ? keepUpInterval
        : DEFAULT_KEEP_UP_INTERVAL;
  }

  /**
   * Add a subscription to the service to handle the keep-up requests and cancel in
   * case of unsubscribe.
   *
   * @param {SpineSubscription} subscription an active subscription to keep running
   */
  add(subscription) {
    if (this._isRegistered(subscription)) {
      throw new Error('This subscription is already registered in subscription service');
    }
    this._subscriptions.push(subscription);

    if (!this._isRunning()) {
      this._run();
    }
  }

  /**
   * Immediately cancels all active subscriptions previously created through this service.
   */
  cancelAllSubscriptions() {
    const activeSubscriptions = this._subscriptions.filter(s => !s.closed);
    if (activeSubscriptions.length > 0) {
      const subscriptionMessages = activeSubscriptions.map(s => s.internal())
      this._endpoint.cancelAll(subscriptionMessages);
      activeSubscriptions.forEach(s => {
        s.unsubscribe() /* Calling RxJS's `unsubscribe` to stop propagating the updates. */
        this._removeSubscription(s)
      })
    }
  }

  /**
   * Immediately cancels the given subscription, including cancelling it on the server-side.
   *
   * @param {SubscriptionObject} subscription the subscription to cancel
   */
  cancelSubscription(subscription) {
    this._endpoint.cancelSubscription(subscription)
        .then(() => {
          this._removeSubscription(subscription)
        });
  }

  /**
   * Indicates whether this service is running keeping up subscriptions.
   *
   * @returns {boolean}
   * @private
   */
  _isRunning() {
     return !!this._interval;
  }

  /**
   * Starts the subscription service, keeping up the added subscriptions.
   *
   * @private
   */
  _run() {
    this._interval = setInterval(() => {
      this._keepUpSubscriptions();
    }, this._keepUpInterval.inMs());
  }

  /**
   * Stops the subscription service.
   *
   * @private
   */
  _stop() {
    clearInterval(this._interval);
    this._interval = null;
  }

  /**
   * Sends the "keep-up" request for all active subscriptions.
   *
   * The non-`OK` response status means the subscription has already been canceled on the server,
   * most likely due to a timeout. So, in such case, the subscription is removed from the list of
   * active ones.
   *
   * @private
   */
  _keepUpSubscriptions() {
    const cancelledSubscriptions = this._subscriptions.filter(s => s.closed);
    if (cancelledSubscriptions.length > 0) {
      const subscriptionMessages = cancelledSubscriptions.map(s => s.internal())
      this._endpoint.cancelAll(subscriptionMessages);
      cancelledSubscriptions.forEach(s => this._removeSubscription(s))
    }
    const subscriptions = this._subscriptions.map(value => value.internal());
    if (subscriptions.length === 0) {
      return;
    }
    this._endpoint.keepUpSubscriptions(subscriptions).then(response => {
      for (let i = 0; i < response.response.length; i++) {
        const r = response.response[i];
        const status = ObjectToProto.convert(r.status, Status.typeUrl());
        if (status.getStatusCase() !== Status.StatusCase.OK) {
          this._removeSubscription(subscriptions[i])
        }
      }
    });
  }

  /**
   * Removes the provided subscription from subscriptions list, which stops any attempts
   * to update it. In case no more subscriptions are left, stops this service.
   *
   * In case the passed subscription is not known to this service, does nothing.
   *
   * @param subscription a subscription to cancel;
   *                     this method accepts values of both `SpineSubscription`
   *                     and Proto `Subscription` types,
   *                     and operates based on the subscription ID
   * @private
   */
  _removeSubscription(subscription) {
    let id;
    if (typeof subscription.id === 'function') {
      id = subscription.id();
    } else {
      id = subscription.getId().getValue();
    }
    const index = this._subscriptions.findIndex(item => item.id() === id);
    this._subscriptions.splice(index, 1);

    if (this._subscriptions.length === 0) {
      this._stop();
    }
  }

  /**
   * @private
   */
  _isRegistered(subscription) {
    const id = subscription.id();
    const exists = this._subscriptions.find(registered => registered.id() === id);
    return !!exists;
  }
}