Class AbstractCorrelatingMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler
- All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>
,Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Ordered
,ComponentSourceAware
,ExpressionCapable
,Orderable
,MessageProducer
,DiscardingMessageHandler
,HeaderPropagationAware
,IntegrationPattern
,NamedComponent
,IntegrationManagement
,ManageableLifecycle
,TrackableComponent
,MessageHandler
,reactor.core.CoreSubscriber<Message<?>>
- Direct Known Subclasses:
AggregatingMessageHandler
,ResequencingMessageHandler
public abstract class AbstractCorrelatingMessageHandler
extends AbstractMessageProducingHandler
implements DiscardingMessageHandler, ApplicationEventPublisherAware, ManageableLifecycle
Abstract Message handler that holds a buffer of correlated messages in a
MessageStore
.
This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of
MessageHandlers that require correlation and is used as a base class for Aggregator -
AggregatingMessageHandler
and Resequencer - ResequencingMessageHandler
,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy
,
ReleaseStrategy
, and MessageGroupProcessor
implementations as
you require.
By default the CorrelationStrategy
will be a
HeaderAttributeCorrelationStrategy
and the ReleaseStrategy
will be a
SequenceSizeReleaseStrategy
.
Use proper CorrelationStrategy
for cases when same
MessageStore
is used
for multiple handlers to ensure uniqueness of message groups across handlers.
When the expireTimeout
is greater than 0, groups which are older than this timeout
are purged from the store on start up (or when purgeOrphanedGroups()
is called).
If expireDuration
is provided, the task is scheduled to perform
purgeOrphanedGroups()
periodically.
- Since:
- 2.0
- Author:
- Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell, Artem Bilan, David Liu, Enrique Rodriguez, Meherzad Lahewala, Jayadev Sirimamilla, Ngoc Nhan
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static class
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplate
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
-
Constructor Summary
ConstructorsConstructorDescriptionAbstractCorrelatingMessageHandler
(MessageGroupProcessor processor, MessageGroupStore store) AbstractCorrelatingMessageHandler
(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract void
afterRelease
(MessageGroup group, Collection<Message<?>> completedMessages) Allows you to provide additional logic that needs to be performed after the MessageGroup was released.protected void
afterRelease
(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) Subclasses may override if special action is needed because the group was released or discarded due to a timeout.protected void
completeGroup
(Object correlationKey, MessageGroup group, Lock lock) protected Collection
<Message<?>> completeGroup
(Message<?> message, Object correlationKey, MessageGroup group, Lock lock) void
destroy()
protected void
expireGroup
(Object correlationKey, MessageGroup group, Lock lock) protected int
findLastReleasedSequenceNumber
(Object groupId, Collection<Message<?>> partialSequence) protected void
forceComplete
(MessageGroup group) Subclasses may implement this method to provide component type information.protected CorrelationStrategy
Return the discard channel.protected String
protected EvaluationContext
protected Map
<UUID, ScheduledFuture<?>> protected BiFunction
<Message<?>, String, String> protected Expression
protected LockRegistry
protected long
Return a configuredMessageGroupProcessor
.protected ReleaseStrategy
protected void
handleMessageInternal
(Message<?> message) protected boolean
protected boolean
protected boolean
protected boolean
boolean
protected boolean
protected boolean
protected Object
obtainGroupTimeout
(MessageGroup group) protected void
onInit()
Subclasses may implement this for initialization logic.void
Perform aMessageGroupStore.expireMessageGroups(long)
with the providedexpireTimeout
.protected void
remove
(MessageGroup group) void
setApplicationEventPublisher
(ApplicationEventPublisher applicationEventPublisher) void
setCorrelationStrategy
(CorrelationStrategy correlationStrategy) void
setDiscardChannel
(MessageChannel discardChannel) void
setDiscardChannelName
(String discardChannelName) void
setExpireDuration
(Duration expireDuration) Configure aDuration
how often to clean up old orphaned groups from the store.void
setExpireDurationMillis
(long expireDuration) Configure aDuration
(in millis) how often to clean up old orphaned groups from the store.void
setExpireGroupsUponTimeout
(boolean expireGroupsUponTimeout) Expire (completely remove) a group if it is completed due to timeout.void
setExpireTimeout
(long expireTimeout) Configure a timeout in milliseconds for purging old orphaned groups from the store.void
setForceReleaseAdviceChain
(List<Advice> forceReleaseAdviceChain) void
setGroupConditionSupplier
(BiFunction<Message<?>, String, String> conditionSupplier) Configure aBiFunction
to supply a group condition from a message to be added to the group.void
setGroupTimeoutExpression
(Expression groupTimeoutExpression) void
setLockRegistry
(LockRegistry lockRegistry) final void
setMessageStore
(MessageGroupStore store) void
setMinimumTimeoutForEmptyGroups
(long minimumTimeoutForEmptyGroups) By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.void
setOutputProcessor
(MessageGroupProcessor outputProcessor) Specify aMessageGroupProcessor
for the output function.void
setPopSequence
(boolean popSequence) Perform aBaseMessageBuilder.popSequenceDetails()
for output message or not.void
setReleaseLockBeforeSend
(boolean releaseLockBeforeSend) Set to true to release the message group lock before sending any output.void
setReleasePartialSequences
(boolean releasePartialSequences) SetreleasePartialSequences
on an underlying defaultSequenceSizeReleaseStrategy
.void
setReleaseStrategy
(ReleaseStrategy releaseStrategy) void
setSendPartialResultOnExpiry
(boolean sendPartialResultOnExpiry) void
start()
void
stop()
protected MessageGroup
protected void
verifyResultCollectionConsistsOfMessages
(Collection<?> elements) Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribe, setObservationConvention
Methods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, getIntegrationPatternType, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface reactor.core.CoreSubscriber
currentContext
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs
Methods inherited from interface org.springframework.messaging.MessageHandler
handleMessage
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
AbstractCorrelatingMessageHandler
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) -
AbstractCorrelatingMessageHandler
-
AbstractCorrelatingMessageHandler
-
-
Method Details
-
setLockRegistry
-
setMessageStore
-
setCorrelationStrategy
-
setReleaseStrategy
-
setGroupTimeoutExpression
-
setForceReleaseAdviceChain
-
setOutputProcessor
Specify aMessageGroupProcessor
for the output function.- Parameters:
outputProcessor
- theMessageGroupProcessor
to use- Since:
- 5.0
-
getOutputProcessor
Return a configuredMessageGroupProcessor
.- Returns:
- the configured
MessageGroupProcessor
- Since:
- 5.2
-
setDiscardChannel
-
setDiscardChannelName
-
setSendPartialResultOnExpiry
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) -
setMinimumTimeoutForEmptyGroups
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds.- Parameters:
minimumTimeoutForEmptyGroups
- The minimum timeout.
-
setReleasePartialSequences
public void setReleasePartialSequences(boolean releasePartialSequences) SetreleasePartialSequences
on an underlying defaultSequenceSizeReleaseStrategy
. Ignored for other release strategies.- Parameters:
releasePartialSequences
- true to allow release.
-
setExpireGroupsUponTimeout
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) Expire (completely remove) a group if it is completed due to timeout. Default true- Parameters:
expireGroupsUponTimeout
- the expireGroupsUponTimeout to set- Since:
- 4.1
-
setPopSequence
public void setPopSequence(boolean popSequence) Perform aBaseMessageBuilder.popSequenceDetails()
for output message or not. Default to true. This option removes the sequence information added by the nearest upstream component withapplySequence=true
(for example splitter).- Parameters:
popSequence
- the boolean flag to use.- Since:
- 5.1
-
isReleaseLockBeforeSend
protected boolean isReleaseLockBeforeSend() -
setReleaseLockBeforeSend
public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) Set to true to release the message group lock before sending any output. See "Avoiding Deadlocks" in the Aggregator section of the reference manual for more information as to why this might be needed.- Parameters:
releaseLockBeforeSend
- true to release the lock.- Since:
- 5.1.1
-
setExpireTimeout
public void setExpireTimeout(long expireTimeout) Configure a timeout in milliseconds for purging old orphaned groups from the store. Used on startup and when anexpireDuration
is provided, the task for runningpurgeOrphanedGroups()
is scheduled with that period. TheforceReleaseProcessor
is used to process those expired groups according the "force complete" options. A group can be orphaned if a persistent message group store is used and no new messages arrive for that group after a restart.- Parameters:
expireTimeout
- the number of milliseconds to determine old orphaned groups in the store to purge.- Since:
- 5.4
- See Also:
-
setExpireDurationMillis
public void setExpireDurationMillis(long expireDuration) Configure aDuration
(in millis) how often to clean up old orphaned groups from the store.- Parameters:
expireDuration
- the delay how often to callpurgeOrphanedGroups()
.- Since:
- 5.4
- See Also:
-
setExpireDuration
Configure aDuration
how often to clean up old orphaned groups from the store.- Parameters:
expireDuration
- the delay how often to callpurgeOrphanedGroups()
.- Since:
- 5.4
- See Also:
-
setGroupConditionSupplier
Configure aBiFunction
to supply a group condition from a message to be added to the group. Thenull
result from the function will reset a condition set before.- Parameters:
conditionSupplier
- the function to supply a group condition from a message to be added to the group.- Since:
- 5.5
- See Also:
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisher
in interfaceApplicationEventPublisherAware
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classAbstractMessageProducingHandler
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classMessageHandlerSupport
-
getMessageStore
-
getExpireGroupScheduledFutures
-
getCorrelationStrategy
-
getReleaseStrategy
-
getGroupConditionSupplier
-
getDiscardChannel
Description copied from interface:DiscardingMessageHandler
Return the discard channel.- Specified by:
getDiscardChannel
in interfaceDiscardingMessageHandler
- Returns:
- the channel.
-
getDiscardChannelName
-
isSendPartialResultOnExpiry
protected boolean isSendPartialResultOnExpiry() -
isSequenceAware
protected boolean isSequenceAware() -
getLockRegistry
-
isLockRegistrySet
protected boolean isLockRegistrySet() -
getMinimumTimeoutForEmptyGroups
protected long getMinimumTimeoutForEmptyGroups() -
isReleasePartialSequences
protected boolean isReleasePartialSequences() -
getGroupTimeoutExpression
-
getEvaluationContext
-
handleMessageInternal
- Specified by:
handleMessageInternal
in classAbstractMessageHandler
-
isExpireGroupsUponCompletion
protected boolean isExpireGroupsUponCompletion() -
afterRelease
Allows you to provide additional logic that needs to be performed after the MessageGroup was released.- Parameters:
group
- The group.completedMessages
- The completed messages.
-
afterRelease
protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) Subclasses may override if special action is needed because the group was released or discarded due to a timeout. By default,afterRelease(MessageGroup, Collection)
is invoked.- Parameters:
group
- The group.completedMessages
- The completed messages.timeout
- True if the release/discard was due to a timeout.
-
forceComplete
-
remove
-
findLastReleasedSequenceNumber
protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) -
store
-
expireGroup
-
completeGroup
-
completeGroup
protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group, Lock lock) -
verifyResultCollectionConsistsOfMessages
-
obtainGroupTimeout
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
- Specified by:
destroy
in interfaceIntegrationManagement
- Overrides:
destroy
in classMessageHandlerSupport
-
start
public void start()- Specified by:
start
in interfaceLifecycle
- Specified by:
start
in interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stop
in interfaceLifecycle
- Specified by:
stop
in interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunning
in interfaceLifecycle
- Specified by:
isRunning
in interfaceManageableLifecycle
-
purgeOrphanedGroups
public void purgeOrphanedGroups()Perform aMessageGroupStore.expireMessageGroups(long)
with the providedexpireTimeout
. Can be called externally at any time. Internally it is called from the scheduled task with the configuredexpireDuration
.- Since:
- 5.4
-