CatchUpProcess

public final class CatchUpProcess<I> extends AbstractStatefulReactor<I, S, B>

A process that performs a projection catch-up.

Starts the catch-up process by emitting the CatchUpRequested event.

Has its own Inbox, so the messages arriving to it are dispatched by the Delivery.

While recalling the event history, the process relies on the work of Delivery, which follows the status of the process and delivers messages in TO_CATCH_UP status instead of live TO_DELIVER messages to the catching-up targets.

The tricky part is so called "turbulence" period, i.e. the time when the catch-up comes close to the current time in the event history. In this phase the task is to understand when to stop reading the history and switch back to delivering the live events dispatched to the catching-up projections. To do so, a special FINALIZING status is introduced. Observed by Delivery it tells to perform a transition from dispatching TO_CATCH_UP events back to delivering those TO_DELIVER. See more on that below.

In this version of the process implementation, the turbulence period always equals 500 ms meaning that the catch-up starts its finalization when the event history is read up until now - 500 ms.

In its lifecycle, the process moves through the several statuses.

Not started

The process is created in to this status upon receiving CatchUpRequested event. The further actions include:

  • A CatchUpStarted event is emitted. The target projection repository listens to this event and kills the state of the matching entities.
  • The status if the catch-up process is set to IN_PROGRESS.

IN_PROGRESS

When the process is in this status, the event history is read and the matching events are sent to the Inboxes of the corresponding projections

The catch-up maintains this status until the history is read till the point in time, which is very close to the Time.currentTime(). When this time comes, the events arriving to the EventStore are very likely to be just-emitted in a live mode. Therefore, the process needs to decide whether to decide that the history is fully read, or to continue the traversal through the history.

At this stage the actions are as follows.

  • The historical event messages are read from the EventStore respecting the time range requested and the time of the last read operation performed by this process. The maximum number of the events read is determined by one of the Delivery settings.
  • Depending on the targets requested for the catch-up, the events are posted to the corresponding entities.
  • Unless the timestamps of the events are getting close to the current time, an HistoryEventsRecalled is emitted, leaving the process in the IN_PROGRESS status and triggering the next round similar to this one.

    If the timestamps of the events read on this step are as close to the current time as the turbulence period, the HistoryFullyRecalled is emitted.

FINALIZING

The process moves to this status when the event history has been fully recalled and the corresponding HistoryFullyRecalled is received. At this stage the Delivery stops the propagation of the events to the catch-up messages, waiting for this process to populate the inboxes with the messages arriving to be dispatched during the turbulence period. Potentially, the inboxes will contain the duplicates produced by both the live users and this process. To deal with it, a deduplication is performed by the Delivery. See CatchUpStation for more details.

The actions are as follows.

  • All the remaining messages of the matching event types are read EventStore. In this operation the read limits set by the Delivery are NOT used, since the goal is to read the remainder of the events.
  • If there were no events read, the CatchUpCompleted is emitted and the process is moved to the COMPLETED status.
  • If there were some events read, the LiveEventsPickedUp event is emitted. This allows some time for the posted events to become visible to the Delivery. The reacting handler of the LiveEventsPickedUp completes the process.

COMPLETED

Once the process moves to the COMPLETED status, the corresponding Delivery routines deduplicate, reorder and dispatch the "paused" events. Then the normal live delivery flow is resumed.

To ensure that all the shards in which the "paused" historical events reside are processed, this process additionally emits the "maintenance" ShardProcessingRequested events for each shard involved. By dispatching them, the system guarantees that the Delivery observes the COMPLETED status of this process and delivers the remainer of the messages.

Types

Link copied to clipboard
A method object dispatching the event to catch-up.

Functions

Link copied to clipboard
public boolean canDispatch(EventEnvelope envelope)
Checks if this dispatcher can dispatch the given message.
Link copied to clipboard
protected Optional<CatchUp> load(CatchUpId id)
Loads the state from the storage by ID.
Link copied to clipboard
public static CatchUpProcessBuilder<I> newBuilder<I>(ProjectionRepository<I, ? extends Object, ? extends Object> repo)
Link copied to clipboard
protected Builder newStateBuilderWith(CatchUpId id)
Creates a new instance of the respective state ValidatingBuilder and sets the passed identifier to it.
Link copied to clipboard
protected void registerIn(Stand stand)
Does NOT register this process in Stand, as the emitted events should not be available for subscribing.
Link copied to clipboard
public void registerWith(BoundedContext context)
Registers this instance as a part of the given Bounded Context.
Link copied to clipboard
protected ImmutableSet<CatchUpId> route(EventEnvelope event)
Selects the target to which the event should be dispatched.
Link copied to clipboard
public CatchUpId startCatchUp(Timestamp since, Set<I> ids)
Starts the catch-up for the projection instances selecting them by their identifiers.
Link copied to clipboard
protected void store(CatchUp updatedState)
Stores the passed state in the storage.

Inherited functions

Link copied to clipboard
protected B builder()
Returns the current state as a ValidatingBuilder for the respective message.
Link copied to clipboard
public void checkNotRegistered()
Verifies that this instance is NOT registered yet.
Link copied to clipboard
public void checkRegistered()
Verifies that this instance is already registered.
Link copied to clipboard
public abstract DispatchOutcome dispatch(E envelope)
public DispatchOutcome dispatch(EventEnvelope event)
Dispatches the message contained in the passed envelope and returns the outcome.
Link copied to clipboard
public boolean dispatchesEvents()
Verifies if this instance dispatches at least one event.
Link copied to clipboard
public boolean dispatchesExternalEvents()
Verifies if this instance dispatches at least one external event.
Link copied to clipboard
public ImmutableSet<EventClass> domesticEventClasses()
Obtains classes of domestic events processed by this dispatcher.
Link copied to clipboard
public ImmutableSet<EventClass> eventClasses()
Obtains classes of all events processed by this dispatcher.
Link copied to clipboard
public ImmutableSet<EventClass> externalEventClasses()
Obtains classes of external events processed by this dispatcher.
Link copied to clipboard
protected final void flushState()
Immediately writes the changes made to the current builder to the storage.
Link copied to clipboard
public boolean isRegistered()
Determines if this instance is already registered with a Bounded Context.
Link copied to clipboard
public abstract ImmutableSet<C> messageClasses()
public ImmutableSet<EventClass> messageClasses()
Obtains a set of message classes that can be processed by this dispatcher.
Link copied to clipboard
public Nothing nothing()
Obtains the io.spine.server.model.
Link copied to clipboard
public ImmutableSet<EventClass> producedEvents()
Obtains classes of the events produced by this object.
Link copied to clipboard
public Any producerId()
Obtains the name of this reactor, packed to Any.
Link copied to clipboard
public Version version()
Returns a zero version.