Asynchronous Consumer
Spring AMQP also supports annotated listener endpoints through the use of the @RabbitListener annotation and provides an open infrastructure to register endpoints programmatically.
This is by far the most convenient way to setup an asynchronous consumer.
See Annotation-driven Listener Endpoints for more details.
|
The prefetch default value used to be 1, which could lead to under-utilization of efficient consumers. Starting with version 2.0, the default prefetch value is now 250, which should keep consumers busy in most common scenarios and thus improve throughput. There are, nevertheless, scenarios where the prefetch value should be low:
Also, with low-volume messaging and multiple consumers (including concurrency within a single listener container instance), you may wish to reduce the prefetch to get a more even distribution of messages across consumers. For more background about prefetch, see this post about consumer utilization in RabbitMQ and this post about queuing theory. |
Message Listener
For asynchronous Message
reception, a dedicated component (not the AmqpTemplate
) is involved.
That component is a container for a Message
-consuming callback.
We consider the container and its properties later in this section.
First, though, we should look at the callback, since that is where your application code is integrated with the messaging system.
There are a few options for the callback, starting with an implementation of the MessageListener
interface, which the following listing shows:
public interface MessageListener {
void onMessage(Message message);
}
If your callback logic depends on the AMQP Channel instance for any reason, you may instead use the ChannelAwareMessageListener
.
It looks similar but has an extra parameter.
The following listing shows the ChannelAwareMessageListener
interface definition:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
In version 2.1, this interface moved from package o.s.amqp.rabbit.core to o.s.amqp.rabbit.listener.api .
|
MessageListenerAdapter
If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework. This is often referred to as “Message-driven POJO” support.
Version 1.5 introduced a more flexible mechanism for POJO messaging, the @RabbitListener annotation.
See Annotation-driven Listener Endpoints for more information.
|
When using the adapter, you need to provide only a reference to the instance that the adapter itself should invoke. The following example shows how to do so:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
You can subclass the adapter and provide an implementation of getListenerMethodName()
to dynamically select different methods based on the message.
This method has two parameters, originalMessage
and extractedMessage
, the latter being the result of any conversion.
By default, a SimpleMessageConverter
is configured.
See SimpleMessageConverter
for more information and information about other converters available.
Starting with version 1.4.2, the original message has consumerQueue
and consumerTag
properties, which can be used to determine the queue from which a message was received.
Starting with version 1.5, you can configure a map of consumer queue or tag to method name, to dynamically select the method to call.
If no entry is in the map, we fall back to the default listener method.
The default listener method (if not set) is handleMessage
.
Starting with version 2.0, a convenient FunctionalInterface
has been provided.
The following listing shows the definition of FunctionalInterface
:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
This interface facilitates convenient configuration of the adapter by using Java 8 lambdas, as the following example shows:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
Starting with version 2.2, the buildListenerArguments(Object)
has been deprecated and new buildListenerArguments(Object, Channel, Message)
one has been introduced instead.
The new method helps listener to get Channel
and Message
arguments to do more, such as calling channel.basicReject(long, boolean)
in manual acknowledge mode.
The following listing shows the most basic example:
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
Now you could configure ExtendedListenerAdapter
as same as MessageListenerAdapter
if you need to receive “channel” and “message”.
Parameters of listener should be set as buildListenerArguments(Object, Channel, Message)
returned, as the following example of listener shows:
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
Container
Now that you have seen the various options for the Message
-listening callback, we can turn our attention to the container.
Basically, the container handles the “active” responsibilities so that the listener callback can remain passive.
The container is an example of a “lifecycle” component.
It provides methods for starting and stopping.
When configuring the container, you essentially bridge the gap between an AMQP Queue and the MessageListener
instance.
You must provide a reference to the ConnectionFactory
and the queue names or Queue instances from which that listener should consume messages.
Prior to version 2.0, there was one listener container, the SimpleMessageListenerContainer
.
There is now a second container, the DirectMessageListenerContainer
.
The differences between the containers and criteria you might apply when choosing which to use are described in Choosing a Container.
The following listing shows the most basic example, which works by using the, SimpleMessageListenerContainer
:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
As an “active” component, it is most common to create the listener container with a bean definition so that it can run in the background. The following example shows one way to do so with XML:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
The following listing shows another way to do so with XML:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
Both of the preceding examples create a DirectMessageListenerContainer
(notice the type
attribute — it defaults to simple
).
Alternately, you may prefer to use Java configuration, which looks similar to the preceding code snippet:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
Consumer Priority
Starting with RabbitMQ Version 3.2, the broker now supports consumer priority (see Using Consumer Priorities with RabbitMQ).
This is enabled by setting the x-priority
argument on the consumer.
The SimpleMessageListenerContainer
now supports setting consumer arguments, as the following example shows:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
For convenience, the namespace provides the priority
attribute on the listener
element, as the following example shows:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
Starting with version 1.3, you can modify the queues on which the container listens at runtime. See Listener Container Queues.
auto-delete
Queues
When a container is configured to listen to auto-delete
queues, the queue has an x-expires
option, or the Time-To-Live policy is configured on the Broker, the queue is removed by the broker when the container is stopped (that is, when the last consumer is cancelled).
Before version 1.3, the container could not be restarted because the queue was missing.
The RabbitAdmin
only automatically redeclares queues and so on when the connection is closed or when it opens, which does not happen when the container is stopped and started.
Starting with version 1.3, the container uses a RabbitAdmin
to redeclare any missing queues during startup.
You can also use conditional declaration (see Conditional Declaration) together with an auto-startup="false"
admin to defer queue declaration until the container is started.
The following example shows how to do so:
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
In this case, the queue and exchange are declared by containerAdmin
, which has auto-startup="false"
so that the elements are not declared during context initialization.
Also, the container is not started for the same reason.
When the container is later started, it uses its reference to containerAdmin
to declare the elements.