Delivery
Delivers the messages to the entities.
Splits the incoming messages into shards and allows to deliver the messages to their destinations on a per-shard basis. Guarantees that one and only one application server node serves the messages from the given shard at a time, thus preventing any concurrent modifications of entity state.
Delegates the message dispatching and low-level handling of message duplicates to Inboxes of each target entity. The respective Inbox
instances should be created in each of Entity
repositories.
Configuration
Delivery Strategy
By default, a shard is assigned according to the identifier of the target entity. The messages heading to a single entity will always reside in a single shard. However, the framework users may customize this behavior.
The typical customization would be to specify the same shard index for the related targets. E.g. if there is an OrderAggregate
, OrderItemAggregate
and OrderItemProjection
, they could share the same shard index. In this way the messages headed to these entities will be dispatched and processed together. In turn, that will reduce the eventual consistency lag between C
side (i.e. aggregate state updates) and Q
side (i.e. the respective updates in projections).
Deduplication
As long as the underlying storage and transport mechanisms are restricted by the CAP theorem, there may be duplicates in the messages written, read or dispatched. The Delivery
responds to it by storing some of the already delivered messages for longer and using them as a source for deduplication.
Provides the time-based deduplication capabilities to eliminate the messages, which may have been already delivered to their targets. The duplicates will be detected among the messages, which are not older, than now - [deduplication window]
.
Customizing InboxStorage
Delivery
is responsible for providing the InboxStorage for every inbox registered. Framework users may configure the storage, taking into account that it is typically multi-tenant. By default, the InboxStorage
for the delivery is provided by the environment-specific storage factory and is multi-tenant.
Catch-up
In addition to delivering the messages sent in a real-time, Delivery
dispatches the historical events sent to the catching-up projections. These events are dispatched through the same shards as the live messages. A special CatchUpStation is responsible for handling this use-case. See more on that in the respective section.
To control how many historical events are read and put into shards, the end-users may configure the maximum number of messages read from the history at a time. This is helpful to balance the per-shard throughput, so that the live messages are still dispatched through the same shards in a reasonable time.
The statuses of the ongoing catch-up processes are stored in a dedicated CatchUpStorage. The DeliveryBuilder
exposes an API for the customization of this storage.
Observers
Once a message is written to the Inbox
, the pre-configured shard observers are notified. In this way any third-party environment planners, load balancers, and schedulers may plug into the delivery and perform various routines to enable the further processing of the sharded messages. In a distributed environment a message queue may be used to notify the node cluster of a shard that has some messages pending for the delivery.
Work registry
Once an application node picks the shard to deliver the messages from it, it registers itself in a ShardedWorkRegistry. It serves as a list of locks-per-shard that only allows to pick a shard to a single node at a time. The framework users may configure the implementation of the registry by calling setWorkRegistry.
Dispatching messages
Delivery stages
The delivery process for each shard index is split into DeliveryStages. In scope of each stage, a certain number of messages is read from the respective shard of the Inbox
. The messages are grouped per-target and delivered in batches if possible. The maximum number of the messages within a DeliveryStage
can be configured.
After each DeliveryStage
it is possible to stop the delivery by supplying a custom delivery monitor. Please refer to the documentation for the details.
Conveyor and stations
In a scope of DeliveryStage
the page of the InboxMessage
s is placed to the Conveyor responsible for tracking the status of each message. The conveyor is run through the pipeline of stations, each modifying the state of the messages. At the end of the pipeline, the changed made to the messages are committed to the underlying InboxStorage
in a bulk. Such an approach allows to minimize the number of the requests sent to the storage.
As long as the new DeliveryStage
is started, the new instance of the Conveyor
is created.
Below is the list of the conveyor stations in the pipeline. 1. Catch-up station
This station is responsible for dispatching the historical events in TO_CATCH_UP status to the respective targets. Also, while the target entity is under a catch-up, all the live messages headed to it are ignored. Once the catch-up is completed, this station handles the transition period, in which the last batch of the historical events and live messages are dispatched together. See CatchUpStation for more details. 2. Live delivery station
This station is responsible for dispatching the messages sent in a real-time. It ignores the messages in TO_CATCH_UP status. Another responsibility of this station is to set for how long the delivered messages should be kept according to the deduplication window settings. See LiveDeliveryStation for more details. 3. Cleanup station
This station removes the messages which are already delivered and are no longer needed for the deduplication. See CleanupStation for the description. Deduplication
During the dispatching, Conveyor
keeps track of the delivered messages. The stations performing the actual message dispatching rely onto this knowledge and deduplicate the messages prior to calling the target's endpoint.
Additionally, the Delivery
provides a cache of recently delivered messages. Each instance of the Conveyor
has an access to it and uses it in deduplication procedures.
Local environment
By default, the delivery is configured to run locally. It uses see-and-dispatch observer, which delivers the messages from the observed shard once a message is passed to its onMessage(InboxMessage) method. This process is synchronous.
To deal with the multi-threaded access in a local mode, an InMemoryShardedWorkRegistry is used. It operates on top of the synchronized
in-memory data structures and prevents several threads from picking up the same shard.
Shard maintenance
To perform the maintenance procedures, the Delivery
requires all the
BoundedContext
s to register themselves in it. Upon this registration, a special ShardMaintenanceProcess is registered as an event dispatcher in a passed BoundedContext
. Such a registration is performed automatically when the context is built.
Functions
Delivery
for local and development environment.Builder
of Delivery
.Delivery
message dispatchers in the given BoundedContext
.