This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.4.0!

Strict Message Ordering

This section describes message ordering for inbound and outbound messages.

Inbound

If you require strict ordering of inbound messages, you must configure the inbound listener container’s prefetchCount property to 1. This is because, if a message fails and is redelivered, it arrives after existing prefetched messages. Since Spring AMQP version 2.0, the prefetchCount defaults to 250 for improved performance. Strict ordering requirements come at the cost of decreased performance.

Outbound

Consider the following integration flow:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

Suppose we send messages A, B and C to the gateway. While it is likely that messages A, B, C are sent in order, there is no guarantee. This is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message. One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred-fold.

To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the BoundRabbitChannelAdvice which is a HandleMessageAdvice. See Handling Message Advice. When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations). The following example shows how to use BoundRabbitChannelAdvice:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

Notice that the same RabbitTemplate (which implements RabbitOperations) is used in the advice and the outbound adapter. The advice runs the downstream flow within the template’s invoke method so that all operations run on the same channel. If the optional timeout is provided, when the flow completes, the advice calls the waitForConfirmsOrDie method, which throws an exception if the confirmations are not received within the specified time.

There must be no thread hands-off in the downstream flow (QueueChannel, ExecutorChannel, and others).