Class CorrelatingMessageBarrier

All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>, Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Ordered, ComponentSourceAware, ExpressionCapable, Orderable, MessageSource<Object>, IntegrationPattern, NamedComponent, IntegrationManagement, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>

public class CorrelatingMessageBarrier extends AbstractMessageHandler implements MessageSource<Object>
This Endpoint serves as a barrier for messages that should not be processed yet. The decision when a message can be processed is delegated to a ReleaseStrategy. When a message can be processed it is up to the client to take care of the locking (potentially from the ReleaseStrategy's canRelease(..) method).

This class differs from AbstractCorrelatingMessageHandler in that it completely decouples the receiver and the sender. It can be applied in scenarios where completion of a message group is not well defined but only a certain amount of messages for any given correlation key may be processed at a time.

The messages will be stored in a MessageStore for each correlation key.

Author:
Iwein Fuld, Oleg Zhurakousky, Gary Russell, Artem Bilan, Trung Pham
See Also:
  • Constructor Details

    • CorrelatingMessageBarrier

      public CorrelatingMessageBarrier()
    • CorrelatingMessageBarrier

      public CorrelatingMessageBarrier(MessageGroupStore store)
  • Method Details

    • setCorrelationStrategy

      public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
      Set the CorrelationStrategy to be used to determine the correlation key for incoming messages.
      Parameters:
      correlationStrategy - The correlation strategy.
    • setReleaseStrategy

      public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
      Set the ReleaseStrategy that should be used when deciding if a group in this barrier may be released.
      Parameters:
      releaseStrategy - The release strategy.
    • handleMessageInternal

      protected void handleMessageInternal(Message<?> message)
      Specified by:
      handleMessageInternal in class AbstractMessageHandler
    • receive

      public Message<Object> receive()
      Description copied from interface: MessageSource
      Retrieve the next available message from this source. Returns null if no message is available.
      Specified by:
      receive in interface MessageSource<Object>
      Returns:
      The message or null.