In this chapter, we will explore the interfaces and classes that are the essential components for developing applications with Spring AMQP.
Spring AMQP consists of a handful of modules, each represented by a
JAR in the distribution. These modules are: spring-amqp, spring-rabbit and
spring-erlang. The 'spring-amqp' module contains the
org.springframework.amqp.core
package. Within that
package, you will find the classes that represent the core AMQP "model".
Our intention is to provide generic abstractions that do not rely on any
particular AMQP broker implementation or client library. End user code
will be more portable across vendor implementations as it can be developed
against the abstraction layer only. These abstractions are then used
implemented by broker-specific modules, such as 'spring-rabbit'. For the
1.0 release there is only a RabbitMQ implementation however the
abstractions have been validated in .NET using Apache Qpid in addition to
RabbitMQ. Since AMQP operates at the protocol level in principle the
RabbitMQ client can be used with any broker that supports the same
protocol version, but we do not test any other brokers at present.
The overview here assumes that you are already familiar with the basics of the AMQP specification already. If you are not, then have a look at the resources listed in Part IV, “Other Resources”
The 0-8 and 0-9-1 AMQP specifications do not define a Message
class or interface. Instead, when performing an operation such as
'
', the content is passed as a byte-array argument and
additional properties are passed in as separate arguments. Spring AMQP
defines a Message class as part of a more general AMQP domain model
representation. The purpose of the Message class is to simply
encapsulate the body and properties within a single instance so that the
API can in turn be simpler. The Message class definition is quite
straightforward.basicPublish
public class Message { private final MessageProperties messageProperties; private final byte[] body; public Message(byte[] body, MessageProperties messageProperties) { this.body = body; this.messageProperties = messageProperties; } public byte[] getBody() { return this.body; } public MessageProperties getMessageProperties() { return this.messageProperties; } }
The MessageProperties
interface
defines several common properties such as 'messageId', 'timestamp',
'contentType', and several more. Those properties can also be extended
with user-defined 'headers' by calling the setHeader(String
key, Object value)
method.
The Exchange
interface represents
an AMQP Exchange, which is what a Message Producer sends to. Each
Exchange within a virtual host of a broker will have a unique name as
well as a few other properties:
public interface Exchange { String getName(); String getExchangeType(); boolean isDurable(); boolean isAutoDelete(); Map<String, Object> getArguments(); }
As you can see, an Exchange also has a 'type' represented by constants
defined in ExchangeTypes
. The basic types are:
Direct
, Topic
,
Fanout
, and Headers
.
In the core package you will find
implementations of the Exchange
interface
for each of those types. The behavior varies across these Exchange types
in terms of how they handle bindings to Queues. For example, a Direct exchange allows
for a Queue to be bound by a fixed routing key (often the Queue's name).
A Topic exchange supports bindings with routing patterns that may
include the '*' and '#' wildcards for 'exactly-one' and 'zero-or-more',
respectively. The Fanout exchange publishes to all Queues that are bound
to it without taking any routing key into consideration. For much more
information about these and the other Exchange types, check out Part IV, “Other Resources”.
Note | |
---|---|
The AMQP specification also requires that any broker provide a "default" Direct Exchange that has no name. All Queues that are declared will be bound to that default Exchange with their names as routing keys. You will learn more about the default Exchange's usage within Spring AMQP in Section 3.3, “AmqpTemplate”. |
The Queue
class represents the component
from which a Message Consumer receives Messages. Like the various
Exchange classes, our implementation is intended to be an abstract
representation of this core AMQP type.
public class Queue { private final String name; private volatile boolean durable; private volatile boolean exclusive; private volatile boolean autoDelete; private volatile Map<String, Object> arguments; /** * The queue is durable, non-exclusive and non auto-delete. * * @param name the name of the queue. */ public Queue(String name) { this(name, true, false, false); } // Getters and Setters omitted for brevity
Notice that the constructor takes the Queue name. Depending on the implementation, the admin template may provide methods for generating a uniquely named Queue. Such Queues can be useful as a "reply-to" address or other temporary situations. For that reason, the 'exclusive' and 'autoDelete' properties of an auto-generated Queue would both be set to 'true'.
Note | |
---|---|
See the section on queues in Section 3.8, “Configuring the broker” for information about declaring queues using namespace support, including queue arguments. |
Given that a producer sends to an Exchange and a consumer receives
from a Queue, the bindings that connect Queues to Exchanges are critical
for connecting those producers and consumers via messaging. In Spring
AMQP, we define a Binding
class to represent
those connections. Let's review the basic options for binding Queues to
Exchanges.
You can bind a Queue to a DirectExchange with a fixed routing key.
new Binding(someQueue, someDirectExchange, "foo.bar")
You can bind a Queue to a TopicExchange with a routing pattern.
new Binding(someQueue, someTopicExchange, "foo.*")
You can bind a Queue to a FanoutExchange with no routing key.
new Binding(someQueue, someFanoutExchange)
We also provide a BindingBuilder
to
facilitate a "fluent API" style.
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
Note | |
---|---|
The BindingBuilder class is shown above for clarity, but this style works well when using a static import for the 'bind()' method. |
By itself, an instance of the Binding class is just holding the
data about a connection. In other words, it is not an "active"
component. However, as you will see later in Section 3.8, “Configuring the broker”, Binding instances can be used by the
AmqpAdmin
class to actually trigger the
binding actions on the broker. Also, as you will see in that same
section, the Binding instances can be defined using Spring's
@Bean
-style within @Configuration
classes. There is also a convenient base class which further simplifies
that approach for generating AMQP-related bean definitions and
recognizes the Queues, Exchanges, and Bindings so that they will all be
declared on the AMQP broker upon application startup.
The AmqpTemplate
is also defined
within the core package. As one of the main components involved in actual
AMQP messaging, it is discussed in detail in its own section (see Section 3.3, “AmqpTemplate”).
Whereas the AMQP model we described in the previous section is generic and applicable to all implementations, when we get into the management of resources, the details are specific to the broker implementation. Therefore, in this section, we will be focusing on code that exists only within our "spring-rabbit" module since at this point, RabbitMQ is the only supported implementation.
The central component for managing a connection to the
RabbitMQ broker is the
ConnectionFactory
interface. The
responsibility of a
ConnectionFactory
implementation is
to provide an instance of
org.springframework.amqp.rabbit.connection.Connection
which is a wrapper for
com.rabbitmq.client.Connection
. The
only concrete implementation we provide is
CachingConnectionFactory
which, by default, establishes a
single connection proxy that can be shared by the application. Sharing
of the connection is possible since the "unit of work" for
messaging with AMQP is actually a "channel" (in some ways, this is
similar to the relationship between a Connection and a Session in
JMS). As you can imagine, the connection instance provides a
createChannel
method. The
CachingConnectionFactory
implementation
supports caching of those channels, and it maintains separate
caches for channels based on whether they are transactional or not.
When creating an instance of CachingConnectionFactory
,
the 'hostname' can be provided via the constructor. The 'username' and
'password' properties should be provided as well. If you would like to
configure the size of the channel cache (the default is 1), you could
call the setChannelCacheSize()
method here as well.
Starting with version 1.3, the CachingConnectionFactory
can be configured to cache connections as well as just channels. In this case, each call to
createConnection()
creates a new connection (or retrieves an idle one from the cache).
Closing a connection returns it to the cache (if the cache size has not been reached).
Channels created on such connections are cached too. The use of separate connections might be
useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to
connect to different cluster members.
Important | |
---|---|
When the cache mode is
Also, at the time of writing, the |
It is important to understand that the cache size is (by default) not a limit, but merely the number of channels that can be cached. With a cache size of, say, 10, any number of channels can actually be in use. If more than 10 channels are being used and they are all returned to the cache, 10 will go in the cache; the remainder will be physically closed.
Starting with version 1.4.2, the CachingConnectionFactory
has a property channelCheckoutTimeout
. When this property is greater than zero,
the channelCacheSize
becomes a limit on the number of channels that can be created
on a connection. If the limit is reached, calling threads will block until a channel is available
or this timeout is reached, in which case a AmqpTimeoutException
is thrown.
Caution | |
---|---|
Channels used within the framework (e.g. RabbitTemplate ) will be reliably
returned to the cache. If you create channels outside of the framework, (e.g. by accessing
the connection(s) directly and invoking createChannel() ), you must return them (by closing)
reliably, perhaps in a finally block, to avoid running out of channels.
|
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.createConnection();
When using XML, the configuration might look like this:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="somehost"/> <property name="username" value="guest"/> <property name="password" value="guest"/> </bean>
Note | |
---|---|
There is also a SingleConnectionFactory
implementation which is only available in the unit test code of the framework.
It is simpler than CachingConnectionFactory since it does
not cache channels, but it is not intended for practical usage
outside of simple tests due to its lack of performance and resilience.
If you find a need to implement your own ConnectionFactory
for some reason, the AbstractConnectionFactory
base class may provide a nice starting point. |
A ConnectionFactory
can
be created quickly and conveniently using the rabbit namespace:
<rabbit:connection-factory id="connectionFactory"/>
In most cases this
will be preferable since the framework can choose the best
defaults for you. The created instance will be a
CachingConnectionFactory
.
Keep in mind that the default cache size for channels is 1.
If you want more channels to be cached set a larger value via the
'channelCacheSize' property. In XML it would look like this:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="somehost"/> <property name="username" value="guest"/> <property name="password" value="guest"/> <property name="channelCacheSize" value="25"/> </bean>
And with the namespace you can just add the 'channel-cache-size' attribute:
<rabbit:connection-factory id="connectionFactory" channel-cache-size="25"/>
The default cache mode is CHANNEL, but you can configure it to cache
connections instead; in this case, we use connection-cache-size
:
<rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
Host and port attributes can be provided using the namespace
<rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>
Alternatively, if running in a clustered environment, use the addresses attribute.
<rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>
The CachingConnectionFactory
uses an instance of the Rabbit
client ConnectionFactory
; a number of configuration properties
are passed through (host, port, userName, password, requestedHeartBeat,
connectionTimeout
for
example) when setting the equivalent property on the CachingConnectionFactory
. To set other properties (clientProperties
for example),
define an instance of the rabbit factory and provide a reference to it using
the appropriate constructor of the CachingConnectionFactory
.
When using the namespace as described above, provide a reference to the configured
factory in the connection-factory
attribute. For convenience, a
factory bean is provided to assist in configuring the connection factory in a
Spring application context, as discussed in the next section.
<rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
Starting with version 1.4, a convenient RabbitConnectionFactoryBean
is provided to enable convenient configuration
of SSL properties on the underlying client connection factory, using dependency injection.
Other setters simply delegate to the underlying
factory. Previously you had to configure the SSL options programmatically.
<rabbit:connection-factory id="rabbitConnectionFactory" connection-factory="clientConnectionFactory" host="${host}" port="${port}" virtual-host="${vhost}" username="${username}" password="${password}" /> <bean id="clientConnectionFactory" class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean"> <property name="useSSL" value="true" /> <property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/> </bean>
Refer to the RabbitMQ Documentation
for information about configuring SSL. Omit the sslPropertiesLocation
property
to connect over SSL without certificate validation. When using certificate validation, the property
is a Spring Resource
pointing to a properties file containing the following
keys:
keyStore=file:/secret/keycert.p12 trustStore=file:/secret/trustStore keyStore.passPhrase=secret trustStore.passPhrase=secret
The keyStore
and truststore
are Spring Resources
pointing to the stores. Typically this properties file will be secured by the operating system
with the application having read access.
Starting with version 1.3, the AbstractRoutingConnectionFactory
has been introduced. This provides a mechanism to configure mappings for several
ConnectionFactories
and determine a target ConnectionFactory
by some lookupKey
at runtime. Typically, the implementation checks a thread-bound context.
For convenience, Spring AMQP provides the SimpleRoutingConnectionFactory
,
which gets the current thread-bound lookupKey
from the SimpleResourceHolder
:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory"> <property name="targetConnectionFactories"> <map> <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/> <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/> </map> </property> </bean> <rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService { @Autowired private RabbitTemplate rabbitTemplate; public void service(String vHost, String payload) { SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost); rabbitTemplate.convertAndSend(payload); SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory()); } }
It is important to unbind the resource after use.
For more information see the JavaDocs of AbstractRoutingConnectionFactory
.
Starting with version 1.4, the RabbitTemplate
supports
the SpEL sendConnectionFactorySelectorExpression
and
receiveConnectionFactorySelectorExpression
properties, which are evaluated on each AMQP
protocol interaction operation (send
, sendAndReceive
, receive
or
receiveAndReply
), resolving to a lookupKey
value for the provided
AbstractRoutingConnectionFactory
. Bean references, such as
"@vHostResolver.getVHost(#root)"
can be used in the expression. For send
operations, the Message to be sent is the root evaluation object; for receive
operations, the
queueName is the root evaluation object.
The routing algorithm is: If the selector expression
is null
, or is evaluated to null
, or the provided
ConnectionFactory
isn't an instance of
AbstractRoutingConnectionFactory
, everything works as before, relying on the provided
ConnectionFactory
implementation. The same occurs if the
evaluation result isn't null
, but there is no target
ConnectionFactory
for that lookupKey
and the
AbstractRoutingConnectionFactory
is configured with
lenientFallback = true
. Of course, in the case of an
AbstractRoutingConnectionFactory
it does fallback to its routing
implementation based on determineCurrentLookupKey()
. But, if
lenientFallback = false
, an IllegalStateException
is thrown.
The Namespace support also provides the send-connection-factory-selector-expression
and receive-connection-factory-selector-expression
attributes
on the <rabbit:template>
component.
Also starting with version 1.4, you can configure a routing connection factory
in a SimpleMessageListenerContainer
. In that case, the list of queue names
is used as the lookup key. For example, if you configure the container with
setQueueNames("foo, bar")
, the lookup key will be "[foo,bar]"
(no spaces).
Confirmed and returned messages are supported by setting the
CachingConnectionFactory
's publisherConfirms
and publisherReturns
properties to 'true' respectively.
Channel
s created by
the factory are wrapped in an PublisherCallbackChannel
which is used to facilitate the callbacks. When such a channel is obtained, the
client can register a PublisherCallbackChannel.Listener
with the Channel
. The
PublisherCallbackChannel
implementation contains logic
to route a confirm/return to the appropriate listener.
These features are explained further in the following sections.
Tip | |
---|---|
For some more background information, please see the following blog post by the RabbitMQ team titled Introducing Publisher Confirms. |
As with many other high-level abstractions provided by the Spring
Framework and related projects, Spring AMQP provides a "template" that
plays a central role. The interface that defines the main operations is
called AmqpTemplate
. Those operations cover
the general behavior for sending and receiving Messages. In other words,
they are not unique to any implementation, hence the "AMQP" in the name.
On the other hand, there are implementations of that interface that are
tied to implementations of the AMQP protocol. Unlike JMS, which is an
interface-level API itself, AMQP is a wire-level protocol. The
implementations of that protocol provide their own client libraries, so
each implementation of the template interface will depend on a particular
client library. Currently, there is only a single implementation:
RabbitTemplate
. In the examples that follow, you
will often see usage of an "AmqpTemplate", but when you look at the
configuration examples, or any code excerpts where the template is
instantiated and/or setters are invoked, you will see the implementation
type (e.g. "RabbitTemplate").
As mentioned above, the AmqpTemplate
interface defines all of the basic operations for sending and receiving
Messages. We will explore Message sending and reception, respectively, in
the two sections that follow.
Starting with version 1.3 you can now configure the
RabbitTemplate
to use a RetryTemplate
to help with handling problems with broker connectivity. Refer to the
spring-retry
project for complete information; the following is just one example that uses
an exponential back off policy and the default SimpleRetryPolicy
which will make three attempts before throwing the exception to the caller.
Using the XML namespace:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500" /> <property name="multiplier" value="10.0" /> <property name="maxInterval" value="10000" /> </bean> </property> </bean>
Using @Configuration
:
@Bean public AmqpTemplate rabbitTemplate(); RabbitTemplate template = new RabbitTemplate(connectionFactory()); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; }
Starting with version 1.4, in addition to the retryTemplate
property,
the recoveryCallback
option is supported on the RabbitTemplate
.
It is used as a second argument for the
RetryTemplate.execute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T>recoveryCallback)
.
Note | |
---|---|
The RecoveryCallback is somewhat limited in that the retry context
only contains the lastThrowable field. For more sophisticated use cases, you should
use an external RetryTemplate so that you can convey additional information
to the RecoveryCallback via the context's attributes:
|
retryTemplate.execute( new RetryCallback<Object, Exception>() { @Override public Object doWithRetry(RetryContext context) throws Exception { context.setAttribute("message", message); return rabbitTemplate.convertAndSend(exchange, routingKey, message); } }, new RecoveryCallback<Object>() { @Override public Object recover(RetryContext context) throws Exception { Object message = context.getAttribute("message"); Throwable t = context.getLastThrowable(); // Do something with message return null; } }); }
In this case, you would not inject a RetryTemplate
into the RabbitTemplate
.
The RabbitTemplate
implementation of AmqpTemplate
supports Publisher Confirms and Returns.
For returned messages, the template's
mandatory
property must be set to 'true', and it requires
a CachingConnectionFactory
that has its
publisherReturns
property set to true
(see Section 3.2.4, “Publisher Confirms and Returns”).
Returns are sent to
to the client by it registering a RabbitTemplate.ReturnCallback
by calling setReturnCallback(ReturnCallback callback)
. The callback
must implement this method:
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
Only one ReturnCallback
is supported by each
RabbitTemplate
.
For Publisher Confirms (aka Publisher Acknowledgements), the template requires
a CachingConnectionFactory
that has its
publisherConfirms
property set to true. Confirms are sent to
to the client by it registering a RabbitTemplate.ConfirmCallback
by calling setConfirmCallback(ConfirmCallback callback)
. The callback
must implement this method:
Note | |
---|---|
When a rabbit template send operation completes, the channel is closed; this would preclude the reception
of confirms or returns in the case when the connection factory cache is full (when there is space in
the cache, the channel is not physically closed and the returns/confirms will proceed as normal).
When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time
for the confirms/returns to be received. When using confirms, the channel will be closed when the
last confirm is received. When using only returns, the channel will remain open for the full
5 seconds. It is generally recommended to set the
connection factory's channelCacheSize to a large enough value so that the channel on which a
message is published is returned to the cache instead of being closed.
|
void confirm(CorrelationData correlationData, boolean ack, String cause);
The CorrelationData
is an object supplied by the client when sending the
original message. This is described further in the next section. The ack
is true for
an ack
and false for a nack
. For nack
s, the cause may
contain a reason for the nack, if it is available when the nack
is generated. An example
is when sending a message to a non-existent exchange. In that case the broker closes the channel; the
reason for the closure is included in the cause
. cause
was added in
version 1.4.
Only one ConfirmCallback
is supported by a
RabbitTemplate
.
When sending a Message, one can use any of the following methods:
void send(Message message) throws AmqpException; void send(String routingKey, Message message) throws AmqpException; void send(String exchange, String routingKey, Message message) throws AmqpException;
We can begin our discussion with the last method listed above since it is actually the most explicit. It allows an AMQP Exchange name to be provided at runtime along with a routing key. The last parameter is the callback that is responsible for actual creating of the Message instance. An example of using this method to send a Message might look this this:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
The "exchange" property can be set on the template itself if you plan to use that template instance to send to the same exchange most or all of the time. In such cases, the second method listed above may be used instead. The following example is functionally equivalent to the previous one:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
If both the "exchange" and "routingKey" properties are set on the
template, then the method accepting only the
Message
may be used:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.setRoutingKey("quotes.nasdaq.FOO"); amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
A better way of thinking about the exchange and routing key properties is that the explicit method parameters will always override the template's default values. In fact, even if you do not explicitly set those properties on the template, there are always default values in place. In both cases, the default is an empty String, but that is actually a sensible default. As far as the routing key is concerned, it's not always necessary in the first place (e.g. a Fanout Exchange). Furthermore, a Queue may be bound to an Exchange with an empty String. Those are both legitimate scenarios for reliance on the default empty String value for the routing key property of the template. As far as the Exchange name is concerned, the empty String is quite commonly used because the AMQP specification defines the "default Exchange" as having no name. Since all Queues are automatically bound to that default Exchange (which is a Direct Exchange) using their name as the binding value, that second method above can be used for simple point-to-point Messaging to any Queue through the default Exchange. Simply provide the queue name as the "routingKey" - either by providing the method parameter at runtime:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
Or, if you prefer to create a template that will be used for publishing primarily or exclusively to a single Queue, the following is perfectly reasonable:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue template.send(new Message("Hello World".getBytes(), someProperties));
Starting with version 1.3, a message builder API is
provided by the MessageBuilder
and MessagePropertiesBuilder
;
they provides a convenient "fluent" means of creating a message or message properties:
Message message = MessageBuilder.withBody("foo".getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build();
or
MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build(); Message message = MessageBuilder.withBody("foo".getBytes()) .andProperties(props) .build();
Each of the properties defined on the
MessageProperies can be set. Other methods include setHeader(String key, String value)
,
removeHeader(String key)
, removeHeaders()
, and
copyProperties(MessageProperties properties)
. Each property setting method has a
set*IfAbsent()
variant. In the cases where a default initial value exists, the method is named
set*IfAbsentOrDefault()
.
Five static methods are provided to create an initial message builder:
public static MessageBuilder withBody(byte[] body) public static MessageBuilder withClonedBody(byte[] body) public static MessageBuilder withBody(byte[] body, int from, int to) public static MessageBuilder fromMessage(Message message) public static MessageBuilder fromClonedMessage(Message message)
The message created by the builder will have a body that is a direct reference to the argument. | |
The message created by the builder will have a body that is a new array containing a copy of bytes in the argument. | |
The message created by the builder will have a body that is a new array containing the range
of bytes from the argument. See | |
The message created by the builder will have a body that is a direct reference to the
body of the argument.
The argument's properties are copied to a new a | |
The message created by the builder will have a body that is a new array containing a copy of
the argument's body.
The argument's properties are copied to a new a |
Three static methods are provided to create an initial message properties builder:
public static MessagePropertiesBuilder newInstance() public static MessagePropertiesBuilder fromProperties(MessageProperties properties) public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)
With the RabbitTemplate
implementation of AmqpTemplate
,
each of the send()
methods has an overloaded version that takes an
additional CorrelationData
object. When publisher confirms
are enabled, this object is returned in the callback described in
Section 3.3, “AmqpTemplate”. This allows the sender to correlate
a confirm (ack or nack) with the sent message.
When the template's mandatory
property is 'true' returned messages are provided
by the callback described in Section 3.3, “AmqpTemplate”.
Starting with version 1.4 the RabbitTemplate
supports
the SpEL mandatoryExpression
property, which is evaluated against each request message, as the root
evaluation object, resolving to a boolean
value. Bean references, such as
"@myBean.isMandatory(#root)"
can be used in the expression.
Message reception is always a bit more complicated than sending. The
reason is that there are two ways to receive a
Message
. The simpler option is to poll for a single
Message
at a time with a polling
method call. The more complicated yet more common approach is to register
a listener that will receive Messages
on-demand,
asynchronously. We will look at an example of each approach in the next
two sub-sections.
The AmqpTemplate
itself can be used
for polled Message reception. If no message is available, null
is returned
immediately; there is no blocking. There are two simple 'receive' methods
available. As with the Exchange on the sending side, there is a method
that requires a default queue property having been set directly on the template
itself, and there is a method that accepts a queue parameter at
runtime.
Message receive() throws AmqpException; Message receive(String queueName) throws AmqpException;
Just like in the case of sending messages, the
AmqpTemplate
has some convenience methods for
receiving POJOs instead of Message
instances, and
implementations will provide a way to customize the
MessageConverter
used to create the
Object
returned:
Object receiveAndConvert() throws AmqpException; Object receiveAndConvert(String queueName) throws AmqpException;
Similar to sendAndReceive
methods, beginning with
version 1.3, the AmqpTemplate
has several convenience receiveAndReply
methods for synchronously
receiving, processing and replying to messages:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
The AmqpTemplate
implementation takes care of the 'receive' and 'reply' phases.
In most cases you should provide only an implementation of ReceiveAndReplyCallback
to perform some business logic for the received message and build a reply object or message, if needed.
Note, a ReceiveAndReplyCallback
may return null
. In this case
no reply is sent and receiveAndReply
works like the receive
method. This allows
the same queue to be used for a mixture of messages, some of which may not need a reply.
Automatic message (request and reply) conversion is applied only if the provided callback is not an instance of
ReceiveAndReplyMessageCallback
- which provides a raw message exchange contract.
The ReplyToAddressCallback
is useful for cases requiring custom logic
to determine the replyTo
address at runtime against the received message
and reply from the ReceiveAndReplyCallback
.
By default, replyTo
information in the request message is used to route the reply.
The following is an example of POJO-based receive and reply...
boolean received = this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() { public Invoice handle(Order order) { return processOrder(order); } }); if (received) { log.info("We received an order!"); }
Important | |
---|---|
Spring AMQP also supports annotated-listener endpoints through the use of the
|
For asynchronous Message
reception, a dedicated component (not the
AmqpTemplate
) is involved. That component
is a container for a Message consuming callback. We will look at the
container and its properties in just a moment, but first we should look at
the callback since that is where your application code will be integrated
with the messaging system. There are a few options for the callback
starting with an implementation of the
MessageListener
interface:
public interface MessageListener { void onMessage(Message message); }
If your callback logic depends upon the AMQP
Channel instance for any reason, you may instead use the
ChannelAwareMessageListener
. It looks
similar but with an extra parameter:
public interface ChannelAwareMessageListener { void onMessage(Message message, Channel channel) throws Exception; }
If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework. This is often referred to as "Message-driven POJO" support. When using the adapter, you only need to provide a reference to the instance that the adapter itself should invoke.
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo); listener.setDefaultListenerMethod("myMethod");
You can subclass the adapter and provide an implementation of getListenerMethodName()
to dynamically select different methods based on the message.
Now
that you've seen the various options for the Message-listening callback,
we can turn our attention to the container. Basically, the container
handles the "active" responsibilities so that the listener callback can
remain passive. The container is an example of a "lifecycle" component. It
provides methods for starting and stopping. When configuring the
container, you are essentially bridging the gap between an AMQP Queue and
the MessageListener
instance. You must
provide a reference to the
ConnectionFactory
and the queue name or
Queue instance(s) from which that listener should consume Messages. Here
is the most basic example using the default implementation,
SimpleMessageListenerContainer
:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory); container.setQueueNames("some.queue"); container.setMessageListener(new MessageListenerAdapter(somePojo));
As an "active" component, it's most common to create the listener container with a bean definition so that it can simply run in the background. This can be done via XML:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/> </rabbit:listener-container>
Or, you may prefer to use the @Configuration style which will look very similar to the actual code snippet above:
@Configuration public class ExampleAmqpConfiguration { @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } @Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public MessageListener exampleListener() { return new MessageListener() { public void onMessage(Message message) { System.out.println("received: " + message); } }; } }
Starting with RabbitMQ Version 3.2, the broker now supports consumer priority (see
http://www.rabbitmq.com/blog/2013/12/16/using-consumer-priorities-with-rabbitmq/).
This is enabled by setting the x-priority
argument on the consumer.
The SimpleMessageListenerContainer
now supports setting consumer arguments:
container.setConsumerArguments(Collections. <String, Object> singletonMap("x-priority", Integer.valueOf(10)));
For convenience, the namespace provides the priority
attribute on the
listener
element:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" /> </rabbit:listener-container>
Starting with version 1.3 the queue(s) on which the container is listening can be modified at runtime; see Section 3.14, “Listener Container Queues”.
When a container is configured to listen to auto-delete
queue(s), or the queue has an
x-expires
option or the Time-To-Live
policy is configured on the Broker, the queue is removed by the broker when the container is
stopped (last consumer is cancelled).
Before version 1.3, the container could not be restarted because
the queue was missing; the RabbitAdmin
only automatically
redeclares queues etc, when the connection is closed/opens, which does not happen
when the container is stopped/started.
Starting with version 1.3, the container will now use a
RabbitAdmin
to redeclare any missing queues during startup.
You can also use conditional declaration (Section 3.8.1, “Conditional Declaration”)
together with an auto-startup="false"
admin to defer queue declaration
until the container is started.
<rabbit:queue id="otherAnon" declared-by="containerAdmin" /> <rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin"> <rabbit:bindings> <rabbit:binding queue="otherAnon" key="otherAnon" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:listener-container id="container2" auto-startup="false"> <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" /> </rabbit:listener-container> <rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
In this case, the queue and exchange are declared by containerAdmin
which has auto-startup="false"
so the elements are not declared
during context initialization. Also, the container is not started for the
same reason. When the container is later started, it uses it's reference to
containerAdmin
to declare the elements.
Starting with version 1.4, the easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it allows you to expose a method of a managed bean as a Rabbit listener endpoint.
@Component public class MyService { @RabbitListener(queues = "myQueue") public void processOrder(String data) { ... } }
The idea of the example above is that, whenever a message is available on the
org.springframework.amqp.core.Queue
"myQueue", the processOrder
method is invoked accordingly (in this case, with the payload of the message).
The annotated endpoint infrastructure creates a message listener container behind the scenes
for each annotated method, using a RabbitListenerContainerFactory
.
To enable support for @RabbitListener
annotations add
@EnableRabbit
to one of your @Configuration
classes.
@Configuration @EnableRabbit public class AppConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); return factory; } }
By default, the infrastructure looks for a bean named rabbitListenerContainerFactory
as the source for the factory to use to create message listener containers. In this case, and ignoring the
RabbitMQ infrastructure setup, the processOrder
method can be invoked with a core
poll size of 3 threads and a maximum pool size of 10 threads.
It is possible to customize the listener container factory to use per annotation or an explicit
default can be configured by implementing the RabbitListenerConfigurer
interface. The default is only required if at least one endpoint is registered without a specific container
factory. See the javadoc for full details and examples.
If you prefer XML configuration, use the <rabbit:annotation-driven>
element.
<rabbit:annotation-driven/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="concurrentConsumers" value="3"/> <property name="maxConcurrentConsumers" value="10"/> </bean>
RabbitListenerEndpoint
provides a model of a Rabbit endpoint and is
responsible for configuring the container for that model. The infrastructure allows you to
configure endpoints programmatically in addition to the ones that are detected by the
RabbitListener
annotation.
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint(); endpoint.setQueueNames("anotherQueue"); endpoint.setMessageListener(message -> { // processing }); registrar.registerEndpoint(endpoint); } }
In the example above, we used SimpleRabbitListenerEndpoint
which provides
the actual MessageListener
to invoke but you could just as well build your own
endpoint variant describing a custom invocation mechanism.
It should be noted that you could just as well skip the use of @RabbitListener
altogether and only register your endpoints programmatically through RabbitListenerConfigurer.
So far, we have been injecting a simple String in our endpoint but it can actually have a very
flexible method signature. Let’s rewrite it to inject the Order
with a custom header:
@Component public class MyService { @RabbitListener(queues = "myQueue") public void processOrder(Order order, @Header("order_type") String orderType) { ... } }
These are the main elements you can inject in listener endpoints:
org.springframework.amqp.core.Message
.
com.rabbitmq.client.Channel
on which the message was received
org.springframework.messaging.Message
representing the incoming AMQP
message. Note that this message holds both the custom and the standard headers (as defined
by AmqpHeaders
).
@Header
-annotated method arguments to extract a specific header value,
including standard AMQP headers.
@Headers
-annotated argument that must also be assignable to
java.util.Map
for getting access to all headers.
Message
and Channel
) is considered to be the payload. You can make that explicit by
annotating the parameter with @Payload
. You can also turn on validation by
adding an extra @Valid
.
The ability to inject Spring’s Message abstraction is particularly useful to benefit from all the information stored in the transport-specific message without relying on transport-specific API.
@RabbitListener(queues = "myQueue") public void processOrder(Message<Order> order) { ... }
Handling of method arguments is provided by DefaultMessageHandlerMethodFactory
which can be further customized to support additional method arguments. The conversion and validation
support can be customized there as well.
For instance, if we want to make sure our Order is valid before processing it, we can annotate
the payload with @Valid
and configure the necessary validator as follows:
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setValidator(myValidator()); return factory; } }
The existing support in MessageListenerAdapter
already allows your method to
have a non-void return type. When that’s the case, the result of the invocation is encapsulated in a
message sent either in the address specified in the ReplyToAddress
header of
the original message or in the default address configured on the listener. That default address can
now be set using the @SendTo
annotation of the messaging abstraction.
Assuming our processOrder
method should now return an
OrderStatus
, it is possible to write it as follow to automatically send a reply:
@RabbitListener(destination = "myQueue") @SendTo("status") public OrderStatus processOrder(Order order) { // order processing return status; }
If you need to set additional headers in a transport-independent manner, you could return a
Message
instead, something like:
@RabbitListener(destination = "myQueue") @SendTo("status") public Message<OrderStatus> processOrder(Order order) { // order processing return MessageBuilder .withPayload(status) .setHeader("code", 1234) .build(); }
The @SendTo
value is assumed as a reply exchange
and
routingKey
pair following the pattern exchange/routingKey
, where one of those
parts can be omitted. The valid values are:
foo/bar
- the replyTo exchange and routingKey.
foo/
- the replyTo exchange and default (empty) routingKey.
bar
or /bar
- the replyTo routingKey and default (empty) exchange.
/
or empty - the replyTo default exchange and default routingKey.
Also @SendTo
can be used without a value
attribute. This case is equal to an empty
sendTo pattern. @SendTo
is only used if the inbound message does not have a
replyToAddress
property.
A number of different threads are involved with asynchronous consumers.
Threads from the
TaskExecutor
configured in the SimpleMessageListener
are used to invoke the MessageListener
when a new message is
delivered by RabbitMQ Client
. If not configured, a SimpleAsyncTaskExecutor
is used. If a pooled executor is used, ensure the pool size is sufficient to handle the
configured concurrency.
The Executor
configured in the CachingConnectionFactory
is passed into the RabbitMQ Client
when creating the connection, and its threads are used to
deliver new messages to the listener container. At the time of writing, if this is not configured,
the client uses an internal thread pool executor with a pool size of 5.
The RabbitMQ client
uses a ThreadFactory
to create threads
for low-level I/O (socket) operations. To modify this factory, you need to configure the underlying
RabbitMQ ConnectionFactory
, as discussed in Section 3.2.1, “Configuring the Underlying Client Connection Factory”.
The AmqpTemplate
also defines several
methods for sending and receiving Messages that will delegate to a
MessageConverter
. The
MessageConverter
itself is quite
straightforward. It provides a single method for each direction: one for
converting to a Message and another for converting
from a Message. Notice that when converting to a
Message, you may also provide properties in addition to the object. The
"object" parameter typically corresponds to the Message body.
public interface MessageConverter { Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException; Object fromMessage(Message message) throws MessageConversionException; }
The relevant Message-sending methods on the
AmqpTemplate
are listed below. They
are simpler than the methods we discussed previously because they
do not require the Message
instance.
Instead, the MessageConverter
is
responsible for "creating" each Message
by
converting the provided object to the byte array for
the Message
body and then adding any
provided MessageProperties
.
void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
On the receiving side, there are only two methods: one that accepts the queue name and one that relies on the template's "queue" property having been set.
Object receiveAndConvert() throws AmqpException; Object receiveAndConvert(String queueName) throws AmqpException;
Note | |
---|---|
The MessageListenerAdapter mentioned in Section 3.5.2, “Asynchronous Consumer”
also uses a MessageConverter .
|
The default implementation of the
MessageConverter
strategy is called
SimpleMessageConverter
. This is the converter
that will be used by an instance of RabbitTemplate if you do not
explicitly configure an alternative. It handles text-based content,
serialized Java objects, and simple byte arrays.
If the content type of the input Message begins with "text"
(e.g. "text/plain"), it will also check for the content-encoding
property to determine the charset to be used when converting the
Message body byte array to a Java String. If no content-encoding
property had been set on the input Message, it will use the "UTF-8"
charset by default. If you need to override that default setting, you
can configure an instance of
SimpleMessageConverter
, set its
"defaultCharset" property and then inject that into a
RabbitTemplate
instance.
If the content-type property value of the input Message is set
to "application/x-java-serialized-object", the
SimpleMessageConverter
will attempt to
deserialize (rehydrate) the byte array into a Java object. While that
might be useful for simple prototyping, it's generally not recommended
to rely on Java serialization since it leads to tight coupling between
the producer and consumer. Of course, it also rules out usage of
non-Java systems on either side. With AMQP being a wire-level
protocol, it would be unfortunate to lose much of that advantage with
such restrictions. In the next two sections, we'll explore some
alternatives for passing rich domain object content without relying on
Java serialization.
For all other content-types, the
SimpleMessageConverter
will return the Message
body content directly as a byte array.
When converting to a Message from an arbitrary Java Object, the
SimpleMessageConverter
likewise deals with byte
arrays, Strings, and Serializable instances. It will convert each of
these to bytes (in the case of byte arrays, there is nothing to
convert), and it will set the content-type property accordingly. If
the Object to be converted does not match one of those types, the
Message body will be null.
As mentioned in the previous section, relying on Java
serialization is generally not recommended. One rather common
alternative that is more flexible and portable across different
languages and platforms is JSON (JavaScript Object Notation). Two
implementations are available and can be configured on any
RabbitTemplate
instance to override its usage of
the SimpleMessageConverter
default.
The JsonMessageConverter
which uses the
org.codehaus.jackson
1.x library
and Jackson2JsonMessageConverter
which uses the
com.fasterxml.jackson
2.x library.
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="messageConverter"> <bean class="org.springframework.amqp.support.converter.JsonMessageConverter"> <!-- if necessary, override the DefaultClassMapper --> <property name="classMapper" ref="customClassMapper"/> </bean> </property> </bean>
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="messageConverter"> <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"> <!-- if necessary, override the DefaultClassMapper --> <property name="classMapper" ref="customClassMapper"/> </bean> </property> </bean>
As shown above, the JsonMessageConverter
and Jackson2JsonMessageConverter
uses a
DefaultClassMapper
by default. Type information is
added to (and retrieved from) the MessageProperties
.
If an inbound message does not contain type information in the
MessageProperties
, but you know the expected type,
you can configure a static type using the defaultType
property
<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.JsonMessageConverter"> <property name="classMapper"> <bean class="org.springframework.amqp.support.converter.DefaultClassMapper"> <property name="defaultType" value="foo.PurchaseOrder"/> </bean> </property> </bean>
<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter"> <property name="classMapper"> <bean class="org.springframework.amqp.support.converter.DefaultClassMapper"> <property name="defaultType" value="foo.PurchaseOrder"/> </bean> </property> </bean>
Yet another option is the
MarshallingMessageConverter
. It delegates to the
Spring OXM library's implementations of the
Marshaller
and
Unmarshaller
strategy interfaces. You can
read more about that library here.
In terms of configuration, it's most common to provide the constructor
argument only since most implementations of
Marshaller
will also implement
Unmarshaller
.
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="messageConverter"> <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter"> <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/> </bean> </property> </bean>
This class was introduced in version 1.4.2 and allows delegation
to a specific MessageConverter
based on the content type
property in the MessageProperties
. By default, it will delegate
to a SimpleMessageConverter
if there is no contentType
property, or a value that matches none of the configured converters.
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter"> <property name="delegates"> <map> <entry key="application/json" value-ref="jsonMessageConverter" /> <entry key="application/xml" value-ref="xmlMessageConverter" /> </map> </property> </bean>
The AmqpTemplate
also provides a variety of
sendAndReceive
methods that accept the same argument options
that you have seen above for the one-way send operations (exchange, routingKey, and Message).
Those methods are quite useful for request/reply scenarios since they handle the configuration
of the necessary "reply-to" property before sending and can listen for the reply message on an
exclusive Queue that is created internally for that purpose.
Similar request/reply methods are also available where the MessageConverter
is applied to both the request and reply. Those methods are named convertSendAndReceive
.
See the Javadoc of AmqpTemplate
for more detail.
By default, a new temporary queue is used for each reply (but see Section 3.7.1, “RabbitMQ Direct reply-to”). However, a single reply queue can be configured on the template, which can be more efficient, and also allows you to set arguments on that queue. In this case, however, you must also provide a <reply-listener/> sub element. This element provides a listener container for the reply queue, with the template being the listener. All of the Section 3.11, “Message Listener Container Configuration” attributes allowed on a <listener-container/> are allowed on the element, except for connection-factory and message-converter, which are inherited from the template's configuration.
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-queue="replies"> <rabbit:reply-listener/> </rabbit:template>
While the container and template share a connection factory, they do not share a channel and therefore requests and replies are not performed within the same transaction (if transactional).
Important | |
---|---|
Starting with version 3.4.0, the RabbitMQ server now supports
Direct reply-to; this eliminates the main reason
for a fixed reply queue (to avoid the need to create a temporary queue for each request).
Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default
(if supported by the server) instead of creating temporary reply queues.
When no replyQueue is provided (or it is set with the name amq.rabbitmq.reply-to ),
the RabbitTemplate will automatically
detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue.
When using Direct reply-to, a reply-listener is not required and should not be configured.
|
Reply listeners are still supported with named queues (other than amq.rabbitmq.reply-to
),
allowing control of reply concurrency etc.
When using a fixed reply queue (other than amq.rabbitmq.reply-to
),
it is necessary to provide correlation data so that replies can be correlated
to requests. See
RabbitMQ Remote Procedure Call (RPC). By default, the standard correlationId
property will
be used to hold the correlation data. However, if you wish to use a custom propertry to hold correlation
data, you can set the correlation-key
attribute on the <rabbit-template/>. Explicitly
setting the attribute to correlationId
is the same as omitting the attribute.
Of course, the client and server must use the same header for correlation data.
Note | |
---|---|
Spring AMQP version 1.1 used a custom property spring_reply_correlation for this data. If you
wish to revert to this behavior with the current version, perhaps to maintain compatibility with another
application using 1.1, you must set the attribute to spring_reply_correlation .
|
When using a fixed reply queue, a SimpleListenerContainer
is used to receive the
replies; with the RabbitTemplate
being the MessageListener
.
When defining a template with the <rabbit:template/>
namespace element, as shown above,
the parser defines the container and wires in the template as the listener.
Note | |
---|---|
When the template does not use a fixed replyQueue (or is using Direct reply-to - see
Section 3.7.1, “RabbitMQ Direct reply-to”) a listener container is not needed.
|
If you define your RabbitTemplate
as a <bean/>
, or using an
@Configuration
class to define it as an @Bean
, or when creating
the template programmatically, you will need to define and
wire up the reply listener container yourself. If you fail to do this, the template will never receive
the replies and will eventually time out and return null as the reply to a call to a
sendAndReceive
method.
Important | |
---|---|
When wiring the reply listener and template yourself, it is important to ensure that the template's
replyQueue and the container's queues (or queueNames ) properties
refer to the same queue. The template inserts the reply queue into the outbound message
replyTo property.
|
The following are examples of how to manually wire up the beans.
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="connectionFactory" /> <property name="exchange" value="foo.exchange" /> <property name="routingKey" value="foo" /> <property name="replyQueue" ref="replyQ" /> <property name="replyTimeout" value="600000" /> </bean> <bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <constructor-arg ref="connectionFactory" /> <property name="queues" ref="replyQ" /> <property name="messageListener" ref="amqpTemplate" /> </bean> <rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean public RabbitTemplate amqpTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMessageConverter(msgConv()); rabbitTemplate.setReplyQueue(replyQueue()); rabbitTemplate.setReplyTimeout(60000); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer replyListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueues(replyQueue()); container.setMessageListener(amqpTemplate()); return container; } @Bean public Queue replyQueue() { return new Queue("my.reply.queue"); }
A complete example of a RabbitTemplate
wired with a fixed reply queue, together
with a "remote" listener container that handles the request and returns the reply is shown in
this test case.
Important | |
---|---|
When the reply times out (
Prior to version 1.3.6, late replies for timed out messages were simply logged. Now, if
a late reply is received, it is rejected (the template throws an
Refer to the RabbitMQ Dead Letter Documentation
for more information about configuring dead lettering. You can also take a look at the
|
The Spring Framework has a general remoting capability, allowing
Remote Procedure Calls (RPC) using various transports.
Spring-AMQP supports a similar mechanism with a AmqpProxyFactoryBean
on the client
and a AmqpInvokerServiceExporter
on the server. This provides RPC over AMQP.
On the client side, a RabbitTemplate
is used as described above; on the server side,
the invoker (configured as a MessageListener
) receives the message, invokes
the configured service, and returns the reply using the inbound message's replyTo
information.
The client factory bean can be injected into any bean (using its serviceInterface
); the client
can then invoke methods on the proxy, resulting in remote execution over AMQP.
Note | |
---|---|
With the default
On the server side, the |
Sample client and server configurations are shown below.
<bean id="client" class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean"> <property name="amqpTemplate" ref="template" /> <property name="serviceInterface" value="foo.ServiceInterface" /> </bean> <rabbit:connection-factory id="connectionFactory" /> <rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000" routing-key="remoting.binding" exchange="remoting.exchange" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="remoting.queue" /> <rabbit:direct-exchange name="remoting.exchange"> <rabbit:bindings> <rabbit:binding queue="remoting.queue" key="remoting.binding" /> </rabbit:bindings> </rabbit:direct-exchange>
<bean id="listener" class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter"> <property name="serviceInterface" value="foo.ServiceInterface" /> <property name="service" ref="service" /> <property name="amqpTemplate" ref="template" /> </bean> <bean id="service" class="foo.ServiceImpl" /> <rabbit:connection-factory id="connectionFactory" /> <rabbit:template id="template" connection-factory="connectionFactory" /> <rabbit:queue name="remoting.queue" /> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="listener" queue-names="remoting.queue" /> </rabbit:listener-container>
Important | |
---|---|
The AmqpInvokerServiceExporter can only process properly
formed messages, such as those sent from the AmqpProxyFactoryBean .
If it receives a message that it cannot interpret, a serialized
RuntimeException will be sent as a reply. If the message has
no replyToAddress property, the message will be rejected and permanently lost if no
Dead Letter Exchange has been configured.
|
The AMQP specification describes how the protocol can be used to configure Queues, Exchanges and Bindings on the broker. These operations which are portable from the 0.8 specification and higher are present in the AmqpAdmin interface in the org.springframework.amqp.core package. The RabbitMQ implementation of that class is RabbitAdmin located in the org.springframework.amqp.rabbit.core package.
The AmqpAdmin interface is based on using the Spring AMQP domain abstractions and is shown below:
public interface AmqpAdmin { // Exchange Operations void declareExchange(Exchange exchange); void deleteExchange(String exchangeName); // Queue Operations Queue declareQueue(); String declareQueue(Queue queue); void deleteQueue(String queueName); void deleteQueue(String queueName, boolean unused, boolean empty); void purgeQueue(String queueName, boolean noWait); // Binding Operations void declareBinding(Binding binding); void removeBinding(Binding binding); Properties getQueueProperties(String queueName); }
The no-arg declareQueue() method defines a queue on the broker whose name
is automatically generated. The additional properties of this auto-generated
queue are exclusive=true
, autoDelete=true
, and durable=false
.
The declareQueue(Queue queue)
method takes a Queue
object
and returns the name of the declared queue. This is useful if you wish the broker to generate the
queue's name. This is in contrast to an AnonymousQueue
where the
framework generates a unique (UUID
) name and sets durable
to
false
and exlusive, autoDelete
to true
.
If the provided Queue
's name
property
is an empty String,
the Broker declares the queue with a generated name and that name is returned to the caller.
The Queue
object itself is not changed.
This functionality
can only be used programmatically by invoking the RabbitAdmin
directly.
It is not supported for auto-declaration by the admin by defining a queue declaratively in the
application context.
A <rabbit:queue/>
with an empty, or missing, name
will
always create an AnonymousQueue
. This is because the name will
change if redeclared due to a connection failure. Declarative queues must have fixed names because they
might be referenced elsewhere in the context, for example, in a listener:
<rabbit:listener-container> <rabbit:listener ref="listener" queue-names="#{someQueue.name}" /> </rabbit:listener-container>
See Section 3.15.1, “Automatic Declaration of Exchanges, Queues and Bindings”.
The RabbitMQ implementation of this interface is RabbitAdmin which when configured using Spring XML would look like this:
<rabbit:connection-factory id="connectionFactory"/> <rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
When the CachingConnectionFactory
cache mode
is CHANNEL
(the default),
the RabbitAdmin
implementation does automatic
lazy declaration of Queues
,
Exchanges
and Bindings
declared in the same ApplicationContext
. These
components will be declared as son as a Connection
is opened to the broker. There are some namespace features that make this
very convenient, e.g. in the Stocks sample application we have:
<rabbit:queue id="tradeQueue"/> <rabbit:queue id="marketDataQueue"/> <fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit"> <bindings> <binding queue="tradeQueue"/> </bindings> </fanout-exchange> <topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit"> <bindings> <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/> </bindings> </topic-exchange>
In the example above we are using anonymous Queues (actually internally just Queues with names generated by the framework, not by the broker) and refer to them by ID. We can also declare Queues with explicit names, which also serve as identifiers for their bean definitions in the context. E.g.
<rabbit:queue name="stocks.trade.queue"/>
Tip | |
---|---|
You can provide both an id and a name attribute. This allows you to refer to the queue (for example in a binding) by an id that is independent of the queue name. It also allows standard Spring features such as property placeholders, and SpEL expressions for the queue name; these features are not available when using the name as the bean identifier. |
Queues can be configured with additional arguments, for example, 'x-message-ttl' or 'x-ha-policy'. Using the namespace support, they are provided in the form of a Map of argument name/argument value pairs, using the <rabbit:queue-arguments> element.
<rabbit:queue name="withArguments"> <rabbit:queue-arguments> <entry key="x-ha-policy" value="all"/> </rabbit:queue-arguments> </rabbit:queue>
By default, the arguments are assumed to be strings. For arguments of other types, the type needs to be provided.
<rabbit:queue name="withArguments"> <rabbit:queue-arguments value-type="java.lang.Long"> <entry key="x-message-ttl" value="100"/> </rabbit:queue-arguments> </rabbit:queue>
When providing arguments of mixed types, the type is provided for each entry element:
<rabbit:queue name="withArguments"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">100</value> </entry> <entry key="x-ha-policy" value="all"/> </rabbit:queue-arguments> </rabbit:queue>
With Spring Framework 3.2 and later, this can be declared a little more succinctly:
<rabbit:queue name="withArguments"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/> <entry key="x-ha-policy" value="all"/> </rabbit:queue-arguments> </rabbit:queue>
Important | |
---|---|
The RabbitMQ broker will not allow declaration of a queue with mismatched
arguments. For example, if a
By default, the
This behavior can be modified by setting the |
Starting with version 1.3 the HeadersExchange can be configured to match on multiple headers; you can also specify whether any or all headers must match:
<rabbit:headers-exchange name="headers-test"> <rabbit:bindings> <rabbit:binding queue="bucket"> <rabbit:binding-arguments> <entry key="foo" value="bar"/> <entry key="baz" value="qux"/> <entry key="x-match" value="all"/> </rabbit:binding-arguments> </rabbit:binding> </rabbit:bindings> </rabbit:headers-exchange>
To see how to use Java to configure the AMQP infrastructure, look at
the Stock sample application, where there is the @Configuration
class AbstractStockRabbitConfiguration
which in
turn has RabbitClientConfiguration and RabbitServerConfiguration
subclasses. The code for AbstractStockRabbitConfiguration is shown
below
@Configuration public abstract class AbstractStockAppRabbitConfiguration { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMessageConverter(jsonMessageConverter()); configureRabbitTemplate(template); return template; } @Bean public MessageConverter jsonMessageConverter() { return new JsonMessageConverter(); } @Bean public TopicExchange marketDataExchange() { return new TopicExchange("app.stock.marketdata"); } // additional code omitted for brevity }
In the Stock application, the server is configured using the following @Configuration class:
@Configuration public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration { @Bean public Queue stockRequestQueue() { return new Queue("app.stock.request"); } }
This is the end of the whole inheritance chain of @Configuration classes. The end result is the the TopicExchange and Queue will be declared to the broker upon application startup. There is no binding of the TopicExchange to a queue in the server configuration, as that is done in the client application. The stock request queue however is automatically bound to the AMQP default exchange - this behavior is defined by the specification.
The client @Configuration class is a little more interesting and is shown below.
@Configuration public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration { @Value("${stocks.quote.pattern}") private String marketDataRoutingKey; @Bean public Queue marketDataQueue() { return amqpAdmin().declareQueue(); } /** * Binds to the market data exchange. Interested in any stock quotes * that match its routing key. */ @Bean public Binding marketDataBinding() { return BindingBuilder.bind( marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey); } // additional code omitted for brevity }
The client is declaring another queue via the declareQueue() method on the AmqpAdmin, and it binds that queue to the market data exchange with a routing pattern that is externalized in a properties file.
By default, all queues, exchanges, and bindings are declared by all
RabbitAdmin
instances (that have auto-startup="true"
)
in the application context.
Note | |
---|---|
Starting with the 1.2 release, it is possible to conditionally declare these elements. This is particularly useful when an application connects to multiple brokers and needs to specify with which broker(s) a particular element should be declared. |
The classes representing these elements
implement Declarable
which
has two methods: shouldDeclare()
and getDeclaringAdmins()
.
The RabbitAdmin
uses these methods to determine whether
a particular instance should actually process the declarations on its
Connection
.
The properties are available as attributes in the namespace, as shown in the following examples.
<rabbit:admin id="admin1" connection-factory="CF1" /> <rabbit:admin id="admin2" connection-factory="CF2" /> <rabbit:queue id="declaredByBothAdminsImplicitly" /> <rabbit:queue id="declaredByBothAdmins" declared-by="admin1, admin2" /> <rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" /> <rabbit:queue id="notDeclaredByAny" auto-declare="false" /> <rabbit:direct-exchange name="direct" declared-by="admin1, admin2"> <rabbit:bindings> <rabbit:binding key="foo" queue="bar"/> </rabbit:bindings> </rabbit:direct-exchange>
Note | |
---|---|
The auto-declare attribute is true by default and if the declared-by
is not supplied (or is empty) then all RabbitAdmin s will declare the
object (as long as the admin's auto-startup attribute is true; the default).
|
Similarly, you can use Java-based @Configuration
to achieve the same
effect. In this example, the components will be declared by admin1
but not
admin2
:
@Bean public RabbitAdmin admin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1()); rabbitAdmin.afterPropertiesSet(); return rabbitAdmin; } @Bean public RabbitAdmin admin2() { RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2()); rabbitAdmin.afterPropertiesSet(); return rabbitAdmin; } @Bean public Queue queue() { Queue queue = new Queue("foo"); queue.setAdminsThatShouldDeclare(admin()); return queue; } @Bean public Exchange exchange() { DirectExchange exchange = new DirectExchange("bar"); exchange.setAdminsThatShouldDeclare(admin()); return exchange; } @Bean public Binding binding() { Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null); binding.setAdminsThatShouldDeclare(admin()); return binding; }
Many operations with the RabbitMQ Java client can throw checked Exceptions. For example, there are a lot of cases where IOExceptions may be thrown. The RabbitTemplate, SimpleMessageListenerContainer, and other Spring AMQP components will catch those Exceptions and convert into one of the Exceptions within our runtime hierarchy. Those are defined in the 'org.springframework.amqp' package, and AmqpException is the base of the hierarchy.
When a listener throws an exception, it is wrapped in a
ListenerExecutionFailedException
and, normally the message
is rejected and requeued by the broker. Setting defaultRequeueRejected
to false will cause messages to be discarded (or routed to a dead letter exchange).
As discussed in Section 3.15.3, “Message Listeners and the Asynchronous Case”, the listener can throw an
AmqpRejectAndDontRequeueException
to conditionally control
this behavior.
However, there is a class of errors where the listener cannot control the behavior.
When a message that cannot be converted is encountered (for example an invalid
content_encoding
header), the MessageConversionException
is thrown before the message reaches user code. With defaultRequeueRejected
set to true
(default), such messages would be redelivered over and over.
Before version 1.3.2,
users needed to write a custom ErrorHandler, as discussed
in Section 3.9, “Exception Handling” to avoid this situation.
Starting with version 1.3.2, the default
ErrorHandler
is now a
ConditionalRejectingErrorHandler
which will reject (and not
requeue) messages that fail with a MessageConversionException
.
An instance of this error handler can be configured with a
FatalExceptionStrategy
so users can provide
their own rules for conditional message rejection, e.g. a delegate implementation to the
BinaryExceptionClassifier
from Spring Retry (Section 3.15.3, “Message Listeners and the Asynchronous Case”).
In addition, the ListenerExecutionFailedException
now has a
failedMessage
property which can be used in the decision.
If the FatalExceptionStrategy.isFatal()
method returns true
,
the error handler throws an AmqpRejectAndDontRequeueException
.
The default FatalExceptionStrategy logs a warning message.
The Spring Rabbit framework has support for automatic transaction management in the synchronous and asynchronous use cases with a number of different semantics that can be selected declaratively, as is familiar to existing users of Spring transactions. This makes many if not most common messaging patterns very easy to implement.
There are two ways to signal the desired transaction semantics to
the framework. In both the RabbitTemplate
and
SimpleMessageListenerContainer
there is a flag
channelTransacted
which, if true, tells the framework to use
a transactional channel and to end all operations (send or receive) with a
commit or rollback depending on the outcome, with an exception signaling
a rollback. Another signal is to provide an external transaction with one
of Spring's PlatformTransactionManager
implementations as a context for the ongoing operation. If there is
already a transaction in progress when the framework is sending or
receiving a message, and the channelTransacted
flag is true,
then the commit or rollback of the messaging transaction will be deferred
until the end of the current transaction. If the
channelTransacted
flag is false, then no transaction
semantics apply to the messaging operation (it is auto-acked).
The channelTransacted
flag is a configuration
time setting: it is declared and processed once when the AMQP components
are created, usually at application startup. The external transaction is
more dynamic in principle because the system responds to the current
Thread state at runtime, but in practice is often also a configuration
setting, when the transactions are layered onto an application
declaratively.
For synchronous use cases with RabbitTemplate
the external transaction is provided by the caller, either declaratively
or imperatively according to taste (the usual Spring transaction model).
An example of a declarative approach (usually preferred because it is
non-invasive), where the template has been configured with
channelTransacted=true
:
@Transactional public void doSomething() { String incoming = rabbitTemplate.receiveAndConvert(); // do some more database processing... String outgoing = processInDatabaseAndExtractReply(incoming); rabbitTemplate.convertAndSend(outgoing); }
A String payload is received, converted and sent as a message body
inside a method marked as @Transactional, so if the database processing
fails with an exception, the incoming message will be returned to the
broker, and the outgoing message will not be sent. This applies to any
operations with the RabbitTemplate
inside a chain
of transactional methods (unless the Channel
is
directly manipulated to commit the transaction early for instance).
For asynchronous use cases with
SimpleMessageListenerContainer
if an external
transaction is needed it has to be requested by the container when it sets
up the listener. To signal that an external transaction is required the
user provides an implementation of
PlatformTransactionManager
to the container when it
is configured. For example:
@Configuration public class ExampleExternalTransactionAmqpConfiguration { @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setTransactionManager(transactionManager()); container.setChannelTransacted(true); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } }
In the example above, the transaction manager is added as a
dependency injected from another bean definition (not shown), and the
channelTransacted
flag is also set to true. The effect is
that if the listener fails with an exception the transaction will be
rolled back, and the message will also be returned to the broker.
Significantly, if the transaction fails to commit (e.g. a database
constraint error, or connectivity problem), then the AMQP transaction will
also be rolled back, and the message will be returned to the broker. This is
sometimes known as a Best Efforts 1 Phase Commit, and is a very powerful
pattern for reliable messaging. If the channelTransacted
flag
was set to false in the example above, which is the default, then the
external transaction would still be provided for the listener, but all
messaging operations would be auto-acked, so the effect is to commit the
messaging operations even on a rollback of the business operation.
AMQP transactions only apply to messages and acks sent to the
broker, so when there is a rollback of a Spring transaction and a
message has been received, what Spring AMQP has to do is not just
rollback the transaction, but also manually reject the message (sort of
a nack, but that's not what the specification calls it). The action
taken on message rejection is independent of transactions and
depends on the defaultRequeueRejected
property (default true
). For more information about
rejecting failed messages, see Section 3.15.3, “Message Listeners and the Asynchronous Case”.
For more information about RabbitMQ transactions, and their limitations, refer to RabbitMQ Broker Semantics.
Note | |
---|---|
Prior to RabbitMQ 2.7.0, such messages (and any that are unacked when a channel is closed or aborts) went to the back of the queue on a Rabbit broker, since 2.7.0, rejected messages go to the front of the queue, in a similar manner to JMS rolled back messages. |
The RabbitTransactionManager is an alternative to executing Rabbit operations within, and synchronized with, external transactions. This Transaction Manager is an implementation of the PlatformTransactionManager interface and should be used with a single Rabbit ConnectionFactory.
Important | |
---|---|
This strategy is not able to provide XA transactions, for example in order to share transactions between messaging and database access. |
Application code is required to retrieve the transactional Rabbit resources
via ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
instead of a standard Connection.createChannel()
call with
subsequent Channel creation. When using Spring's
RabbitTemplate
,
it will autodetect a thread-bound Channel and automatically participate
in it.
With Java Configuration you can setup a new RabbitTransactionManager using:
@Bean public RabbitTransactionManager rabbitTransactionManager() { return new RabbitTransactionManager(connectionFactory); }
If you prefer using XML configuration, declare the following bean in your XML Application Context file:
<bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <property name="connectionFactory" ref="connectionFactory"/> </bean>
There are quite a few options for configuring a
SimpleMessageListenerContainer
related to
transactions and quality of service, and some of them interact with each
other.
The table below shows the container property names and their
equivalent attribute names (in parentheses) when using the namespace to configure a
<rabbit:message-listener-container/>
N/A
for
the attribute.
Table 3.1. Configuration options for a message listener container
Property | Description |
---|---|
channelTransacted | Boolean flag to signal that all messages should be acknowledged in a transaction (either manually or automatically) |
acknowledgeMode | NONE = no acks will be sent
(incompatible with
channelTransacted=true ). RabbitMQ calls this
"autoack" because the broker assumes all messages are acked
without any action from the consumer. MANUAL = the listener must
acknowledge all messages by calling
Channel.basicAck() . AUTO = the container will
acknowledge the message automatically, unless the
MessageListener throws an
exception. Note that
acknowledgeMode is complementary to
channelTransacted - if the channel is transacted then the broker
requires a commit notification in addition to the ack.
This is the default mode.
See also txSize . |
transactionManager | External transaction manager for the operation of the
listener. Also complementary to channelTransacted - if the
Channel is transacted then its
transaction will be synchronized with the external
transaction. |
prefetchCount | The number of messages to accept from the broker in one
socket frame. The higher this is the faster the messages can be
delivered, but the higher the risk of non-sequential processing.
Ignored if the acknowledgeMode is
NONE. This will be increased, if necessary, to match the
txSize . |
shutdownTimeout | When a container shuts down (e.g. if its enclosing
ApplicationContext is closed) it waits
for in-flight messages to be processed up to this limit.
Defaults to 5 seconds. After the limit is reached, if the
channel is not transacted messages will be discarded. |
txSize | When used with acknowledgeMode AUTO,
the container will attempt to process up to
this number of messages before sending an ack (waiting for each one up
to the receive timeout setting). This is also when a
transactional channel is committed. If the prefetchCount
is less than the txSize , it will be increased
to match the txSize . |
receiveTimeout | The maximum time to wait for each message. If
acknowledgeMode=NONE this has very little effect -
the container just spins round and asks for another message. It
has the biggest effect for a transactional
Channel with txSize > 1 ,
since it can cause messages already consumed not to be
acknowledged until the timeout expires. |
autoStartup | Flag to indicate that the container should start when the
ApplicationContext does (as part of the
SmartLifecycle callbacks which happen
after all beans are initialized). Defaults to true, but set it
to false if your broker might not be available on startup, and
then call start() later manually when you know the
broker is ready. |
phase | When autoStartup is true, the lifecycle phase within which this container should start and stop. The lower the value the earlier this container will start and the later it will stop. The default is Integer.MAX_VALUE meaning the container will start as late as possible and stop as soon as possible. |
adviceChain | An array of AOP Advice to apply to the listener
execution. This can be used to apply additional cross cutting
concerns such as automatic retry in the event of broker death.
Note that simple re-connection after an AMQP error is handled by
the CachingConnectionFactory , as long as
the broker is still alive. |
taskExecutor | A reference to a Spring TaskExecutor (or standard JDK 1.5+ Executor) for executing listener invokers. Default is a SimpleAsyncTaskExecutor, using internally managed threads. |
errorHandler | A reference to an ErrorHandler strategy for handling any
uncaught Exceptions that may occur during the execution of the
MessageListener.
Default: ConditionalRejectingErrorHandler |
concurrentConsumers | The number of concurrent consumers to initially start for each listener. See Section 3.12, “Listener Concurrency”. |
maxConcurrentConsumers | The maximum number of concurrent consumers to start, if needed, on demand. Must be greater than or equal to 'concurrentConsumers'. See Section 3.12, “Listener Concurrency”. |
startConsumerMinInterval | The time in milliseconds which must elapse before each new consumer is started on demand. See Section 3.12, “Listener Concurrency”. Default 10000 (10 seconds). |
stopConsumerMinInterval | The time in milliseconds which must elapse before a consumer is stopped, since the last consumer was stopped, when an idle consumer is detected. See Section 3.12, “Listener Concurrency”. Default 60000 (1 minute). |
consecutiveActiveTrigger | The minimum number of consecutive messages received by a consumer, without a receive timeout occurring, when considering starting a new consumer. Also impacted by 'txSize'. See Section 3.12, “Listener Concurrency”. Default 10. |
consecutiveIdleTrigger | The minimum number of receive timeouts a consumer must experience before considering stopping a consumer. Also impacted by 'txSize'. See Section 3.12, “Listener Concurrency”. Default 10. . |
connectionFactory | A reference to the connectionFactory; when configuring using the XML namespace, the default referenced bean name is "rabbitConnectionFactory". |
defaultRequeueRejected | Determines whether messages that are rejected because the listener threw an exception should be requeued or not. Default 'true'. |
recoveryInterval | Determines the time in milliseconds between attempts to start a consumer if it fails to start for non-fatal reasons. Default '5000'. |
exclusive | Determines whether the single consumer in this container has
exclusive access to the queue(s). The concurrency of the container
must be 1 when this is true. If another consumer has exclusive access,
the container will attempt to recover the consumer, according to the
recovery-interval . When using the namespace, this attribute
appears on the <rabbit:listener/> element along with the queue names.
Default 'false'. |
rabbitAdmin | When a listener container listens to at least one auto-delete
queue and it is found to be missing during startup, the container uses
a RabbitAdmin to declare the queue and any
related bindings and exchanges. If such elements are configured to
use conditional declaration (see Section 3.8.1, “Conditional Declaration”),
the container must use the admin that was configured to declare those
elements. Specify that admin here; only required when using auto-delete
queues with conditional declaration. If you do not wish the auto-delete
queue(s) to be declared until the container is started, set
auto-startup to false on the admin.
Defaults to a RabbitAdmin that
will declare all non-conditional elements. |
missingQueuesFatal | Starting with version 1.3.5,
When set to This was not configurable in previous versions. When set to You can also use a properties bean to set the property globally for all containers, as follows: <util:properties id="spring.amqp.global.properties"> <prop key="smlc.missing.queues.fatal">false</prop> </util:properties> This global property will not be applied to any containers that have an explicit
|
autoDeclare (auto-declare) | Starting with version 1.4,
When set to |
By default, the listener container will start a single consumer which will receive messages from the queue(s).
When examining the table in the previous section, you will see a number
of properties/attributes that control concurrency. The simplest is
concurrentConsumers
, which simply creates that (fixed) number
of consumers which will concurrently process messages.
Prior to version 1.3.0, this was the only setting available and the container had to be stopped and started again to change the setting.
Since version 1.3.0, you can now dynamically
adjust the concurrentConsumers
property. If it is
changed while the container is running, consumers will be added or
removed as necessary to adjust to the new setting.
In addition, a new property maxConcurrentConsumers
has been
added and the container will dynamically adjust the concurrency based on
workload. This works in conjunction with four additional properties:
consecutiveActiveTrigger
, startConsumerMinInterval
,
consecutiveIdleTrigger
, stopConsumerMinInterval
. With
the default settings, the algorithm to increase consumers works as follows:
If the maxConcurrentConsumers
has not been reached and an existing
consumer is active for 10 consecutive cycles AND at least 10 seconds
has elapsed since the last consumer was started, a new consumer is started.
A consumer is considered active if it received at least one message in
txSize
* receiveTimeout
milliseconds.
With the default settings, the algorithm to decrease consumers works as follows:
If there are more than concurrentConsumers
running and a consumer
detects 10 consecutive timeouts (idle) AND the last consumer was stopped at
least 60 seconds ago, a consumer will be stopped. The timeout depends on the
receiveTimeout
and the txSize
properties. A consumer
is considered idle if it receives no messages in txSize
*
receiveTimeout
milliseconds. So, with the
default timeout (1 second) and a txSize
of 4, stopping a consumer
will be considered after 40 seconds of idle time (4 timeouts correspond to 1
idle detection).
Note | |
---|---|
Practically, consumers will only be stopped if the whole container is idle for some time. This is because the broker will share its work across all the active consumers. |
Also starting with version 1.3, the listener container can be configured with a single exclusive consumer; this prevents other containers from consuming from the queue(s) until the current consumer is cancelled. The concurrency of such a container must be 1.
When using exclusive consumers, other containers will attempt to consume from
the queue(s) according to the recoveryInterval
property, and log
a WARNing if the attempt fails.
Version 1.3 introduced a number of improvements for handling multiple queues in a listener container.
The container must be configured to listen on at least one queue; this
was the case previously too, but now
queues can be added and removed at runtime.
The container will recycle (cancel and re-create) the consumers when any
pre-fetched messages have been processed. See methods addQueues
,
addQueueNames
, removeQueues
and
removeQueueNames
. When removing queues,
at least one queue must remain.
A consumer will now start if any of its queues are available - previously the container would stop if any queues were unavailable. Now, this is only the case if none of the queues are available. If not all queues are available, the container will attempt to passively declare (and consume from) the missing queue(s) every 60 seconds.
Also, if a consumer receives a cancel from the broker (for example if a queue is deleted) the consumer will attempt to recover and the recovered consumer will continue to process messages from any other configured queues. Previously a cancel on one queue cancelled the entire consumer and eventually the container would stop due to the missing queue.
If you wish to permanently remove a queue, you should update the container before or after deleting to queue, to avoid future attempts to consume from it.
Some of the key (and most popular) high-level features that Spring AMQP provides are to do with recovery and automatic re-connection in the event of a protocol error or broker failure. We have seen all the relevant components already in this guide, but it should help to bring them all together here and call out the features and recovery scenarios individually.
The primary reconnection features are enabled by the
CachingConnectionFactory
itself.
It is also often beneficial to use the RabbitAdmin
auto-declaration features. In addition, if you care about
guaranteed delivery, you probably also need to use the
channelTransacted
flag in
RabbitTemplate
and
SimpleMessageListenerContainer
and also the
AcknowledgeMode.AUTO
(or manual if you do the acks
yourself) in the
SimpleMessageListenerContainer
.
The RabbitAdmin
component can
declare exchanges, queues and bindings on startup. It does this
lazily, through a ConnectionListener
, so
if the broker is not present on startup it doesn't matter. The
first time a Connection
is used (e.g. by
sending a message) the listener will fire and the admin features
will be applied. A further benefit of doing the auto
declarations in a listener is that if the connection is dropped
for any reason (e.g. broker death, network glitch, etc.) they
will be applied again the next time they are needed.
Note | |
---|---|
Queues declared this way must have fixed names; either explicitly
declared, or generated by the framework for AnonymousQueue s.
Anonymous queues are non-durable, exclusive, and auto-delete.
|
Important | |
---|---|
Automatic declaration is only performed when the
CachingConnectionFactory cache mode
is CHANNEL (the default). This limitation exists
because exlusive and auto-delete queues are bound to the
connection.
|
If you lose your connection to the broker in a synchronous
sequence using RabbitTemplate
(for
instance), then Spring AMQP will throw an
AmqpException
(usually but not always
AmqpIOException
). We don't try to hide
the fact that there was a problem, so you have to be able to
catch and respond to the exception. The easiest thing to do if
you suspect that the connection was lost, and it wasn't your
fault, is to simply try the operation again. You can do this
manually, or you could look at using Spring Retry to handle the
retry (imperatively or declaratively).
Spring Retry provides a couple of AOP interceptors and a
great deal of flexibility to specify the parameters of the retry
(number of attempts, exception types, backoff algorithm etc.).
Spring AMQP also provides some convenience factory beans for
creating Spring Retry interceptors in a convenient form for AMQP
use cases, with strongly typed callback interfaces for you to
implement custom recovery logic. See the Javadocs and
properties of
StatefulRetryOperationsInterceptor
and
StatelessRetryOperationsInterceptor
for
more detail. Stateless retry is appropriate if there is no
transaction or if a transaction is started inside the retry
callback. Note that stateless retry is simpler to configure and
analyse than stateful retry, but it is not usually appropriate
if there is an ongoing transaction which must be rolled back or
definitely is going to roll back. A dropped connection in the
middle of a transaction should have the same effect as a
rollback, so for reconnection where the transaction is started
higher up the stack, stateful retry is usually the best
choice.
Starting with version 1.3, a builder API
is provided to aid in assembling these interceptors using
Java (or in @Configuration
classes), for example:
@Bean public StatefulRetryOperationsInterceptor interceptor() { return RetryInterceptorBuilder.stateful() .maxAttempts(5) .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval .build(); }
Only a subset of retry capabilities can be configured this way; more advanced
features would need the configuration of a RetryTemplate
as a Spring bean.
See the Spring Retry Javadocs for complete information about available policies
and their configuration.
If a MessageListener
fails because
of a business exception, the exception is handled by the message
listener container and then it goes back to listening for
another message. If the failure is caused by a dropped
connection (not a business exception), then the consumer that is
collecting messages for the listener has to be cancelled and
restarted. The
SimpleMessageListenerContainer
handles
this seamlessly, and it leaves a log to say that the listener is
being restarted. In fact it loops endlessly trying to restart
the consumer, and only if the consumer is very badly behaved
indeed will it give up. One side effect is that if the broker
is down when the container starts, it will just keep trying
until a connection can be established.
Business exception handling, as opposed to protocol errors
and dropped connections, might need more thought and some custom
configuration, especially if transactions and/or container acks
are in use. Prior to 2.8.x, RabbitMQ had no definition of dead letter behaviour, so
by default a message that is rejected or rolled back because of
a business exception can be redelivered ad infinitum. To put a
limit in the client on the number of re-deliveries, one
choice is a
StatefulRetryOperationsInterceptor
in the
advice chain of the listener. The interceptor can have a
recovery callback that implements a custom dead letter action:
whatever is appropriate for your particular environment.
Another alternative is to set the container's rejectRequeued property to false. This causes all failed messages to be discarded. When using RabbitMQ 2.8.x or higher, this also facilitates delivering the message to a Dead Letter Exchange.
Or, you can throw a AmqpRejectAndDontRequeueException
;
this prevents message requeuing, regardless of the setting of the
defaultRequeueRejected
property.
Often, a combination of both techniques will be used. Use a
StatefulRetryOperationsInterceptor
in the
advice chain, where it's MessageRecover
throws an AmqpRejectAndDontRequeueException
.
The MessageRecover
is called when all
retries have been exhausted. The default
MessageRecoverer
simply consumes the
errant message and emits a WARN message. In which case,
the message is ACK'd and won't be sent to the Dead Letter
Exchange, if any.
Starting with version 1.3, a new
RepublishMessageRecoverer
is provided,
to allow publishing of failed messages after retries are exhausted:
@Bean RetryOperationsInterceptor interceptor() { return RetryInterceptorBuilder.stateless() .withMaxAttempts(5) .setRecoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz")) .build(); }
Spring Retry has a great deal of flexibility for determining which
exceptions can invoke retry. The default configuration will retry
for all exceptions. Given that user exceptions will be wrapped in
a ListenerExecutionFailedException
we need
to ensure that the classification examines the exception causes.
The default classifier just looks at the top level exception.
Since Spring Retry 1.0.3, the
BinaryExceptionClassifier
has a property
traverseCauses
(default false
). When
true
it will traverse exception causes until it
finds a match or there is no cause.
To use this classifier for retry, use a SimpleRetryPolicy
created with the constructor that takes the max attempts, the
Map
of Exception
s
and the boolean (traverseCauses), and inject this policy into the
RetryTemplate
.
Spring AMQP provides extensive logging, especially at DEBUG
level.
If you wish to monitor the AMQP protocol between the application and broker, you could
use a tool such as WireShark, which has a plugin to decode the protocol. Alternatively
the RabbitMQ java client comes with a very useful class Tracer
.
When run as a main
, by default, it listens on port 5673 and connects
to port 5672 on localhost. Simply run it, and change your connection factory
configuration to connect to port 5673 on localhost. It displays the decoded protocol
on the console. Refer to the Tracer
javadocs for more information.