@SPI public abstract class AbstractWorkRegistry extends java.lang.Object implements ShardedWorkRegistry
ShardedWorkRegistries
based on a specific
persistence mechanism.Constructor and Description |
---|
AbstractWorkRegistry() |
Modifier and Type | Method and Description |
---|---|
protected abstract java.util.Iterator<io.spine.server.delivery.ShardSessionRecord> |
allRecords()
Obtains all the session records associated with this registry.
|
protected abstract ShardProcessingSession |
asSession(io.spine.server.delivery.ShardSessionRecord record)
Restores a
ShardProcessingSession from the given session record. |
protected void |
clearNode(io.spine.server.delivery.ShardSessionRecord session)
Clears the value of
ShardSessionRecord.worker and stores the session. |
protected abstract io.spine.server.delivery.WorkerId |
currentWorkerFor(io.spine.server.NodeId node)
Returns an identifier of the current worker that is now going to process the shard.
|
protected abstract java.util.Optional<io.spine.server.delivery.ShardSessionRecord> |
find(io.spine.server.delivery.ShardIndex index)
Looks for the session record by the given shard index.
|
java.util.Optional<ShardProcessingSession> |
pickUp(io.spine.server.delivery.ShardIndex index,
io.spine.server.NodeId node)
Picks up the shard at a given index to process.
|
java.lang.Iterable<io.spine.server.delivery.ShardIndex> |
releaseExpiredSessions(com.google.protobuf.Duration inactivityPeriod)
Clears up the recorded
NodeId s from the session records if there was no activity
for longer than passed inactivityPeriod . |
protected abstract void |
write(io.spine.server.delivery.ShardSessionRecord session)
Stores the given session.
|
public java.util.Optional<ShardProcessingSession> pickUp(io.spine.server.delivery.ShardIndex index, io.spine.server.NodeId node)
ShardedWorkRegistry
This action is intended to be exclusive, i.e. a single shard may be served by a single application node at a given moment of time.
In case of a successful operation, an instance of ShardProcessingSession
is returned. The node obtained the session should perform the desired actions with the
sharded messages and then complete
the session.
In case the shard at a given index is already picked up by some node,
an Optional.empty()
is returned.
pickUp
in interface ShardedWorkRegistry
index
- the index of the shard to pick up for processingnode
- the identifier of the node for which to pick the shardOptional.empty()
if the shard is not availableprotected abstract io.spine.server.delivery.WorkerId currentWorkerFor(io.spine.server.NodeId node)
An example of such an identifier could be ID of the thread which performs processing.
node
- the node to which the resulted worker belongspublic java.lang.Iterable<io.spine.server.delivery.ShardIndex> releaseExpiredSessions(com.google.protobuf.Duration inactivityPeriod)
ShardedWorkRegistry
NodeId
s from the session records if there was no activity
for longer than passed inactivityPeriod
.
It may be handy if an application node hangs or gets killed — so that it is not able to complete the session in a conventional way.
releaseExpiredSessions
in interface ShardedWorkRegistry
inactivityPeriod
- the duration of the period after which the session is considered expiredprotected void clearNode(io.spine.server.delivery.ShardSessionRecord session)
ShardSessionRecord.worker
and stores the session.protected abstract java.util.Iterator<io.spine.server.delivery.ShardSessionRecord> allRecords()
protected abstract void write(io.spine.server.delivery.ShardSessionRecord session)
The session may or may be not present in the registry already. After calling this method,
the given session must be reachable via AbstractWorkRegistry.find(ShardIndex)
and AbstractWorkRegistry.allRecords()
.
protected abstract java.util.Optional<io.spine.server.delivery.ShardSessionRecord> find(io.spine.server.delivery.ShardIndex index)
index
- shard index to find a session forOptional.empty()
if the record is not present in
the registryprotected abstract ShardProcessingSession asSession(io.spine.server.delivery.ShardSessionRecord record)
ShardProcessingSession
from the given session record.