Source: client/command-request.js

  1. /*
  2. * Copyright 2023, TeamDev. All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Redistribution and use in source and/or binary forms, with or without
  11. * modification, must retain the above copyright notice and the following
  12. * disclaimer.
  13. *
  14. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  15. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  16. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  17. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  18. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  19. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  20. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  21. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  22. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  23. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  24. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  25. */
  26. import {CommandId} from "../proto/spine/core/command_pb";
  27. import {MessageId, Origin} from "../proto/spine/core/diagnostics_pb";
  28. import {Filters} from "./actor-request-factory";
  29. import {AnyPacker} from "./any-packer";
  30. import {ClientRequest} from "./client-request";
  31. import {Type} from "./typed-message";
  32. const NOOP_CALLBACK = () => {};
  33. /**
  34. * A request to post a command to the Spine backend.
  35. *
  36. * Optionally allows to subscribe to events that are the immediate results of handling the command.
  37. *
  38. * A usage example:
  39. * ```
  40. * client.command(logInUser)
  41. * .onOk(_logOk)
  42. * .onError(_logError)
  43. * .onImmediateRejection(_warnAboutRejection)
  44. * .observe(UserLoggedIn.class, ({subscribe, unsubscribe}) => {
  45. * subscribe(event => _logAndUnsubscribe(event, unsubscribe));
  46. * setTimeout(unsubscribe, EVENT_WAIT_TIMEOUT);
  47. * })
  48. * .observe(UserAlreadyLoggedIn.class, (({subscribe, unsubscribe}) => {
  49. * subscribe(event => _warnAboutAndUnsubscribe(event, unsubscribe));
  50. * setTimeout(unsubscribe, EVENT_WAIT_TIMEOUT);
  51. * })
  52. * .post();
  53. * ```
  54. *
  55. * The `subscribe` callback provided to the consumer allows to configure an event receival process
  56. * while the `unsubscribe` callback allows to cancel the subscription.
  57. *
  58. * Please note that in the example above we make sure the subscription is cancelled even if the
  59. * specified event type is never received.
  60. */
  61. export class CommandRequest extends ClientRequest {
  62. /**
  63. * @param {!Message} commandMessage the command to post
  64. * @param {!Client} client the client which initiated the request
  65. * @param {!ActorRequestFactory} actorRequestFactory the request factory
  66. */
  67. constructor(commandMessage, client, actorRequestFactory) {
  68. super(client, actorRequestFactory);
  69. this._commandMessage = commandMessage;
  70. this._onAck = NOOP_CALLBACK;
  71. this._onError = NOOP_CALLBACK;
  72. this._onImmediateRejection = NOOP_CALLBACK;
  73. this._observedTypes = [];
  74. }
  75. /**
  76. * Runs the callback if the command is successfully handled by the Spine server.
  77. *
  78. * @param {!parameterlessCallback} callback the callback to run
  79. * @return {this} self for method chaining
  80. */
  81. onOk(callback) {
  82. this._onAck = callback;
  83. return this;
  84. }
  85. /**
  86. * Runs the callback if the command could not be handled by the Spine server due to a
  87. * technical error.
  88. *
  89. * @param {!consumerCallback<CommandHandlingError>} callback the callback to run
  90. * @return {this} self for method chaining
  91. */
  92. onError(callback) {
  93. this._onError = callback;
  94. return this;
  95. }
  96. /**
  97. * Runs the callback if the server responded on a command with an immediate rejection.
  98. *
  99. * The immediate rejection means the command did not pass the command filters set up in the
  100. * bounded context and was disqualified from execution right away.
  101. *
  102. * A typical example of this would be the command not passing filters due to user permissions
  103. * being not broad enough.
  104. *
  105. * Please note that this rejection is different to a "normal" rejection when the command is
  106. * acknowledged with the `OK` status and then reaches the handler method which processes it. Such
  107. * rejections can be tracked using the `observe(...)` method of this request.
  108. *
  109. * @param {!consumerCallback<spine.core.Event>} callback
  110. * @return {this} self for method chaining
  111. */
  112. onImmediateRejection(callback) {
  113. this._onImmediateRejection = callback;
  114. return this;
  115. }
  116. /**
  117. * Adds the event type to the list of observed command handling results.
  118. *
  119. * @param {!Class<Message>} eventType a type of the observed events
  120. * @param {!consumerCallback<EventSubscriptionCallbacks>} consumer
  121. * a consumer of the `subscribe` and `unsubscribe` callbacks which are responsible for
  122. * accepting the incoming events and cancelling the subscription respectively
  123. * @return {this} self for method chaining
  124. */
  125. observe(eventType, consumer) {
  126. this._observedTypes.push({type: eventType, consumer: consumer});
  127. return this;
  128. }
  129. /**
  130. * Posts the command to the server and subscribes to all observed types.
  131. *
  132. * @return {Promise<void>} a promise that signals if the command posting was done successfully,
  133. * may be ignored
  134. */
  135. post() {
  136. const command = this._requestFactory.command().create(this._commandMessage);
  137. const onAck = {
  138. onOk: this._onAck,
  139. onError: this._onError,
  140. onImmediateRejection: this._onImmediateRejection
  141. };
  142. const promises = [];
  143. this._observedTypes.forEach(({type, consumer}) => {
  144. const originFilter = Filters.eq("context.past_message", this._asOrigin(command));
  145. const promise = this._client.subscribeToEvent(type)
  146. .where(originFilter)
  147. .post()
  148. .then(({eventEmitted, unsubscribe}) => {
  149. const subscribe = eventConsumer => {
  150. eventEmitted.subscribe({
  151. next: eventConsumer
  152. });
  153. };
  154. consumer({subscribe, unsubscribe});
  155. });
  156. promises.push(promise);
  157. });
  158. const subscriptionPromise = Promise.all(promises);
  159. return subscriptionPromise.then(() => this._client.post(command, onAck));
  160. }
  161. /**
  162. * @param {!Command} command
  163. * @return {Origin}
  164. *
  165. * @private
  166. */
  167. _asOrigin(command) {
  168. const result = new Origin();
  169. const messageId = new MessageId();
  170. const commandIdType = Type.forClass(CommandId);
  171. const packedId = AnyPacker.pack(command.getId()).as(commandIdType);
  172. messageId.setId(packedId);
  173. const typeUrl = command.getMessage().getTypeUrl();
  174. messageId.setTypeUrl(typeUrl);
  175. result.setMessage(messageId);
  176. const actorContext = command.getContext().getActorContext();
  177. result.setActorContext(actorContext);
  178. return result;
  179. }
  180. }