This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 3.2.1! |
Enable Listener Endpoint Annotations
To enable support for @RabbitListener
annotations, you can add @EnableRabbit
to one of your @Configuration
classes.
The following example shows how to do so:
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
Since version 2.0, a DirectMessageListenerContainerFactory
is also available.
It creates DirectMessageListenerContainer
instances.
For information to help you choose between SimpleRabbitListenerContainerFactory and DirectRabbitListenerContainerFactory , see Choosing a Container.
|
Starting with version 2.2.2, you can provide a ContainerCustomizer
implementation (as shown above).
This can be used to further configure the container after it has been created and configured; you can use this, for example, to set properties that are not exposed by the container factory.
Version 2.4.8 provides the CompositeContainerCustomizer
for situations where you wish to apply multiple customizers.
By default, the infrastructure looks for a bean named rabbitListenerContainerFactory
as the source for the factory to use to create message listener containers.
In this case, and ignoring the RabbitMQ infrastructure setup, the processOrder
method can be invoked with a core poll size of three threads and a maximum pool size of ten threads.
You can customize the listener container factory to use for each annotation, or you can configure an explicit default by implementing the RabbitListenerConfigurer
interface.
The default is required only if at least one endpoint is registered without a specific container factory.
See the Javadoc for full details and examples.
The container factories provide methods for adding MessagePostProcessor
instances that are applied after receiving messages (before invoking the listener) and before sending replies.
See Reply Management for information about replies.
Starting with version 2.0.6, you can add a RetryTemplate
and RecoveryCallback
to the listener container factory.
It is used when sending replies.
The RecoveryCallback
is invoked when retries are exhausted.
You can use a SendRetryContextAccessor
to get information from the context.
The following example shows how to do so:
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
If you prefer XML configuration, you can use the <rabbit:annotation-driven>
element.
Any beans annotated with @RabbitListener
are detected.
For SimpleRabbitListenerContainer
instances, you can use XML similar to the following:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
For DirectMessageListenerContainer
instances, you can use XML similar to the following:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
Starting with version 2.0, the @RabbitListener
annotation has a concurrency
property.
It supports SpEL expressions (#{…}
) and property placeholders (${…}
).
Its meaning and allowed values depend on the container type, as follows:
-
For the
DirectMessageListenerContainer
, the value must be a single integer value, which sets theconsumersPerQueue
property on the container. -
For the
SimpleRabbitListenerContainer
, the value can be a single integer value, which sets theconcurrentConsumers
property on the container, or it can have the form,m-n
, wherem
is theconcurrentConsumers
property andn
is themaxConcurrentConsumers
property.
In either case, this setting overrides the settings on the factory. Previously you had to define different container factories if you had listeners that required different concurrency.
The annotation also allows overriding the factory autoStartup
and taskExecutor
properties via the autoStartup
and executor
(since 2.2) annotation properties.
Using a different executor for each might help with identifying threads associated with each listener in logs and thread dumps.
Version 2.2 also added the ackMode
property, which allows you to override the container factory’s acknowledgeMode
property.
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}