This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 3.2.1! |
Configuring the Broker
The AMQP specification describes how the protocol can be used to configure queues, exchanges, and bindings on the broker.
These operations (which are portable from the 0.8 specification and higher) are present in the AmqpAdmin
interface in the org.springframework.amqp.core
package.
The RabbitMQ implementation of that class is RabbitAdmin
located in the org.springframework.amqp.rabbit.core
package.
The AmqpAdmin
interface is based on using the Spring AMQP domain abstractions and is shown in the following listing:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
See also Scoped Operations.
The getQueueProperties()
method returns some limited information about the queue (message count and consumer count).
The keys for the properties returned are available as constants in the RabbitTemplate
(QUEUE_NAME
,
QUEUE_MESSAGE_COUNT
, and QUEUE_CONSUMER_COUNT
).
The RabbitMQ REST API provides much more information in the QueueInfo
object.
The no-arg declareQueue()
method defines a queue on the broker with a name that is automatically generated.
The additional properties of this auto-generated queue are exclusive=true
, autoDelete=true
, and durable=false
.
The declareQueue(Queue queue)
method takes a Queue
object and returns the name of the declared queue.
If the name
property of the provided Queue
is an empty String
, the broker declares the queue with a generated name.
That name is returned to the caller.
That name is also added to the actualName
property of the Queue
.
You can use this functionality programmatically only by invoking the RabbitAdmin
directly.
When using auto-declaration by the admin when defining a queue declaratively in the application context, you can set the name property to ""
(the empty string).
The broker then creates the name.
Starting with version 2.1, listener containers can use queues of this type.
See Containers and Broker-Named queues for more information.
This is in contrast to an AnonymousQueue
where the framework generates a unique (UUID
) name and sets durable
to
false
and exclusive
, autoDelete
to true
.
A <rabbit:queue/>
with an empty (or missing) name
attribute always creates an AnonymousQueue
.
See AnonymousQueue
to understand why AnonymousQueue
is preferred over broker-generated queue names as well as
how to control the format of the name.
Starting with version 2.1, anonymous queues are declared with argument Queue.X_QUEUE_LEADER_LOCATOR
set to client-local
by default.
This ensures that the queue is declared on the node to which the application is connected.
Declarative queues must have fixed names because they might be referenced elsewhere in the context — such as in the
listener shown in the following example:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
The RabbitMQ implementation of this interface is RabbitAdmin
, which, when configured by using Spring XML, resembles the following example:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
When the CachingConnectionFactory
cache mode is CHANNEL
(the default), the RabbitAdmin
implementation does automatic lazy declaration of queues, exchanges, and bindings declared in the same ApplicationContext
.
These components are declared as soon as a Connection
is opened to the broker.
There are some namespace features that make this very convenient — for example,
in the Stocks sample application, we have the following:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
In the preceding example, we use anonymous queues (actually, internally, just queues with names generated by the framework, not by the broker) and refer to them by ID. We can also declare queues with explicit names, which also serve as identifiers for their bean definitions in the context. The following example configures a queue with an explicit name:
<rabbit:queue name="stocks.trade.queue"/>
You can provide both id and name attributes.
This lets you refer to the queue (for example, in a binding) by an ID that is independent of the queue name.
It also allows standard Spring features (such as property placeholders and SpEL expressions for the queue name).
These features are not available when you use the name as the bean identifier.
|
Queues can be configured with additional arguments — for example, x-message-ttl
.
When you use the namespace support, they are provided in the form of a Map
of argument-name/argument-value pairs, which are defined by using the <rabbit:queue-arguments>
element.
The following example shows how to do so:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
By default, the arguments are assumed to be strings. For arguments of other types, you must provide the type. The following example shows how to specify the type:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
When providing arguments of mixed types, you must provide the type for each entry element. The following example shows how to do so:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
With Spring Framework 3.2 and later, this can be declared a little more succinctly, as follows:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
When you use Java configuration, the Queue.X_QUEUE_LEADER_LOCATOR
argument is supported as a first class property through the setLeaderLocator()
method on the Queue
class.
Starting with version 2.1, anonymous queues are declared with this property set to client-local
by default.
This ensures that the queue is declared on the node the application is connected to.
The RabbitMQ broker does not allow declaration of a queue with mismatched arguments.
For example, if a queue already exists with no time to live argument, and you attempt to declare it with (for example) key="x-message-ttl" value="100" , an exception is thrown.
|
By default, the RabbitAdmin
immediately stops processing all declarations when any exception occurs.
This could cause downstream issues, such as a listener container failing to initialize because another queue (defined after the one in error) is not declared.
This behavior can be modified by setting the ignore-declaration-exceptions
attribute to true
on the RabbitAdmin
instance.
This option instructs the RabbitAdmin
to log the exception and continue declaring other elements.
When configuring the RabbitAdmin
using Java, this property is called ignoreDeclarationExceptions
.
This is a global setting that applies to all elements.
Queues, exchanges, and bindings have a similar property that applies to just those elements.
Prior to version 1.6, this property took effect only if an IOException
occurred on the channel, such as when there is a mismatch between current and desired properties.
Now, this property takes effect on any exception, including TimeoutException
and others.
In addition, any declaration exceptions result in the publishing of a DeclarationExceptionEvent
, which is an ApplicationEvent
that can be consumed by any ApplicationListener
in the context.
The event contains a reference to the admin, the element that was being declared, and the Throwable
.
Headers Exchange
Starting with version 1.3, you can configure the HeadersExchange
to match on multiple headers.
You can also specify whether any or all headers must match.
The following example shows how to do so:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
Starting with version 1.6, you can configure Exchanges
with an internal
flag (defaults to false
) and such an
Exchange
is properly configured on the Broker through a RabbitAdmin
(if one is present in the application context).
If the internal
flag is true
for an exchange, RabbitMQ does not let clients use the exchange.
This is useful for a dead letter exchange or exchange-to-exchange binding, where you do not wish the exchange to be used
directly by publishers.
To see how to use Java to configure the AMQP infrastructure, look at the Stock sample application,
where there is the @Configuration
class AbstractStockRabbitConfiguration
, which ,in turn has
RabbitClientConfiguration
and RabbitServerConfiguration
subclasses.
The following listing shows the code for AbstractStockRabbitConfiguration
:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
In the Stock application, the server is configured by using the following @Configuration
class:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
This is the end of the whole inheritance chain of @Configuration
classes.
The end result is that TopicExchange
and Queue
are declared to the broker upon application startup.
There is no binding of TopicExchange
to a queue in the server configuration, as that is done in the client application.
The stock request queue, however, is automatically bound to the AMQP default exchange.
This behavior is defined by the specification.
The client @Configuration
class is a little more interesting.
Its declaration follows:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
The client declares another queue through the declareQueue()
method on the AmqpAdmin
.
It binds that queue to the market data exchange with a routing pattern that is externalized in a properties file.
Builder API for Queues and Exchanges
Version 1.6 introduces a convenient fluent API for configuring Queue
and Exchange
objects when using Java configuration.
The following example shows how to use it:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
See the Javadoc for org.springframework.amqp.core.QueueBuilder
and org.springframework.amqp.core.ExchangeBuilder
for more information.
Starting with version 2.0, the ExchangeBuilder
now creates durable exchanges by default, to be consistent with the simple constructors on the individual AbstractExchange
classes.
To make a non-durable exchange with the builder, use .durable(false)
before invoking .build()
.
The durable()
method with no parameter is no longer provided.
Version 2.2 introduced fluent APIs to add "well known" exchange and queue arguments…
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
Declaring Collections of Exchanges, Queues, and Bindings
You can wrap collections of Declarable
objects (Queue
, Exchange
, and Binding
) in Declarables
objects.
The RabbitAdmin
detects such beans (as well as discrete Declarable
beans) in the application context, and declares the contained objects on the broker whenever a connection is established (initially and after a connection failure).
The following example shows how to do so:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
In versions prior to 2.1, you could declare multiple Declarable instances by defining beans of type Collection<Declarable> .
This can cause undesirable side effects in some cases, because the admin has to iterate over all Collection<?> beans.
|
Version 2.2 added the getDeclarablesByType
method to Declarables
; this can be used as a convenience, for example, when declaring the listener container bean(s).
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
Conditional Declaration
By default, all queues, exchanges, and bindings are declared by all RabbitAdmin
instances (assuming they have auto-startup="true"
) in the application context.
Starting with version 2.1.9, the RabbitAdmin
has a new property explicitDeclarationsOnly
(which is false
by default); when this is set to true
, the admin will only declare beans that are explicitly configured to be declared by that admin.
Starting with the 1.2 release, you can conditionally declare these elements. This is particularly useful when an application connects to multiple brokers and needs to specify with which brokers a particular element should be declared. |
The classes representing these elements implement Declarable
, which has two methods: shouldDeclare()
and getDeclaringAdmins()
.
The RabbitAdmin
uses these methods to determine whether a particular instance should actually process the declarations on its Connection
.
The properties are available as attributes in the namespace, as shown in the following examples:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
By default, the auto-declare attribute is true and, if the declared-by is not supplied (or is empty), then all RabbitAdmin instances declare the object (as long as the admin’s auto-startup attribute is true , the default, and the admin’s explicit-declarations-only attribute is false).
|
Similarly, you can use Java-based @Configuration
to achieve the same effect.
In the following example, the components are declared by admin1
but not by admin2
:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
A Note On the id
and name
Attributes
The name
attribute on <rabbit:queue/>
and <rabbit:exchange/>
elements reflects the name of the entity in the broker.
For queues, if the name
is omitted, an anonymous queue is created (see AnonymousQueue
).
In versions prior to 2.0, the name
was also registered as a bean name alias (similar to name
on <bean/>
elements).
This caused two problems:
-
It prevented the declaration of a queue and exchange with the same name.
-
The alias was not resolved if it contained a SpEL expression (
#{…}
).
Starting with version 2.0, if you declare one of these elements with both an id
and a name
attribute, the name is no longer declared as a bean name alias.
If you wish to declare a queue and exchange with the same name
, you must provide an id
.
There is no change if the element has only a name
attribute.
The bean can still be referenced by the name
— for example, in binding declarations.
However, you still cannot reference it if the name contains SpEL — you must provide an id
for reference purposes.
AnonymousQueue
In general, when you need a uniquely-named, exclusive, auto-delete queue, we recommend that you use the AnonymousQueue
instead of broker-defined queue names (using ""
as a Queue
name causes the broker to generate the queue
name).
This is because:
-
The queues are actually declared when the connection to the broker is established. This is long after the beans are created and wired together. Beans that use the queue need to know its name. In fact, the broker might not even be running when the application is started.
-
If the connection to the broker is lost for some reason, the admin re-declares the
AnonymousQueue
with the same name. If we used broker-declared queues, the queue name would change.
You can control the format of the queue name used by AnonymousQueue
instances.
By default, the queue name is prefixed by spring.gen-
followed by a base64 representation of the UUID
— for example: spring.gen-MRBv9sqISkuCiPfOYfpo4g
.
You can provide an AnonymousQueue.NamingStrategy
implementation in a constructor argument.
The following example shows how to do so:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
The first bean generates a queue name prefixed by spring.gen-
followed by a base64 representation of the UUID
— for
example: spring.gen-MRBv9sqISkuCiPfOYfpo4g
.
The second bean generates a queue name prefixed by something-
followed by a base64 representation of the UUID
.
The third bean generates a name by using only the UUID (no base64 conversion) — for example, f20c818a-006b-4416-bf91-643590fedb0e
.
The base64 encoding uses the “URL and Filename Safe Alphabet” from RFC 4648.
Trailing padding characters (=
) are removed.
You can provide your own naming strategy, whereby you can include other information (such as the application name or client host) in the queue name.
You can specify the naming strategy when you use XML configuration.
The naming-strategy
attribute is present on the <rabbit:queue>
element
for a bean reference that implements AnonymousQueue.NamingStrategy
.
The following examples show how to specify the naming strategy in various ways:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
The first example creates names such as spring.gen-MRBv9sqISkuCiPfOYfpo4g
.
The second example creates names with a String representation of a UUID.
The third example creates names such as custom.gen-MRBv9sqISkuCiPfOYfpo4g
.
You can also provide your own naming strategy bean.
Starting with version 2.1, anonymous queues are declared with argument Queue.X_QUEUE_LEADER_LOCATOR
set to client-local
by default.
This ensures that the queue is declared on the node to which the application is connected.
You can revert to the previous behavior by calling queue.setLeaderLocator(null)
after constructing the instance.
Recovering Auto-Delete Declarations
Normally, the RabbitAdmin
(s) only recover queues/exchanges/bindings that are declared as beans in the application context; if any such declarations are auto-delete, they will be removed by the broker if the connection is lost.
When the connection is re-established, the admin will redeclare the entities.
Normally, entities created by calling admin.declareQueue(…)
, admin.declareExchange(…)
and admin.declareBinding(…)
will not be recovered.
Starting with version 2.4, the admin has a new property redeclareManualDeclarations
; when true
, the admin will recover these entities in addition to the beans in the application context.
Recovery of individual declarations will not be performed if deleteQueue(…)
, deleteExchange(…)
or removeBinding(…)
is called.
Associated bindings are removed from the recoverable entities when queues and exchanges are deleted.
Finally, calling resetAllManualDeclarations()
will prevent the recovery of any previously declared entities.