Class DelayHandler

All Implemented Interfaces:
EventListener, org.reactivestreams.Subscriber<Message<?>>, Aware, BeanClassLoaderAware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, Ordered, ExpressionCapable, Orderable, MessageProducer, DelayHandlerManagement, HeaderPropagationAware, IntegrationPattern, NamedComponent, IntegrationManagement, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>

A MessageHandler that is capable of delaying the continuation of a Message flow based on the result of evaluation delayExpression on an inbound Message or a default delay value configured on this handler. Note that the continuation of the flow is delegated to a TaskScheduler, and therefore, the calling thread does not block. The advantage of this approach is that many delays can be managed concurrently, even very long delays, without producing a buildup of blocked Threads.

One thing to keep in mind, however, is that any active transactional context will not propagate from the original sender to the eventual recipient. This is a side effect of passing the Message to the output channel after the delay with a different Thread in control.

When this handler's delayExpression property is configured, that evaluation result value will take precedence over the handler's defaultDelay value. The actual evaluation result value may be a long, a String that can be parsed as a long, or a Date. If it is a long, it will be interpreted as the length of time to delay in milliseconds counting from the current time (e.g. a value of 5000 indicates that the Message can be released as soon as five seconds from the current time). If the value is a Date, it will be delayed at least until that Date occurs (i.e. the delay in that case is equivalent to headerDate.getTime() - new Date().getTime()).

Delayed messages are stored in the MessageGroupStore as a dedicated group. If an external persistent store is provided, those delayed messages will be rescheduled after application startup. The messageGroupId is required option and must be unique for each delayer configuration to avoid work-stealing from the store and unexpected releases. Different instances of the same delayer can point to the same message group in the store. The messageGroupId cannot rely on a bean name which might be generated. After application restart the bean may get a different generated name and its delayed messages might be lost from reschedule since its group is not managed by the application anymore.

