Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).
You need to include this dependency into your project:
Maven.
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-amqp</artifactId> <version>5.1.0.RELEASE</version> </dependency>
Gradle.
compile "org.springframework.integration:spring-integration-amqp:5.1.0.RELEASE"
The following adapters are available:
Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.
To provide AMQP support, Spring Integration relies on (Spring AMQP), which applies core Spring concepts to the development of AMQP-based messaging solutions. Spring AMQP provides similar semantics to (Spring JMS).
Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.
TIP: You should familiarize yourself with the reference documentation of the Spring AMQP project. It provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.
The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:
<int-amqp:inbound-channel-adapter id="inboundAmqp" channel="inboundChannel" queue-names="si.test.queue" acknowledge-mode="AUTO" advice-chain="" channel-transacted="" concurrent-consumers="" connection-factory="" error-channel="" expose-listener-channel="" header-mapper="" mapped-request-headers="" listener-container="" message-converter="" message-properties-converter="" phase="" (16) prefetch-count="" (17) receive-timeout="" (18) recovery-interval="" (19) missing-queues-fatal="" (20) shutdown-timeout="" (21) task-executor="" (22) transaction-attribute="" (23) transaction-manager="" (24) tx-size="" (25) consumers-per-queue /> (26)
The unique ID for this adapter. Optional. | |
Message channel to which converted messages should be sent. Required. | |
Names of the AMQP queues (comma-separated list) from which messages should be consumed. Required. | |
Acknowledge mode for the | |
Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter. Optional. | |
Flag to indicate that channels created by this component are transactional. If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback. Optional (Defaults to false). | |
Specify the number of concurrent consumers to create.
The default is | |
Bean reference to the RabbitMQ | |
Message channel to which error messages should be sent. Optional. | |
Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered | |
A reference to an | |
Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the | |
Reference to the | |
The | |
The | |
Specifies the phase in which the underlying | |
Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the | |
Receive timeout in milliseconds.
Optional (defaults to | |
Specifies the interval between recovery attempts of the underlying | |
If true and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If | |
The time to wait for workers (in milliseconds) after the underlying | |
By default, the underlying | |
By default, the underlying | |
Sets a bean reference to an external | |
Tells the | |
Indicates that the underlying listener container should be a |
container | |
---|---|
Note that when configuring an external container, you cannot use the Spring AMQP namespace to define the container.
This is because the namespace requires at least one <bean id="container" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="queueNames" value="aName.queue" /> <property name="defaultRequeueRejected" value="false"/> </bean> |
Important | |
---|---|
Even though the Spring Integration JMS and AMQP support is similar, important differences exist.
The JMS inbound channel adapter is using a |
The following Spring Boot application shows an example of configuring the inbound adapter with Java configuration:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel amqpInputChannel() { return new DirectChannel(); } @Bean public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer, @Qualifier("amqpInputChannel") MessageChannel channel) { AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(channel); return adapter; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("aName"); container.setConcurrentConsumers(2); // ... return container; } @Bean @ServiceActivator(inputChannel = "amqpInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; } }
The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName")) .handle(m -> System.out.println(m.getPayload())) .get(); } }
Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand — for example, with a MessageSourcePollingTemplate
or a poller.
See Section 6.2.3, “Deferred Acknowledgment Pollable Message Source” for more information.
It does not currently support XML configuration.
The following example shows how to configure an AmqpMessageSource
with Java configuration:
@Bean public AmqpMessageSource source(ConnectionFactory connectionFactory) { return new AmpqpMessageSource(connectionFactory, "someQueue"); }
See the Javadoc for configuration properties.
The following example shows how to configure an inboundPolledAdapter
with the Java DSL:
@Bean public IntegrationFlow flow() { return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE), e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false)) .handle(p -> { ... }) .get(); }
The inbound gateway supports all the attributes on the inbound channel adapter (except that channel is replaced by request-channel), plus some additional attributes. The following listing shows the available attributes:
<int-amqp:inbound-gateway id="inboundGateway" request-channel="myRequestChannel" header-mapper="" mapped-request-headers="" mapped-reply-headers="" reply-channel="myReplyChannel" reply-timeout="1000" amqp-template="" default-reply-to="" />
The Unique ID for this adapter. Optional. | |
Message channel to which converted messages are sent. Required. | |
A reference to an | |
Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the | |
Comma-separated list of names of | |
Message Channel where reply Messages are expected. Optional. | |
Sets the | |
The customized | |
The |
See the note in Section 14.1, “Inbound Channel Adapter” about configuring the listener-container
attribute.
The following Spring Boot application shows an example of how to configure the inbound gateway with Java configuration:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel amqpInputChannel() { return new DirectChannel(); } @Bean public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer, @Qualifier("amqpInputChannel") MessageChannel channel) { AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer); gateway.setRequestChannel(channel); gateway.setDefaultReplyTo("bar"); return gateway; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("foo"); container.setConcurrentConsumers(2); // ... return container; } @Bean @ServiceActivator(inputChannel = "amqpInputChannel") public MessageHandler handler() { return new AbstractReplyProducingMessageHandler() { @Override protected Object handleRequestMessage(Message<?> requestMessage) { return "reply to " + requestMessage.getPayload(); } }; } }
The following Spring Boot application shows an example of how to configure the inbound gateway with the Java DSL:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean // return the upper cased payload public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo")) .transform(String.class, String::toUpperCase) .get(); } }
By default, the inbound endpoints use the AUTO
acknowledge mode, which means the container automatically acknowledges the message when the downstream integration flow completes (or a message is handed off to another thread by using a QueueChannel
or ExecutorChannel
).
Setting the mode to NONE
configures the consumer such that acknowledgments are not used at all (the broker automatically acknowledges the message as soon as it is sent).
Setting the mode to MANUAL
lets user code acknowledge the message at some other point during processing.
To support this, with this mode, the endpoints provide the Channel
and deliveryTag
in the amqp_channel
and amqp_deliveryTag
headers, respectively.
You can perform any valid Rabbit command on the Channel
but, generally, only basicAck
and basicNack
(or basicReject
) are used.
In order to not interfere with the operation of the container, you should not retain a reference to the channel and use it only in the context of the current message.
Note | |
---|---|
Since the |
The following example shows how you might use MANUAL
acknowledgement:
@ServiceActivator(inputChannel = "foo", outputChannel = "bar") public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception { // Do some processing if (allOK) { channel.basicAck(deliveryTag, false); // perhaps do some more processing } else { channel.basicNack(deliveryTag, false, true); } return someResultForDownStreamProcessing; }
The following example shows the available properties for an AMQP outbound channel adapter:
<int-amqp:outbound-channel-adapter id="outboundAmqp" channel="outboundChannel" amqp-template="myAmqpTemplate" exchange-name="" exchange-name-expression="" order="1" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" error-message-strategy="" header-mapper="" mapped-request-headers="" (16) lazy-connect="true" /> (17)
The unique ID for this adapter. Optional. | |
Message channel to which messages should be sent to have them converted and published to an AMQP exchange. Required. | |
Bean reference to the configured AMQP template.
Optional (defaults to | |
The name of the AMQP exchange to which messages are sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with exchange-name-expression. Optional. | |
A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with exchange-name. Optional. | |
The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.
Optional (defaults to | |
The fixed routing-key to use when sending messages.
By default, this is an empty | |
A SpEL expression that is evaluated to determine the routing key to use when sending messages, with the message as the root object (for example, payload.key).
By default, this is an empty | |
The default delivery mode for messages: | |
An expression that defines correlation data.
When provided, this configures the underlying AMQP template to receive publisher confirmations.
Requires a dedicated | |
The channel to which positive ( | |
The channel to which negative ( | |
The channel to which returned messages are sent.
When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.
When there is no | |
A reference to an | |
A reference to an | |
Comma-separated list of names of AMQP Headers to be mapped from the | |
When set to |
return-channel | |
---|---|
Using a |
The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToRabbit("foo"); } @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { void sendToRabbit(String data); } }
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToRabbit("foo"); } @Bean public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return IntegrationFlows.from(amqpOutboundChannel()) .handle(Amqp.outboundAdapter(amqpTemplate) .routingKey("foo")) // default exchange - route to queue 'foo' .get(); } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { void sendToRabbit(String data); } }
The following listing shows the possible properties for an AMQP Outbound Gateway:
<int-amqp:outbound-gateway id="inboundGateway" request-channel="myRequestChannel" amqp-template="" exchange-name="" exchange-name-expression="" order="1" reply-channel="" reply-timeout="" requires-reply="" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" (16) error-message-strategy="" (17) lazy-connect="true" /> (18)
The unique ID for this adapter. Optional. | |
Message channel to which messages are sent to have them converted and published to an AMQP exchange. Required. | |
Bean reference to the configured AMQP template.
Optional (defaults to | |
The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name cxchange. Mutually exclusive with exchange-name-expression. Optional. | |
A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages should be sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with exchange-name. Optional. | |
The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.
Optional (defaults to | |
Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional. | |
The time the gateway waits when sending the reply message to the | |
When | |
The | |
A SpEL expression that is evaluated to determine the | |
The default delivery mode for messages: | |
Since version 4.2.
An expression defining correlation data.
When provided, this configures the underlying AMQP template to receive publisher confirms.
Requires a dedicated | |
The channel to which positive ( | |
The channel to which negative ( | |
The channel to which returned messages are sent.
When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.
When there is no | |
A reference to an | |
When set to |
return-channel | |
---|---|
Using a |
Important | |
---|---|
The underlying |
The following Spring Boot application shows an example of how to configure the outbound gateway with Java configuration:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); String reply = gateway.sendToRabbit("foo"); System.out.println(reply); } @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setExpectReply(true); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { String sendToRabbit(String data); } }
Note that the only difference between the outbound adapter and outbound gateway configuration is the setting of the
expectReply
property.
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); RabbitTemplate template = context.getBean(RabbitTemplate.class); MyGateway gateway = context.getBean(MyGateway.class); String reply = gateway.sendToRabbit("foo"); System.out.println(reply); } @Bean public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return IntegrationFlows.from(amqpOutboundChannel()) .handle(Amqp.outboundGateway(amqpTemplate) .routingKey("foo")) // default exchange - route to queue 'foo' .get(); } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { String sendToRabbit(String data); } }
The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a
reply is received (or a timeout occurs).
Spring Integration version 4.3 added an asynchronous gateway, which uses the AsyncRabbitTemplate
from Spring AMQP.
When a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread.
This can be useful when the gateway is invoked on a poller thread.
The thread is released and is available for other tasks in the framework.
The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:
<int-amqp:outbound-gateway id="inboundGateway" request-channel="myRequestChannel" async-template="" exchange-name="" exchange-name-expression="" order="1" reply-channel="" reply-timeout="" requires-reply="" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" (16) lazy-connect="true" /> (17)
The unique ID for this adapter. Optional. | |
Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange. Required. | |
Bean reference to the configured | |
The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with exchange-name-expression. Optional. | |
A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with exchange-name. Optional. | |
The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.
Optional (it defaults to | |
Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional. | |
The time the gateway waits when sending the reply message to the | |
When no reply message is received within the | |
The routing-key to use when sending Messages.
By default, this is an empty | |
A SpEL expression that is evaluated to determine the routing-key to use when sending messages,
with the message as the root object (for example, payload.key).
By default, this is an empty | |
The default delivery mode for messages: | |
An expression that defines correlation data.
When provided, this configures the underlying AMQP template to receive publisher confirmations.
Requires a dedicated | |
The channel to which positive ( | |
Since version 4.2. The channel to which negative ( | |
The channel to which returned messages are sent.
When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway.
The message is constructed from the data received from AMQP, with the following additional headers:
| |
When set to |
See also Section 10.5.2, “Asynchronous Service Activator” for more information.
RabbitTemplate | |
---|---|
When you use confirmations and returns, we recommend that the |
The following configuration shows an example of how to configure the outbound gateway with Java configuration:
@Configuration public class AmqpAsyncConfig { @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AsyncAmqpOutboundGateway amqpOutbound(AmqpTemplate asyncTemplate) { AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyContainer) { return new AsyncRabbitTemplate(rabbitTemplate, replyContainer); } @Bean public SimpleMessageListenerContainer replyContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf); container.setQueueNames("asyncRQ1"); return container; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } }
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication public class AmqpAsyncApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpAsyncApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); String reply = gateway.sendToRabbit("foo"); System.out.println(reply); } @Bean public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) { return f -> f .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate) .routingKey("foo")); // default exchange - route to queue 'foo' } @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input") public interface MyGateway { String sendToRabbit(String data); } }
Spring AMQP 1.4 introduced the ContentTypeDelegatingMessageConverter
, where the actual converter is selected based
on the incoming content type message property.
This can be used by inbound endpoints.
As of Spring Integration version 4.3, you can use the ContentTypeDelegatingMessageConverter
on outbound endpoints as well, with the contentType
header specifying which converter is used.
The following example configures a ContentTypeDelegatingMessageConverter
, with the default converter being the SimpleMessageConverter
(which handles Java serialization and plain text), together with a JSON converter:
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel" exchange-name="someExchange" routing-key="someKey" amqp-template="amqpTemplateContentTypeConverter" /> <int:channel id="ctRequestChannel"/> <rabbit:template id="amqpTemplateContentTypeConverter" connection-factory="connectionFactory" message-converter="ctConverter" /> <bean id="ctConverter" class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter"> <property name="delegates"> <map> <entry key="application/json"> <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" /> </entry> </map> </property> </bean>
Sending a message to ctRequestChannel
with the contentType
header set to application/json
causes the JSON converter to be selected.
This applies to both the outbound channel adapter and gateway.
Note | |
---|---|
Starting with version 5.0, headers that are added to the There are, however, cases where the previous behavior is desired — for example, when a There is now a property called |
Spring AMQP version 1.6 introduced a mechanism to allow the specification of a default user ID for outbound messages.
It has always been possible to set the AmqpHeaders.USER_ID
header, which now takes precedence over the default.
This might be useful to message recipients.
For inbound messages, if the message publisher sets the property, it is made available in the AmqpHeaders.RECEIVED_USER_ID
header.
Note that RabbitMQ validates that the user ID is the actual user ID for the connection or that the connection allows impersonation.
To configure a default user ID for outbound messages, configure it on a RabbitTemplate
and configure the outbound adapter or gateway to use that template.
Similarly, to set the user ID property on replies, inject an appropriately configured template into the inbound gateway.
See the Spring AMQP documentation for more information.
Spring AMQP supports the RabbitMQ Delayed Message Exchange Plugin.
For inbound messages, the x-delay
header is mapped to the AmqpHeaders.RECEIVED_DELAY
header.
Setting the AMQPHeaders.DELAY
header causes the corresponding x-delay
header to be set in outbound messages.
You can also specify the delay
and delayExpression
properties on outbound endpoints (delay-expression
when using XML configuration).
These properties take precedence over the AmqpHeaders.DELAY
header.
There are two message channel implementations available.
One is point-to-point, and the other is publish-subscribe.
Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate
and
SimpleMessageListenerContainer
(as shown earlier in this chapter for the channel adapters and gateways).
However, the examples we show here have minimal configuration.
Explore the XML schema to view the available attributes.
A point-to-point channel might look like the following example:
<int-amqp:channel id="p2pChannel"/>
Under the covers, the preceding example causes a Queue
named si.p2pChannel
to be declared, and this channel sends to that Queue
(technically, by sending to the no-name direct exchange with a routing key that matches the name of this Queue
).
This channel also registers a consumer on that Queue
.
If you want the channel to be "pollable
" instead of message-driven, provide the message-driven
flag with a value of false
, as the following example shows:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
A publish-subscribe channel might look like the following:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
Under the covers, the preceding example causes a fanout exchange named si.fanout.pubSubChannel
to be declared, and this channel sends to that fanout exchange.
This channel also declares a server-named exclusive, auto-delete, non-durable Queue
and binds that to the fanout exchange while registering a consumer on that Queue
to receive messages.
There is no "pollable
" option for a publish-subscribe-channel.
It must be message-driven.
Starting with version 4.1, AMQP-backed message channels (in conjunction with channel-transacted
) support
template-channel-transacted
to separate transactional
configuration for the AbstractMessageListenerContainer
and
for the RabbitTemplate
.
Note that, previously, channel-transacted
was true
by default.
Now, by default, it is false
for the AbstractMessageListenerContainer
.
Prior to version 4.3, AMQP-backed channels only supported messages with Serializable
payloads and headers.
The entire message was converted (serialized) and sent to RabbitMQ.
Now, you can set the extract-payload
attribute (or setExtractPayload()
when using Java configuration) to true
.
When this flag is true
, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters.
This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the Jackson2JsonMessageConverter
).
See Section 14.12, “AMQP Message Headers” for more about the default mapped headers.
You can modify the mapping by providing custom mappers that use the outbound-header-mapper
and inbound-header-mapper
attributes.
You can now also specify a default-delivery-mode
, which is used to set the delivery mode when there is no amqp_deliveryMode
header.
By default, Spring AMQP MessageProperties
uses PERSISTENT
delivery mode.
Important | |
---|---|
As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications. For that purpose, use channel adapters instead. |
Important | |
---|---|
Starting with version 5.0, the pollable channel now blocks the poller thread for the specified |
The following example shows how to configure the channels with Java configuration:
@Bean public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("foo"); factoryBean.setPubSub(false); return factoryBean; } @Bean public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("bar"); factoryBean.setPubSub(false); return factoryBean; } @Bean public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("baz"); factoryBean.setPubSub(false); return factoryBean; }
The following example shows how to configure the channels with the Java DSL:
@Bean public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.pollableChannel(connectionFactory) .queueName("foo")) ... .get(); } @Bean public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.channel(connectionFactory) .queueName("bar")) ... .get(); } @Bean public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.publisSubscribeChannel(connectionFactory) .queueName("baz")) ... .get(); }
The Spring Integration AMQP Adapters automatically map all AMQP properties and headers.
(This is a change from 4.3 - previously, only standard headers were mapped).
By default, these properties are copied to and from Spring Integration MessageHeaders
by using the
DefaultAmqpHeaderMapper
.
You can pass in your own implementation of AMQP-specific header mappers, as the adapters have properties to support doing so.
Any user-defined headers within the AMQP MessageProperties
are copied to or from an AMQP message, unless explicitly negated by the requestHeaderNames
or replyHeaderNames
properties of the DefaultAmqpHeaderMapper
.
By default, for an outbound mapper, no x-*
headers are mapped.
See the caution that appears later in this section for why.
To override the default and revert to the pre-4.3 behavior, use STANDARD_REQUEST_HEADERS
and
STANDARD_REPLY_HEADERS
in the properties.
Tip | |
---|---|
When mapping user-defined headers, the values can also contain simple wildcard patterns (such as thing* or |
Starting with version 4.1, the AbstractHeaderMapper
(a DefaultAmqpHeaderMapper
superclass) lets the NON_STANDARD_HEADERS
token be configured for the requestHeaderNames
and replyHeaderNames
properties (in addition to the existing STANDARD_REQUEST_HEADERS
and STANDARD_REPLY_HEADERS
) to map all user-defined headers.
The org.springframework.amqp.support.AmqpHeaders
class identifies the default headers that are used by the
DefaultAmqpHeaderMapper
:
amqp_appId
amqp_clusterId
amqp_contentEncoding
amqp_contentLength
content-type
(see Section 14.12.2, “contentType Header”)
amqp_correlationId
amqp_delay
amqp_deliveryMode
amqp_deliveryTag
amqp_expiration
amqp_messageCount
amqp_messageId
amqp_receivedDelay
amqp_receivedDeliveryMode
amqp_receivedExchange
amqp_receivedRoutingKey
amqp_redelivered
amqp_replyTo
amqp_timestamp
amqp_type
amqp_userId
amqp_publishConfirm
amqp_publishConfirmNackCause
amqp_returnReplyCode
amqp_returnReplyText
amqp_returnExchange
amqp_returnRoutingKey
amqp_channel
amqp_consumerTag
amqp_consumerQueue
Caution | |
---|---|
As mentioned earlier in this section, using a header mapping pattern of`*` is a common way to copy all headers.
However, this can have some unexpected side effects, because certain RabbitMQ proprietary properties/headers are also
copied.
For example, when you use federation, the received message may have
a property named |
Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !
.
Negated patterns get priority, so a list such as
STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1
does not map thing1
(nor thing2
nor thing3
).
The standard headers plus bad
and qux
are mapped.
Important | |
---|---|
If you have a user-defined header that begins with |
Note | |
---|---|
Starting with version 5.1, the |
Unlike other headers, the AmqpHeaders.CONTENT_TYPE
is not prefixed with amqp_
; this allows transparent passing of the contentType header across different technologies.
E.g. an inbound HTTP message sent to a RabbitMQ queue.
The contentType
header is mapped to Spring AMQP’s MessageProperties.contentType
property and that is subsequently mapped to RabbitMQ’s content_type
property.
Prior to version 5.1, this header was also mapped as an entry in the MessageProperties.headers
map; this was incorrect and, furthermore, the value could be wrong since the underlying Spring AMQP message converter might have changed the content type.
Such a change would be reflected in the first-class content_type
property, but not in the RabbitMQ headers map.
Inbound mapping ignored the headers map value.
contentType
is no longer mapped to an entry in the headers map.
This section describes message ordering for inbound and outbound messages.
If you require strict ordering of inbound messages, you must configure the inbound listener container’s prefetchCount
property to 1
.
This is because, if a message fails and is redelivered, it arrives after existing prefetched messages.
Since Spring AMQP version 2.0, the prefetchCount
defaults to 250
for improved performance.
Strict ordering requirements come at the cost of decreased performance.
Consider the following integration flow:
@Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlows.from(Gateway.class) .split(s -> s.delimiters(",")) .<String, String>transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get(); }
Suppose we send messages A
, B
and C
to the gateway.
While it is likely that messages A
, B
, C
are sent in order, there is no guarantee.
This is because the template "borrows
" a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message.
One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred fold.
To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the BoundRabbitChannelAdvice
which is a HandleMessageAdvice
.
See Section 10.9.4, “Handling Message Advice”.
When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations).
The following example shows how to use BoundRabbitChannelAdvice
:
@Bean public IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlows.from(Gateway.class) .split(s -> s.delimiters(",") .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10)))) .<String, String>transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get(); }
Notice that the same RabbitTemplate
(which implements RabbitOperations
) is used in the advice and the outbound adapter.
The advice runs the downstream flow within the template’s invoke
method so that all operations run on the same channel.
If the optional timeout is provided, when the flow completes, the advice calls the waitForConfirmsOrDie
method, which throws an exception if the confirmations are not received within the specified time.
Important | |
---|---|
There must be no thread handoffs in the downstream flow ( |
To experiment with the AMQP adapters, check out the samples available in the Spring Integration samples git repository at https://github.com/SpringSource/spring-integration-samples
Currently, one sample demonstrates the basic functionality of the Spring Integration AMQP adapter by using an outbound channel adapter and an inbound channel adapter. As AMQP broker implementation in the sample uses RabbitMQ.
Note | |
---|---|
In order to run the example, you need a running instance of RabbitMQ. A local installation with just the basic defaults suffices. For detailed RabbitMQ installation procedures, see http://www.rabbitmq.com/install.html |
Once the sample application is started, enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue. In return, that message is retrieved by Spring Integration and printed to the console.
The following image illustrates the basic set of Spring Integration components used in this sample.