Spring Integration provides Channel Adapters for receiving and sending messages using the Advanced Message Queuing Protocol (AMQP). The following adapters are available:
Spring Integration also provides a point-to-point Message Channel as well as a publish/subscribe Message Channel backed by AMQP Exchanges and Queues.
In order to provide AMQP support, Spring Integration relies on Spring AMQP (http://www.springsource.org/spring-amqp) which "applies core Spring concepts to the development of AMQP-based messaging solutions". Spring AMQP provides similar semantics as Spring JMS (http://static.springsource.org/spring/docs/current/spring-framework-reference/html/jms.html).
Whereas the provided AMQP Channel Adapters are intended for unidirectional Messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP Gateways for request/reply operations.
Tip | |
---|---|
Please familiarize yourself with the reference documentation of the Spring AMQP project as well. It provides much more in-depth information regarding Spring's integration with AMQP in general and RabbitMQ in particular. You can find the documentation at: http://static.springsource.org/spring-amqp/reference/html/ |
A configuration sample for an AMQP Inbound Channel Adapter is shown below.
<int-amqp:inbound-channel-adapter id="inboundAmqp" channel="inboundChannel" queue-names="si.test.queue" acknowledge-mode="AUTO" advice-chain="" channel-transacted="" concurrent-consumers="" connection-factory="" error-channel="" expose-listener-channel="" header-mapper="" mapped-request-headers="" listener-container="" message-converter="" message-properties-converter="" phase=""(16) prefetch-count=""(17) receive-timeout=""(18) recovery-interval=""(19) missing-queues-fatal=""(20) shutdown-timeout=""(21) task-executor=""(22) transaction-attribute=""(23) transaction-manager=""(24) tx-size=""(25)/>
Unique ID for this adapter. Optional. | ||||
Message Channel to which converted Messages should be sent. Required. | ||||
Names of the AMQP Queues from which Messages should be consumed (comma-separated list). Required. | ||||
Acknowledge Mode for the MessageListenerContainer.
When set to MANUAL, the delivery tag and channel are provided in
message headers | ||||
Extra AOP Advice(s) to handle cross cutting behavior associated with this Inbound Channel Adapter. Optional. | ||||
Flag to indicate that channels created by this component will be transactional. Ff true, tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback depending on the outcome, with an exception signalling a rollback. Optional (Defaults to false). | ||||
Specify the number of concurrent consumers to create. Default is 1. Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues. Optional. | ||||
Bean reference to the RabbitMQ ConnectionFactory. Optional (Defaults to 'connectionFactory'). | ||||
Message Channel to which error Messages should be sent. Optional. | ||||
Shall the listener channel (com.rabbitmq.client.Channel) be exposed to a registered ChannelAwareMessageListener. Optional (Defaults to true). | ||||
A reference to an | ||||
Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders. This can only be provided if the 'header-mapper' reference is not provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "foo*, bar" or "*foo"). | ||||
Reference to the
| ||||
The MessageConverter to use when receiving AMQP Messages. Optional. | ||||
The MessagePropertiesConverter to use when receiving AMQP Messages. Optional. | ||||
Specify the phase in which the underlying SimpleMessageListenerContainer should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible. Optional. | ||||
Tells the AMQP broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. It should be greater than or equal to the transaction size (see attribute "tx-size"). Optional (Defaults to 1). | ||||
Receive timeout in milliseconds. Optional (Defaults to 1000). | ||||
Specifies the interval between recovery attempts of the underlying SimpleMessageListenerContainer (in milliseconds). Optional (Defaults to 5000). | ||||
If 'true', and none of the queues are available on the broker, the container
will throw a fatal exception during startup and will stop if the queues are deleted when
the container is running (after making 3 attempts to passively declare the queues). If false,
the container will not throw an exception and go into recovery mode, attempting to restart according
to the | ||||
The time to wait for workers in milliseconds after the underlying SimpleMessageListenerContainer is stopped, and before the AMQP connection is forced closed. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Otherwise the connection is closed and messages remain unacked (if the channel is transactional). Defaults to 5000 milliseconds. Optional (Defaults to 5000). | ||||
By default, the underlying SimpleMessageListenerContainer uses a SimpleAsyncTaskExecutor implementation, that fires up a new Thread for each task, executing it asynchronously. By default, the number of concurrent threads is unlimited. NOTE: This implementation does not reuse threads. Consider a thread-pooling TaskExecutor implementation as an alternative. Optional (Defaults to SimpleAsyncTaskExecutor). | ||||
By default the underlying SimpleMessageListenerContainer creates a new instance of the DefaultTransactionAttribute (takes the EJB approach to rolling back on runtime, but not checked exceptions. Optional (Defaults to DefaultTransactionAttribute). | ||||
Sets a Bean reference to an external PlatformTransactionManager on the underlying SimpleMessageListenerContainer. The transaction manager works in conjunction with the "channel-transacted" attribute. If there is already a transaction in progress when the framework is sending or receiving a message, and the channelTransacted flag is true, then the commit or rollback of the messaging transaction will be deferred until the end of the current transaction. If the channelTransacted flag is false, then no transaction semantics apply to the messaging operation (it is auto-acked). For further information see chapter 1.9 of the Spring AMQP reference guide: http://static.springsource.org/spring-amqp/docs/1.0.x/reference/html/#d0e525 Optional. | ||||
Tells the SimpleMessageListenerContainer how many messages to process in a single transaction (if the channel is transactional). For best results it should be less than or equal to the set "prefetch-count". Optional (Defaults to 1). |
Important | |
---|---|
Even though the Spring Integration JMS and AMQP support is very similar, important differences exist. The JMS Inbound Channel Adapter is using a JmsDestinationPollingSource under the covers and expects a configured Poller. The AMQP Inbound Channel Adapter on the other side uses a SimpleMessageListenerContainer and is message driven. In that regard it is more similar to the JMS Message Driven Channel Adapter. |
The inbound gateway supports all the attributes on the inbound channel adapter (except 'channel' is replaced by 'request-channel'), plus some additional attributes:
<int-amqp:inbound-gateway id="inboundGateway" request-channel="myRequestChannel" header-mapper="" mapped-request-headers="" mapped-reply-headers="" reply-channel="myReplyChannel" reply-timeout="1000"/>
Unique ID for this adapter. Optional. | |
Message Channel to which converted Messages should be sent. Required. | |
A reference to an | |
Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the
| |
Comma-separated list of names of | |
Message Channel where reply Messages will be expected. Optional. | |
Used to set the |
See the note in Section 10.2, “Inbound Channel Adapter” about configuring the listener-container
attribute.
By default the inbound endpoints use acknowledge mode AUTO
, which means the container
automatically acks the message when the downstream integration flow completes (or a message is
handed off to another thread using a QueueChannel
or ExecutorChannel
).
Setting the mode to NONE
configures the consumer such that acks are not used at all
(the broker automatically acks the message as soon as it is sent). Setting the mode to
MANUAL
allows user code to ack the message at some other point during processing.
To support this, with this mode, the endpoints provide the Channel
and
deliveryTag
in the amqp_channel
and amqp_deliveryTag
headers respectively.
You can perform any valid rabbit command on the Channel
but, generally, only
basicAck
and basicNack
(or basicReject
) would be used.
In order to not interfere with the operation of the container, you should not retain a reference
to the channel and just use it in the context of the current message.
Note | |
---|---|
Since the Channel is a reference to a "live" object, it cannot be serialized
and will be lost if a message is persisted.
|
This is an example of how you might use MANUAL
acknowledgement:
@ServiceActivator(inputChannel = "foo", outputChannel = "bar") public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception { // Do some processing if (allOK) { channel.basicAck(deliveryTag, false); // perhaps do some more processing } else { channel.basicNack(deliveryTag, false, true); } return someResultForDownStreamProcessing; }
A configuration sample for an AMQP Outbound Channel Adapter is shown below.
<int-amqp:outbound-channel-adapter id="outboundAmqp" channel="outboundChannel" amqp-template="myAmqpTemplate" exchange-name="" exchange-name-expression="" order="1" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" header-mapper="" mapped-request-headers="" lazy-connect="true"(16)/>
Unique ID for this adapter. Optional. | ||||
Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required. | ||||
Bean Reference to the configured AMQP Template Optional (Defaults to "amqpTemplate"). | ||||
The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with 'exchange-name-expression'. Optional. | ||||
A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with 'exchange-name'. Optional. | ||||
The order for this consumer when multiple consumers are registered thereby enabling load- balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]). | ||||
The fixed routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with 'routing-key-expression'. Optional. | ||||
A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. 'payload.key'). By default, this will be an empty String. Mutually exclusive with 'routing-key'. Optional. | ||||
The default delivery mode for messages; 'PERSISTENT' or 'NON_PERSISTENT'. Overridden if the 'header-mapper'
sets the delivery mode. The 'DefaultHeaderMapper' sets the value if the
Spring Integration message header | ||||
An expression defining correlation data. When provided, this configures the underlying
amqp template to receive publisher confirms. Requires a dedicated
Starting with version 4.1 the | ||||
The channel to which positive (ack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Optional, default=nullChannel. | ||||
The channel to which negative (nack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Optional, default=nullChannel. | ||||
The channel to which returned messages are sent. When provided, the underlying amqp template is configured to return undeliverable messages to the adapter. The message will be constructed from the data received from amqp, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey. Optional.
| ||||
A reference to an | ||||
Comma-separated list of names of AMQP Headers to be mapped from the
| ||||
When set to |
A configuration sample for an AMQP Outbound Gateway is shown below.
<int-amqp:outbound-gateway id="inboundGateway" request-channel="myRequestChannel" amqp-template="" exchange-name="" exchange-name-expression="" order="1" reply-channel="" reply-channel="" requires-reply="" routing-key="" routing-key-expression="" default-delivery-mode"" return-channel="" lazy-connect="true"/>
Unique ID for this adapter. Optional. | ||||
Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required. | ||||
Bean Reference to the configured AMQP Template Optional (Defaults to "amqpTemplate"). | ||||
The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with 'exchange-name-expression'. Optional. | ||||
A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with 'exchange-name'. Optional. | ||||
The order for this consumer when multiple consumers are registered thereby enabling load- balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]). | ||||
Message Channel to which replies should be sent after being received from an AQMP Queue and converted. Optional. | ||||
The time the gateway will wait when sending the reply message to the | ||||
When | ||||
The routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with 'routing-key-expression' Optional. | ||||
A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. 'payload.key'). By default, this will be an empty String. Mutually exclusive with 'routing-key'. Optional. | ||||
The default delivery mode for messages; 'PERSISTENT' or 'NON_PERSISTENT'. Overridden if the 'header-mapper'
sets the delivery mode. The 'DefaultHeaderMapper' sets the value if the
Spring Integration message header | ||||
The channel to which returned messages are sent. When provided, the underlying amqp template is configured to return undeliverable messages to the gateway. The message will be constructed from the data received from amqp, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey. Optional.
| ||||
When set to |
Note | |
---|---|
Prior to Spring Integration 2.2, and Spring AMQP 1.1, the outbound gateway used a new, temporary,
reply queue for each request. This is still the default, but now the RabbitTemplate can be configured
with a specific queue for replies; headers are added to the outbound message for request/reply correlation.
It is important that the consuming application returns these headers unchanged. The headers are
|
Important | |
---|---|
The underlying AmqpTemplate has a default replyTimeout of
5 seconds. If you require a longer timeout, it must be configured on the template .
|
There are two Message Channel implementations available. One is point-to-point, and the other is publish/subscribe. Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate and SimpleMessageListenerContainer as you have seen on the Channel Adapters and Gateways. However, the examples we'll show here are going to have minimal configuration. Explore the XML schema to view the available attributes.
A point-to-point channel would look like this:
<int-amqp:channel id="p2pChannel"/>
Under the covers a Queue named "si.p2pChannel" would be declared, and this channel will send to that Queue (technically by sending to the no-name Direct Exchange with a routing key that matches this Queue's name). This channel will also register a consumer on that Queue. If for some reason, you want the Queue to be "pollable" instead of message-driven, then simply provide the "message-driven" flag with a value of false:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
A publish/subscribe channel would look like this:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
Under the covers a Fanout Exchange named "si.fanout.pubSubChannel" would be declared, and this channel will send to that Fanout Exchange. This channel will also declare a server-named exclusive, autodelete, non-durable Queue and bind that to the Fanout Exchange while registering a consumer on that Queue to receive Messages. There is no "pollable" option for a publish-subscribe-channel; it must be message-driven.
Starting with version 4.1 AMQP Backed Message Channels, alongside with
channel-transacted
, support template-channel-transacted
to separate
transactional
configuration for the AbstractMessageListenerContainer
and for the RabbitTemplate
.
Note, previously, the channel-transacted
was true
by default, now it changed to
false
as standard default value for the AbstractMessageListenerContainer
.
The Spring Integration AMPQ Adapters will map standard AMQP properties
automatically. These properties will be copied by default to and from
Spring Integration MessageHeaders
using the
DefaultAmqpHeaderMapper
.
Of course, you can pass in your own implementation of AMQP specific header mappers, as the adapters have respective properties to support that.
Any user-defined headers within the AMQP
MessageProperties
will NOT be copied to or from an AMQP Message, unless explicitly specified
by the requestHeaderNames and/or
replyHeaderNames properties of the
DefaultAmqpHeaderMapper
.
Tip | |
---|---|
When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo") to be matched. For example, if you need to copy all user-defined headers simply use the wild-card character '*'. |
Starting with version 4.1, the AbstractHeaderMapper
(a DefaultAmqpHeaderMapper
superclass) allows the NON_STANDARD_HEADERS
token to be configured for the requestHeaderNames and/or replyHeaderNames
properties (in addition to existing STANDARD_REQUEST_HEADERS
and
STANDARD_REPLY_HEADERS
) to map all user-defined headers. Note, it is recommended to use the
combination like this STANDARD_REPLY_HEADERS, NON_STANDARD_HEADERS
instead of
generic *
, to avoid mapping of request headers to the reply.
Class org.springframework.amqp.support.AmqpHeaders
identifies the default headers that will be used by the
DefaultAmqpHeaderMapper
:
To experiment with the AMQP adapters, check out the samples available in the Spring Integration Samples Git repository at:
Currently there is one sample available that demonstrates the basic functionality of the Spring Integration AMQP Adapter using an Outbound Channel Adapter and an Inbound Channel Adapter. As AMQP Broker implementation the sample uses RabbitMQ (http://www.rabbitmq.com/).
Note | |
---|---|
In order to run the example you will need a running instance of RabbitMQ. A local installation with just the basic defaults will be sufficient. For detailed RabbitMQ installation procedures please visit: http://www.rabbitmq.com/install.html |
Once the sample application is started, you enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue. In return that message is retrieved via Spring Integration and then printed to the console.
The image belows illustrates the basic set of Spring Integration components used in this sample.