Thread Barrier
Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs. For example, consider an HTTP request that publishes a message to RabbitMQ. We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.
In version 4.2, Spring Integration introduced the <barrier/>
component for this purpose.
The underlying MessageHandler
is the BarrierMessageHandler
.
This class also implements MessageTriggerAction
, in which a message passed to the trigger()
method releases a corresponding thread in the handleRequestMessage()
method (if present).
The suspended thread and trigger thread are correlated by invoking a CorrelationStrategy
on the messages.
When a message is sent to the input-channel
, the thread is suspended for up to requestTimeout
milliseconds, waiting for a corresponding trigger message.
The default correlation strategy uses the IntegrationMessageHeaderAccessor.CORRELATION_ID
header.
When a trigger message arrives with the same correlation, the thread is released.
The message sent to the output-channel
after release is constructed by using a MessageGroupProcessor
.
By default, the message is a Collection<?>
of the two payloads, and the headers are merged by using a DefaultAggregatingMessageGroupProcessor
.
If the trigger() method is invoked first (or after the main thread times out), it is suspended for up to triggerTimeout waiting for the suspending message to arrive.
If you do not want to suspend the trigger thread, consider handing off to a TaskExecutor instead so that its thread is suspended instead.
|
Prior version 5.4, there was only one timeout option for both request and trigger messages, but in some use-case it is better to have different timeouts for those actions.
Therefore requestTimeout and triggerTimeout options have been introduced.
|
The requires-reply
property determines the action to take if the suspended thread times out before the trigger message arrives.
By default, it is false
, which means the endpoint returns null
, the flow ends, and the thread returns to the caller.
When true
, a ReplyRequiredException
is thrown.
You can call the trigger()
method programmatically (obtain the bean reference by using the name, barrier.handler
— where barrier
is the bean name of the barrier endpoint).
Alternatively, you can configure an <outbound-channel-adapter/>
to trigger the release.
Only one thread can be suspended with the same correlation. The same correlation can be used multiple times but only once concurrently. An exception is thrown if a second thread arrives with the same correlation. |
The following example shows how to use a custom header for correlation:
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
Depending on which one has a message arrive first, either the thread sending a message to in
or the thread sending a message to release
waits for up to ten seconds until the other message arrives.
When the message is released, the out
channel is sent a message that combines the result of invoking the custom MessageGroupProcessor
bean, named myOutputProcessor
.
If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.
For an example of this component, see the barrier sample application.