Class DelayHandler
- All Implemented Interfaces:
EventListener
,org.reactivestreams.Subscriber<Message<?>>
,Aware
,BeanClassLoaderAware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ApplicationListener<ContextRefreshedEvent>
,Ordered
,ExpressionCapable
,Orderable
,MessageProducer
,DelayHandlerManagement
,HeaderPropagationAware
,IntegrationPattern
,NamedComponent
,IntegrationManagement
,TrackableComponent
,MessageHandler
,reactor.core.CoreSubscriber<Message<?>>
MessageHandler
that is capable of delaying the continuation of a Message flow
based on the result of evaluation delayExpression
on an inbound Message
or a default delay value configured on this handler. Note that the continuation of the
flow is delegated to a TaskScheduler
, and therefore, the calling thread does
not block. The advantage of this approach is that many delays can be managed
concurrently, even very long delays, without producing a buildup of blocked Threads.
One thing to keep in mind, however, is that any active transactional context will not propagate from the original sender to the eventual recipient. This is a side effect of passing the Message to the output channel after the delay with a different Thread in control.
When this handler's delayExpression
property is configured, that evaluation
result value will take precedence over the handler's defaultDelay
value. The
actual evaluation result value may be a long, a String that can be parsed as a long, or
a Date. If it is a long, it will be interpreted as the length of time to delay in
milliseconds counting from the current time (e.g. a value of 5000 indicates that the
Message can be released as soon as five seconds from the current time). If the value is
a Date, it will be delayed at least until that Date occurs (i.e. the delay in that case
is equivalent to headerDate.getTime() - new Date().getTime()
).
Delayed messages are stored in the MessageGroupStore
as a dedicated group.
If an external persistent store is provided, those delayed messages will be rescheduled
after application startup.
The messageGroupId
is required option and must be unique for each delayer
configuration to avoid work-stealing from the store and unexpected releases.
Different instances of the same delayer can point to the same message group in the store.
The messageGroupId
cannot rely on a bean name which might be generated.
After application restart the bean may get a different generated name and its delayed
messages might be lost from reschedule since its group is not managed
by the application anymore.
- Since:
- 1.0.3
- Author:
- Mark Fisher, Artem Bilan, Gary Russell, Christian Tzolov
-
Nested Class Summary
Nested classes/interfaces inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
AbstractReplyProducingMessageHandler.RequestHandler
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Modifier and TypeFieldDescriptionstatic final int
static final long
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplate
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
-
Constructor Summary
ConstructorDescriptionConstruct an instance with default options.DelayHandler
(String messageGroupId) Create a DelayHandler with the given 'messageGroupId' that is used as 'key' forMessageGroup
to store delayed Messages in theMessageGroupStore
.DelayHandler
(String messageGroupId, TaskScheduler taskScheduler) Create a DelayHandler with the given default delay. -
Method Summary
Modifier and TypeMethodDescriptionprotected void
doInit()
Subclasses may implement this method to provide component type information.int
Return a pattern type this component implements.protected Object
handleRequestMessage
(Message<?> requestMessage) Check if 'requestMessage' wasn't delayed before (releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long)
andDelayHandler.DelayedMessageWrapper
).void
HandleContextRefreshedEvent
to invokereschedulePersistedMessages()
as late as possible after application context startup.protected void
rescheduleAt
(Message<?> message, Date startTime) void
Used for reading persisted Messages in the 'messageStore' to reschedule them e.g.void
setDefaultDelay
(long defaultDelay) Set the default delay in milliseconds.void
setDelayedAdviceChain
(List<Advice> delayedAdviceChain) Specify theList<Advice>
to adviseDelayHandler.ReleaseMessageHandler
proxy.void
setDelayedMessageErrorChannel
(MessageChannel delayedMessageErrorChannel) Set a message channel to which anErrorMessage
will be sent if sending the released message fails.void
setDelayedMessageErrorChannelName
(String delayedMessageErrorChannelName) Set a message channel name to which anErrorMessage
will be sent if sending the released message fails.void
setDelayExpression
(Expression delayExpression) Specify theExpression
that should be checked for a delay period (in milliseconds) or a Date to delay until.void
setDelayExpressionString
(String delayExpression) Specify theExpression
that should be checked for a delay period (in milliseconds) or a Date to delay until.void
setIgnoreExpressionFailures
(boolean ignoreExpressionFailures) Specify whetherExceptions
thrown bydelayExpression
evaluation should be ignored (only logged).void
setMaxAttempts
(int maxAttempts) Set the maximum number of release attempts for when message release fails.void
setMessageGroupId
(String messageGroupId) Set a group id to manage delayed messages by this handler.void
setMessageStore
(MessageGroupStore messageStore) Specify theMessageGroupStore
that should be used to store Messages while awaiting the delay.void
setRetryDelay
(long retryDelay) Set an additional delay to apply when retrying after a release failure.protected boolean
Subclasses may override this.Methods inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
doInvokeAdvisedRequestHandler, getBeanClassLoader, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldSplitOutput, updateNotPropagatedHeaders
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribe, setObservationConvention
Methods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecution
Methods inherited from interface reactor.core.CoreSubscriber
currentContext
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Field Details
-
DEFAULT_MAX_ATTEMPTS
public static final int DEFAULT_MAX_ATTEMPTS- See Also:
-
DEFAULT_RETRY_DELAY
public static final long DEFAULT_RETRY_DELAY- See Also:
-
-
Constructor Details
-
DelayHandler
public DelayHandler()Construct an instance with default options. ThemessageGroupId
must then be provided via the setter.- Since:
- 6.2
-
DelayHandler
Create a DelayHandler with the given 'messageGroupId' that is used as 'key' forMessageGroup
to store delayed Messages in theMessageGroupStore
. The sending of Messages after the delay will be handled by registered in the ApplicationContext defaultThreadPoolTaskScheduler
.- Parameters:
messageGroupId
- The message group identifier.- See Also:
-
DelayHandler
Create a DelayHandler with the given default delay. The sending of Messages after the delay will be handled by the providedTaskScheduler
.- Parameters:
messageGroupId
- The message group identifier.taskScheduler
- A task scheduler.
-
-
Method Details
-
setMessageGroupId
Set a group id to manage delayed messages by this handler. Required.- Parameters:
messageGroupId
- the group id for delayed messages.- Since:
- 6.2
-
setDefaultDelay
public void setDefaultDelay(long defaultDelay) Set the default delay in milliseconds. If nodelayExpression
property has been provided, the default delay will be applied to all Messages. If a delay should only be applied to Messages with evaluation result fromdelayExpression
, then set this value to 0.- Parameters:
defaultDelay
- The default delay in milliseconds.
-
setDelayExpression
Specify theExpression
that should be checked for a delay period (in milliseconds) or a Date to delay until. If this property is set, the result of the expression evaluation (if not null) will take precedence over this handler's default delay.- Parameters:
delayExpression
- The delay expression.
-
setDelayExpressionString
Specify theExpression
that should be checked for a delay period (in milliseconds) or a Date to delay until. If this property is set, the result of the expression evaluation (if not null) will take precedence over this handler's default delay.- Parameters:
delayExpression
- The delay expression.- Since:
- 5.0
-
setIgnoreExpressionFailures
public void setIgnoreExpressionFailures(boolean ignoreExpressionFailures) Specify whetherExceptions
thrown bydelayExpression
evaluation should be ignored (only logged). In this case the delayer will fall back to thedefaultDelay
. If this property is specified asfalse
, anydelayExpression
evaluationException
will be thrown to the caller without falling back to thedefaultDelay
. Default istrue
.- Parameters:
ignoreExpressionFailures
- true if expression evaluation failures should be ignored.- See Also:
-
determineDelayForMessage(org.springframework.messaging.Message<?>)
-
setMessageStore
Specify theMessageGroupStore
that should be used to store Messages while awaiting the delay.- Parameters:
messageStore
- The message store.
-
setDelayedAdviceChain
Specify theList<Advice>
to adviseDelayHandler.ReleaseMessageHandler
proxy. Usually used to add transactions to delayed messages retrieved from a transactional message store.- Parameters:
delayedAdviceChain
- The advice chain.- See Also:
-
createReleaseMessageTask()
-
setDelayedMessageErrorChannel
Set a message channel to which anErrorMessage
will be sent if sending the released message fails. If the error flow returns normally, the release is complete. If the error flow throws an exception, the release will be re-attempted. If there is a transaction advice on the release task, the error flow is called within the transaction.- Parameters:
delayedMessageErrorChannel
- the channel.- Since:
- 5.0.8
- See Also:
-
setDelayedMessageErrorChannelName
Set a message channel name to which anErrorMessage
will be sent if sending the released message fails. If the error flow returns normally, the release is complete. If the error flow throws an exception, the release will be re-attempted. If there is a transaction advice on the release task, the error flow is called within the transaction.- Parameters:
delayedMessageErrorChannelName
- the channel name.- Since:
- 5.0.8
- See Also:
-
setMaxAttempts
public void setMaxAttempts(int maxAttempts) Set the maximum number of release attempts for when message release fails. Default 5.- Parameters:
maxAttempts
- the max attempts.- Since:
- 5.0.8
- See Also:
-
setRetryDelay
public void setRetryDelay(long retryDelay) Set an additional delay to apply when retrying after a release failure. Default 1000L.- Parameters:
retryDelay
- the retry delay.- Since:
- 5.0.8
- See Also:
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classMessageHandlerSupport
-
getIntegrationPatternType
Description copied from interface:IntegrationPattern
Return a pattern type this component implements.- Specified by:
getIntegrationPatternType
in interfaceIntegrationPattern
- Overrides:
getIntegrationPatternType
in classAbstractReplyProducingMessageHandler
- Returns:
- the
IntegrationPatternType
this component implements.
-
doInit
protected void doInit()- Overrides:
doInit
in classAbstractReplyProducingMessageHandler
-
shouldCopyRequestHeaders
protected boolean shouldCopyRequestHeaders()Description copied from class:AbstractMessageProducingHandler
Subclasses may override this. True by default.- Overrides:
shouldCopyRequestHeaders
in classAbstractMessageProducingHandler
- Returns:
- true if the request headers should be copied.
-
handleRequestMessage
Check if 'requestMessage' wasn't delayed before (releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long)
andDelayHandler.DelayedMessageWrapper
). Than determine 'delay' for 'requestMessage' (determineDelayForMessage(org.springframework.messaging.Message<?>)
) and ifdelay > 0
schedules 'releaseMessage' task after 'delay'.- Specified by:
handleRequestMessage
in classAbstractReplyProducingMessageHandler
- Parameters:
requestMessage
- - the Message which may be delayed.- Returns:
- -
null
if 'requestMessage' is delayed, otherwise - 'payload' from 'requestMessage'. - See Also:
-
releaseMessage(org.springframework.messaging.Message<?>)
-
rescheduleAt
-
getDelayedMessageCount
public int getDelayedMessageCount()- Specified by:
getDelayedMessageCount
in interfaceDelayHandlerManagement
-
reschedulePersistedMessages
public void reschedulePersistedMessages()Used for reading persisted Messages in the 'messageStore' to reschedule them e.g. upon application restart. The logic is based on iteration overmessageGroup.getMessages()
and schedules task for 'delay' logic. This behavior is dictated by the avoidance of invocation thread overload.- Specified by:
reschedulePersistedMessages
in interfaceDelayHandlerManagement
-
onApplicationEvent
HandleContextRefreshedEvent
to invokereschedulePersistedMessages()
as late as possible after application context startup. Also, it checksinitialized
to ignore otherContextRefreshedEvent
s which may be published in the 'parent-child' contexts, e.g. in the Spring-MVC applications.- Specified by:
onApplicationEvent
in interfaceApplicationListener<ContextRefreshedEvent>
- Parameters:
event
- -ContextRefreshedEvent
which occurs after Application context is completely initialized.- See Also:
-