public final class CatchUpProcess<I> extends AbstractStatefulReactor<io.spine.server.delivery.CatchUpId,io.spine.server.delivery.CatchUp,io.spine.server.delivery.CatchUp.Builder>
Starts the catch-up process by emitting the
While recalling the event history, the process relies on the work of
which follows the status of the process and delivers messages in
instead of live
TO_DELIVER messages to the catching-up targets.
The tricky part is so called "turbulence" period, i.e. the time when the catch-up comes
close to the current time in the event history. In this phase the task is to understand when to
stop reading the history and switch back to delivering the live events dispatched to the
catching-up projections. To do so, a special
FINALIZING status is introduced. Observed
Delivery it tells to perform a transition from dispatching
back to delivering those
TO_DELIVER. See more on that below.
In this version of the process implementation, the turbulence period always equals 500 ms
meaning that the catch-up starts its finalization when the event history is read
now - 500 ms.
In its lifecycle, the process moves through the several statuses.
The process is created in to this status upon receiving
The further actions include:
CatchUpStartedevent is emitted. The target projection repository listens to this event and kills the state of the matching entities.
When the process is in this status, the event history is read and the matching events are sent
Inboxes of the corresponding projections
The catch-up maintains this status until the history is read till the point in time,
which is very close to the
Time.currentTime(). When this time comes,
the events arriving to the
EventStore are very likely to be just-emitted in a live mode.
Therefore, the process needs to decide whether to decide that the history is fully read, or
to continue the traversal through the history.
At this stage the actions are as follows.
EventStorerespecting the time range requested and the time of the last read operation performed by this process. The maximum number of the events read is determined by one of the Delivery settings.
HistoryEventsRecalledis emitted, leaving the process in the
IN_PROGRESSstatus and triggering the next round similar to this one.
If the timestamps of the events read on this step are as close to the current time as
the turbulence period, the
HistoryFullyRecalled is emitted.
The process moves to this status when the event history has been fully recalled and the
HistoryFullyRecalled is received. At this stage the
the propagation of the events to the catch-up messages, waiting for this process to populate
the inboxes with the messages arriving to be dispatched during the turbulence period.
Potentially, the inboxes will contain the duplicates produced by both the live users
and this process. To deal with it, a deduplication is performed by the
CatchUpStation for more details.
The actions are as follows.
EventStore. In this operation the read limits set by the
Deliveryare NOT used, since the goal is to read the remainder of the events.
CatchUpCompletedis emitted and the process is moved to the
LiveEventsPickedUpevent is emitted. This allows some time for the posted events to become visible to the
Delivery. The reacting handler of the
LiveEventsPickedUpcompletes the process.
Once the process moves to the
COMPLETED status, the corresponding
routines deduplicate, reorder and dispatch the "paused" events. Then the normal live delivery
flow is resumed.
To ensure that all the shards in which the "paused" historical events reside are processed,
this process additionally emits the "maintenance"
ShardProcessingRequested events for
each shard involved. By dispatching them, the system guarantees that the
COMPLETED status of this process and delivers the remainer of the messages.
|Modifier and Type||Class and Description|
A method object dispatching the event to catch-up.
|Modifier and Type||Method and Description|
Checks if this dispatcher can dispatch the given message.
Loads the state from the storage by ID.
Creates a new instance of the respective state
Selects the target to which the event should be dispatched.
Stores the passed state in the storage.
builder, dispatch, flushState
domesticEventClasses, externalEventClasses, isRegistered, messageClasses, producedEvents, producerId, registerWith, version
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
public boolean canDispatch(EventEnvelope envelope)
This method does not check that the type of the message is one of the dispatched message classes. Instead, it validates the message upon some custom rules of this dispatcher.
Extend this method to forbid messages from being dispatched to this instance.
envelope- the message to check
trueis the given message can be dispatched by this dispatcher,
protected com.google.common.collect.ImmutableSet<io.spine.server.delivery.CatchUpId> route(EventEnvelope event)
protected java.util.Optional<io.spine.server.delivery.CatchUp> load(io.spine.server.delivery.CatchUpId id)
Optional.empty() if there is no such object found.
protected void store(io.spine.server.delivery.CatchUp updatedState)
protected io.spine.server.delivery.CatchUp.Builder newStateBuilderWith(io.spine.server.delivery.CatchUpId id)
ValidatingBuilderand sets the passed identifier to it.