Asynchronous Consumer

Spring AMQP also supports annotated listener endpoints through the use of the @RabbitListener annotation and provides an open infrastructure to register endpoints programmatically. This is by far the most convenient way to setup an asynchronous consumer. See Annotation-driven Listener Endpoints for more details.

The prefetch default value used to be 1, which could lead to under-utilization of efficient consumers. Starting with version 2.0, the default prefetch value is now 250, which should keep consumers busy in most common scenarios and thus improve throughput.

There are, nevertheless, scenarios where the prefetch value should be low:

  • For large messages, especially if the processing is slow (messages could add up to a large amount of memory in the client process)

  • When strict message ordering is necessary (the prefetch value should be set back to 1 in this case)

  • Other special cases

Also, with low-volume messaging and multiple consumers (including concurrency within a single listener container instance), you may wish to reduce the prefetch to get a more even distribution of messages across consumers.

For more background about prefetch, see this post about consumer utilization in RabbitMQ and this post about queuing theory.

Message Listener

For asynchronous Message reception, a dedicated component (not the AmqpTemplate) is involved. That component is a container for a Message-consuming callback. We consider the container and its properties later in this section. First, though, we should look at the callback, since that is where your application code is integrated with the messaging system. There are a few options for the callback, starting with an implementation of the MessageListener interface, which the following listing shows:

public interface MessageListener {
    void onMessage(Message message);
}

If your callback logic depends on the AMQP Channel instance for any reason, you may instead use the ChannelAwareMessageListener. It looks similar but has an extra parameter. The following listing shows the ChannelAwareMessageListener interface definition:

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
In version 2.1, this interface moved from package o.s.amqp.rabbit.core to o.s.amqp.rabbit.listener.api.

MessageListenerAdapter

If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework. This is often referred to as “Message-driven POJO” support.

Version 1.5 introduced a more flexible mechanism for POJO messaging, the @RabbitListener annotation. See Annotation-driven Listener Endpoints for more information.

When using the adapter, you need to provide only a reference to the instance that the adapter itself should invoke. The following example shows how to do so:

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

You can subclass the adapter and provide an implementation of getListenerMethodName() to dynamically select different methods based on the message. This method has two parameters, originalMessage and extractedMessage, the latter being the result of any conversion. By default, a SimpleMessageConverter is configured. See SimpleMessageConverter for more information and information about other converters available.

Starting with version 1.4.2, the original message has consumerQueue and consumerTag properties, which can be used to determine the queue from which a message was received.

Starting with version 1.5, you can configure a map of consumer queue or tag to method name, to dynamically select the method to call. If no entry is in the map, we fall back to the default listener method. The default listener method (if not set) is handleMessage.

Starting with version 2.0, a convenient FunctionalInterface has been provided. The following listing shows the definition of FunctionalInterface:

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

This interface facilitates convenient configuration of the adapter by using Java 8 lambdas, as the following example shows:

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

Starting with version 2.2, the buildListenerArguments(Object) has been deprecated and new buildListenerArguments(Object, Channel, Message) one has been introduced instead. The new method helps listener to get Channel and Message arguments to do more, such as calling channel.basicReject(long, boolean) in manual acknowledge mode. The following listing shows the most basic example:

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

Now you could configure ExtendedListenerAdapter as same as MessageListenerAdapter if you need to receive “channel” and “message”. Parameters of listener should be set as buildListenerArguments(Object, Channel, Message) returned, as the following example of listener shows:

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

Container

Now that you have seen the various options for the Message-listening callback, we can turn our attention to the container. Basically, the container handles the “active” responsibilities so that the listener callback can remain passive. The container is an example of a “lifecycle” component. It provides methods for starting and stopping. When configuring the container, you essentially bridge the gap between an AMQP Queue and the MessageListener instance. You must provide a reference to the ConnectionFactory and the queue names or Queue instances from which that listener should consume messages.

Prior to version 2.0, there was one listener container, the SimpleMessageListenerContainer. There is now a second container, the DirectMessageListenerContainer. The differences between the containers and criteria you might apply when choosing which to use are described in Choosing a Container.

The following listing shows the most basic example, which works by using the, SimpleMessageListenerContainer:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

As an “active” component, it is most common to create the listener container with a bean definition so that it can run in the background. The following example shows one way to do so with XML:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

The following listing shows another way to do so with XML:

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

Both of the preceding examples create a DirectMessageListenerContainer (notice the type attribute — it defaults to simple).

Alternately, you may prefer to use Java configuration, which looks similar to the preceding code snippet:

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

Consumer Priority

Starting with RabbitMQ Version 3.2, the broker now supports consumer priority (see Using Consumer Priorities with RabbitMQ). This is enabled by setting the x-priority argument on the consumer. The SimpleMessageListenerContainer now supports setting consumer arguments, as the following example shows:

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

For convenience, the namespace provides the priority attribute on the listener element, as the following example shows:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

Starting with version 1.3, you can modify the queues on which the container listens at runtime. See Listener Container Queues.

auto-delete Queues

When a container is configured to listen to auto-delete queues, the queue has an x-expires option, or the Time-To-Live policy is configured on the Broker, the queue is removed by the broker when the container is stopped (that is, when the last consumer is cancelled). Before version 1.3, the container could not be restarted because the queue was missing. The RabbitAdmin only automatically redeclares queues and so on when the connection is closed or when it opens, which does not happen when the container is stopped and started.

Starting with version 1.3, the container uses a RabbitAdmin to redeclare any missing queues during startup.

You can also use conditional declaration (see Conditional Declaration) together with an auto-startup="false" admin to defer queue declaration until the container is started. The following example shows how to do so:

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

In this case, the queue and exchange are declared by containerAdmin, which has auto-startup="false" so that the elements are not declared during context initialization. Also, the container is not started for the same reason. When the container is later started, it uses its reference to containerAdmin to declare the elements.