@ManagedResource @IntegrationManagedResource public class DelayHandler extends AbstractReplyProducingMessageHandler implements DelayHandlerManagement, ApplicationListener<ContextRefreshedEvent>
MessageHandler
that is capable of delaying the continuation of a Message flow
based on the result of evaluation delayExpression
on an inbound Message
or a default delay value configured on this handler. Note that the continuation of the
flow is delegated to a TaskScheduler
, and therefore, the calling thread does
not block. The advantage of this approach is that many delays can be managed
concurrently, even very long delays, without producing a buildup of blocked Threads.
One thing to keep in mind, however, is that any active transactional context will not propagate from the original sender to the eventual recipient. This is a side-effect of passing the Message to the output channel after the delay with a different Thread in control.
When this handler's delayExpression
property is configured, that evaluation
result value will take precedence over the handler's defaultDelay
value. The
actual evaluation result value may be a long, a String that can be parsed as a long, or
a Date. If it is a long, it will be interpreted as the length of time to delay in
milliseconds counting from the current time (e.g. a value of 5000 indicates that the
Message can be released as soon as five seconds from the current time). If the value is
a Date, it will be delayed at least until that Date occurs (i.e. the delay in that case
is equivalent to headerDate.getTime() - new Date().getTime()
).
Modifier and Type | Class and Description |
---|---|
static class |
DelayHandler.DelayedMessageWrapper |
AbstractReplyProducingMessageHandler.RequestHandler
IntegrationManagement.ManagementOverrides
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_MAX_ATTEMPTS |
static long |
DEFAULT_RETRY_DELAY |
messagingTemplate
EXPRESSION_PARSER, logger
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
DelayHandler(String messageGroupId)
Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for
MessageGroup to store delayed Messages in the MessageGroupStore . |
DelayHandler(String messageGroupId,
TaskScheduler taskScheduler)
Create a DelayHandler with the given default delay.
|
Modifier and Type | Method and Description |
---|---|
protected void |
doInit() |
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
int |
getDelayedMessageCount() |
IntegrationPatternType |
getIntegrationPatternType()
Return a pattern type this component implements.
|
protected Object |
handleRequestMessage(Message<?> requestMessage)
Checks if 'requestMessage' wasn't delayed before (
releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long)
and DelayHandler.DelayedMessageWrapper ). |
void |
onApplicationEvent(ContextRefreshedEvent event)
Handles
ContextRefreshedEvent to invoke
reschedulePersistedMessages() as late as possible after application context
startup. |
protected void |
rescheduleAt(Message<?> message,
Date startTime) |
void |
reschedulePersistedMessages()
Used for reading persisted Messages in the 'messageStore' to reschedule them e.g.
|
void |
setDefaultDelay(long defaultDelay)
Set the default delay in milliseconds.
|
void |
setDelayedAdviceChain(List<Advice> delayedAdviceChain)
Specify the
List<Advice> to advise
DelayHandler.ReleaseMessageHandler proxy. |
void |
setDelayedMessageErrorChannel(MessageChannel delayedMessageErrorChannel)
Set a message channel to which an
ErrorMessage will be sent if sending the
released message fails. |
void |
setDelayedMessageErrorChannelName(String delayedMessageErrorChannelName)
Set a message channel name to which an
ErrorMessage will be sent if sending
the released message fails. |
void |
setDelayExpression(Expression delayExpression)
Specify the
Expression that should be checked for a delay period (in
milliseconds) or a Date to delay until. |
void |
setDelayExpressionString(String delayExpression)
Specify the
Expression that should be checked for a delay period (in
milliseconds) or a Date to delay until. |
void |
setIgnoreExpressionFailures(boolean ignoreExpressionFailures)
Specify whether
Exceptions thrown by delayExpression evaluation
should be ignored (only logged). |
void |
setMaxAttempts(int maxAttempts)
Set the maximum number of release attempts for when message release fails.
|
void |
setMessageStore(MessageGroupStore messageStore)
Specify the
MessageGroupStore that should be used to store Messages while
awaiting the delay. |
void |
setRetryDelay(long retryDelay)
Set an additional delay to apply when retrying after a release failure.
|
protected boolean |
shouldCopyRequestHeaders()
Subclasses may override this.
|
doInvokeAdvisedRequestHandler, getBeanClassLoader, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldSplitOutput, updateNotPropagatedHeaders
handleMessage, onComplete, onError, onNext, onSubscribe
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
forPayload
getThisAs
getBeanName, getComponentName
public static final int DEFAULT_MAX_ATTEMPTS
public static final long DEFAULT_RETRY_DELAY
public DelayHandler(String messageGroupId)
MessageGroup
to store delayed Messages in the MessageGroupStore
.
The sending of Messages after the delay will be handled by registered in the
ApplicationContext default
ThreadPoolTaskScheduler
.messageGroupId
- The message group identifier.IntegrationObjectSupport.getTaskScheduler()
public DelayHandler(String messageGroupId, TaskScheduler taskScheduler)
TaskScheduler
.messageGroupId
- The message group identifier.taskScheduler
- A task scheduler.public void setDefaultDelay(long defaultDelay)
delayExpression
property has
been provided, the default delay will be applied to all Messages. If a delay should
only be applied to Messages with evaluation result from
delayExpression
, then set this value to 0.defaultDelay
- The default delay in milliseconds.public void setDelayExpression(Expression delayExpression)
Expression
that should be checked for a delay period (in
milliseconds) or a Date to delay until. If this property is set, the result of the
expression evaluation (if not null) will take precedence over this handler's
default delay.delayExpression
- The delay expression.public void setDelayExpressionString(String delayExpression)
Expression
that should be checked for a delay period (in
milliseconds) or a Date to delay until. If this property is set, the result of the
expression evaluation (if not null) will take precedence over this handler's
default delay.delayExpression
- The delay expression.public void setIgnoreExpressionFailures(boolean ignoreExpressionFailures)
Exceptions
thrown by delayExpression
evaluation
should be ignored (only logged). In this case case the delayer will fall back to
the to the defaultDelay
. If this property is specified as false
,
any delayExpression
evaluation Exception
will be thrown to the
caller without falling back to the to the defaultDelay
. Default is
true
.ignoreExpressionFailures
- true if expression evaluation failures should be
ignored.determineDelayForMessage(org.springframework.messaging.Message<?>)
public void setMessageStore(MessageGroupStore messageStore)
MessageGroupStore
that should be used to store Messages while
awaiting the delay.messageStore
- The message store.public void setDelayedAdviceChain(List<Advice> delayedAdviceChain)
List<Advice>
to advise
DelayHandler.ReleaseMessageHandler
proxy. Usually used to add transactions
to delayed messages retrieved from a transactional message store.delayedAdviceChain
- The advice chain.createReleaseMessageTask()
public void setDelayedMessageErrorChannel(MessageChannel delayedMessageErrorChannel)
ErrorMessage
will be sent if sending the
released message fails. If the error flow returns normally, the release is
complete. If the error flow throws an exception, the release will be re-attempted.
If there is a transaction advice on the release task, the error flow is called
within the transaction.delayedMessageErrorChannel
- the channel.setMaxAttempts(int)
,
setRetryDelay(long)
public void setDelayedMessageErrorChannelName(String delayedMessageErrorChannelName)
ErrorMessage
will be sent if sending
the released message fails. If the error flow returns normally, the release is
complete. If the error flow throws an exception, the release will be re-attempted.
If there is a transaction advice on the release task, the error flow is called
within the transaction.delayedMessageErrorChannelName
- the channel name.setMaxAttempts(int)
,
setRetryDelay(long)
public void setMaxAttempts(int maxAttempts)
maxAttempts
- the max attempts.setRetryDelay(long)
public void setRetryDelay(long retryDelay)
retryDelay
- the retry delay.setMaxAttempts(int)
public String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class MessageHandlerSupport
public IntegrationPatternType getIntegrationPatternType()
IntegrationPattern
getIntegrationPatternType
in interface IntegrationPattern
getIntegrationPatternType
in class AbstractReplyProducingMessageHandler
IntegrationPatternType
this component implements.protected void doInit()
doInit
in class AbstractReplyProducingMessageHandler
protected boolean shouldCopyRequestHeaders()
AbstractMessageProducingHandler
shouldCopyRequestHeaders
in class AbstractMessageProducingHandler
protected Object handleRequestMessage(Message<?> requestMessage)
releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long)
and DelayHandler.DelayedMessageWrapper
). Than determine 'delay' for
'requestMessage' (determineDelayForMessage(org.springframework.messaging.Message<?>)
) and if delay > 0
schedules 'releaseMessage' task after 'delay'.handleRequestMessage
in class AbstractReplyProducingMessageHandler
requestMessage
- - the Message which may be delayed.null
if 'requestMessage' is delayed, otherwise - 'payload' from
'requestMessage'.releaseMessage(org.springframework.messaging.Message<?>)
public int getDelayedMessageCount()
getDelayedMessageCount
in interface DelayHandlerManagement
public void reschedulePersistedMessages()
messageGroup.getMessages()
and schedules task for 'delay' logic. This
behavior is dictated by the avoidance of invocation thread overload.reschedulePersistedMessages
in interface DelayHandlerManagement
public void onApplicationEvent(ContextRefreshedEvent event)
ContextRefreshedEvent
to invoke
reschedulePersistedMessages()
as late as possible after application context
startup. Also it checks initialized
to ignore other
ContextRefreshedEvent
s which may be published in the 'parent-child'
contexts, e.g. in the Spring-MVC applications.onApplicationEvent
in interface ApplicationListener<ContextRefreshedEvent>
event
- - ContextRefreshedEvent
which occurs after Application context
is completely initialized.reschedulePersistedMessages()