Request/Reply Messaging
The AmqpTemplate
also provides a variety of sendAndReceive
methods that accept the same argument options that were described earlier for the one-way send operations (exchange
, routingKey
, and Message
).
Those methods are quite useful for request-reply scenarios, since they handle the configuration of the necessary reply-to
property before sending and can listen for the reply message on an exclusive queue that is created internally for that purpose.
Similar request-reply methods are also available where the MessageConverter
is applied to both the request and reply.
Those methods are named convertSendAndReceive
.
See the Javadoc of AmqpTemplate
for more detail.
Starting with version 1.5.0, each of the sendAndReceive
method variants has an overloaded version that takes CorrelationData
.
Together with a properly configured connection factory, this enables the receipt of publisher confirms for the send side of the operation.
See Correlated Publisher Confirms and Returns and the Javadoc for RabbitOperations
for more information.
Starting with version 2.0, there are variants of these methods (convertSendAndReceiveAsType
) that take an additional ParameterizedTypeReference
argument to convert complex returned types.
The template must be configured with a SmartMessageConverter
.
See Converting From a Message
With RabbitTemplate
for more information.
Starting with version 2.1, you can configure the RabbitTemplate
with the noLocalReplyConsumer
option to control a noLocal
flag for reply consumers.
This is false
by default.
Reply Timeout
By default, the send and receive methods timeout after five seconds and return null.
You can modify this behavior by setting the replyTimeout
property.
Starting with version 1.5, if you set the mandatory
property to true
(or the mandatory-expression
evaluates to true
for a particular message), if the message cannot be delivered to a queue, an AmqpMessageReturnedException
is thrown.
This exception has returnedMessage
, replyCode
, and replyText
properties, as well as the exchange
and routingKey
used for the send.
This feature uses publisher returns.
You can enable it by setting publisherReturns to true on the CachingConnectionFactory (see Publisher Confirms and Returns).
Also, you must not have registered your own ReturnCallback with the RabbitTemplate .
|
Starting with version 2.1.2, a replyTimedOut
method has been added, letting subclasses be informed of the timeout so that they can clean up any retained state.
Starting with versions 2.0.11 and 2.1.3, when you use the default DirectReplyToMessageListenerContainer
, you can add an error handler by setting the template’s replyErrorHandler
property.
This error handler is invoked for any failed deliveries, such as late replies and messages received without a correlation header.
The exception passed in is a ListenerExecutionFailedException
, which has a failedMessage
property.
RabbitMQ Direct reply-to
Starting with version 3.4.0, the RabbitMQ server supports direct reply-to.
This eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request).
Starting with Spring AMQP version 1.4.1 direct reply-to is used by default (if supported by the server) instead of creating temporary reply queues.
When no replyQueue is provided (or it is set with a name of amq.rabbitmq.reply-to ), the RabbitTemplate automatically detects whether direct reply-to is supported and either uses it or falls back to using a temporary reply queue.
When using direct reply-to, a reply-listener is not required and should not be configured.
|
Reply listeners are still supported with named queues (other than amq.rabbitmq.reply-to
), allowing control of reply concurrency and so on.
Starting with version 1.6, if you wish to use a temporary, exclusive, auto-delete queue for each
reply, set the useTemporaryReplyQueues
property to true
.
This property is ignored if you set a replyAddress
.
You can change the criteria that dictate whether to use direct reply-to by subclassing RabbitTemplate
and overriding useDirectReplyTo()
to check different criteria.
The method is called once only, when the first request is sent.
Prior to version 2.0, the RabbitTemplate
created a new consumer for each request and canceled the consumer when the reply was received (or timed out).
Now the template uses a DirectReplyToMessageListenerContainer
instead, letting the consumers be reused.
The template still takes care of correlating the replies, so there is no danger of a late reply going to a different sender.
If you want to revert to the previous behavior, set the useDirectReplyToContainer
(direct-reply-to-container
when using XML configuration) property to false.
The AsyncRabbitTemplate
has no such option.
It always used a DirectReplyToContainer
for replies when direct reply-to is used.
Starting with version 2.3.7, the template has a new property useChannelForCorrelation
.
When this is true
, the server does not have to copy the correlation id from the request message headers to the reply message.
Instead, the channel used to send the request is used to correlate the reply to the request.
Message Correlation With A Reply Queue
When using a fixed reply queue (other than amq.rabbitmq.reply-to
), you must provide correlation data so that replies can be correlated to requests.
See RabbitMQ Remote Procedure Call (RPC).
By default, the standard correlationId
property is used to hold the correlation data.
However, if you wish to use a custom property to hold correlation data, you can set the correlation-key
attribute on the <rabbit-template/>.
Explicitly setting the attribute to correlationId
is the same as omitting the attribute.
The client and server must use the same header for correlation data.
Spring AMQP version 1.1 used a custom property called spring_reply_correlation for this data.
If you wish to revert to this behavior with the current version (perhaps to maintain compatibility with another application using 1.1), you must set the attribute to spring_reply_correlation .
|
By default, the template generates its own correlation ID (ignoring any user-supplied value).
If you wish to use your own correlation ID, set the RabbitTemplate
instance’s userCorrelationId
property to true
.
The correlation ID must be unique to avoid the possibility of a wrong reply being returned for a request. |
Reply Listener Container
When using RabbitMQ versions prior to 3.4.0, a new temporary queue is used for each reply.
However, a single reply queue can be configured on the template, which can be more efficient and also lets you set arguments on that queue.
In this case, however, you must also provide a <reply-listener/> sub element.
This element provides a listener container for the reply queue, with the template being the listener.
All of the Message Listener Container Configuration attributes allowed on a <listener-container/> are allowed on the element, except for connection-factory
and message-converter
, which are inherited from the template’s configuration.
If you run multiple instances of your application or use multiple RabbitTemplate instances, you MUST use a unique reply queue for each.
RabbitMQ has no ability to select messages from a queue, so, if they all use the same queue, each instance would compete for replies and not necessarily receive their own.
|
The following example defines a rabbit template with a connection factory:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
While the container and template share a connection factory, they do not share a channel. Therefore, requests and replies are not performed within the same transaction (if transactional).
Prior to version 1.5.0, the reply-address attribute was not available.
Replies were always routed by using the default exchange and the reply-queue name as the routing key.
This is still the default, but you can now specify the new reply-address attribute.
The reply-address can contain an address with the form <exchange>/<routingKey> and the reply is routed to the specified exchange and routed to a queue bound with the routing key.
The reply-address has precedence over reply-queue .
When only reply-address is in use, the <reply-listener> must be configured as a separate <listener-container> component.
The reply-address and reply-queue (or queues attribute on the <listener-container> ) must refer to the same queue logically.
|
With this configuration, a SimpleListenerContainer
is used to receive the replies, with the RabbitTemplate
being the MessageListener
.
When defining a template with the <rabbit:template/>
namespace element, as shown in the preceding example, the parser defines the container and wires in the template as the listener.
When the template does not use a fixed replyQueue (or is using direct reply-to — see RabbitMQ Direct reply-to), a listener container is not needed.
Direct reply-to is the preferred mechanism when using RabbitMQ 3.4.0 or later.
|
If you define your RabbitTemplate
as a <bean/>
or use an @Configuration
class to define it as an @Bean
or when you create the template programmatically, you need to define and wire up the reply listener container yourself.
If you fail to do this, the template never receives the replies and eventually times out and returns null as the reply to a call to a sendAndReceive
method.
Starting with version 1.5, the RabbitTemplate
detects if it has been
configured as a MessageListener
to receive replies.
If not, attempts to send and receive messages with a reply address
fail with an IllegalStateException
(because the replies are never received).
Further, if a simple replyAddress
(queue name) is used, the reply listener container verifies that it is listening
to a queue with the same name.
This check cannot be performed if the reply address is an exchange and routing key and a debug log message is written.
When wiring the reply listener and template yourself, it is important to ensure that the template’s replyAddress and the container’s queues (or queueNames ) properties refer to the same queue.
The template inserts the reply address into the outbound message replyTo property.
|
The following listing shows examples of how to manually wire up the beans:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
A complete example of a RabbitTemplate
wired with a fixed reply queue, together with a “remote” listener container that handles the request and returns the reply is shown in this test case.
When the reply times out (replyTimeout ), the sendAndReceive() methods return null.
|
Prior to version 1.3.6, late replies for timed out messages were only logged.
Now, if a late reply is received, it is rejected (the template throws an AmqpRejectAndDontRequeueException
).
If the reply queue is configured to send rejected messages to a dead letter exchange, the reply can be retrieved for later analysis.
To do so, bind a queue to the configured dead letter exchange with a routing key equal to the reply queue’s name.
See the RabbitMQ Dead Letter Documentation for more information about configuring dead lettering.
You can also take a look at the FixedReplyQueueDeadLetterTests
test case for an example.
Async Rabbit Template
Version 1.6 introduced the AsyncRabbitTemplate
.
This has similar sendAndReceive
(and convertSendAndReceive
) methods to those on the AmqpTemplate
.
However, instead of blocking, they return a CompletableFuture
.
The sendAndReceive
methods return a RabbitMessageFuture
.
The convertSendAndReceive
methods return a RabbitConverterFuture
.
You can either synchronously retrieve the result later, by invoking get()
on the future, or you can register a callback that is called asynchronously with the result.
The following listing shows both approaches:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
If mandatory
is set and the message cannot be delivered, the future throws an ExecutionException
with a cause of AmqpMessageReturnedException
, which encapsulates the returned message and information about the return.
If enableConfirms
is set, the future has a property called confirm
, which is itself a CompletableFuture<Boolean>
with true
indicating a successful publish.
If the confirm future is false
, the RabbitFuture
has a further property called nackCause
, which contains the reason for the failure, if available.
The publisher confirm is discarded if it is received after the reply, since the reply implies a successful publish. |
You can set the receiveTimeout
property on the template to time out replies (it defaults to 30000
- 30 seconds).
If a timeout occurs, the future is completed with an AmqpReplyTimeoutException
.
The template implements SmartLifecycle
.
Stopping the template while there are pending replies causes the pending Future
instances to be canceled.
Starting with version 2.0, the asynchronous template now supports direct reply-to instead of a configured reply queue. To enable this feature, use one of the following constructors:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
See RabbitMQ Direct reply-to to use direct reply-to with the synchronous RabbitTemplate
.
Version 2.0 introduced variants of these methods (convertSendAndReceiveAsType
) that take an additional ParameterizedTypeReference
argument to convert complex returned types.
You must configure the underlying RabbitTemplate
with a SmartMessageConverter
.
See Converting From a Message
With RabbitTemplate
for more information.