org.springframework.integration.aggregator
Class AbstractCorrelatingMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler
- All Implemented Interfaces:
- org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.InitializingBean, org.springframework.core.Ordered, NamedComponent, Orderable, MessageHandler, MessageProducer, TrackableComponent
- Direct Known Subclasses:
- AggregatingMessageHandler, ResequencingMessageHandler
public abstract class AbstractCorrelatingMessageHandler
- extends AbstractMessageHandler
- implements MessageProducer
Abstract Message handler that holds a buffer of correlated messages in a
MessageStore
. This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of MessageHandlers that require correlation
and is used as a base class for Aggregator - AggregatingMessageHandler
and
Resequencer - ResequencingMessageHandler
,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy
,
ReleaseStrategy
, and MessageGroupProcessor
implementations as
you require.
By default the CorrelationStrategy
will be a
HeaderAttributeCorrelationStrategy
and the ReleaseStrategy
will be a
SequenceSizeReleaseStrategy
.
- Since:
- 2.0
- Author:
- Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell
Fields inherited from interface org.springframework.core.Ordered |
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE |
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport |
afterPropertiesSet, getBeanFactory, getComponentName, getConversionService, getTaskScheduler, setBeanFactory, setBeanName, setComponentName, setConversionService, setTaskScheduler, toString |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
DEFAULT_SEND_TIMEOUT
public static final long DEFAULT_SEND_TIMEOUT
- See Also:
- Constant Field Values
messageStore
protected volatile MessageGroupStore messageStore
AbstractCorrelatingMessageHandler
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store,
CorrelationStrategy correlationStrategy,
ReleaseStrategy releaseStrategy)
AbstractCorrelatingMessageHandler
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store)
AbstractCorrelatingMessageHandler
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)
setLockRegistry
public void setLockRegistry(LockRegistry lockRegistry)
setMessageStore
public void setMessageStore(MessageGroupStore store)
setCorrelationStrategy
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
setReleaseStrategy
public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
setOutputChannel
public void setOutputChannel(MessageChannel outputChannel)
- Description copied from interface:
MessageProducer
- Specify the MessageChannel to which produced Messages should be sent.
- Specified by:
setOutputChannel
in interface MessageProducer
onInit
protected void onInit()
throws java.lang.Exception
- Description copied from class:
IntegrationObjectSupport
- Subclasses may implement this for initialization logic.
- Overrides:
onInit
in class IntegrationObjectSupport
- Throws:
java.lang.Exception
setDiscardChannel
public void setDiscardChannel(MessageChannel discardChannel)
setSendTimeout
public void setSendTimeout(long sendTimeout)
setSendPartialResultOnExpiry
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
setMinimumTimeoutForEmptyGroups
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
- By default, when a MessageGroupStoreReaper is configured to expire partial
groups, empty groups are also removed. Empty groups exist after a group
is released normally. This is to enable the detection and discarding of
late-arriving messages. If you wish to run empty group deletion on a longer
schedule than expiring partial groups, set this property. Empty groups will
then not be removed from the MessageStore until they have not been modified
for at least this number of milliseconds.
- Parameters:
minimumTimeoutForEmptyGroups
- The minimum timeout.
setReleasePartialSequences
public void setReleasePartialSequences(boolean releasePartialSequences)
getComponentType
public java.lang.String getComponentType()
- Description copied from class:
IntegrationObjectSupport
- Subclasses may implement this method to provide component type information.
- Specified by:
getComponentType
in interface NamedComponent
- Overrides:
getComponentType
in class AbstractMessageHandler
getMessageStore
protected MessageGroupStore getMessageStore()
handleMessageInternal
protected void handleMessageInternal(Message<?> message)
throws java.lang.Exception
- Specified by:
handleMessageInternal
in class AbstractMessageHandler
- Throws:
java.lang.Exception
afterRelease
protected abstract void afterRelease(MessageGroup group,
java.util.Collection<Message<?>> completedMessages)
- Allows you to provide additional logic that needs to be performed after the MessageGroup was released.
- Parameters:
group
- completedMessages
-
findLastReleasedSequenceNumber
protected int findLastReleasedSequenceNumber(java.lang.Object groupId,
java.util.Collection<Message<?>> partialSequence)