Sending Messages

When sending a message, you can use any of the following methods:

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

We can begin our discussion with the last method in the preceding listing, since it is actually the most explicit. It lets an AMQP exchange name (along with a routing key) be provided at runtime. The last parameter is the callback that is responsible for actual creating the message instance. An example of using this method to send a message might look like this: The following example shows how to use the send method to send a message:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

You can set the exchange property on the template itself if you plan to use that template instance to send to the same exchange most or all of the time. In such cases, you can use the second method in the preceding listing. The following example is functionally equivalent to the previous example:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

If both the exchange and routingKey properties are set on the template, you can use the method that accepts only the Message. The following example shows how to do so:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

A better way of thinking about the exchange and routing key properties is that the explicit method parameters always override the template’s default values. In fact, even if you do not explicitly set those properties on the template, there are always default values in place. In both cases, the default is an empty String, but that is actually a sensible default. As far as the routing key is concerned, it is not always necessary in the first place (for example, for a Fanout exchange). Furthermore, a queue may be bound to an exchange with an empty String. Those are both legitimate scenarios for reliance on the default empty String value for the routing key property of the template. As far as the exchange name is concerned, the empty String is commonly used because the AMQP specification defines the “default exchange” as having no name. Since all queues are automatically bound to that default exchange (which is a direct exchange), using their name as the binding value, the second method in the preceding listing can be used for simple point-to-point messaging to any queue through the default exchange. You can provide the queue name as the routingKey, either by providing the method parameter at runtime. The following example shows how to do so:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

Alternately, you can create a template that can be used for publishing primarily or exclusively to a single Queue. The following example shows how to do so:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

Message Builder API

Starting with version 1.3, a message builder API is provided by the MessageBuilder and MessagePropertiesBuilder. These methods provide a convenient “fluent” means of creating a message or message properties. The following examples show the fluent API in action:

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

Each of the properties defined on the MessageProperties can be set. Other methods include setHeader(String key, String value), removeHeader(String key), removeHeaders(), and copyProperties(MessageProperties properties). Each property setting method has a set*IfAbsent() variant. In the cases where a default initial value exists, the method is named set*IfAbsentOrDefault().

Five static methods are provided to create an initial message builder:

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1 The message created by the builder has a body that is a direct reference to the argument.
2 The message created by the builder has a body that is a new array containing a copy of bytes in the argument.
3 The message created by the builder has a body that is a new array containing the range of bytes from the argument. See Arrays.copyOfRange() for more details.
4 The message created by the builder has a body that is a direct reference to the body of the argument. The argument’s properties are copied to a new MessageProperties object.
5 The message created by the builder has a body that is a new array containing a copy of the argument’s body. The argument’s properties are copied to a new MessageProperties object.

Three static methods are provided to create a MessagePropertiesBuilder instance:

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 A new message properties object is initialized with default values.
2 The builder is initialized with, and build() will return, the provided properties object.,
3 The argument’s properties are copied to a new MessageProperties object.

With the RabbitTemplate implementation of AmqpTemplate, each of the send() methods has an overloaded version that takes an additional CorrelationData object. When publisher confirms are enabled, this object is returned in the callback described in AmqpTemplate. This lets the sender correlate a confirm (ack or nack) with the sent message.

Starting with version 1.6.7, the CorrelationAwareMessagePostProcessor interface was introduced, allowing the correlation data to be modified after the message has been converted. The following example shows how to use it:

Message postProcessMessage(Message message, Correlation correlation);

In version 2.0, this interface is deprecated. The method has been moved to MessagePostProcessor with a default implementation that delegates to postProcessMessage(Message message).

Also starting with version 1.6.7, a new callback interface called CorrelationDataPostProcessor is provided. This is invoked after all MessagePostProcessor instances (provided in the send() method as well as those provided in setBeforePublishPostProcessors()). Implementations can update or replace the correlation data supplied in the send() method (if any). The Message and original CorrelationData (if any) are provided as arguments. The following example shows how to use the postProcess method:

CorrelationData postProcess(Message message, CorrelationData correlationData);

Publisher Returns

When the template’s mandatory property is true, returned messages are provided by the callback described in AmqpTemplate.

Starting with version 1.4, the RabbitTemplate supports the SpEL mandatoryExpression property, which is evaluated against each request message as the root evaluation object, resolving to a boolean value. Bean references, such as @myBean.isMandatory(#root), can be used in the expression.

Publisher returns can also be used internally by the RabbitTemplate in send and receive operations. See Reply Timeout for more information.

Batching

Version 1.4.2 introduced the BatchingRabbitTemplate. This is a subclass of RabbitTemplate with an overridden send method that batches messages according to the BatchingStrategy. Only when a batch is complete is the message sent to RabbitMQ. The following listing shows the BatchingStrategy interface definition:

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
Batched data is held in memory. Unsent messages can be lost in the event of a system failure.

A SimpleBatchingStrategy is provided. It supports sending messages to a single exchange or routing key. It has the following properties:

  • batchSize: The number of messages in a batch before it is sent.

  • bufferLimit: The maximum size of the batched message. This preempts the batchSize, if exceeded, and causes a partial batch to be sent.

  • timeout: A time after which a partial batch is sent when there is no new activity adding messages to the batch.

The SimpleBatchingStrategy formats the batch by preceding each embedded message with a four-byte binary length. This is communicated to the receiving system by setting the springBatchFormat message property to lengthHeader4.

Batched messages are automatically de-batched by listener containers by default (by using the springBatchFormat message header). Rejecting any message from a batch causes the entire batch to be rejected.

However, see @RabbitListener with Batching for more information.