/*
* Copyright 2023, TeamDev. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import {CommandId} from "../proto/spine/core/command_pb";
import {MessageId, Origin} from "../proto/spine/core/diagnostics_pb";
import {Filters} from "./actor-request-factory";
import {AnyPacker} from "./any-packer";
import {ClientRequest} from "./client-request";
import {Type} from "./typed-message";
const NOOP_CALLBACK = () => {};
/**
* A request to post a command to the Spine backend.
*
* Optionally allows to subscribe to events that are the immediate results of handling the command.
*
* A usage example:
* ```
* client.command(logInUser)
* .onOk(_logOk)
* .onError(_logError)
* .onImmediateRejection(_warnAboutRejection)
* .observe(UserLoggedIn.class, ({subscribe, unsubscribe}) => {
* subscribe(event => _logAndUnsubscribe(event, unsubscribe));
* setTimeout(unsubscribe, EVENT_WAIT_TIMEOUT);
* })
* .observe(UserAlreadyLoggedIn.class, (({subscribe, unsubscribe}) => {
* subscribe(event => _warnAboutAndUnsubscribe(event, unsubscribe));
* setTimeout(unsubscribe, EVENT_WAIT_TIMEOUT);
* })
* .post();
* ```
*
* The `subscribe` callback provided to the consumer allows to configure an event receival process
* while the `unsubscribe` callback allows to cancel the subscription.
*
* Please note that in the example above we make sure the subscription is cancelled even if the
* specified event type is never received.
*/
export class CommandRequest extends ClientRequest {
/**
* @param {!Message} commandMessage the command to post
* @param {!Client} client the client which initiated the request
* @param {!ActorRequestFactory} actorRequestFactory the request factory
*/
constructor(commandMessage, client, actorRequestFactory) {
super(client, actorRequestFactory);
this._commandMessage = commandMessage;
this._onAck = NOOP_CALLBACK;
this._onError = NOOP_CALLBACK;
this._onImmediateRejection = NOOP_CALLBACK;
this._observedTypes = [];
}
/**
* Runs the callback if the command is successfully handled by the Spine server.
*
* @param {!parameterlessCallback} callback the callback to run
* @return {this} self for method chaining
*/
onOk(callback) {
this._onAck = callback;
return this;
}
/**
* Runs the callback if the command could not be handled by the Spine server due to a
* technical error.
*
* @param {!consumerCallback<CommandHandlingError>} callback the callback to run
* @return {this} self for method chaining
*/
onError(callback) {
this._onError = callback;
return this;
}
/**
* Runs the callback if the server responded on a command with an immediate rejection.
*
* The immediate rejection means the command did not pass the command filters set up in the
* bounded context and was disqualified from execution right away.
*
* A typical example of this would be the command not passing filters due to user permissions
* being not broad enough.
*
* Please note that this rejection is different to a "normal" rejection when the command is
* acknowledged with the `OK` status and then reaches the handler method which processes it. Such
* rejections can be tracked using the `observe(...)` method of this request.
*
* @param {!consumerCallback<spine.core.Event>} callback
* @return {this} self for method chaining
*/
onImmediateRejection(callback) {
this._onImmediateRejection = callback;
return this;
}
/**
* Adds the event type to the list of observed command handling results.
*
* @param {!Class<Message>} eventType a type of the observed events
* @param {!consumerCallback<EventSubscriptionCallbacks>} consumer
* a consumer of the `subscribe` and `unsubscribe` callbacks which are responsible for
* accepting the incoming events and cancelling the subscription respectively
* @return {this} self for method chaining
*/
observe(eventType, consumer) {
this._observedTypes.push({type: eventType, consumer: consumer});
return this;
}
/**
* Posts the command to the server and subscribes to all observed types.
*
* @return {Promise<void>} a promise that signals if the command posting was done successfully,
* may be ignored
*/
post() {
const command = this._requestFactory.command().create(this._commandMessage);
const onAck = {
onOk: this._onAck,
onError: this._onError,
onImmediateRejection: this._onImmediateRejection
};
const promises = [];
this._observedTypes.forEach(({type, consumer}) => {
const originFilter = Filters.eq("context.past_message", this._asOrigin(command));
const promise = this._client.subscribeToEvent(type)
.where(originFilter)
.post()
.then(({eventEmitted, unsubscribe}) => {
const subscribe = eventConsumer => {
eventEmitted.subscribe({
next: eventConsumer
});
};
consumer({subscribe, unsubscribe});
});
promises.push(promise);
});
const subscriptionPromise = Promise.all(promises);
return subscriptionPromise.then(() => this._client.post(command, onAck));
}
/**
* @param {!Command} command
* @return {Origin}
*
* @private
*/
_asOrigin(command) {
const result = new Origin();
const messageId = new MessageId();
const commandIdType = Type.forClass(CommandId);
const packedId = AnyPacker.pack(command.getId()).as(commandIdType);
messageId.setId(packedId);
const typeUrl = command.getMessage().getTypeUrl();
messageId.setTypeUrl(typeUrl);
result.setMessage(messageId);
const actorContext = command.getContext().getActorContext();
result.setActorContext(actorContext);
return result;
}
}