Reply Management

The existing support in MessageListenerAdapter already lets your method have a non-void return type. When that is the case, the result of the invocation is encapsulated in a message sent to the address specified in the ReplyToAddress header of the original message, or to the default address configured on the listener. You can set that default address by using the @SendTo annotation of the messaging abstraction.

Assuming our processOrder method should now return an OrderStatus, we can write it as follows to automatically send a reply:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

If you need to set additional headers in a transport-independent manner, you could return a Message instead, something like the following:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

Alternatively, you can use a MessagePostProcessor in the beforeSendReplyMessagePostProcessors container factory property to add more headers. Starting with version 2.2.3, the called bean/method is made available in the reply message, which can be used in a message post processor to communicate the information back to the caller:

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

Starting with version 2.2.5, you can configure a ReplyPostProcessor to modify the reply message before it is sent; it is called after the correlationId header has been set up to match the request.

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

Starting with version 3.0, you can configure the post processor on the container factory instead of on the annotation.

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

The id parameter is the listener id.

A setting on the annotation will supersede the factory setting.

The @SendTo value is assumed as a reply exchange and routingKey pair that follows the exchange/routingKey pattern, where one of those parts can be omitted. The valid values are as follows:

  • thing1/thing2: The replyTo exchange and the routingKey. thing1/: The replyTo exchange and the default (empty) routingKey. thing2 or /thing2: The replyTo routingKey and the default (empty) exchange. / or empty: The replyTo default exchange and the default routingKey.

Also, you can use @SendTo without a value attribute. This case is equal to an empty sendTo pattern. @SendTo is used only if the inbound message does not have a replyToAddress property.

Starting with version 1.5, the @SendTo value can be a bean initialization SpEL Expression, as shown in the following example:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

The expression must evaluate to a String, which can be a simple queue name (sent to the default exchange) or with the form exchange/routingKey as discussed prior to the preceding example.

The #{…​} expression is evaluated once, during initialization.

For dynamic reply routing, the message sender should include a reply_to message property or use the alternate runtime SpEL expression (described after the next example).

Starting with version 1.6, the @SendTo can be a SpEL expression that is evaluated at runtime against the request and reply, as the following example shows:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

The runtime nature of the SpEL expression is indicated with !{…​} delimiters. The evaluation context #root object for the expression has three properties:

  • request: The o.s.amqp.core.Message request object.

  • source: The o.s.messaging.Message<?> after conversion.

  • result: The method result.

The context has a map property accessor, a standard type converter, and a bean resolver, which lets other beans in the context be referenced (for example, @someBeanName.determineReplyQ(request, result)).

In summary, #{…​} is evaluated once during initialization, with the #root object being the application context. Beans are referenced by their names. !{…​} is evaluated at runtime for each message, with the root object having the properties listed earlier. Beans are referenced with their names, prefixed by @.

Starting with version 2.1, simple property placeholders are also supported (for example, ${some.reply.to}). With earlier versions, the following can be used as a work around, as the following example shows:

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}