This first part of the reference documentation is a high-level overview of Spring AMQP and the underlying concepts and some code snippets that will get you up and running as quickly as possible.
This is the 5 minute tour to get started with Spring AMQP.
Prerequisites: install and run the RabbitMQ broker (http://www.rabbitmq.com/download.html). Then grab the spring-rabbit JAR and all its dependencies - the easiest way to do that is to declare a dependency in your build tool, e.g. for Maven:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.5.0.RC1</version> </dependency>
And for gradle:
compile 'org.springframework.amqp:spring-rabbit:1.5.0.RC1'
While the default Spring Framework version dependency is 4.1.x, Spring AMQP is generally compatible with earlier versions of Spring Framework.
Annotation-based listeners and the RabbitMessagingTemplate
require Spring Framework 4.1, however.
Similarly, the default amqp-client
version is 3.4.x but the framework is generally compatible with earlier versions.
However, of course, features that rely on newer client versions will not be available.
Using plain, imperative Java to send and receive a message:
ConnectionFactory connectionFactory = new CachingConnectionFactory(); AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareQueue(new Queue("myqueue")); AmqpTemplate template = new RabbitTemplate(connectionFactory); template.convertAndSend("myqueue", "foo"); String foo = (String) template.receiveAndConvert("myqueue");
Note that there is a ConnectionFactory
in the native Java Rabbit client as well.
We are using the Spring abstraction in the code above.
We are relying on the default exchange in the broker (since none is specified in the send), and the default binding of all queues to the default exchange by their name (hence we can use the queue name as a routing key in the send).
Those behaviours are defined in the AMQP specification.
The same example as above, but externalizing the resource configuration to XML:
ApplicationContext context = new GenericXmlApplicationContext("classpath:/rabbit-context.xml"); AmqpTemplate template = context.getBean(AmqpTemplate.class); template.convertAndSend("myqueue", "foo"); String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="myqueue"/> </beans>
The <rabbit:admin/>
declaration by default automatically looks for beans of type Queue
, Exchange
and Binding
and declares them to the broker on behalf of the user, hence there is no need to use that bean explicitly in the simple Java driver.
There are plenty of options to configure the properties of the components in the XML schema - you can use auto-complete features of your XML editor to explore them and look at their documentation.
The same example again with the external configuration in Java:
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class); AmqpTemplate template = context.getBean(AmqpTemplate.class); template.convertAndSend("myqueue", "foo"); String foo = (String) template.receiveAndConvert("myqueue");
@Configuration public class RabbitConfiguration { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public Queue myQueue() { return new Queue("myqueue"); } }
The spring-erlang
jar is no longer included in the distribution.
Use Section 3.1.11, “RabbitMQ REST API” instead.
Previously, if the connection factory was configured with a host/port, but an empty String was also supplied for
addresses
, the host and port were ignored.
Now, an empty addresses
String is treated the same as a null
, and the host/port will be used.
The CachingConnectionFactory
has an additional constructor, with a URI
parameter, to configure the broker connection.
A new method resetConnection()
has been added to allow users to reset the connection (or connections).
This might be used, for example, to reconnect to the primary broker after failing over to the secondary broker.
This will impact in-process operations.
The existing destroy()
method does exactly the same, but the new method has a less daunting name.
When the listener container consumers start, they attempt to passively declare the queues to ensure they are available
on the broker.
Previously, if these declarations failed, for example because the queues didn’t exist, or when an HA queue was being
moved, the retry logic was fixed at 3 retry attempts at 5 second intervals.
If the queue(s) still do not exist, the behavior is controlled by the missingQueuesFatal
property (default true).
Also, for containers configured to listen from multiple queues, if only a subset of queues are available, the consumer
retried the missing queues on a fixed interval of 60 seconds.
These 3 properties (declarationRetries, failedDeclarationRetryInterval,
retryDeclarationInterval
) are now configurable.
See Section 3.1.14, “Message Listener Container Configuration” for more information.
The RabbitGatewaySupport
class has been moved from o.s.amqp.rabbit.core.support
to o.s.amqp.rabbit.core
.
The DefaultMessagePropertiesConverter
can now be configured to
determine the maximum length of a LongString
that will be converted
to a String
rather than a DataInputStream
.
The converter has an alternative constructor that takes the value as a limit.
Previously, this limit was hard-coded at 1024
bytes.
(Also available in 1.4.4).
The bindings
attribute has been added to the @RabbitListener
annotation as mutually exclusive with the queues
attribute to allow the specification of the queue
, its exchange
and binding
for declaration by a RabbitAdmin
on
the Broker.
The default reply address (@SendTo
) for a @RabbitListener
can now be a SpEL expression.
See the section called “Annotation-driven Listener Endpoints” for more information.
It is now possible to declare beans that define a collection of these entities and the RabbitAdmin
will add the
contents to the list of entities that it will declare when a connection is established.
See the section called “Declaring Collections of Exchanges, Queues, Bindings” for more information.
The reply-address
attribute has been added to the <rabbit-template>
component as an alternative reply-queue
.
See Section 3.1.9, “Request/Reply Messaging” for more information.
(Also available in 1.4.4 as a setter on the RabbitTemplate
).
The RabbitTemplate
now supports blocking in receive
and convertAndReceive
methods.
See the section called “Polling Consumer” for more information.
When the mandatory
flag is set when using sendAndReceive
and convertSendAndReceive
methods, the calling thread
will throw an AmqpMessageReturnedException
if the request message can’t be deliverted.
See the section called “Reply Timeout” for more information.
The framework will attempt to verify proper configuration of a reply listener container when using a named reply queue.
See the section called “Reply Listener Container” for more information.
The RabbitManagementTemplate
has been introduced to monitor and configure the RabbitMQ Broker using the REST API
provided by its Management Plugin.
See Section 3.1.11, “RabbitMQ REST API” for more information.
Important | |
---|---|
The Normal Spring bean name overrides are applied; if a later When migrating to this release, if you have |
The @RabbitListener
annotation can now be applied at the class level.
Together with the new @RabbitHandler
method annotation, this allows the handler method to be selected based on payload
type. See the section called “Multi-Method Listeners” for more information.
The SimpleMessageListenerContainer
can now be supplied with a BackOff
instance for consumer
startup recovery.
See Section 3.1.14, “Message Listener Container Configuration” for more information.
A mechanism to control the log levels of channel closure has been introduced. See the section called “Logging Channel Close Events”.
The SimpleMessageListenerContainer
now emits application events when consumers fail.
See the section called “Consumer Failure Events” for more information.
Previously, the consumer tags for asynchronous consumers were generated by the broker. With this release, it is now possible to supply a naming strategy to the listener container. See Section 3.1.6, “Consumer Tags”.
The MessageListenerAdapter
now supports a map of queue names (or consumer tags) to method names, to determine
which delegate method to call based on the queue the message was received from.
A new connection factory that connects to the node in a cluster where a mirrored queue actually resides.
See the section called “Queue Affinity and the LocalizedQueueConnectionFactory”.
POJO listeners can be annotated with @RabbitListener
, enabled by @EnableRabbit
or <rabbit:annotation-driven />
.
Spring Framework 4.1 is required for this feature.
See the section called “Annotation-driven Listener Endpoints” for more information.
A new RabbitMessagingTemplate
is provided to allow users to interact with RabbitMQ using spring-messaging
Message`s.
It uses the `RabbitTemplate
internally which can be configured as normal.
Spring Framework 4.1 is required for this feature.
See the section called “Messaging integration” for more information.
1.3.5 introduced the missingQueuesFatal
property on the SimpleMessageListenerContainer
.
This is now available on the listener container namespace element.
See Section 3.1.14, “Message Listener Container Configuration”.
The confirm
method on this interface has an additional parameter cause
.
When available, this parameter will contain the reason for a negative acknowledgement (nack).
See the section called “Publisher Confirms and Returns”.
A factory bean is now provided to create the underlying RabbitMQ ConnectionFactory
used by the CachingConnectionFactory
.
This enables configuration of SSL options using Spring’s dependency injection.
See the section called “Configuring the Underlying Client Connection Factory”.
The CachingConnectionFactory
now allows the connectionTimeout
to be set as a property or as an attribute in the namespace.
It sets the property on the underlying RabbitMQ ConnectionFactory
See the section called “Configuring the Underlying Client Connection Factory”.
The Logback org.springframework.amqp.rabbit.logback.AmqpAppender
has been introduced.
It provides similar options like org.springframework.amqp.rabbit.log4j.AmqpAppender
.
For more info see JavaDocs of these classes.
The Log4j AmqpAppender
now supports the deliveryMode
property (PERSISTENT
or NON_PERSISTENT
, default: PERSISTENT
).
Previously, all log4j messages were PERSISTENT.
The appender also supports modification of the Message
before sending - allowing, for example, the addition of custom headers.
Subclasses should override the postProcessMessageBeforeSend()
.
The listener container now, by default, redeclares any missing queues during startup.
A new auto-declare
attribute has been added to the <rabbit:listener-container>
to prevent these redeclarations.
See the section called “auto-delete Queues”.
The mandatoryExpression
and sendConnectionFactorySelectorExpression
and receiveConnectionFactorySelectorExpression
SpEL Expression`s properties have been added to the `RabbitTemplate
.
The mandatoryExpression
is used to evaluate a mandatory
boolean value against each request message, when a ReturnCallback
is in use.
See the section called “Publisher Confirms and Returns”.
The sendConnectionFactorySelectorExpression
and receiveConnectionFactorySelectorExpression
are used when an AbstractRoutingConnectionFactory
is provided, to determine the lookupKey
for the target ConnectionFactory
at runtime on each AMQP protocol interaction operation.
See the section called “Routing Connection Factory”.
A SimpleMessageListenerContainer
can be configured with a routing connection factory to enable connection selection based on the queue names.
See the section called “Routing Connection Factory”.
The recoveryCallback
property has been added to be used in the retryTemplate.execute()
.
See the section called “Adding Retry Capabilities”.
This exception is now a subclass of AmqpException
; if you have code like the following:
try { template.convertAndSend("foo", "bar", "baz"); } catch (AmqpException e) { ... } catch (MessageConversionException e) { ... }
The second catch block will no longer be reachable and needs to be moved above the catch-all AmqpException
catch block.
Spring AMQP is now compatible with the RabbitMQ 3.4, including direct reply-to; see the section called “Compatibility” and the section called “RabbitMQ Direct reply-to” for more information.
The ContentTypeDelegatingMessageConverter
has been introduced to select the MessageConverter
to use, based on the contentType
property in the MessageProperties
.
See Section 3.1.7, “Message Converters” for more information.
The listener container now supports dynamic scaling of the number of consumers based on workload, or the concurrency can be programmatically changed without stopping the container. See Section 3.1.15, “Listener Concurrency”.
The listener container now permits the queue(s) on which it is listening to be modified at runtime. Also, the container will now start if at least one of its configured queues is available for use. See Section 3.1.17, “Listener Container Queues”
This listener container will now redeclare any auto-delete queues during startup. See the section called “auto-delete Queues”.
The listener container now supports consumer arguments, allowing the x-priority
argument to be set.
See the section called “Container”.
The SimpleMessageListenerContainer
can now be configured with a single exclusive
consumer, preventing other consumers from listening to the queue.
See Section 3.1.16, “Exclusive Consumer”.
It is now possible to have the Broker generate the queue name, regardless of durable, autoDelete and exclusive settings. See Section 3.1.10, “Configuring the broker”.
Previously, omitting the key
attribute from a binding
element of a direct-exchange
configuration caused the queue or exchange to be bound with an empty string as the routing key.
Now it is bound with the the name of the provided Queue
or Exchange
.
Users wishing to bind with an empty string routing key need to specify key=""
.
The AmqpTemplate
now provides several synchronous receiveAndReply
methods.
These are implemented by the RabbitTemplate
.
For more information see Section 3.1.5, “Receiving messages”.
The RabbitTemplate
now supports configuring a RetryTemplate
to attempt retries (with optional back off policy) for when the broker is not available.
For more information see the section called “Adding Retry Capabilities”.
The caching connection factory can now be configured to cache `Connection`s and their `Channel`s instead of using a single connection and caching just `Channel`s. See Section 3.1.2, “Connection and Resource Management”.
The <exchange>
's <binding>
now supports parsing of the <binding-arguments>
sub-element.
The <headers-exchange>
's <binding>
now can be configured with a key/value
attribute pair (to match on a single header) or with a <binding-arguments>
sub-element, allowing matching on multiple headers; these options are mutually exclusive.
See the section called “Introduction”.
A new SimpleRoutingConnectionFactory
has been introduced, to allow configuration of ConnectionFactories
mapping to determine the target ConnectionFactory
to use at runtime.
See the section called “Routing Connection Factory”.
"Fluent APIs" for building messages and/or message properties is now provided. See the section called “Message Builder API”.
A "Fluent API" for building listener container retry interceptors is now provided. See the section called “Failures in Synchronous Operations and Options for Retry”.
This new MessageRecoverer
is provided to allow publishing a failed message to another queue (including stack trace information in the header) when retries are exhausted.
See the section called “Message Listeners and the Asynchronous Case”.
A default ConditionalRejectingErrorHandler
has been added to the listener container.
This error handler detects message conversion problems (which are fatal) and instructs the container to reject the message to prevent the broker from continually redelivering the unconvertible message.
See Section 3.1.12, “Exception Handling”.
The SimpleMessageListenerContainer
now has a property missingQueuesFatal
(default true
).
Previously, missing queues were always fatal.
See Section 3.1.14, “Message Listener Container Configuration”.
Spring AMQP now using RabbitMQ 3.1.x by default (but retains compatibility with earlier versions).
Certain deprecations have been added for features no longer supported by RabbitMQ 3.1.x - federated exchanges and the immediate
property on the RabbitTemplate
.
The RabbitAdmin
now provides an option to allow exchange, queue, and binding declarations to continue when a declaration fails.
Previously, all declarations stopped on a failure.
By setting ignore-declaration-exceptions
, such exceptions are logged (WARN), but further declarations continue.
An example where this might be useful is when a queue declaration fails because of a slightly different ttl
setting would normally stop other declarations from proceeding.
The RabbitAdmin
now provides an additional method getQueueProperties()
.
This can be used to determine if a queue exists on the broker (returns null for a non-existent queue).
In addition, the current number of messages in the queue, as well as the current number of consumers is returned.
Previously, when using the ...sendAndReceive()
methods were used with a fixed reply queue, two custom headers were used for correlation data and to retain/restore reply queue information.
With this release, the standard message property correlationId
is used by default, although the user can specifiy a custom property to use instead.
In addition, nested replyTo
information is now retained internally in the template, instead of using a custom header.
The immediate
property is deprecated; users must not set this property when using RabbitMQ 3.0.x or greater.
A Jackson 2.x MessageConverter
is now provided, along with the existing converter that uses Jackson 1.x.
Previously, when declaring queues, exchanges and bindings, it was not possible to define which connection factory was used for the declarations, each RabbitAdmin
would declare all components using its connection.
Starting with this release, it is now possible to limit declarations to specific RabbitAdmin
instances.
See the section called “Conditional Declaration”.
Facilities are now provided for using Spring Remoting techniques, using AMQP as the transport for the RPC calls. For more information see the section called “Spring Remoting with AMQP”
Several users have asked for the underlying client connection factory’s requestedHeartBeats
property to be exposed on the Spring AMQP CachingConnectionFactory
.
This is now available; previously, it was necessary to configure the AMQP client factory as a separate bean and provide a reference to it in the CachingConnectionFactory
.
Spring-AMQP is now built using gradle.
Adds support for publisher confirms and returns.
Adds support for HA queues, and broker failover.
Adds support for Dead Letter Exchanges/Dead Letter Queues.