Class AbstractMessageGroupStore
java.lang.Object
org.springframework.integration.store.AbstractBatchingMessageGroupStore
org.springframework.integration.store.AbstractMessageGroupStore
- All Implemented Interfaces:
Iterable<MessageGroup>
,BasicMessageGroupStore
,MessageGroupStore
- Direct Known Subclasses:
AbstractConfigurableMongoDbMessageStore
,AbstractKeyValueMessageStore
,JdbcMessageStore
,MongoDbMessageStore
,SimpleMessageStore
@ManagedResource
public abstract class AbstractMessageGroupStore
extends AbstractBatchingMessageGroupStore
implements MessageGroupStore, Iterable<MessageGroup>
- Since:
- 2.0
- Author:
- Dave Syer, Oleg Zhurakousky, Gary Russell, Artem Bilan, Christian Tzolov, Youbin Wu
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.store.MessageGroupStore
MessageGroupStore.MessageGroupCallback
-
Field Summary
-
Constructor Summary
ModifierConstructorDescriptionprotected
protected
AbstractMessageGroupStore
(boolean lazyLoadMessageGroups) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addMessagesToGroup
(Object groupId, Message<?>... messages) Store messages with an association to a group id.addMessageToGroup
(Object groupId, Message<?> message) Store a message with an association to a group id.void
completeGroup
(Object groupId) Completes this MessageGroup.protected MessageGroup
copy
(MessageGroup group) Used by expireMessageGroups.protected abstract void
doAddMessagesToGroup
(Object groupId, Message<?>... messages) protected abstract void
doCompleteGroup
(Object groupId) protected abstract Message<?>
doPollMessageFromGroup
(Object groupId) protected boolean
doRemoveMessageFromGroupById
(Object groupId, UUID messageId) protected abstract void
doRemoveMessageGroup
(Object groupId) protected abstract void
doRemoveMessagesFromGroup
(Object key, Collection<Message<?>> messages) protected abstract void
doSetGroupCondition
(Object groupId, String condition) protected abstract void
doSetLastReleasedSequenceNumberForGroup
(Object groupId, int sequenceNumber) protected <T,
E extends RuntimeException>
TexecuteLocked
(Object groupId, CheckedCallable<T, E> runnable) protected <E extends RuntimeException>
voidexecuteLocked
(Object groupId, CheckedRunnable<E> runnable) int
expireMessageGroups
(long timeout) Extract all expired groups (whose timestamp is older than the current time less the threshold provided) and call each of the registered callbacks on them in turn.getGroupMetadata
(Object groupId) Obtain the group metadata without fetching any messages; must supply all other group properties; may include the id of the first message.protected LockRegistry
int
Optional attribute giving the number of messages in the store over all groups.int
Optional attribute giving the number of message groups.protected MessageGroupFactory
boolean
Message<?>
pollMessageFromGroup
(Object groupId) Poll Message from thisMessageGroup
(in FIFO style if supported by the implementation) while also removing the polledMessage
.void
Register a callback for when a message group is expired throughMessageGroupStore.expireMessageGroups(long)
.boolean
removeMessageFromGroupById
(Object groupId, UUID messageId) Deletion the message from the group.void
removeMessageGroup
(Object groupId) Remove the message group with this id.void
removeMessagesFromGroup
(Object key, Collection<Message<?>> messages) Persist the deletion of messages from the group.void
removeMessagesFromGroup
(Object key, Message<?>... messages) Persist the deletion of messages from the group.void
setExpiryCallbacks
(Collection<MessageGroupStore.MessageGroupCallback> expiryCallbacks) Convenient injection point for expiry callbacks in the message store.void
setGroupCondition
(Object groupId, String condition) Add a condition sentence into the group.void
setLastReleasedSequenceNumberForGroup
(Object groupId, int sequenceNumber) Allows you to set the sequence number of the last released Message.void
setLazyLoadMessageGroups
(boolean lazyLoadMessageGroups) Specify if the result of theBasicMessageGroupStore.getMessageGroup(Object)
should be wrapped to thePersistentMessageGroup
- a lazy-load proxy for messages in group Defaults totrue
.final void
setLockRegistry
(LockRegistry lockRegistry) Specify the type of theLockRegistry
to ensure atomic operationsvoid
setTimeoutOnIdle
(boolean timeoutOnIdle) Allows you to override the rule for the timeout calculation.Methods inherited from class org.springframework.integration.store.AbstractBatchingMessageGroupStore
getRemoveBatchSize, setMessageGroupFactory, setRemoveBatchSize
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.integration.store.BasicMessageGroupStore
getMessageGroup, messageGroupSize
Methods inherited from interface java.lang.Iterable
forEach, iterator, spliterator
Methods inherited from interface org.springframework.integration.store.MessageGroupStore
getMessageFromGroup, getMessagesForGroup, getOneMessageFromGroup, iterator, streamMessagesForGroup
-
Field Details
-
INTERRUPTED_WHILE_OBTAINING_LOCK
- See Also:
-
GROUP_ID_MUST_NOT_BE_NULL
- See Also:
-
logger
-
-
Constructor Details
-
AbstractMessageGroupStore
protected AbstractMessageGroupStore() -
AbstractMessageGroupStore
protected AbstractMessageGroupStore(boolean lazyLoadMessageGroups)
-
-
Method Details
-
getMessageGroupFactory
- Overrides:
getMessageGroupFactory
in classAbstractBatchingMessageGroupStore
-
setExpiryCallbacks
Convenient injection point for expiry callbacks in the message store. Each of the callbacks provided will simply be registered with the store usingregisterMessageGroupExpiryCallback(MessageGroupCallback)
.- Parameters:
expiryCallbacks
- the expiry callbacks to add
-
isTimeoutOnIdle
public boolean isTimeoutOnIdle() -
setTimeoutOnIdle
public void setTimeoutOnIdle(boolean timeoutOnIdle) Allows you to override the rule for the timeout calculation. Typical timeout is based from the time theMessageGroup
was created. If you want the timeout to be based on the time theMessageGroup
was idling (e.g., inactive from the last update) invoke this method with 'true'. Default is 'false'.- Parameters:
timeoutOnIdle
- The boolean.
-
setLazyLoadMessageGroups
public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) Specify if the result of theBasicMessageGroupStore.getMessageGroup(Object)
should be wrapped to thePersistentMessageGroup
- a lazy-load proxy for messages in group Defaults totrue
.The target logic is based on the
SimpleMessageGroupFactory.GroupType.PERSISTENT
.- Parameters:
lazyLoadMessageGroups
- theboolean
flag to use.- Since:
- 4.3
-
setLockRegistry
Specify the type of theLockRegistry
to ensure atomic operations- Parameters:
lockRegistry
- lockRegistryType- Since:
- 6.5
-
getLockRegistry
-
registerMessageGroupExpiryCallback
Description copied from interface:MessageGroupStore
Register a callback for when a message group is expired throughMessageGroupStore.expireMessageGroups(long)
.- Specified by:
registerMessageGroupExpiryCallback
in interfaceMessageGroupStore
- Parameters:
callback
- A callback to execute when a message group is cleaned up.
-
expireMessageGroups
Description copied from interface:MessageGroupStore
Extract all expired groups (whose timestamp is older than the current time less the threshold provided) and call each of the registered callbacks on them in turn. For example: call with a timeout of 100 to expire all groups that were created more than 100 milliseconds ago, and are not yet complete. Use a timeout of 0 (or negative to be on the safe side) to expire all message groups.- Specified by:
expireMessageGroups
in interfaceMessageGroupStore
- Parameters:
timeout
- the timeout threshold to use- Returns:
- the number of message groups expired
- See Also:
-
copy
Used by expireMessageGroups. We need to return a snapshot of the group at the time the reaper runs, so we can properly detect if the group changed between now and the attempt to expire the group. Not necessary for persistent stores, so the default behavior is to just return the group.- Parameters:
group
- The group.- Returns:
- The group, or a copy.
- Since:
- 4.0.1
-
getMessageCountForAllMessageGroups
Description copied from interface:MessageGroupStore
Optional attribute giving the number of messages in the store over all groups. Implementations may decline to respond by throwing an exception.- Specified by:
getMessageCountForAllMessageGroups
in interfaceMessageGroupStore
- Returns:
- the number of messages
-
getMessageGroupCount
Description copied from interface:MessageGroupStore
Optional attribute giving the number of message groups. Implementations may decline to respond by throwing an exception.- Specified by:
getMessageGroupCount
in interfaceMessageGroupStore
- Returns:
- the number message groups
-
getGroupMetadata
Description copied from interface:MessageGroupStore
Obtain the group metadata without fetching any messages; must supply all other group properties; may include the id of the first message.- Specified by:
getGroupMetadata
in interfaceMessageGroupStore
- Parameters:
groupId
- The group id.- Returns:
- The metadata.
-
removeMessagesFromGroup
Description copied from interface:MessageGroupStore
Persist the deletion of messages from the group.- Specified by:
removeMessagesFromGroup
in interfaceMessageGroupStore
- Parameters:
key
- The groupId for the group containing the message(s).messages
- The messages to be removed.
-
removeMessagesFromGroup
Description copied from interface:MessageGroupStore
Persist the deletion of messages from the group.- Specified by:
removeMessagesFromGroup
in interfaceMessageGroupStore
- Parameters:
key
- The groupId for the group containing the message(s).messages
- The messages to be removed.
-
doRemoveMessagesFromGroup
-
addMessagesToGroup
Description copied from interface:MessageGroupStore
Store messages with an association to a group id. This can be used to group messages together.- Specified by:
addMessagesToGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- The group id to store messages under.messages
- The messages to add.
-
doAddMessagesToGroup
-
addMessageToGroup
Description copied from interface:BasicMessageGroupStore
Store a message with an association to a group id. This can be used to group messages together.- Specified by:
addMessageToGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group id to store the message under.message
- A message.- Returns:
- The message group.
-
removeMessageGroup
Description copied from interface:BasicMessageGroupStore
Remove the message group with this id.- Specified by:
removeMessageGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The id of the group to remove.
-
doRemoveMessageGroup
-
removeMessageFromGroupById
Description copied from interface:MessageGroupStore
Deletion the message from the group.- Specified by:
removeMessageFromGroupById
in interfaceMessageGroupStore
- Parameters:
groupId
- The groupId for the group containing the message.messageId
- The message id to be removed.- Returns:
- true if message has been removed.
-
doRemoveMessageFromGroupById
-
setLastReleasedSequenceNumberForGroup
Description copied from interface:MessageGroupStore
Allows you to set the sequence number of the last released Message. Used for Resequencing use cases- Specified by:
setLastReleasedSequenceNumberForGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- The group identifier.sequenceNumber
- The sequence number.
-
doSetLastReleasedSequenceNumberForGroup
-
completeGroup
Description copied from interface:MessageGroupStore
Completes this MessageGroup. Completion of the MessageGroup generally means that this group should not be allowing anymore mutating operation to be performed on it. For example any attempt to add/remove new Message form the group should not be allowed.- Specified by:
completeGroup
in interfaceMessageGroupStore
- Parameters:
groupId
- The group identifier.
-
doCompleteGroup
-
setGroupCondition
Description copied from interface:MessageGroupStore
Add a condition sentence into the group. Can be used later on for making some decisions for group, e.g. release strategy for correlation handler can consult this condition instead of iterating all the messages in group.- Specified by:
setGroupCondition
in interfaceMessageGroupStore
- Parameters:
groupId
- The group identifier.condition
- The condition to store into the group.
-
doSetGroupCondition
-
pollMessageFromGroup
Description copied from interface:BasicMessageGroupStore
Poll Message from thisMessageGroup
(in FIFO style if supported by the implementation) while also removing the polledMessage
.- Specified by:
pollMessageFromGroup
in interfaceBasicMessageGroupStore
- Parameters:
groupId
- The group identifier.- Returns:
- The message.
-
doPollMessageFromGroup
-
executeLocked
protected <T,E extends RuntimeException> T executeLocked(Object groupId, CheckedCallable<T, E> runnable) -
executeLocked
protected <E extends RuntimeException> void executeLocked(Object groupId, CheckedRunnable<E> runnable)
-