@RabbitListener with Batching
When receiving a a batch of messages, the de-batching is normally performed by the container and the listener is invoked with one message at at time.
Starting with version 2.2, you can configure the listener container factory and listener to receive the entire batch in one call, simply set the factory’s batchListener
property, and make the method payload parameter a List
or Collection
:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
Setting the batchListener
property to true automatically turns off the deBatchingEnabled
container property in containers that the factory creates (unless consumerBatchEnabled
is true
- see below). Effectively, the debatching is moved from the container to the listener adapter and the adapter creates the list that is passed to the listener.
A batch-enabled factory cannot be used with a multi-method listener.
Also starting with version 2.2. when receiving batched messages one-at-a-time, the last message contains a boolean header set to true
.
This header can be obtained by adding the @Header(AmqpHeaders.LAST_IN_BATCH)
boolean last` parameter to your listener method.
The header is mapped from MessageProperties.isLastInBatch()
.
In addition, AmqpHeaders.BATCH_SIZE
is populated with the size of the batch in every message fragment.
In addition, a new property consumerBatchEnabled
has been added to the SimpleMessageListenerContainer
.
When this is true, the container will create a batch of messages, up to batchSize
; a partial batch is delivered if receiveTimeout
elapses with no new messages arriving.
If a producer-created batch is received, it is debatched and added to the consumer-side batch; therefore the actual number of messages delivered may exceed batchSize
, which represents the number of messages received from the broker.
deBatchingEnabled
must be true when consumerBatchEnabled
is true; the container factory will enforce this requirement.
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
When using consumerBatchEnabled
with @RabbitListener
:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
the first is called with the raw, unconverted
org.springframework.amqp.core.Message
s received. -
the second is called with the
org.springframework.messaging.Message<?>
s with converted payloads and mapped headers/properties. -
the third is called with the converted payloads, with no access to headers/properties.
You can also add a Channel
parameter, often used when using MANUAL
ack mode.
This is not very useful with the third example because you don’t have access to the delivery_tag
property.
Spring Boot provides a configuration property for consumerBatchEnabled
and batchSize
, but not for batchListener
.
Starting with version 3.0, setting consumerBatchEnabled
to true
on the container factory also sets batchListener
to true
.
When consumerBatchEnabled
is true
, the listener must be a batch listener.
Starting with version 3.0, listener methods can consume Collection<?>
or List<?>
.