public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler implements DiscardingMessageHandler, org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationEventPublisherAware, org.springframework.context.Lifecycle
MessageStore
. This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of
MessageHandlers that require correlation and is used as a base class for Aggregator -
AggregatingMessageHandler
and Resequencer - ResequencingMessageHandler
,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy
,
ReleaseStrategy
, and MessageGroupProcessor
implementations as
you require.
By default the CorrelationStrategy
will be a
HeaderAttributeCorrelationStrategy
and the ReleaseStrategy
will be a
SequenceSizeReleaseStrategy
.
Use proper CorrelationStrategy
for cases when same MessageStore
is used
for multiple handlers to ensure uniqueness of message groups across handlers.
Modifier and Type | Class and Description |
---|---|
protected static class |
AbstractCorrelatingMessageHandler.SequenceAwareMessageGroup |
IntegrationManagement.ManagementOverrides
Modifier and Type | Field and Description |
---|---|
protected org.apache.commons.logging.Log |
logger |
messagingTemplate
EXPRESSION_PARSER
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) |
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store) |
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store,
CorrelationStrategy correlationStrategy,
ReleaseStrategy releaseStrategy) |
Modifier and Type | Method and Description |
---|---|
protected abstract void |
afterRelease(MessageGroup group,
java.util.Collection<org.springframework.messaging.Message<?>> completedMessages)
Allows you to provide additional logic that needs to be performed after the MessageGroup was released.
|
protected void |
afterRelease(MessageGroup group,
java.util.Collection<org.springframework.messaging.Message<?>> completedMessages,
boolean timeout)
Subclasses may override if special action is needed because the group was released or discarded
due to a timeout.
|
protected java.util.Collection<org.springframework.messaging.Message<?>> |
completeGroup(org.springframework.messaging.Message<?> message,
java.lang.Object correlationKey,
MessageGroup group) |
protected void |
completeGroup(java.lang.Object correlationKey,
MessageGroup group) |
void |
destroy() |
protected void |
expireGroup(java.lang.Object correlationKey,
MessageGroup group) |
protected int |
findLastReleasedSequenceNumber(java.lang.Object groupId,
java.util.Collection<org.springframework.messaging.Message<?>> partialSequence) |
protected void |
forceComplete(MessageGroup group) |
java.lang.String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected CorrelationStrategy |
getCorrelationStrategy() |
org.springframework.messaging.MessageChannel |
getDiscardChannel()
Return the discard channel.
|
protected java.lang.String |
getDiscardChannelName() |
protected org.springframework.expression.EvaluationContext |
getEvaluationContext() |
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> |
getExpireGroupScheduledFutures() |
protected org.springframework.expression.Expression |
getGroupTimeoutExpression() |
protected LockRegistry |
getLockRegistry() |
MessageGroupStore |
getMessageStore() |
protected long |
getMinimumTimeoutForEmptyGroups() |
protected MessageGroupProcessor |
getOutputProcessor() |
protected ReleaseStrategy |
getReleaseStrategy() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
protected boolean |
isExpireGroupsUponCompletion() |
protected boolean |
isLockRegistrySet() |
protected boolean |
isReleasePartialSequences() |
boolean |
isRunning() |
protected boolean |
isSendPartialResultOnExpiry() |
protected boolean |
isSequenceAware() |
protected java.lang.Long |
obtainGroupTimeout(MessageGroup group) |
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
protected void |
remove(MessageGroup group) |
void |
setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) |
void |
setCorrelationStrategy(CorrelationStrategy correlationStrategy) |
void |
setDiscardChannel(org.springframework.messaging.MessageChannel discardChannel) |
void |
setDiscardChannelName(java.lang.String discardChannelName) |
void |
setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)
Expire (completely remove) a group if it is completed due to timeout.
|
void |
setForceReleaseAdviceChain(java.util.List<org.aopalliance.aop.Advice> forceReleaseAdviceChain) |
void |
setGroupTimeoutExpression(org.springframework.expression.Expression groupTimeoutExpression) |
void |
setLockRegistry(LockRegistry lockRegistry) |
void |
setMessageStore(MessageGroupStore store) |
void |
setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
By default, when a MessageGroupStoreReaper is configured to expire partial
groups, empty groups are also removed.
|
void |
setOutputProcessor(MessageGroupProcessor outputProcessor)
Specify a
MessageGroupProcessor for the output function. |
void |
setReleasePartialSequences(boolean releasePartialSequences)
Set
releasePartialSequences on an underlying default
SequenceSizeReleaseStrategy . |
void |
setReleaseStrategy(ReleaseStrategy releaseStrategy) |
void |
setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) |
void |
setTaskScheduler(org.springframework.scheduling.TaskScheduler taskScheduler) |
void |
start() |
void |
stop() |
protected MessageGroup |
store(java.lang.Object correlationKey,
org.springframework.messaging.Message<?> message) |
protected void |
verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements) |
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
configureMetrics, getActiveCount, getActiveCountLong, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onComplete, onError, onNext, onSubscribe, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getComponentName
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy)
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store)
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)
public void setLockRegistry(LockRegistry lockRegistry)
public final void setMessageStore(MessageGroupStore store)
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
public void setGroupTimeoutExpression(org.springframework.expression.Expression groupTimeoutExpression)
public void setForceReleaseAdviceChain(java.util.List<org.aopalliance.aop.Advice> forceReleaseAdviceChain)
public void setOutputProcessor(MessageGroupProcessor outputProcessor)
MessageGroupProcessor
for the output function.outputProcessor
- the MessageGroupProcessor
to usepublic void setTaskScheduler(org.springframework.scheduling.TaskScheduler taskScheduler)
setTaskScheduler
in class IntegrationObjectSupport
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher
in interface org.springframework.context.ApplicationEventPublisherAware
protected void onInit() throws java.lang.Exception
IntegrationObjectSupport
onInit
in class AbstractMessageProducingHandler
java.lang.Exception
- Any exception.public void setDiscardChannel(org.springframework.messaging.MessageChannel discardChannel)
public void setDiscardChannelName(java.lang.String discardChannelName)
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
minimumTimeoutForEmptyGroups
- The minimum timeout.public void setReleasePartialSequences(boolean releasePartialSequences)
releasePartialSequences
on an underlying default
SequenceSizeReleaseStrategy
. Ignored for other release strategies.releasePartialSequences
- true to allow release.public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)
expireGroupsUponTimeout
- the expireGroupsUponTimeout to setpublic java.lang.String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class AbstractMessageHandler
public MessageGroupStore getMessageStore()
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> getExpireGroupScheduledFutures()
protected MessageGroupProcessor getOutputProcessor()
protected CorrelationStrategy getCorrelationStrategy()
protected ReleaseStrategy getReleaseStrategy()
public org.springframework.messaging.MessageChannel getDiscardChannel()
DiscardingMessageHandler
getDiscardChannel
in interface DiscardingMessageHandler
protected java.lang.String getDiscardChannelName()
protected boolean isSendPartialResultOnExpiry()
protected boolean isSequenceAware()
protected LockRegistry getLockRegistry()
protected boolean isLockRegistrySet()
protected long getMinimumTimeoutForEmptyGroups()
protected boolean isReleasePartialSequences()
protected org.springframework.expression.Expression getGroupTimeoutExpression()
protected org.springframework.expression.EvaluationContext getEvaluationContext()
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws java.lang.Exception
handleMessageInternal
in class AbstractMessageHandler
java.lang.Exception
protected boolean isExpireGroupsUponCompletion()
protected abstract void afterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages)
group
- The group.completedMessages
- The completed messages.protected void afterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages, boolean timeout)
afterRelease(MessageGroup, Collection)
is invoked.group
- The group.completedMessages
- The completed messages.timeout
- True if the release/discard was due to a timeout.protected void forceComplete(MessageGroup group)
protected void remove(MessageGroup group)
protected int findLastReleasedSequenceNumber(java.lang.Object groupId, java.util.Collection<org.springframework.messaging.Message<?>> partialSequence)
protected MessageGroup store(java.lang.Object correlationKey, org.springframework.messaging.Message<?> message)
protected void expireGroup(java.lang.Object correlationKey, MessageGroup group)
protected void completeGroup(java.lang.Object correlationKey, MessageGroup group)
protected java.util.Collection<org.springframework.messaging.Message<?>> completeGroup(org.springframework.messaging.Message<?> message, java.lang.Object correlationKey, MessageGroup group)
protected void verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements)
protected java.lang.Long obtainGroupTimeout(MessageGroup group)
public void destroy() throws java.lang.Exception
destroy
in interface org.springframework.beans.factory.DisposableBean
java.lang.Exception
public void start()
start
in interface org.springframework.context.Lifecycle
public void stop()
stop
in interface org.springframework.context.Lifecycle
public boolean isRunning()
isRunning
in interface org.springframework.context.Lifecycle