Class AbstractCorrelatingMessageHandler

All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>, Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ApplicationEventPublisherAware, Lifecycle, Ordered, ExpressionCapable, Orderable, MessageProducer, DiscardingMessageHandler, HeaderPropagationAware, IntegrationPattern, NamedComponent, IntegrationManagement, ManageableLifecycle, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>
Direct Known Subclasses:
AggregatingMessageHandler, ResequencingMessageHandler

public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler implements DiscardingMessageHandler, ApplicationEventPublisherAware, ManageableLifecycle
Abstract Message handler that holds a buffer of correlated messages in a MessageStore. This class takes care of correlated groups of messages that can be completed in batches. It is useful for custom implementation of MessageHandlers that require correlation and is used as a base class for Aggregator - AggregatingMessageHandler and Resequencer - ResequencingMessageHandler, or custom implementations requiring correlation.

To customize this handler inject CorrelationStrategy, ReleaseStrategy, and MessageGroupProcessor implementations as you require.

By default the CorrelationStrategy will be a HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a SequenceSizeReleaseStrategy.

Use proper CorrelationStrategy for cases when same MessageStore is used for multiple handlers to ensure uniqueness of message groups across handlers.

When the expireTimeout is greater than 0, groups which are older than this timeout are purged from the store on start up (or when purgeOrphanedGroups() is called). If expireDuration is provided, the task is scheduled to perform purgeOrphanedGroups() periodically.

Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell, Artem Bilan, David Liu, Enrique Rodriguez, Meherzad Lahewala, Jayadev Sirimamilla
  • Constructor Details

  • Method Details

    • setLockRegistry

      public void setLockRegistry(LockRegistry lockRegistry)
    • setMessageStore

      public final void setMessageStore(MessageGroupStore store)
    • setCorrelationStrategy

      public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
    • setReleaseStrategy

      public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
    • setGroupTimeoutExpression

      public void setGroupTimeoutExpression(Expression groupTimeoutExpression)
    • setForceReleaseAdviceChain

      public void setForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain)
    • setOutputProcessor

      public void setOutputProcessor(MessageGroupProcessor outputProcessor)
      Specify a MessageGroupProcessor for the output function.
      outputProcessor - the MessageGroupProcessor to use
    • getOutputProcessor

      public MessageGroupProcessor getOutputProcessor()
      Return a configured MessageGroupProcessor.
      the configured MessageGroupProcessor
    • setDiscardChannel

      public void setDiscardChannel(MessageChannel discardChannel)
    • setDiscardChannelName

      public void setDiscardChannelName(String discardChannelName)
    • setSendPartialResultOnExpiry

      public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
    • setMinimumTimeoutForEmptyGroups

      public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
      By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds.
      minimumTimeoutForEmptyGroups - The minimum timeout.
    • setReleasePartialSequences

      public void setReleasePartialSequences(boolean releasePartialSequences)
      Set releasePartialSequences on an underlying default SequenceSizeReleaseStrategy. Ignored for other release strategies.
      releasePartialSequences - true to allow release.
    • setExpireGroupsUponTimeout

      public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)
      Expire (completely remove) a group if it is completed due to timeout. Default true
      expireGroupsUponTimeout - the expireGroupsUponTimeout to set
    • setPopSequence

      public void setPopSequence(boolean popSequence)
      Perform a MessageBuilder.popSequenceDetails() for output message or not. Default to true. This option removes the sequence information added by the nearest upstream component with applySequence=true (for example splitter).
      popSequence - the boolean flag to use.
    • isReleaseLockBeforeSend

      protected boolean isReleaseLockBeforeSend()
    • setReleaseLockBeforeSend

      public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend)
      Set to true to release the message group lock before sending any output. See "Avoiding Deadlocks" in the Aggregator section of the reference manual for more information as to why this might be needed.
      releaseLockBeforeSend - true to release the lock.
    • setExpireTimeout

      public void setExpireTimeout(long expireTimeout)
      Configure a timeout in milliseconds for purging old orphaned groups from the store. Used on startup and when an expireDuration is provided, the task for running purgeOrphanedGroups() is scheduled with that period. The forceReleaseProcessor is used to process those expired groups according the "force complete" options. A group can be orphaned if a persistent message group store is used and no new messages arrive for that group after a restart.
      expireTimeout - the number of milliseconds to determine old orphaned groups in the store to purge.
      See Also:
    • setExpireDurationMillis

      public void setExpireDurationMillis(long expireDuration)
      Configure a Duration (in millis) how often to clean up old orphaned groups from the store.
      expireDuration - the delay how often to call purgeOrphanedGroups().
      See Also:
    • setExpireDuration

      public void setExpireDuration(@Nullable Duration expireDuration)
      Configure a Duration how often to clean up old orphaned groups from the store.
      expireDuration - the delay how often to call purgeOrphanedGroups().
      See Also:
    • setGroupConditionSupplier

      public void setGroupConditionSupplier(BiFunction<Message<?>,String,String> conditionSupplier)
      Configure a BiFunction to supply a group condition from a message to be added to the group. The null result from the function will reset a condition set before.
      conditionSupplier - the function to supply a group condition from a message to be added to the group.
      See Also:
    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface ApplicationEventPublisherAware
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      onInit in class AbstractMessageProducingHandler
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      getComponentType in class MessageHandlerSupport
    • getMessageStore

      public MessageGroupStore getMessageStore()
    • getExpireGroupScheduledFutures

      protected Map<UUID,ScheduledFuture<?>> getExpireGroupScheduledFutures()
    • getCorrelationStrategy

      protected CorrelationStrategy getCorrelationStrategy()
    • getReleaseStrategy

      protected ReleaseStrategy getReleaseStrategy()
    • getGroupConditionSupplier

      @Nullable protected BiFunction<Message<?>,String,String> getGroupConditionSupplier()
    • getDiscardChannel

      public MessageChannel getDiscardChannel()
      Description copied from interface: DiscardingMessageHandler
      Return the discard channel.
      Specified by:
      getDiscardChannel in interface DiscardingMessageHandler
      the channel.
    • getDiscardChannelName

      protected String getDiscardChannelName()
    • isSendPartialResultOnExpiry

      protected boolean isSendPartialResultOnExpiry()
    • isSequenceAware

      protected boolean isSequenceAware()
    • getLockRegistry

      protected LockRegistry getLockRegistry()
    • isLockRegistrySet

      protected boolean isLockRegistrySet()
    • getMinimumTimeoutForEmptyGroups

      protected long getMinimumTimeoutForEmptyGroups()
    • isReleasePartialSequences

      protected boolean isReleasePartialSequences()
    • getGroupTimeoutExpression

      protected Expression getGroupTimeoutExpression()
    • getEvaluationContext

      protected EvaluationContext getEvaluationContext()
    • handleMessageInternal

      protected void handleMessageInternal(Message<?> message)
      Specified by:
      handleMessageInternal in class AbstractMessageHandler
    • isExpireGroupsUponCompletion

      protected boolean isExpireGroupsUponCompletion()
    • afterRelease

      protected abstract void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages)
      Allows you to provide additional logic that needs to be performed after the MessageGroup was released.
      group - The group.
      completedMessages - The completed messages.
    • afterRelease

      protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout)
      Subclasses may override if special action is needed because the group was released or discarded due to a timeout. By default, afterRelease(MessageGroup, Collection) is invoked.
      group - The group.
      completedMessages - The completed messages.
      timeout - True if the release/discard was due to a timeout.
    • forceComplete

      protected void forceComplete(MessageGroup group)
    • remove

      protected void remove(MessageGroup group)
    • findLastReleasedSequenceNumber

      protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence)
    • store

      protected MessageGroup store(Object correlationKey, Message<?> message)
    • expireGroup

      protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock)
    • completeGroup

      protected void completeGroup(Object correlationKey, MessageGroup group, Lock lock)
    • completeGroup

      protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group, Lock lock)
    • verifyResultCollectionConsistsOfMessages

      protected void verifyResultCollectionConsistsOfMessages(Collection<?> elements)
    • obtainGroupTimeout

      protected Object obtainGroupTimeout(MessageGroup group)
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface IntegrationManagement
      destroy in class MessageHandlerSupport
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • purgeOrphanedGroups

      public void purgeOrphanedGroups()
      Perform a MessageGroupStore.expireMessageGroups(long) with the provided expireTimeout. Can be called externally at any time. Internally it is called from the scheduled task with the configured expireDuration.