@RabbitListener with Batching

When receiving a a batch of messages, the de-batching is normally performed by the container and the listener is invoked with one message at at time. Starting with version 2.2, you can configure the listener container factory and listener to receive the entire batch in one call, simply set the factory’s batchListener property, and make the method payload parameter a List or Collection:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

Setting the batchListener property to true automatically turns off the deBatchingEnabled container property in containers that the factory creates (unless consumerBatchEnabled is true - see below). Effectively, the debatching is moved from the container to the listener adapter and the adapter creates the list that is passed to the listener.

A batch-enabled factory cannot be used with a multi-method listener.

Also starting with version 2.2. when receiving batched messages one-at-a-time, the last message contains a boolean header set to true. This header can be obtained by adding the @Header(AmqpHeaders.LAST_IN_BATCH) boolean last` parameter to your listener method. The header is mapped from MessageProperties.isLastInBatch(). In addition, AmqpHeaders.BATCH_SIZE is populated with the size of the batch in every message fragment.

In addition, a new property consumerBatchEnabled has been added to the SimpleMessageListenerContainer. When this is true, the container will create a batch of messages, up to batchSize; a partial batch is delivered if receiveTimeout elapses with no new messages arriving. If a producer-created batch is received, it is debatched and added to the consumer-side batch; therefore the actual number of messages delivered may exceed batchSize, which represents the number of messages received from the broker. deBatchingEnabled must be true when consumerBatchEnabled is true; the container factory will enforce this requirement.

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

When using consumerBatchEnabled with @RabbitListener:

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • the first is called with the raw, unconverted org.springframework.amqp.core.Message s received.

  • the second is called with the org.springframework.messaging.Message<?> s with converted payloads and mapped headers/properties.

  • the third is called with the converted payloads, with no access to headers/properties.

You can also add a Channel parameter, often used when using MANUAL ack mode. This is not very useful with the third example because you don’t have access to the delivery_tag property.

Spring Boot provides a configuration property for consumerBatchEnabled and batchSize, but not for batchListener. Starting with version 3.0, setting consumerBatchEnabled to true on the container factory also sets batchListener to true. When consumerBatchEnabled is true, the listener must be a batch listener.

Starting with version 3.0, listener methods can consume Collection<?> or List<?>.