Class BarrierMessageHandler

All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>, Aware, BeanClassLoaderAware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Ordered, ComponentSourceAware, ExpressionCapable, Orderable, MessageProducer, DiscardingMessageHandler, HeaderPropagationAware, MessageTriggerAction, IntegrationPattern, NamedComponent, IntegrationManagement, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>

public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler implements MessageTriggerAction, DiscardingMessageHandler
A message handler that suspends the thread until a message with corresponding correlation is passed into the trigger method or the timeout occurs. Only one thread with a particular correlation (result of invoking the CorrelationStrategy) can be suspended at a time. If the inbound thread does not arrive before the trigger thread, the latter is suspended until it does, or the timeout occurs. Separate timeouts may be configured for request and trigger messages.

The default CorrelationStrategy is a HeaderAttributeCorrelationStrategy.

The default output processor is a DefaultAggregatingMessageGroupProcessor.

Since:
4.2
Author:
Gary Russell, Artem Bilan, Michel Jung
  • Constructor Details

    • BarrierMessageHandler

      public BarrierMessageHandler(long timeout)
      Construct an instance with the provided timeout and default correlation and output strategies.
      Parameters:
      timeout - the timeout in milliseconds for both, request and trigger messages.
    • BarrierMessageHandler

      public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor)
      Construct an instance with the provided timeout and output processor, and default correlation strategy.
      Parameters:
      timeout - the timeout in milliseconds for both, request and trigger messages.
      outputProcessor - the output MessageGroupProcessor.
    • BarrierMessageHandler

      public BarrierMessageHandler(long timeout, CorrelationStrategy correlationStrategy)
      Construct an instance with the provided timeout and correlation strategy, and default output processor.
      Parameters:
      timeout - the timeout in milliseconds for both, request and trigger messages.
      correlationStrategy - the correlation strategy.
    • BarrierMessageHandler

      public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor, CorrelationStrategy correlationStrategy)
      Construct an instance with the provided timeout and output processor, and default correlation strategy.
      Parameters:
      timeout - the timeout in milliseconds for both, request and trigger messages.
      outputProcessor - the output MessageGroupProcessor.
      correlationStrategy - the correlation strategy.
    • BarrierMessageHandler

      public BarrierMessageHandler(long requestTimeout, long triggerTimeout)
      Construct an instance with the provided timeouts and default correlation and output strategies.
      Parameters:
      requestTimeout - the timeout in milliseconds when waiting for trigger message.
      triggerTimeout - the timeout in milliseconds when waiting for a request message.
      Since:
      5.4
    • BarrierMessageHandler

      public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor)
      Construct an instance with the provided timeout and output processor, and default correlation strategy.
      Parameters:
      requestTimeout - the timeout in milliseconds when waiting for trigger message.
      triggerTimeout - the timeout in milliseconds when waiting for a request message.
      outputProcessor - the output MessageGroupProcessor.
      Since:
      5.4
    • BarrierMessageHandler

      public BarrierMessageHandler(long requestTimeout, long triggerTimeout, CorrelationStrategy correlationStrategy)
      Construct an instance with the provided timeout and correlation strategy, and default output processor.
      Parameters:
      requestTimeout - the timeout in milliseconds when waiting for trigger message.
      triggerTimeout - the timeout in milliseconds when waiting for a request message.
      correlationStrategy - the correlation strategy.
      Since:
      5.4
    • BarrierMessageHandler

      public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor, CorrelationStrategy correlationStrategy)
      Construct an instance with the provided timeout and output processor, and default correlation strategy.
      Parameters:
      requestTimeout - the timeout in milliseconds when waiting for trigger message.
      triggerTimeout - the timeout in milliseconds when waiting for a request message.
      outputProcessor - the output MessageGroupProcessor.
      correlationStrategy - the correlation strategy.
      Since:
      5.4
  • Method Details

    • setDiscardChannelName

      public void setDiscardChannelName(String discardChannelName)
      Set the name of the channel to which late arriving trigger messages are sent, or request message does not arrive in time.
      Parameters:
      discardChannelName - the discard channel.
      Since:
      5.0
    • setDiscardChannel

      public void setDiscardChannel(MessageChannel discardChannel)
      Set the channel to which late arriving trigger messages are sent, or request message does not arrive in time.
      Parameters:
      discardChannel - the discard channel.
      Since:
      5.0
    • getDiscardChannel

      @Nullable public MessageChannel getDiscardChannel()
      Return the discard message channel for trigger action message.
      Specified by:
      getDiscardChannel in interface DiscardingMessageHandler
      Returns:
      a discard message channel.
      Since:
      5.0
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessageHandlerSupport
    • getIntegrationPatternType

      public IntegrationPatternType getIntegrationPatternType()
      Description copied from interface: IntegrationPattern
      Return a pattern type this component implements.
      Specified by:
      getIntegrationPatternType in interface IntegrationPattern
      Overrides:
      getIntegrationPatternType in class AbstractReplyProducingMessageHandler
      Returns:
      the IntegrationPatternType this component implements.
    • handleRequestMessage

      protected Object handleRequestMessage(Message<?> requestMessage)
      Description copied from class: AbstractReplyProducingMessageHandler
      Subclasses must implement this method to handle the request Message. The return value may be a Message, a MessageBuilder, or any plain Object. The base class will handle the final creation of a reply Message from any of those starting points. If the return value is null, the Message flow will end here.
      Specified by:
      handleRequestMessage in class AbstractReplyProducingMessageHandler
      Parameters:
      requestMessage - The request message.
      Returns:
      The result of handling the message, or null.
    • buildResult

      protected Object buildResult(Object key, Message<?> requestMessage, Message<?> releaseMessage)
      Override to change the default mechanism by which the inbound and release messages are returned as a result.
      Parameters:
      key - The correlation key.
      requestMessage - the inbound message.
      releaseMessage - the release message.
      Returns:
      the result.
    • trigger

      public void trigger(Message<?> message)
      Description copied from interface: MessageTriggerAction
      Take some action based on the message.
      Specified by:
      trigger in interface MessageTriggerAction
      Parameters:
      message - the message.