Spring Integration

org.springframework.integration.aggregator
Class AbstractCorrelatingMessageHandler

java.lang.Object
  extended by org.springframework.integration.context.IntegrationObjectSupport
      extended by org.springframework.integration.handler.AbstractMessageHandler
          extended by org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler
All Implemented Interfaces:
org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.InitializingBean, org.springframework.core.Ordered, NamedComponent, Orderable, MessageHandler, MessageProducer, TrackableComponent
Direct Known Subclasses:
AggregatingMessageHandler, ResequencingMessageHandler

public abstract class AbstractCorrelatingMessageHandler
extends AbstractMessageHandler
implements MessageProducer

Abstract Message handler that holds a buffer of correlated messages in a MessageStore. This class takes care of correlated groups of messages that can be completed in batches. It is useful for custom implementation of MessageHandlers that require correlation and is used as a base class for Aggregator - AggregatingMessageHandler and Resequencer - ResequencingMessageHandler, or custom implementations requiring correlation.

To customize this handler inject CorrelationStrategy, ReleaseStrategy, and MessageGroupProcessor implementations as you require.

By default the CorrelationStrategy will be a HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a SequenceSizeReleaseStrategy.

Since:
2.0
Author:
Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell

Field Summary
static long DEFAULT_SEND_TIMEOUT
           
protected  MessageGroupStore messageStore
           
 
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
 
Constructor Summary
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)
           
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store)
           
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy)
           
 
Method Summary
protected abstract  void afterRelease(MessageGroup group, java.util.Collection<Message<?>> completedMessages)
          Allows you to provide additional logic that needs to be performed after the MessageGroup was released.
protected  int findLastReleasedSequenceNumber(java.lang.Object groupId, java.util.Collection<Message<?>> partialSequence)
           
 java.lang.String getComponentType()
          Subclasses may implement this method to provide component type information.
protected  MessageGroupStore getMessageStore()
           
protected  void handleMessageInternal(Message<?> message)
           
protected  void onInit()
          Subclasses may implement this for initialization logic.
 void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
           
 void setDiscardChannel(MessageChannel discardChannel)
           
 void setLockRegistry(LockRegistry lockRegistry)
           
 void setMessageStore(MessageGroupStore store)
           
 void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
          By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.
 void setOutputChannel(MessageChannel outputChannel)
          Specify the MessageChannel to which produced Messages should be sent.
 void setReleasePartialSequences(boolean releasePartialSequences)
           
 void setReleaseStrategy(ReleaseStrategy releaseStrategy)
           
 void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
           
 void setSendTimeout(long sendTimeout)
           
 
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
getOrder, handleMessage, setOrder, setShouldTrack
 
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, getBeanFactory, getComponentName, getConversionService, getTaskScheduler, setBeanFactory, setBeanName, setComponentName, setConversionService, setTaskScheduler, toString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.springframework.integration.context.NamedComponent
getComponentName
 

Field Detail

DEFAULT_SEND_TIMEOUT

public static final long DEFAULT_SEND_TIMEOUT
See Also:
Constant Field Values

messageStore

protected volatile MessageGroupStore messageStore
Constructor Detail

AbstractCorrelatingMessageHandler

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
                                         MessageGroupStore store,
                                         CorrelationStrategy correlationStrategy,
                                         ReleaseStrategy releaseStrategy)

AbstractCorrelatingMessageHandler

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
                                         MessageGroupStore store)

AbstractCorrelatingMessageHandler

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)
Method Detail

setLockRegistry

public void setLockRegistry(LockRegistry lockRegistry)

setMessageStore

public void setMessageStore(MessageGroupStore store)

setCorrelationStrategy

public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)

setReleaseStrategy

public void setReleaseStrategy(ReleaseStrategy releaseStrategy)

setOutputChannel

public void setOutputChannel(MessageChannel outputChannel)
Description copied from interface: MessageProducer
Specify the MessageChannel to which produced Messages should be sent.

Specified by:
setOutputChannel in interface MessageProducer

onInit

protected void onInit()
               throws java.lang.Exception
Description copied from class: IntegrationObjectSupport
Subclasses may implement this for initialization logic.

Overrides:
onInit in class IntegrationObjectSupport
Throws:
java.lang.Exception

setDiscardChannel

public void setDiscardChannel(MessageChannel discardChannel)

setSendTimeout

public void setSendTimeout(long sendTimeout)

setSendPartialResultOnExpiry

public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)

setMinimumTimeoutForEmptyGroups

public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to run empty group deletion on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds.

Parameters:
minimumTimeoutForEmptyGroups - The minimum timeout.

setReleasePartialSequences

public void setReleasePartialSequences(boolean releasePartialSequences)

getComponentType

public java.lang.String getComponentType()
Description copied from class: IntegrationObjectSupport
Subclasses may implement this method to provide component type information.

Specified by:
getComponentType in interface NamedComponent
Overrides:
getComponentType in class AbstractMessageHandler

getMessageStore

protected MessageGroupStore getMessageStore()

handleMessageInternal

protected void handleMessageInternal(Message<?> message)
                              throws java.lang.Exception
Specified by:
handleMessageInternal in class AbstractMessageHandler
Throws:
java.lang.Exception

afterRelease

protected abstract void afterRelease(MessageGroup group,
                                     java.util.Collection<Message<?>> completedMessages)
Allows you to provide additional logic that needs to be performed after the MessageGroup was released.

Parameters:
group -
completedMessages -

findLastReleasedSequenceNumber

protected int findLastReleasedSequenceNumber(java.lang.Object groupId,
                                             java.util.Collection<Message<?>> partialSequence)

Spring Integration