Sending Messages
When sending a message, you can use any of the following methods:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
We can begin our discussion with the last method in the preceding listing, since it is actually the most explicit.
It lets an AMQP exchange name (along with a routing key) be provided at runtime.
The last parameter is the callback that is responsible for actual creating the message instance.
An example of using this method to send a message might look like this:
The following example shows how to use the send
method to send a message:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
You can set the exchange
property on the template itself if you plan to use that template instance to send to the same exchange most or all of the time.
In such cases, you can use the second method in the preceding listing.
The following example is functionally equivalent to the previous example:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
If both the exchange
and routingKey
properties are set on the template, you can use the method that accepts only the Message
.
The following example shows how to do so:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
A better way of thinking about the exchange and routing key properties is that the explicit method parameters always override the template’s default values.
In fact, even if you do not explicitly set those properties on the template, there are always default values in place.
In both cases, the default is an empty String
, but that is actually a sensible default.
As far as the routing key is concerned, it is not always necessary in the first place (for example, for
a Fanout
exchange).
Furthermore, a queue may be bound to an exchange with an empty String
.
Those are both legitimate scenarios for reliance on the default empty String
value for the routing key property of the template.
As far as the exchange name is concerned, the empty String
is commonly used because the AMQP specification defines the “default exchange” as having no name.
Since all queues are automatically bound to that default exchange (which is a direct exchange), using their name as the binding value, the second method in the preceding listing can be used for simple point-to-point messaging to any queue through the default exchange.
You can provide the queue name as the routingKey
, either by providing the method parameter at runtime.
The following example shows how to do so:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
Alternately, you can create a template that can be used for publishing primarily or exclusively to a single Queue. The following example shows how to do so:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
Message Builder API
Starting with version 1.3, a message builder API is provided by the MessageBuilder
and MessagePropertiesBuilder
.
These methods provide a convenient “fluent” means of creating a message or message properties.
The following examples show the fluent API in action:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
Each of the properties defined on the MessageProperties
can be set.
Other methods include setHeader(String key, String value)
, removeHeader(String key)
, removeHeaders()
, and copyProperties(MessageProperties properties)
.
Each property setting method has a set*IfAbsent()
variant.
In the cases where a default initial value exists, the method is named set*IfAbsentOrDefault()
.
Five static methods are provided to create an initial message builder:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | The message created by the builder has a body that is a direct reference to the argument. |
2 | The message created by the builder has a body that is a new array containing a copy of bytes in the argument. |
3 | The message created by the builder has a body that is a new array containing the range of bytes from the argument.
See Arrays.copyOfRange() for more details. |
4 | The message created by the builder has a body that is a direct reference to the body of the argument.
The argument’s properties are copied to a new MessageProperties object. |
5 | The message created by the builder has a body that is a new array containing a copy of the argument’s body.
The argument’s properties are copied to a new MessageProperties object. |
Three static methods are provided to create a MessagePropertiesBuilder
instance:
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | A new message properties object is initialized with default values. |
2 | The builder is initialized with, and build() will return, the provided properties object., |
3 | The argument’s properties are copied to a new MessageProperties object. |
With the RabbitTemplate
implementation of AmqpTemplate
, each of the send()
methods has an overloaded version that takes an additional CorrelationData
object.
When publisher confirms are enabled, this object is returned in the callback described in AmqpTemplate
.
This lets the sender correlate a confirm (ack
or nack
) with the sent message.
Starting with version 1.6.7, the CorrelationAwareMessagePostProcessor
interface was introduced, allowing the correlation data to be modified after the message has been converted.
The following example shows how to use it:
Message postProcessMessage(Message message, Correlation correlation);
In version 2.0, this interface is deprecated.
The method has been moved to MessagePostProcessor
with a default implementation that delegates to postProcessMessage(Message message)
.
Also starting with version 1.6.7, a new callback interface called CorrelationDataPostProcessor
is provided.
This is invoked after all MessagePostProcessor
instances (provided in the send()
method as well as those provided in setBeforePublishPostProcessors()
).
Implementations can update or replace the correlation data supplied in the send()
method (if any).
The Message
and original CorrelationData
(if any) are provided as arguments.
The following example shows how to use the postProcess
method:
CorrelationData postProcess(Message message, CorrelationData correlationData);
Publisher Returns
When the template’s mandatory
property is true
, returned messages are provided by the callback described in AmqpTemplate
.
Starting with version 1.4, the RabbitTemplate
supports the SpEL mandatoryExpression
property, which is evaluated against each request message as the root evaluation object, resolving to a boolean
value.
Bean references, such as @myBean.isMandatory(#root)
, can be used in the expression.
Publisher returns can also be used internally by the RabbitTemplate
in send and receive operations.
See Reply Timeout for more information.
Batching
Version 1.4.2 introduced the BatchingRabbitTemplate
.
This is a subclass of RabbitTemplate
with an overridden send
method that batches messages according to the BatchingStrategy
.
Only when a batch is complete is the message sent to RabbitMQ.
The following listing shows the BatchingStrategy
interface definition:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
Batched data is held in memory. Unsent messages can be lost in the event of a system failure. |
A SimpleBatchingStrategy
is provided.
It supports sending messages to a single exchange or routing key.
It has the following properties:
-
batchSize
: The number of messages in a batch before it is sent. -
bufferLimit
: The maximum size of the batched message. This preempts thebatchSize
, if exceeded, and causes a partial batch to be sent. -
timeout
: A time after which a partial batch is sent when there is no new activity adding messages to the batch.
The SimpleBatchingStrategy
formats the batch by preceding each embedded message with a four-byte binary length.
This is communicated to the receiving system by setting the springBatchFormat
message property to lengthHeader4
.
Batched messages are automatically de-batched by listener containers by default (by using the springBatchFormat message header).
Rejecting any message from a batch causes the entire batch to be rejected.
|
However, see @RabbitListener with Batching for more information.