Delayer
A delayer is a simple endpoint that lets a message flow be delayed by a certain interval.
When a message is delayed, the original sender does not block.
Instead, the delayed messages are scheduled with an instance of org.springframework.scheduling.TaskScheduler
to be sent to the output channel after the delay has passed.
This approach is scalable even for rather long delays, since it does not result in a large number of blocked sender threads.
On the contrary, in the typical case, a thread pool is used for the actual execution of releasing the messages.
This section contains several examples of configuring a delayer.
Configuring a Delayer
The <delayer>
element is used to delay the message flow between two message channels.
As with the other endpoints, you can provide the 'input-channel' and 'output-channel' attributes, but the delayer also has 'default-delay' and 'expression' attributes (and the 'expression' element) that determine the number of milliseconds by which each message should be delayed.
The following example delays all messages by three seconds:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
If you need to determine the delay for each message, you can also provide the SpEL expression by using the 'expression' attribute, as the following expression shows:
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
In the preceding example, the three-second delay applies only when the expression evaluates to null for a given inbound message.
If you want to apply a delay only to messages that have a valid result of the expression evaluation, you can use a 'default-delay' of 0
(the default).
For any message that has a delay of 0
(or less), the message is sent immediately, on the calling thread.
The XML parser uses a message group ID of <beanName>.messageGroupId .
|
The delay handler supports expression evaluation results that represent an interval in milliseconds (any Object whose toString() method produces a value that can be parsed into a Long ) as well as java.util.Date instances representing an absolute time.
In the first case, the milliseconds are counted from the current time (for example a value of 5000 would delay the message for at least five seconds from the time it is received by the delayer).
With a Date instance, the message is not released until the time represented by that Date object.
A value that equates to a non-positive delay or a Date in the past results in no delay.
Instead, it is sent directly to the output channel on the original sender’s thread.
If the expression evaluation result is not a Date and can not be parsed as a Long , the default delay (if any — the default is 0 ) is applied.
|
The expression evaluation may throw an evaluation exception for various reasons, including an invalid expression or other conditions.
By default, such exceptions are ignored (though logged at the DEBUG level) and the delayer falls back to the default delay (if any).
You can modify this behavior by setting the ignore-expression-failures attribute.
By default, this attribute is set to true and the delayer behavior is as described earlier.
However, if you wish to not ignore expression evaluation exceptions and throw them to the delayer’s caller, set the ignore-expression-failures attribute to false .
|
In the preceding example, the delay expression is specified as
Consequently, if there is a possibility of the header being omitted and you want to fall back to the default delay, it is generally more efficient (and recommended) using the indexer syntax instead of dot property accessor syntax, because detecting the null is faster than catching an exception. |
The delayer delegates to an instance of Spring’s TaskScheduler
abstraction.
The default scheduler used by the delayer is the ThreadPoolTaskScheduler
instance provided by Spring Integration on startup.
See Configuring the Task Scheduler.
If you want to delegate to a different scheduler, you can provide a reference through the delayer element’s 'scheduler' attribute, as the following example shows:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
If you configure an external ThreadPoolTaskScheduler , you can set waitForTasksToCompleteOnShutdown = true on this property.
It allows successful completion of 'delay' tasks that are already in the execution state (releasing the message) when the application is shutdown.
Before Spring Integration 2.2, this property was available on the <delayer> element, because DelayHandler could create its own scheduler on the background.
Since 2.2, the delayer requires an external scheduler instance and waitForTasksToCompleteOnShutdown was deleted.
You should use the scheduler’s own configuration.
|
ThreadPoolTaskScheduler has a property errorHandler , which can be injected with some implementation of org.springframework.util.ErrorHandler .
This handler allows processing an Exception from the thread of the scheduled task sending the delayed message.
By default, it uses an org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler , and you can see a stack trace in the logs.
You might want to consider using an org.springframework.integration.channel.MessagePublishingErrorHandler , which sends an ErrorMessage into an error-channel , either from the failed message’s header or into the default error-channel .
This error handling is performed after a transaction rolls back (if present).
See Release Failures.
|
Delayer and a Message Store
The DelayHandler
persists delayed messages into the message group in the provided MessageStore
.
(The 'groupId' is based on the required 'id' attribute of the <delayer>
element.)
A delayed message is removed from the MessageStore
by the scheduled task immediately before the DelayHandler
sends the message to the output-channel
.
If the provided MessageStore
is persistent (such as JdbcMessageStore
), it provides the ability to not lose messages on the application shutdown.
After application startup, the DelayHandler
reads messages from its message group in the MessageStore
and reschedules them with a delay based on the original arrival time of the message (if the delay is numeric).
For messages where the delay header was a Date
, that Date
is used when rescheduling.
If a delayed message remains in the MessageStore
more than its 'delay', it is sent immediately after startup.
The <delayer>
can be enriched with either of two mutually exclusive elements: <transactional>
and <advice-chain>
.
The List
of these AOP advices is applied to the proxied internal DelayHandler.ReleaseMessageHandler
, which has the responsibility to release the message, after the delay, on a Thread
of the scheduled task.
It might be used, for example, when the downstream message flow throws an exception and the transaction of the ReleaseMessageHandler
is rolled back.
In this case, the delayed message remains in the persistent MessageStore
.
You can use any custom org.aopalliance.aop.Advice
implementation within the <advice-chain>
.
The <transactional>
element defines a simple advice chain that has only the transactional advice.
The following example shows an advice-chain
within a <delayer>
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
The DelayHandler
can be exported as a JMX MBean
with managed operations (getDelayedMessageCount
and reschedulePersistedMessages
), which allows the rescheduling of delayed persisted messages at runtime — for example, if the TaskScheduler
has previously been stopped.
These operations can be invoked through a Control Bus
command, as the following example shows:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
For more information regarding the message store, JMX, and the control bus, see System Management. |
Starting with version 5.3.7, if a transaction is active when a message is stored into a MessageStore
, the release task is scheduled in a TransactionSynchronization.afterCommit()
callback.
This is necessary to prevent a race condition, where the scheduled release could run before the transaction has committed, and the message is not found.
In this case, the message will be released after the delay, or after the transaction commits, whichever is later.
Release Failures
Starting with version 5.0.8, there are two new properties on the delayer:
-
maxAttempts
(default 5) -
retryDelay
(default 1 second)
When a message is released, if the downstream flow fails, the release will be attempted after the retryDelay
.
If the maxAttempts
is reached, the message is discarded (unless the release is transactional, in which case the message will remain in the store, but will no longer be scheduled for release, until the application is restarted, or the reschedulePersistedMessages()
method is invoked, as discussed above).
In addition, you can configure a delayedMessageErrorChannel
; when a release fails, an ErrorMessage
is sent to that channel with the exception as the payload and has the originalMessage
property.
The ErrorMessage
contains a header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
containing the current count.
If the error flow consumes the error message and exits normally, no further action is taken; if the release is transactional, the transaction will commit and the message deleted from the store.
If the error flow throws an exception, the release will be retried up to maxAttempts
as discussed above.