Reply Management
The existing support in MessageListenerAdapter
already lets your method have a non-void return type.
When that is the case, the result of the invocation is encapsulated in a message sent to the address specified in the ReplyToAddress
header of the original message, or to the default address configured on the listener.
You can set that default address by using the @SendTo
annotation of the messaging abstraction.
Assuming our processOrder
method should now return an OrderStatus
, we can write it as follows to automatically send a reply:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
If you need to set additional headers in a transport-independent manner, you could return a Message
instead, something like the following:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
Alternatively, you can use a MessagePostProcessor
in the beforeSendReplyMessagePostProcessors
container factory property to add more headers.
Starting with version 2.2.3, the called bean/method is made available in the reply message, which can be used in a message post processor to communicate the information back to the caller:
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
Starting with version 2.2.5, you can configure a ReplyPostProcessor
to modify the reply message before it is sent; it is called after the correlationId
header has been set up to match the request.
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
Starting with version 3.0, you can configure the post processor on the container factory instead of on the annotation.
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
The id
parameter is the listener id.
A setting on the annotation will supersede the factory setting.
The @SendTo
value is assumed as a reply exchange
and routingKey
pair that follows the exchange/routingKey
pattern,
where one of those parts can be omitted.
The valid values are as follows:
-
thing1/thing2
: ThereplyTo
exchange and theroutingKey
.thing1/
: ThereplyTo
exchange and the default (empty)routingKey
.thing2
or/thing2
: ThereplyTo
routingKey
and the default (empty) exchange./
or empty: ThereplyTo
default exchange and the defaultroutingKey
.
Also, you can use @SendTo
without a value
attribute.
This case is equal to an empty sendTo
pattern.
@SendTo
is used only if the inbound message does not have a replyToAddress
property.
Starting with version 1.5, the @SendTo
value can be a bean initialization SpEL Expression, as shown in the following example:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
The expression must evaluate to a String
, which can be a simple queue name (sent to the default exchange) or with
the form exchange/routingKey
as discussed prior to the preceding example.
The #{…} expression is evaluated once, during initialization.
|
For dynamic reply routing, the message sender should include a reply_to
message property or use the alternate
runtime SpEL expression (described after the next example).
Starting with version 1.6, the @SendTo
can be a SpEL expression that is evaluated at runtime against the request
and reply, as the following example shows:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
The runtime nature of the SpEL expression is indicated with !{…}
delimiters.
The evaluation context #root
object for the expression has three properties:
-
request
: Theo.s.amqp.core.Message
request object. -
source
: Theo.s.messaging.Message<?>
after conversion. -
result
: The method result.
The context has a map property accessor, a standard type converter, and a bean resolver, which lets other beans in the
context be referenced (for example, @someBeanName.determineReplyQ(request, result)
).
In summary, #{…}
is evaluated once during initialization, with the #root
object being the application context.
Beans are referenced by their names.
!{…}
is evaluated at runtime for each message, with the root object having the properties listed earlier.
Beans are referenced with their names, prefixed by @
.
Starting with version 2.1, simple property placeholders are also supported (for example, ${some.reply.to}
).
With earlier versions, the following can be used as a work around, as the following example shows:
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}