Spring Integration provides Channel Adapters for receiving and sending messages using the Advanced Message Queuing Protocol (AMQP). The following adapters are available:
Spring Integration also provides a point-to-point Message Channel as well as a publish/subscribe Message Channel backed by AMQP Exchanges and Queues.
In order to provide AMQP support, Spring Integration relies on (Spring AMQP) which "applies core Spring concepts to the development of AMQP-based messaging solutions". Spring AMQP provides similar semantics to (Spring JMS).
Whereas the provided AMQP Channel Adapters are intended for unidirectional Messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP Gateways for request/reply operations.
Tip | |
---|---|
Please familiarize yourself with the reference documentation of the Spring AMQP project as well. It provides much more in-depth information regarding Spring’s integration with AMQP in general and RabbitMQ in particular. |
A configuration sample for an AMQP Inbound Channel Adapter is shown below.
<int-amqp:inbound-channel-adapter id="inboundAmqp" channel="inboundChannel" queue-names="si.test.queue" acknowledge-mode="AUTO" advice-chain="" channel-transacted="" concurrent-consumers="" connection-factory="" error-channel="" expose-listener-channel="" header-mapper="" mapped-request-headers="" listener-container="" message-converter="" message-properties-converter="" phase="" (16) prefetch-count="" (17) receive-timeout="" (18) recovery-interval="" (19) missing-queues-fatal="" (20) shutdown-timeout="" (21) task-executor="" (22) transaction-attribute="" (23) transaction-manager="" (24) tx-size="" (25) consumers-per-queue /> (26)
Unique ID for this adapter. Optional. | |
Message Channel to which converted Messages should be sent. Required. | |
Names of the AMQP Queues from which Messages should be consumed (comma-separated list).Required. | |
Acknowledge Mode for the | |
Extra AOP Advice(s) to handle cross cutting behavior associated with this Inbound Channel Adapter. Optional. | |
Flag to indicate that channels created by this component will be transactional. If true, tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback depending on the outcome, with an exception signalling a rollback. Optional (Defaults to false). | |
Specify the number of concurrent consumers to create. Default is 1. Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, use 1 consumer for low-volume queues. Not allowed when consumers-per-queue is set. Optional. | |
Bean reference to the RabbitMQ ConnectionFactory. Optional (Defaults to connectionFactory). | |
Message Channel to which error Messages should be sent. Optional. | |
Shall the listener channel (com.rabbitmq.client.Channel) be exposed to a registered | |
A reference to an | |
Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders. This can only be provided if the header-mapper reference is not provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "foo*, bar" or "*foo"). | |
Reference to the | |
The MessageConverter to use when receiving AMQP Messages. Optional. | |
The MessagePropertiesConverter to use when receiving AMQP Messages. Optional. | |
Specify the phase in which the underlying | |
Tells the AMQP broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. It should be greater than or equal to the transaction size (see attribute "tx-size"). Optional (Defaults to 1). | |
Receive timeout in milliseconds. Optional (Defaults to 1000). | |
Specifies the interval between recovery attempts of the underlying | |
If true, and none of the queues are available on the broker, the container will throw a fatal exception during startup and will stop if the queues are deleted when the container is running (after making 3 attempts to passively declare the queues).
If false, the container will not throw an exception and go into recovery mode, attempting to restart according to the | |
The time to wait for workers in milliseconds after the underlying | |
By default, the underlying | |
By default the underlying | |
Sets a Bean reference to an external | |
Tells the | |
Indicates that the underlying listener container should be a |
container | |
---|---|
Note that when configuring an external container, you cannot use the Spring AMQP namespace to define the container.
This is because the namespace requires at least one <bean id="container" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="queueNames" value="foo.queue" /> <property name="defaultRequeueRejected" value="false"/> </bean> |
Important | |
---|---|
Even though the Spring Integration JMS and AMQP support is very similar, important differences exist.
The JMS Inbound Channel Adapter is using a |
The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel amqpInputChannel() { return new DirectChannel(); } @Bean public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer, @Qualifier("amqpInputChannel") MessageChannel channel) { AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(channel); return adapter; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("foo"); container.setConcurrentConsumers(2); // ... return container; } @Bean @ServiceActivator(inputChannel = "amqpInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; } }
The following Spring Boot application provides an example of configuring the inbound adapter using the Java DSL:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "foo")) .handle(m -> System.out.println(m.getPayload())) .get(); } }
Starting with version 5.0.1, a polled channel adapter is provided, allowing fetching individual messages on-demand, for example with a MessageSourcePollingTemplate
or a poller.
See Section 4.2.3, “Deferred Acknowledgment Pollable Message Source” for more information.
It does not currently have XML configuration.
@Bean public AmqpMessageSource source(ConnectionFactory connectionFactory) { return new AmpqpMessageSource(connectionFactory, "someQueue"); }
Refer to the javadocs for configuration properties.
With the Java DSL:
@Bean public IntegrationFlow flow() { return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE), e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false)) .handle(p -> { ... }) .get(); }
The inbound gateway supports all the attributes on the inbound channel adapter (except channel is replaced by request-channel), plus some additional attributes:
<int-amqp:inbound-gateway id="inboundGateway" request-channel="myRequestChannel" header-mapper="" mapped-request-headers="" mapped-reply-headers="" reply-channel="myReplyChannel" reply-timeout="1000" amqp-template="" default-reply-to="" />
Unique ID for this adapter. Optional. | |
Message Channel to which converted Messages should be sent. Required. | |
A reference to an | |
Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the | |
Comma-separated list of names of | |
Message Channel where reply Messages will be expected. Optional. | |
Used to set the | |
The customized | |
The |
See the note in Section 12.2, “Inbound Channel Adapter” about configuring the listener-container
attribute.
The following Spring Boot application provides an example of configuring the inbound gateway using Java configuration:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel amqpInputChannel() { return new DirectChannel(); } @Bean public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer, @Qualifier("amqpInputChannel") MessageChannel channel) { AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer); gateway.setRequestChannel(channel); gateway.setDefaultReplyTo("bar"); return gateway; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("foo"); container.setConcurrentConsumers(2); // ... return container; } @Bean @ServiceActivator(inputChannel = "amqpInputChannel") public MessageHandler handler() { return new AbstractReplyProducingMessageHandler() { @Override protected Object handleRequestMessage(Message<?> requestMessage) { return "reply to " + requestMessage.getPayload(); } }; } }
The following Spring Boot application provides an example of configuring the inbound gateway using the Java DSL:
@SpringBootApplication public class AmqpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); } @Bean // return the upper cased payload public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) { return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo")) .transform(String.class, String::toUpperCase) .get(); } }
By default the inbound endpoints use acknowledge mode AUTO
, which means the container automatically acks the message when the downstream integration flow completes (or a message is handed off to another thread using a QueueChannel
or ExecutorChannel
).
Setting the mode to NONE
configures the consumer such that acks are not used at all (the broker automatically acks the message as soon as it is sent).
Setting the mode to MANUAL
allows user code to ack the message at some other point during processing.
To support this, with this mode, the endpoints provide the Channel
and deliveryTag
in the amqp_channel
and amqp_deliveryTag
headers respectively.
You can perform any valid rabbit command on the Channel
but, generally, only basicAck
and basicNack
(or basicReject
) would be used.
In order to not interfere with the operation of the container, you should not retain a reference to the channel and just use it in the context of the current message.
Note | |
---|---|
Since the |
This is an example of how you might use MANUAL
acknowledgement:
@ServiceActivator(inputChannel = "foo", outputChannel = "bar") public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception { // Do some processing if (allOK) { channel.basicAck(deliveryTag, false); // perhaps do some more processing } else { channel.basicNack(deliveryTag, false, true); } return someResultForDownStreamProcessing; }
A configuration sample for an AMQP Outbound Channel Adapter is shown below.
<int-amqp:outbound-channel-adapter id="outboundAmqp" channel="outboundChannel" amqp-template="myAmqpTemplate" exchange-name="" exchange-name-expression="" order="1" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" error-message-strategy="" header-mapper="" mapped-request-headers="" (16) lazy-connect="true" /> (17)
Unique ID for this adapter. Optional. | |
Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required. | |
Bean Reference to the configured AMQP Template Optional (Defaults to "amqpTemplate"). | |
The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name-expression. Optional. | |
A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name. Optional. | |
The order for this consumer when multiple consumers are registered thereby enabling load-balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]). | |
The fixed routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with routing-key-expression.Optional. | |
A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key). By default, this will be an empty String. Mutually exclusive with routing-key. Optional. | |
The default delivery mode for messages; | |
An expression defining correlation data.
When provided, this configures the underlying amqp template to receive publisher confirms.
Requires a dedicated | |
The channel to which positive (ack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression.
If the expression is | |
The channel to which negative (nack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression (if there is no | |
The channel to which returned messages are sent.
When provided, the underlying amqp template is configured to return undeliverable messages to the adapter.
When there is no | |
A reference to an | |
A reference to an | |
Comma-separated list of names of AMQP Headers to be mapped from the | |
When set to |
return-channel | |
---|---|
Using a |
The following Spring Boot application provides an example of configuring the outbound adapter using Java configuration:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToRabbit("foo"); } @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { void sendToRabbit(String data); } }
The following Spring Boot application provides an example of configuring the outbound adapter using the Java DSL:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToRabbit("foo"); } @Bean public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return IntegrationFlows.from(amqpOutboundChannel()) .handle(Amqp.outboundAdapter(amqpTemplate) .routingKey("foo")) // default exchange - route to queue 'foo' .get(); } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { void sendToRabbit(String data); } }
Configuration for an AMQP Outbound Gateway is shown below.
<int-amqp:outbound-gateway id="inboundGateway" request-channel="myRequestChannel" amqp-template="" exchange-name="" exchange-name-expression="" order="1" reply-channel="" reply-timeout="" requires-reply="" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" (16) error-message-strategy="" (17) lazy-connect="true" /> (18)
Unique ID for this adapter. Optional. | |
Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required. | |
Bean Reference to the configured AMQP Template Optional (Defaults to "amqpTemplate"). | |
The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name-expression. Optional. | |
A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name. Optional. | |
The order for this consumer when multiple consumers are registered thereby enabling load-balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]). | |
Message Channel to which replies should be sent after being received from an AMQP Queue and converted.Optional. | |
The time the gateway will wait when sending the reply message to the | |
When | |
The routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with routing-key-expression. Optional. | |
A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key). By default, this will be an empty String. Mutually exclusive with routing-key. Optional. | |
The default delivery mode for messages; | |
Since version 4.2. An expression defining correlation data.
When provided, this configures the underlying amqp template to receive publisher confirms.
Requires a dedicated | |
The channel to which positive (ack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression.
If the expression is | |
The channel to which negative (nack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression (if there is no | |
The channel to which returned messages are sent.
When provided, the underlying amqp template is configured to return undeliverable messages to the adapter.
When there is no | |
A reference to an | |
When set to |
return-channel | |
---|---|
Using a |
Important | |
---|---|
The underlying |
The following Spring Boot application provides an example of configuring the outbound gateway using Java configuration:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); String reply = gateway.sendToRabbit("foo"); System.out.println(reply); } @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); outbound.setExpectReply(true); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { String sendToRabbit(String data); } }
Notice that the only difference between the outbound adapter and outbound gateway configuration is the setting of the
expectReply
property.
The following Spring Boot application provides an example of configuring the outbound adapter using the Java DSL:
@SpringBootApplication @IntegrationComponentScan public class AmqpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpJavaApplication.class) .web(false) .run(args); RabbitTemplate template = context.getBean(RabbitTemplate.class); MyGateway gateway = context.getBean(MyGateway.class); String reply = gateway.sendToRabbit("foo"); System.out.println(reply); } @Bean public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return IntegrationFlows.from(amqpOutboundChannel()) .handle(Amqp.outboundGateway(amqpTemplate) .routingKey("foo")) // default exchange - route to queue 'foo' .get(); } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") public interface MyGateway { String sendToRabbit(String data); } }
The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a
reply is received (or a timeout occurs).
Spring Integration version 4.3 added this asynchronous gateway, which uses the AsyncRabbitTemplate
from Spring AMQP.
When a message is sent, the thread returns immediately after the send completes, and the reply is sent on the template’s
listener container thread when it is received.
This can be useful when the gateway is invoked on a poller thread; the thread is released and is available for other
tasks in the framework.
Configuration for an AMQP Async Outbound Gateway is shown below.
<int-amqp:outbound-gateway id="inboundGateway" request-channel="myRequestChannel" async-template="" exchange-name="" exchange-name-expression="" order="1" reply-channel="" reply-timeout="" requires-reply="" routing-key="" routing-key-expression="" default-delivery-mode"" confirm-correlation-expression="" confirm-ack-channel="" confirm-nack-channel="" return-channel="" (16) lazy-connect="true" /> (17)
Unique ID for this adapter. Optional. | |
Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required. | |
Bean Reference to the configured | |
The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name-expression. Optional. | |
A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name. Optional. | |
The order for this consumer when multiple consumers are registered thereby enabling load-balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]). | |
Message Channel to which replies should be sent after being received from an AMQP Queue and converted. Optional. | |
The time the gateway will wait when sending the reply message to the | |
When | |
The routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with routing-key-expression. Optional. | |
A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key). By default, this will be an empty String. Mutually exclusive with routing-key. Optional. | |
The default delivery mode for messages; | |
An expression defining correlation data.
When provided, this configures the underlying amqp template to receive publisher confirms.
Requires a dedicated | |
The channel to which positive (ack) publisher confirms are sent; payload is the correlation
data defined by the confirm-correlation-expression.
Requires the underlying | |
Since version 4.2. The channel to which negative (nack) publisher confirms are sent; payload is the correlation
data defined by the confirm-correlation-expression.
Requires the underlying | |
The channel to which returned messages are sent.
When provided, the underlying amqp template is configured to return undeliverable messages to the gateway.
The message will be constructed from the data received from amqp, with the following additional headers:
amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey.
Requires the underlying | |
When set to |
Also see Section 8.5.3, “Asynchronous Service Activator” for more information.
RabbitTemplate | |
---|---|
When using confirms and returns, it is recommended that the |
The following configuration provides an example of configuring the outbound gateway using Java configuration:
@Configuration public class AmqpAsyncConfig { @Bean @ServiceActivator(inputChannel = "amqpOutboundChannel") public AsyncAmqpOutboundGateway amqpOutbound(AmqpTemplate asyncTemplate) { AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate); outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' return outbound; } @Bean public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyContainer) { return new AsyncRabbitTemplate(rabbitTemplate, replyContainer); } @Bean public SimpleMessageListenerContainer replyContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf); container.setQueueNames("asyncRQ1"); return container; } @Bean public MessageChannel amqpOutboundChannel() { return new DirectChannel(); } }
The following Spring Boot application provides an example of configuring the outbound adapter using the Java DSL:
@SpringBootApplication public class AmqpAsyncApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(AmqpAsyncApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); String reply = gateway.sendToRabbit("foo"); System.out.println(reply); } @Bean public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) { return f -> f .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate) .routingKey("foo")); // default exchange - route to queue 'foo' } @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input") public interface MyGateway { String sendToRabbit(String data); } }
Spring AMQP 1.4 introduced the ContentTypeDelegatingMessageConverter
where the actual converter is selected based
on the incoming content type message property.
This could be used by inbound endpoints.
Spring Integration version 4.3 now allows the ContentTypeDelegatingMessageConverter
to be used on outbound
endpoints as well - with the contentType
header specifiying which converter will be used.
The following configures a ContentTypeDelegatingMessageConverter
with the default converter being the
SimpleMessageConverter
(which handles java serialization and plain text), together with a JSON converter:
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel" exchange-name="someExchange" routing-key="someKey" amqp-template="amqpTemplateContentTypeConverter" /> <int:channel id="ctRequestChannel"/> <rabbit:template id="amqpTemplateContentTypeConverter" connection-factory="connectionFactory" message-converter="ctConverter" /> <bean id="ctConverter" class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter"> <property name="delegates"> <map> <entry key="application/json"> <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" /> </entry> </map> </property> </bean>
Sending a message to ctRequestChannel
with the contentType
header set to application/json
will cause the
JSON converter to be selected.
This applies to both the outbound channel adapter and gateway.
Note | |
---|---|
Starting with version 5.0, headers that are added to the There are, however, cases where the previous behavior is desired.
For example, with a There is now a property on the outbound channel adapter and gateway (as well as AMQP-backed channels) |
Spring AMQP version 1.6 introduced a mechanism to allow the specification of a default user id for outbound messages.
It has always been possible to set the AmqpHeaders.USER_ID
header which will now take precedence over the default.
This might be useful to message recipients; for inbound messages, if the message publisher sets the property, it is made available in the AmqpHeaders.RECEIVED_USER_ID
header.
Note that RabbitMQ validates that the user id is the actual user id for the connection or one for which impersonation is allowed.
To configure a default user id for outbound messages, configure it on a RabbitTemplate
and configure the outbound adapter or gateway to use that template.
Similarly, to set the user id property on replies, inject an appropriately configured template into the inbound gateway.
See the Spring AMQP documentation for more information.
Spring AMQP supports the RabbitMQ Delayed Message Exchange Plugin.
For inbound messages, the x-delay
header is mapped to the AmqpHeaders.RECEIVED_DELAY
header.
Setting the AMQPHeaders.DELAY
header will cause the corresponding x-delay
header to be set in outbound messages.
You can also specify the delay
and delayExpression
properties on outbound endpoints (delay-expression
when using XML configuration).
This takes precedence over the AmqpHeaders.DELAY
header.
There are two Message Channel implementations available. One is point-to-point, and the other is publish/subscribe. Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate and SimpleMessageListenerContainer as you have seen on the Channel Adapters and Gateways. However, the examples we’ll show here are going to have minimal configuration. Explore the XML schema to view the available attributes.
A point-to-point channel would look like this:
<int-amqp:channel id="p2pChannel"/>
Under the covers a Queue named "si.p2pChannel" would be declared, and this channel will send to that Queue (technically by sending to the no-name Direct Exchange with a routing key that matches this Queue’s name).
This channel will also register a consumer on that Queue.
If you want the channel to be "pollable" instead of message-driven, then simply provide the "message-driven" flag with
a value of false
:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
A publish/subscribe channel would look like this:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
Under the covers a Fanout Exchange named "si.fanout.pubSubChannel" would be declared, and this channel will send to that Fanout Exchange. This channel will also declare a server-named exclusive, auto-delete, non-durable Queue and bind that to the Fanout Exchange while registering a consumer on that Queue to receive Messages. There is no "pollable" option for a publish-subscribe-channel; it must be message-driven.
Starting with version 4.1 AMQP Backed Message Channels, alongside with channel-transacted
, support
template-channel-transacted
to separate transactional
configuration for the AbstractMessageListenerContainer
and
for the RabbitTemplate
.
Note, previously, the channel-transacted
was true
by default, now it changed to false
as standard default value
for the AbstractMessageListenerContainer
.
Prior to version 4.3, AMQP-backed channels only supported messages with Serializable
payloads and headers.
The entire message was converted (serialized) and sent to RabbitMQ.
Now, you can set the extract-payload
attribute (or setExtractPayload()
when using Java configuration) to true.
When this flag is true
, the message payload is converted and the headers mapped, in a similar manner to when using
channel adapters.
This allows AMQP-backed channels to be used with non-serializable payloads (perhaps with another message converter
such as the Jackson2JsonMessageConverter
).
The default mapped headers are discussed in Section 12.13, “AMQP Message Headers”.
You can modify the mapping by providing custom mappers using the outbound-header-mapper
and inbound-header-mapper
attributes.
You can now also specify a default-delivery-mode
, used to set the delivery mode when there is no
amqp_deliveryMode
header.
By default, Spring AMQP MessageProperties
uses PERSISTENT
delivery mode.
Important | |
---|---|
Just as with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications; for that purpose, use channel adapters instead. |
Important | |
---|---|
Starting with version 5.0, the pollable channel now blocks the poller thread for the specified |
The following provides an example of configuring the channels using Java configuration:
@Bean public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("foo"); factoryBean.setPubSub(false); return factoryBean; } @Bean public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("bar"); factoryBean.setPubSub(false); return factoryBean; } @Bean public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("baz"); factoryBean.setPubSub(false); return factoryBean; }
The following provides an example of configuring the channels using the Java DSL:
@Bean public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.pollableChannel(connectionFactory) .queueName("foo")) ... .get(); } @Bean public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.channel(connectionFactory) .queueName("bar")) ... .get(); } @Bean public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) { return IntegrationFlows.from(...) ... .channel(Amqp.publisSubscribeChannel(connectionFactory) .queueName("baz")) ... .get(); }
The Spring Integration AMQP Adapters will map all AMQP properties and headers automatically.
(This is a change in 4.3 - previously, only standard headers were mapped).
These properties will be copied by default to and from Spring Integration MessageHeaders
using the
DefaultAmqpHeaderMapper.
Of course, you can pass in your own implementation of AMQP specific header mappers, as the adapters have respective properties to support that.
Any user-defined headers within the AMQP MessageProperties WILL
be copied to or from an AMQP Message, unless explicitly negated by the requestHeaderNames and/or
replyHeaderNames properties of the DefaultAmqpHeaderMapper
.
For an outbound mapper, no x-*
headers are mapped by default; see the caution below for the reason why.
To override the default, and revert to the pre-4.3 behavior, use STANDARD_REQUEST_HEADERS
and
STANDARD_REPLY_HEADERS
in the properties.
Tip | |
---|---|
When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo")
to be matched.
|
Starting with version 4.1, the AbstractHeaderMapper
(a DefaultAmqpHeaderMapper
superclass) allows the
NON_STANDARD_HEADERS
token to be configured for the requestHeaderNames and/or replyHeaderNames properties
(in addition to the existing STANDARD_REQUEST_HEADERS
and STANDARD_REPLY_HEADERS
) to map all user-defined headers.
Class org.springframework.amqp.support.AmqpHeaders
identifies the default headers that will be used by the
DefaultAmqpHeaderMapper
:
Caution | |
---|---|
As mentioned above, using a header mapping pattern |
Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !
.
Negated patterns get priority, so a list such as
STANDARD_REQUEST_HEADERS,foo,ba*,!bar,!baz,qux,!foo
will NOT map foo
(nor bar
nor baz
); the standard headers plus bad
, qux
will be mapped.
Important | |
---|---|
If you have a user defined header that begins with |
To experiment with the AMQP adapters, check out the samples available in the Spring Integration Samples Git repository at:
Currently there is one sample available that demonstrates the basic functionality of the Spring Integration AMQP Adapter using an Outbound Channel Adapter and an Inbound Channel Adapter. As AMQP Broker implementation the sample uses RabbitMQ (http://www.rabbitmq.com/).
Note | |
---|---|
In order to run the example you will need a running instance of RabbitMQ. A local installation with just the basic defaults will be sufficient. For detailed RabbitMQ installation procedures please visit: http://www.rabbitmq.com/install.html |
Once the sample application is started, you enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue. In return that message is retrieved via Spring Integration and then printed to the console.
The image belows illustrates the basic set of Spring Integration components used in this sample.