Request/Reply Messaging

The AmqpTemplate also provides a variety of sendAndReceive methods that accept the same argument options that were described earlier for the one-way send operations (exchange, routingKey, and Message). Those methods are quite useful for request-reply scenarios, since they handle the configuration of the necessary reply-to property before sending and can listen for the reply message on an exclusive queue that is created internally for that purpose.

Similar request-reply methods are also available where the MessageConverter is applied to both the request and reply. Those methods are named convertSendAndReceive. See the Javadoc of AmqpTemplate for more detail.

Starting with version 1.5.0, each of the sendAndReceive method variants has an overloaded version that takes CorrelationData. Together with a properly configured connection factory, this enables the receipt of publisher confirms for the send side of the operation. See Correlated Publisher Confirms and Returns and the Javadoc for RabbitOperations for more information.

Starting with version 2.0, there are variants of these methods (convertSendAndReceiveAsType) that take an additional ParameterizedTypeReference argument to convert complex returned types. The template must be configured with a SmartMessageConverter. See Converting From a Message With RabbitTemplate for more information.

Starting with version 2.1, you can configure the RabbitTemplate with the noLocalReplyConsumer option to control a noLocal flag for reply consumers. This is false by default.

Reply Timeout

By default, the send and receive methods timeout after five seconds and return null. You can modify this behavior by setting the replyTimeout property. Starting with version 1.5, if you set the mandatory property to true (or the mandatory-expression evaluates to true for a particular message), if the message cannot be delivered to a queue, an AmqpMessageReturnedException is thrown. This exception has returnedMessage, replyCode, and replyText properties, as well as the exchange and routingKey used for the send.

This feature uses publisher returns. You can enable it by setting publisherReturns to true on the CachingConnectionFactory (see Publisher Confirms and Returns). Also, you must not have registered your own ReturnCallback with the RabbitTemplate.

Starting with version 2.1.2, a replyTimedOut method has been added, letting subclasses be informed of the timeout so that they can clean up any retained state.

Starting with versions 2.0.11 and 2.1.3, when you use the default DirectReplyToMessageListenerContainer, you can add an error handler by setting the template’s replyErrorHandler property. This error handler is invoked for any failed deliveries, such as late replies and messages received without a correlation header. The exception passed in is a ListenerExecutionFailedException, which has a failedMessage property.

RabbitMQ Direct reply-to

Starting with version 3.4.0, the RabbitMQ server supports direct reply-to. This eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 direct reply-to is used by default (if supported by the server) instead of creating temporary reply queues. When no replyQueue is provided (or it is set with a name of amq.rabbitmq.reply-to), the RabbitTemplate automatically detects whether direct reply-to is supported and either uses it or falls back to using a temporary reply queue. When using direct reply-to, a reply-listener is not required and should not be configured.

Reply listeners are still supported with named queues (other than amq.rabbitmq.reply-to), allowing control of reply concurrency and so on.

Starting with version 1.6, if you wish to use a temporary, exclusive, auto-delete queue for each reply, set the useTemporaryReplyQueues property to true. This property is ignored if you set a replyAddress.

You can change the criteria that dictate whether to use direct reply-to by subclassing RabbitTemplate and overriding useDirectReplyTo() to check different criteria. The method is called once only, when the first request is sent.

Prior to version 2.0, the RabbitTemplate created a new consumer for each request and canceled the consumer when the reply was received (or timed out). Now the template uses a DirectReplyToMessageListenerContainer instead, letting the consumers be reused. The template still takes care of correlating the replies, so there is no danger of a late reply going to a different sender. If you want to revert to the previous behavior, set the useDirectReplyToContainer (direct-reply-to-container when using XML configuration) property to false.

The AsyncRabbitTemplate has no such option. It always used a DirectReplyToContainer for replies when direct reply-to is used.

Starting with version 2.3.7, the template has a new property useChannelForCorrelation. When this is true, the server does not have to copy the correlation id from the request message headers to the reply message. Instead, the channel used to send the request is used to correlate the reply to the request.

Message Correlation With A Reply Queue

When using a fixed reply queue (other than amq.rabbitmq.reply-to), you must provide correlation data so that replies can be correlated to requests. See RabbitMQ Remote Procedure Call (RPC). By default, the standard correlationId property is used to hold the correlation data. However, if you wish to use a custom property to hold correlation data, you can set the correlation-key attribute on the <rabbit-template/>. Explicitly setting the attribute to correlationId is the same as omitting the attribute. The client and server must use the same header for correlation data.

Spring AMQP version 1.1 used a custom property called spring_reply_correlation for this data. If you wish to revert to this behavior with the current version (perhaps to maintain compatibility with another application using 1.1), you must set the attribute to spring_reply_correlation.

By default, the template generates its own correlation ID (ignoring any user-supplied value). If you wish to use your own correlation ID, set the RabbitTemplate instance’s userCorrelationId property to true.

The correlation ID must be unique to avoid the possibility of a wrong reply being returned for a request.

Reply Listener Container

When using RabbitMQ versions prior to 3.4.0, a new temporary queue is used for each reply. However, a single reply queue can be configured on the template, which can be more efficient and also lets you set arguments on that queue. In this case, however, you must also provide a <reply-listener/> sub element. This element provides a listener container for the reply queue, with the template being the listener. All of the Message Listener Container Configuration attributes allowed on a <listener-container/> are allowed on the element, except for connection-factory and message-converter, which are inherited from the template’s configuration.

