This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.4.0! |
Strict Message Ordering
This section describes message ordering for inbound and outbound messages.
Inbound
If you require strict ordering of inbound messages, you must configure the inbound listener container’s prefetchCount
property to 1
.
This is because, if a message fails and is redelivered, it arrives after existing prefetched messages.
Since Spring AMQP version 2.0, the prefetchCount
defaults to 250
for improved performance.
Strict ordering requirements come at the cost of decreased performance.
Outbound
Consider the following integration flow:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
Suppose we send messages A
, B
and C
to the gateway.
While it is likely that messages A
, B
, C
are sent in order, there is no guarantee.
This is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message.
One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred-fold.
To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the BoundRabbitChannelAdvice
which is a HandleMessageAdvice
.
See Handling Message Advice.
When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations).
The following example shows how to use BoundRabbitChannelAdvice
:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
Notice that the same RabbitTemplate
(which implements RabbitOperations
) is used in the advice and the outbound adapter.
The advice runs the downstream flow within the template’s invoke
method so that all operations run on the same channel.
If the optional timeout is provided, when the flow completes, the advice calls the waitForConfirmsOrDie
method, which throws an exception if the confirmations are not received within the specified time.
There must be no thread hands-off in the downstream flow (QueueChannel , ExecutorChannel , and others).
|