Delivery

public final class Delivery

Delivers the messages to the entities.

Splits the incoming messages into shards and allows to deliver the messages to their destinations on a per-shard basis. Guarantees that one and only one application server node serves the messages from the given shard at a time, thus preventing any concurrent modifications of entity state.

Delegates the message dispatching and low-level handling of message duplicates to Inboxes of each target entity. The respective Inbox instances should be created in each of Entity repositories.

Configuration

Delivery Strategy

By default, a shard is assigned according to the identifier of the target entity. The messages heading to a single entity will always reside in a single shard. However, the framework users may customize this behavior.

The typical customization would be to specify the same shard index for the related targets. E.g. if there is an OrderAggregate, OrderItemAggregate and OrderItemProjection, they could share the same shard index. In this way the messages headed to these entities will be dispatched and processed together. In turn, that will reduce the eventual consistency lag between C side (i.e. aggregate state updates) and Q side (i.e. the respective updates in projections).

Deduplication

As long as the underlying storage and transport mechanisms are restricted by the CAP theorem, there may be duplicates in the messages written, read or dispatched. The Delivery responds to it by storing some of the already delivered messages for longer and using them as a source for deduplication.

Provides the time-based deduplication capabilities to eliminate the messages, which may have been already delivered to their targets. The duplicates will be detected among the messages, which are not older, than now - [deduplication window].

Customizing InboxStorage

Delivery is responsible for providing the InboxStorage for every inbox registered. Framework users may configure the storage, taking into account that it is typically multi-tenant. By default, the InboxStorage for the delivery is provided by the environment-specific storage factory and is multi-tenant.

Catch-up

In addition to delivering the messages sent in a real-time, Delivery dispatches the historical events sent to the catching-up projections. These events are dispatched through the same shards as the live messages. A special CatchUpStation is responsible for handling this use-case. See more on that in the respective section.

To control how many historical events are read and put into shards, the end-users may configure the maximum number of messages read from the history at a time. This is helpful to balance the per-shard throughput, so that the live messages are still dispatched through the same shards in a reasonable time.

The statuses of the ongoing catch-up processes are stored in a dedicated CatchUpStorage. The DeliveryBuilder exposes an API for the customization of this storage.

Observers

Once a message is written to the Inbox, the pre-configured shard observers are notified. In this way any third-party environment planners, load balancers, and schedulers may plug into the delivery and perform various routines to enable the further processing of the sharded messages. In a distributed environment a message queue may be used to notify the node cluster of a shard that has some messages pending for the delivery.

Work registry

Once an application node picks the shard to deliver the messages from it, it registers itself in a ShardedWorkRegistry. It serves as a list of locks-per-shard that only allows to pick a shard to a single node at a time. The framework users may configure the implementation of the registry by calling setWorkRegistry.

Dispatching messages

Delivery stages

The delivery process for each shard index is split into DeliveryStages. In scope of each stage, a certain number of messages is read from the respective shard of the Inbox. The messages are grouped per-target and delivered in batches if possible. The maximum number of the messages within a DeliveryStage can be configured.

After each DeliveryStage it is possible to stop the delivery by supplying a custom delivery monitor. Please refer to the documentation for the details.

Conveyor and stations

In a scope of DeliveryStage the page of the InboxMessages is placed to the Conveyor responsible for tracking the status of each message. The conveyor is run through the pipeline of stations, each modifying the state of the messages. At the end of the pipeline, the changed made to the messages are committed to the underlying InboxStorage in a bulk. Such an approach allows to minimize the number of the requests sent to the storage.

As long as the new DeliveryStage is started, the new instance of the Conveyor is created.

Below is the list of the conveyor stations in the pipeline. 1. Catch-up station

This station is responsible for dispatching the historical events in TO_CATCH_UP status to the respective targets. Also, while the target entity is under a catch-up, all the live messages headed to it are ignored. Once the catch-up is completed, this station handles the transition period, in which the last batch of the historical events and live messages are dispatched together. See CatchUpStation for more details. 2. Live delivery station

This station is responsible for dispatching the messages sent in a real-time. It ignores the messages in TO_CATCH_UP status. Another responsibility of this station is to set for how long the delivered messages should be kept according to the deduplication window settings. See LiveDeliveryStation for more details. 3. Cleanup station

This station removes the messages which are already delivered and are no longer needed for the deduplication. See CleanupStation for the description. Deduplication

During the dispatching, Conveyor keeps track of the delivered messages. The stations performing the actual message dispatching rely onto this knowledge and deduplicate the messages prior to calling the target's endpoint.

Additionally, the Delivery provides a cache of recently delivered messages. Each instance of the Conveyor has an access to it and uses it in deduplication procedures.

Local environment

By default, the delivery is configured to run locally. It uses see-and-dispatch observer, which delivers the messages from the observed shard once a message is passed to its onMessage(InboxMessage) method. This process is synchronous.

To deal with the multi-threaded access in a local mode, an InMemoryShardedWorkRegistry is used. It operates on top of the synchronized in-memory data structures and prevents several threads from picking up the same shard.

Shard maintenance

To perform the maintenance procedures, the Delivery requires all the BoundedContexts to register themselves in it. Upon this registration, a special ShardMaintenanceProcess is registered as an event dispatcher in a passed BoundedContext. Such a registration is performed automatically when the context is built.

Functions

Link copied to clipboard
public Optional<DeliveryStats> deliverMessagesFrom(ShardIndex index)
Delivers the messages put into the shard with the passed index to their targets.
Link copied to clipboard
Returns a listener of the dispatching operations occurring in the MulticastBuses.
Link copied to clipboard
public static Delivery local()
Creates a new instance of Delivery suitable for local and development environment.
Link copied to clipboard
public static Delivery localAsync()
Creates a new instance of Delivery for local and development environment.
Link copied to clipboard
public static DeliveryBuilder newBuilder()
Creates an instance of new Builder of Delivery.
Link copied to clipboard
public CatchUpProcessBuilder<I> newCatchUpProcess<I>(ProjectionRepository<I, ? extends Object, ? extends Object> repo)
Creates a new instance of the builder for CatchUpProcess.
Link copied to clipboard
public Inbox.Builder<I> newInbox<I>(TypeUrl entityType)
Creates an instance of Inbox.Builder for the given entity type.
Link copied to clipboard
public void registerDispatchersIn(BoundedContext context)
Registers the internal Delivery message dispatchers in the given BoundedContext.
Link copied to clipboard
public void subscribe(ShardObserver observer)
Subscribes to the updates of shard contents.