public final class Delivery
extends java.lang.Object
implements io.spine.logging.Logging
Splits the incoming messages into shards and allows to deliver the messages to their destinations on a per-shard basis. Guarantees that one and only one application server node serves the messages from the given shard at a time, thus preventing any concurrent modifications of entity state.
Delegates the message dispatching and low-level handling of message duplicates to
Inbox
es of each target entity. The respective Inbox
instances
should be created in each of Entity
repositories.
By default, a shard is assigned according to the identifier of the target entity. The messages heading to a single entity will always reside in a single shard. However, the framework users may customize this behavior.
The typical customization would be to specify the same shard index for the related targets.
E.g. if there is an OrderAggregate
, OrderItemAggregate
and OrderItemProjection
, they could share the same shard index. In this way the messages
headed to these entities will be dispatched and processed together. In turn, that will reduce
the eventual consistency lag between C
side (i.e. aggregate state updates)
and Q
side (i.e. the respective updates in projections).
As long as the underlying storage and transport mechanisms are restricted by the CAP theorem,
there may be duplicates in the messages written, read or dispatched. The Delivery
responds to it by storing some of the already delivered messages for longer and using them as
a source for deduplication.
Provides the time-based
deduplication capabilities to eliminate the messages, which may have been already delivered
to their targets. The duplicates will be detected among the messages, which are not older, than
now - [deduplication window]
.
InboxStorage
Delivery
is responsible for providing the InboxStorage
for every inbox
registered. Framework users may configure the storage, taking into account that it is typically multi-tenant. By default,
the InboxStorage
for the delivery is provided by the environment-specific
storage factory and is multi-tenant.
In addition to delivering the messages sent in a real-time, Delivery
dispatches
the historical events sent to the catching-up projections. These events are dispatched through
the same shards as the live messages. A special CatchUpStation
is responsible for
handling this use-case. See more on that in the respective section.
To control how many historical events are read and put into shards, the end-users may configure the maximum number of messages read from the history at a time. This is helpful to balance the per-shard throughput, so that the live messages are still dispatched through the same shards in a reasonable time.
The statuses of the ongoing catch-up processes are stored in a dedicated
CatchUpStorage
. The DeliveryBuilder
exposes an API for the customization of this
storage.
Once a message is written to the Inbox
,
the pre-configured shard observers are
notified. In this way any third-party
environment planners, load balancers, and schedulers may plug into the delivery and perform
various routines to enable the further processing of the sharded messages. In a distributed
environment a message queue may be used to notify the node cluster of a shard that has some
messages pending for the delivery.
Once an application node picks the shard to deliver the messages from it, it registers itself
in a ShardedWorkRegistry
. It serves as a list of locks-per-shard that only allows
to pick a shard to a single node at a time. The framework users may configure the implementation
of the registry by calling DeliveryBuilder.setWorkRegistry(ShardedWorkRegistry)
.
The delivery process for each shard index is split into DeliveryStage
s. In scope of
each stage, a certain number of messages is read from the respective shard of the Inbox
.
The messages are grouped per-target and delivered in batches if possible. The maximum
number of the messages within a DeliveryStage
can be
configured.
After each DeliveryStage
it is possible to stop the delivery by
supplying
a custom delivery monitor.
Please refer to the documentation
for the details.
In a scope of DeliveryStage
the page of the InboxMessage
s is placed
to the Conveyor
responsible for tracking the status of each message.
The conveyor is run through the pipeline of stations, each modifying the state of the messages.
At the end of the pipeline, the changed made to the messages are committed to the underlying
InboxStorage
in a bulk. Such an approach allows to minimize the number of the requests
sent to the storage.
As long as the new DeliveryStage
is started, the new instance of the Conveyor
is created.
Below is the list of the conveyor stations in the pipeline. 1. Catch-up station
This station is responsible for dispatching the historical events in
TO_CATCH_UP
status to the respective targets. Also,
while the target entity is under a catch-up, all the live messages headed to it are ignored.
Once the catch-up is completed, this station handles the transition period, in which the last
batch of the historical events and live messages are dispatched together.
See CatchUpStation
for more details.
2. Live delivery station
This station is responsible for dispatching the messages sent in a real-time. It ignores
the messages in TO_CATCH_UP
status. Another responsibility
of this station is to set for how long the delivered messages should be kept according to the
deduplication window settings.
See LiveDeliveryStation
for more details.
3. Cleanup station
This station removes the messages which are already delivered and are no longer needed for the
deduplication. See CleanupStation
for the description.
Deduplication
During the dispatching, Conveyor
keeps track of the delivered messages. The stations
performing the actual message dispatching rely onto this knowledge and deduplicate
the messages prior to calling the target's endpoint.
Additionally, the Delivery
provides a cache of recently
delivered messages. Each instance of the Conveyor
has an access to it and uses it
in deduplication procedures.
By default, the delivery is configured to run locally. It
uses see-and-dispatch observer, which delivers the
messages from the observed shard once a message is passed to its
onMessage(InboxMessage)
method. This
process is synchronous.
To deal with the multi-threaded access in a local mode,
an InMemoryShardedWorkRegistry is used. It operates on top of the
synchronized
in-memory data structures and prevents several threads from picking up the
same shard.
To perform the maintenance procedures, the Delivery
requires all the BoundedContext
s to register themselves in it. Upon this registration, a special
ShardMaintenanceProcess
is registered as an event dispatcher in a passed
BoundedContext
. Such a registration is performed automatically when the context is
built.
Modifier and Type | Method and Description |
---|---|
java.util.Optional<DeliveryStats> |
deliverMessagesFrom(io.spine.server.delivery.ShardIndex index)
Delivers the messages put into the shard with the passed index to their targets.
|
static Delivery |
local()
Creates a new instance of
Delivery suitable for local and development environment. |
static Delivery |
localAsync()
Creates a new instance of
Delivery for local and development environment. |
static DeliveryBuilder |
newBuilder()
Creates an instance of new
Builder of Delivery . |
<I> CatchUpProcessBuilder<I> |
newCatchUpProcess(ProjectionRepository<I,?,?> repo)
Creates a new instance of the builder for
CatchUpProcess . |
<I> Inbox.Builder<I> |
newInbox(io.spine.type.TypeUrl entityType)
Creates an instance of
Inbox.Builder for the given entity type. |
void |
subscribe(ShardObserver observer)
Subscribes to the updates of shard contents.
|
public static DeliveryBuilder newBuilder()
Builder
of Delivery
.public static Delivery local()
Delivery
suitable for local and development environment.
In this setup, the InboxMessage
s are delivered to their targets synchronously.
Uses a single-shard splitting.
To construct a Delivery
instance, a StorageFactory
is needed.
If it was not configured in the ServerEnvironment
, uses a new InMemoryStorageFactory
.
public static Delivery localAsync()
Delivery
for local and development environment.
The InboxMessage
s are delivered to their targets asynchronously.
The returned instance of Delivery
is configured to use
the single shard.
To construct a Delivery
instance, a StorageFactory
is needed.
If it was not configured in the ServerEnvironment
, a new InMemoryStorageFactory
used.
public java.util.Optional<DeliveryStats> deliverMessagesFrom(io.spine.server.delivery.ShardIndex index)
At a given moment of time, exactly one application node may serve messages from
a particular shard. Therefore, in scope of this delivery, an approach based on pessimistic
locking per-ShardIndex
is applied.
In case the given shard is already processed by some node, this method does nothing and
returns Optional.empty()
.
The content of the shard is read and delivered on page-by-page basis. The runtime exceptions occurring while a page is being delivered are accumulated and then the first exception is rethrown, if any.
After all the pages are read, the delivery process is launched again for the same shard. It is required in order to handle the messages, that may have been put to the same shard as an outcome of the first-wave messages.
Once the shard has no more messages to deliver, the delivery process ends, releasing
the lock for the respective ShardIndex
.
index
- the shard index to deliver the messages from.Optional.empty()
if there
were no delivery performedpublic <I> Inbox.Builder<I> newInbox(io.spine.type.TypeUrl entityType)
Inbox.Builder
for the given entity type.I
- the type if entity identifiersentityType
- the type of the entity, to which the inbox will belongInbox
public <I> CatchUpProcessBuilder<I> newCatchUpProcess(ProjectionRepository<I,?,?> repo)
CatchUpProcess
.I
- the type of identifiers of entities managed by the projection repositoryrepo
- projection repository for which the catch-up process will be createdCatchUpProcess
public void subscribe(ShardObserver observer)
The passed observer will be notified that the contents of a shard with a particular index were changed.
observer
- an observer to notify of updates.