public final class CatchUpProcess<I> extends AbstractStatefulReactor<io.spine.server.delivery.CatchUpId,io.spine.server.delivery.CatchUp,io.spine.server.delivery.CatchUp.Builder>
Starts the catch-up process by emitting the CatchUpRequested
event.
Has its own Inbox
, so the messages arriving to it are dispatched by the
Delivery
.
While recalling the event history, the process relies on the work of Delivery
,
which follows the status of the process and delivers messages in TO_CATCH_UP
status
instead of live TO_DELIVER
messages to the catching-up targets.
The tricky part is so called "turbulence" period, i.e. the time when the catch-up comes
close to the current time in the event history. In this phase the task is to understand when to
stop reading the history and switch back to delivering the live events dispatched to the
catching-up projections. To do so, a special FINALIZING
status is introduced. Observed
by Delivery
it tells to perform a transition from dispatching TO_CATCH_UP
events
back to delivering those TO_DELIVER
. See more on that below.
In this version of the process implementation, the turbulence period always equals 500 ms
meaning that the catch-up starts its finalization when the event history is read
up until now - 500 ms
.
In its lifecycle, the process moves through the several statuses.
Not started
The process is created in to this status upon receiving CatchUpRequested
event.
The further actions include:
CatchUpStarted
event is emitted. The target projection repository listens to
this event and kills the state of the matching entities.
IN_PROGRESS
.
IN_PROGRESS
When the process is in this status, the event history is read and the matching events are sent
to the Inbox
es of the corresponding projections
The catch-up maintains this status until the history is read till the point in time,
which is very close to the Time.currentTime()
. When this time comes,
the events arriving to the EventStore
are very likely to be just-emitted in a live mode.
Therefore, the process needs to decide whether to decide that the history is fully read, or
to continue the traversal through the history.
At this stage the actions are as follows.
EventStore
respecting
the time range requested and the time of the last read operation performed by this process.
The maximum number of the events read is determined by
one of the Delivery settings.
HistoryEventsRecalled
is emitted, leaving the process in the IN_PROGRESS
status and triggering the next round similar to this one.
If the timestamps of the events read on this step are as close to the current time as
the turbulence period, the HistoryFullyRecalled
is emitted.
FINALIZING
The process moves to this status when the event history has been fully recalled and the
corresponding HistoryFullyRecalled
is received. At this stage the Delivery
stops
the propagation of the events to the catch-up messages, waiting for this process to populate
the inboxes with the messages arriving to be dispatched during the turbulence period.
Potentially, the inboxes will contain the duplicates produced by both the live users
and this process. To deal with it, a deduplication is performed by the Delivery
.
See CatchUpStation
for more details.
The actions are as follows.
EventStore
.
In this operation the read limits set by the Delivery
are NOT used, since
the goal is to read the remainder of the events.
CatchUpCompleted
is emitted and the process
is moved to the COMPLETED
status.
LiveEventsPickedUp
event is emitted.
This allows some time for the posted events to become visible to the Delivery
.
The reacting handler of the LiveEventsPickedUp
completes the process.
COMPLETED
Once the process moves to the COMPLETED
status, the corresponding Delivery
routines deduplicate, reorder and dispatch the "paused" events. Then the normal live delivery
flow is resumed.
To ensure that all the shards in which the "paused" historical events reside are processed,
this process additionally emits the "maintenance" ShardProcessingRequested
events for
each shard involved. By dispatching them, the system guarantees that the Delivery
observes the COMPLETED
status of this process and delivers the remainer of the messages.
BoundedContext
application.Modifier and Type | Class and Description |
---|---|
static interface |
CatchUpProcess.DispatchCatchingUp<I>
A method object dispatching the event to catch-up.
|
Modifier and Type | Method and Description |
---|---|
boolean |
canDispatch(EventEnvelope envelope)
Checks if this dispatcher can dispatch the given message.
|
protected java.util.Optional<io.spine.server.delivery.CatchUp> |
load(io.spine.server.delivery.CatchUpId id)
Loads the state from the storage by ID.
|
protected io.spine.server.delivery.CatchUp.Builder |
newStateBuilderWith(io.spine.server.delivery.CatchUpId id)
Creates a new instance of the respective state
ValidatingBuilder and sets the
passed identifier to it. |
protected com.google.common.collect.ImmutableSet<io.spine.server.delivery.CatchUpId> |
route(EventEnvelope event)
Selects the target to which the event should be dispatched.
|
protected void |
store(io.spine.server.delivery.CatchUp updatedState)
Stores the passed state in the storage.
|
builder, dispatch, flushState
domesticEventClasses, externalEventClasses, isRegistered, messageClasses, producedEvents, producerId, registerWith, version
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
nothing
dispatchesEvents, dispatchesExternalEvents, eventClasses
public boolean canDispatch(EventEnvelope envelope)
This method does not check that the type of the message is one of the dispatched message classes. Instead, it validates the message upon some custom rules of this dispatcher.
Extend this method to forbid messages from being dispatched to this instance.
Ensures that the passed signal is an instance of CatchUpSignal
and hence
is suitable for the upcoming ID extraction
at the routing stage.
envelope
- the message to checktrue
is the given message can be dispatched by this dispatcher,
false
otherwiseprotected com.google.common.collect.ImmutableSet<io.spine.server.delivery.CatchUpId> route(EventEnvelope event)
AbstractStatefulReactor
route
in class AbstractStatefulReactor<io.spine.server.delivery.CatchUpId,io.spine.server.delivery.CatchUp,io.spine.server.delivery.CatchUp.Builder>
event
- the event to dispatchprotected java.util.Optional<io.spine.server.delivery.CatchUp> load(io.spine.server.delivery.CatchUpId id)
AbstractStatefulReactor
Returns Optional.empty()
if there is no such object found.
load
in class AbstractStatefulReactor<io.spine.server.delivery.CatchUpId,io.spine.server.delivery.CatchUp,io.spine.server.delivery.CatchUp.Builder>
protected void store(io.spine.server.delivery.CatchUp updatedState)
AbstractStatefulReactor
store
in class AbstractStatefulReactor<io.spine.server.delivery.CatchUpId,io.spine.server.delivery.CatchUp,io.spine.server.delivery.CatchUp.Builder>
protected io.spine.server.delivery.CatchUp.Builder newStateBuilderWith(io.spine.server.delivery.CatchUpId id)
AbstractStatefulReactor
ValidatingBuilder
and sets the
passed identifier to it.newStateBuilderWith
in class AbstractStatefulReactor<io.spine.server.delivery.CatchUpId,io.spine.server.delivery.CatchUp,io.spine.server.delivery.CatchUp.Builder>