If you run multiple instances of your application or use multiple RabbitTemplate instances, you MUST use a unique reply queue for each. RabbitMQ has no ability to select messages from a queue, so, if they all use the same queue, each instance would compete for replies and not necessarily receive their own.

The following example defines a rabbit template with a connection factory:

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

While the container and template share a connection factory, they do not share a channel. Therefore, requests and replies are not performed within the same transaction (if transactional).

Prior to version 1.5.0, the reply-address attribute was not available. Replies were always routed by using the default exchange and the reply-queue name as the routing key. This is still the default, but you can now specify the new reply-address attribute. The reply-address can contain an address with the form <exchange>/<routingKey> and the reply is routed to the specified exchange and routed to a queue bound with the routing key. The reply-address has precedence over reply-queue. When only reply-address is in use, the <reply-listener> must be configured as a separate <listener-container> component. The reply-address and reply-queue (or queues attribute on the <listener-container>) must refer to the same queue logically.

With this configuration, a SimpleListenerContainer is used to receive the replies, with the RabbitTemplate being the MessageListener. When defining a template with the <rabbit:template/> namespace element, as shown in the preceding example, the parser defines the container and wires in the template as the listener.

When the template does not use a fixed replyQueue (or is using direct reply-to — see RabbitMQ Direct reply-to), a listener container is not needed. Direct reply-to is the preferred mechanism when using RabbitMQ 3.4.0 or later.

If you define your RabbitTemplate as a <bean/> or use an @Configuration class to define it as an @Bean or when you create the template programmatically, you need to define and wire up the reply listener container yourself. If you fail to do this, the template never receives the replies and eventually times out and returns null as the reply to a call to a sendAndReceive method.

Starting with version 1.5, the RabbitTemplate detects if it has been configured as a MessageListener to receive replies. If not, attempts to send and receive messages with a reply address fail with an IllegalStateException (because the replies are never received).

Further, if a simple replyAddress (queue name) is used, the reply listener container verifies that it is listening to a queue with the same name. This check cannot be performed if the reply address is an exchange and routing key and a debug log message is written.

When wiring the reply listener and template yourself, it is important to ensure that the template’s replyAddress and the container’s queues (or queueNames) properties refer to the same queue. The template inserts the reply address into the outbound message replyTo property.

The following listing shows examples of how to manually wire up the beans:

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

A complete example of a RabbitTemplate wired with a fixed reply queue, together with a “remote” listener container that handles the request and returns the reply is shown in this test case.

When the reply times out (replyTimeout), the sendAndReceive() methods return null.

Prior to version 1.3.6, late replies for timed out messages were only logged. Now, if a late reply is received, it is rejected (the template throws an AmqpRejectAndDontRequeueException). If the reply queue is configured to send rejected messages to a dead letter exchange, the reply can be retrieved for later analysis. To do so, bind a queue to the configured dead letter exchange with a routing key equal to the reply queue’s name.

See the RabbitMQ Dead Letter Documentation for more information about configuring dead lettering. You can also take a look at the FixedReplyQueueDeadLetterTests test case for an example.

Async Rabbit Template

Version 1.6 introduced the AsyncRabbitTemplate. This has similar sendAndReceive (and convertSendAndReceive) methods to those on the AmqpTemplate. However, instead of blocking, they return a CompletableFuture.

The sendAndReceive methods return a RabbitMessageFuture. The convertSendAndReceive methods return a RabbitConverterFuture.

You can either synchronously retrieve the result later, by invoking get() on the future, or you can register a callback that is called asynchronously with the result. The following listing shows both approaches:

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

If mandatory is set and the message cannot be delivered, the future throws an ExecutionException with a cause of AmqpMessageReturnedException, which encapsulates the returned message and information about the return.

If enableConfirms is set, the future has a property called confirm, which is itself a CompletableFuture<Boolean> with true indicating a successful publish. If the confirm future is false, the RabbitFuture has a further property called nackCause, which contains the reason for the failure, if available.

The publisher confirm is discarded if it is received after the reply, since the reply implies a successful publish.

You can set the receiveTimeout property on the template to time out replies (it defaults to 30000 - 30 seconds). If a timeout occurs, the future is completed with an AmqpReplyTimeoutException.

The template implements SmartLifecycle. Stopping the template while there are pending replies causes the pending Future instances to be canceled.

Starting with version 2.0, the asynchronous template now supports direct reply-to instead of a configured reply queue. To enable this feature, use one of the following constructors:

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

See RabbitMQ Direct reply-to to use direct reply-to with the synchronous RabbitTemplate.

Version 2.0 introduced variants of these methods (convertSendAndReceiveAsType) that take an additional ParameterizedTypeReference argument to convert complex returned types. You must configure the underlying RabbitTemplate with a SmartMessageConverter. See Converting From a Message With RabbitTemplate for more information.

Starting with version 3.0, the AsyncRabbitTemplate methods now return CompletableFuture s instead of ListenableFuture s.

Spring Remoting with AMQP

Spring remoting is no longer supported because the functionality has been removed from Spring Framework.

Use sendAndReceive operations using the RabbitTemplate (client side ) and @RabbitListener instead.