Since:
1.0.3
Author:
Mark Fisher, Artem Bilan, Gary Russell, Christian Tzolov
  • Field Details

  • Constructor Details

    • DelayHandler

      public DelayHandler()
      Construct an instance with default options. The messageGroupId must then be provided via the setter.
      Since:
      6.2
    • DelayHandler

      public DelayHandler(String messageGroupId)
      Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for MessageGroup to store delayed Messages in the MessageGroupStore. The sending of Messages after the delay will be handled by registered in the ApplicationContext default ThreadPoolTaskScheduler.
      Parameters:
      messageGroupId - The message group identifier.
      See Also:
    • DelayHandler

      public DelayHandler(String messageGroupId, TaskScheduler taskScheduler)
      Create a DelayHandler with the given default delay. The sending of Messages after the delay will be handled by the provided TaskScheduler.
      Parameters:
      messageGroupId - The message group identifier.
      taskScheduler - A task scheduler.
  • Method Details

    • setMessageGroupId

      public void setMessageGroupId(String messageGroupId)
      Set a group id to manage delayed messages by this handler. Required.
      Parameters:
      messageGroupId - the group id for delayed messages.
      Since:
      6.2
    • setDefaultDelay

      public void setDefaultDelay(long defaultDelay)
      Set the default delay in milliseconds. If no delayExpression property has been provided, the default delay will be applied to all Messages. If a delay should only be applied to Messages with evaluation result from delayExpression, then set this value to 0.
      Parameters:
      defaultDelay - The default delay in milliseconds.
    • setDelayExpression

      public void setDelayExpression(Expression delayExpression)
      Specify the Expression that should be checked for a delay period (in milliseconds) or a Date to delay until. If this property is set, the result of the expression evaluation (if not null) will take precedence over this handler's default delay.
      Parameters:
      delayExpression - The delay expression.
    • setDelayExpressionString

      public void setDelayExpressionString(String delayExpression)
      Specify the Expression that should be checked for a delay period (in milliseconds) or a Date to delay until. If this property is set, the result of the expression evaluation (if not null) will take precedence over this handler's default delay.
      Parameters:
      delayExpression - The delay expression.
      Since:
      5.0
    • setIgnoreExpressionFailures

      public void setIgnoreExpressionFailures(boolean ignoreExpressionFailures)
      Specify whether Exceptions thrown by delayExpression evaluation should be ignored (only logged). In this case the delayer will fall back to the defaultDelay. If this property is specified as false, any delayExpression evaluation Exception will be thrown to the caller without falling back to the defaultDelay. Default is true.
      Parameters:
      ignoreExpressionFailures - true if expression evaluation failures should be ignored.
      See Also:
      • determineDelayForMessage(org.springframework.messaging.Message<?>)
    • setMessageStore

      public void setMessageStore(MessageGroupStore messageStore)
      Specify the MessageGroupStore that should be used to store Messages while awaiting the delay.
      Parameters:
      messageStore - The message store.
    • setDelayedAdviceChain

      public void setDelayedAdviceChain(List<Advice> delayedAdviceChain)
      Specify the List<Advice> to advise DelayHandler.ReleaseMessageHandler proxy. Usually used to add transactions to delayed messages retrieved from a transactional message store.
      Parameters:
      delayedAdviceChain - The advice chain.
      See Also:
      • createReleaseMessageTask()
    • setDelayedMessageErrorChannel

      public void setDelayedMessageErrorChannel(MessageChannel delayedMessageErrorChannel)
      Set a message channel to which an ErrorMessage will be sent if sending the released message fails. If the error flow returns normally, the release is complete. If the error flow throws an exception, the release will be re-attempted. If there is a transaction advice on the release task, the error flow is called within the transaction.
      Parameters:
      delayedMessageErrorChannel - the channel.
      Since:
      5.0.8
      See Also:
    • setDelayedMessageErrorChannelName

      public void setDelayedMessageErrorChannelName(String delayedMessageErrorChannelName)
      Set a message channel name to which an ErrorMessage will be sent if sending the released message fails. If the error flow returns normally, the release is complete. If the error flow throws an exception, the release will be re-attempted. If there is a transaction advice on the release task, the error flow is called within the transaction.
      Parameters:
      delayedMessageErrorChannelName - the channel name.
      Since:
      5.0.8
      See Also:
    • setMaxAttempts

      public void setMaxAttempts(int maxAttempts)
      Set the maximum number of release attempts for when message release fails. Default 5.
      Parameters:
      maxAttempts - the max attempts.
      Since:
      5.0.8
      See Also:
    • setRetryDelay

      public void setRetryDelay(long retryDelay)
      Set an additional delay to apply when retrying after a release failure. Default 1000L.
      Parameters:
      retryDelay - the retry delay.
      Since:
      5.0.8
      See Also:
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessageHandlerSupport
    • getIntegrationPatternType

      public IntegrationPatternType getIntegrationPatternType()
      Description copied from interface: IntegrationPattern
      Return a pattern type this component implements.
      Specified by:
      getIntegrationPatternType in interface IntegrationPattern
      Overrides:
      getIntegrationPatternType in class AbstractReplyProducingMessageHandler
      Returns:
      the IntegrationPatternType this component implements.
    • doInit

      protected void doInit()
      Overrides:
      doInit in class AbstractReplyProducingMessageHandler
    • shouldCopyRequestHeaders

      protected boolean shouldCopyRequestHeaders()
      Description copied from class: AbstractMessageProducingHandler
      Subclasses may override this. True by default.
      Overrides:
      shouldCopyRequestHeaders in class AbstractMessageProducingHandler
      Returns:
      true if the request headers should be copied.
    • handleRequestMessage

      protected Object handleRequestMessage(Message<?> requestMessage)
      Check if 'requestMessage' wasn't delayed before (releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long) and DelayHandler.DelayedMessageWrapper). Than determine 'delay' for 'requestMessage' (determineDelayForMessage(org.springframework.messaging.Message<?>)) and if delay > 0 schedules 'releaseMessage' task after 'delay'.
      Specified by:
      handleRequestMessage in class AbstractReplyProducingMessageHandler
      Parameters:
      requestMessage - - the Message which may be delayed.
      Returns:
      - null if 'requestMessage' is delayed, otherwise - 'payload' from 'requestMessage'.
      See Also:
      • releaseMessage(org.springframework.messaging.Message<?>)
    • rescheduleAt

      protected void rescheduleAt(Message<?> message, Date startTime)
    • getDelayedMessageCount

      public int getDelayedMessageCount()
      Specified by:
      getDelayedMessageCount in interface DelayHandlerManagement
    • reschedulePersistedMessages

      public void reschedulePersistedMessages()
      Used for reading persisted Messages in the 'messageStore' to reschedule them e.g. upon application restart. The logic is based on iteration over messageGroup.getMessages() and schedules task for 'delay' logic. This behavior is dictated by the avoidance of invocation thread overload.
      Specified by:
      reschedulePersistedMessages in interface DelayHandlerManagement
    • onApplicationEvent

      public void onApplicationEvent(ContextRefreshedEvent event)
      Handle ContextRefreshedEvent to invoke reschedulePersistedMessages() as late as possible after application context startup. Also, it checks initialized to ignore other ContextRefreshedEvents which may be published in the 'parent-child' contexts, e.g. in the Spring-MVC applications.
      Specified by:
      onApplicationEvent in interface ApplicationListener<ContextRefreshedEvent>
      Parameters:
      event - - ContextRefreshedEvent which occurs after Application context is completely initialized.
      See Also: