AMQP Support
Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>5.2.12.BUILD-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-amqp:5.2.12.BUILD-SNAPSHOT"
The following adapters are available:
Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.
To provide AMQP support, Spring Integration relies on (Spring AMQP), which applies core Spring concepts to the development of AMQP-based messaging solutions. Spring AMQP provides similar semantics to (Spring JMS).
Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.
TIP: You should familiarize yourself with the reference documentation of the Spring AMQP project. It provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.
Inbound Channel Adapter
The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:
<int-amqp:inbound-channel-adapter
id="inboundAmqp" (1)
channel="inboundChannel" (2)
queue-names="si.test.queue" (3)
acknowledge-mode="AUTO" (4)
advice-chain="" (5)
channel-transacted="" (6)
concurrent-consumers="" (7)
connection-factory="" (8)
error-channel="" (9)
expose-listener-channel="" (10)
header-mapper="" (11)
mapped-request-headers="" (12)
listener-container="" (13)
message-converter="" (14)
message-properties-converter="" (15)
phase="" (16)
prefetch-count="" (17)
receive-timeout="" (18)
recovery-interval="" (19)
missing-queues-fatal="" (20)
shutdown-timeout="" (21)
task-executor="" (22)
transaction-attribute="" (23)
transaction-manager="" (24)
tx-size="" (25)
consumers-per-queue /> (26)
1 | The unique ID for this adapter. Optional. |
2 | Message channel to which converted messages should be sent. Required. |
3 | Names of the AMQP queues (comma-separated list) from which messages should be consumed. Required. |
4 | Acknowledge mode for the MessageListenerContainer .
When set to MANUAL , the delivery tag and channel are provided in message headers amqp_deliveryTag and amqp_channel , respectively.
The user application is responsible for acknowledgement.
NONE means no acknowledgements (autoAck ).
AUTO means the adapter’s container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See Inbound Endpoint Acknowledge Mode. |
5 | Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter. Optional. |
6 | Flag to indicate that channels created by this component are transactional. If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback. Optional (Defaults to false). |
7 | Specify the number of concurrent consumers to create.
The default is 1 .
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional. |
8 | Bean reference to the RabbitMQ ConnectionFactory .
Optional (defaults to 'connectionFactory'). |
9 | Message channel to which error messages should be sent. Optional. |
10 | Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered ChannelAwareMessageListener .
Optional (defaults to true). |
11 | A reference to an AmqpHeaderMapper to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as contentType ) are copied to Spring Integration MessageHeaders .
Any user-defined headers within the AMQP MessageProperties are NOT copied to the message by the default DefaultAmqpHeaderMapper .
Not allowed if 'request-header-names' is provided. |
12 | Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders .
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "*" or "thing1*, thing2" or "*something"). |
13 | Reference to the AbstractMessageListenerContainer to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the MessageListener itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own MessageListener .
Optional. |
14 | The MessageConverter to use when receiving AMQP messages.
Optional. |
15 | The MessagePropertiesConverter to use when receiving AMQP messages.
Optional. |
16 | Specifies the phase in which the underlying AbstractMessageListenerContainer should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is Integer.MAX_VALUE , meaning that this container starts as late as possible and stops as soon as possible.
Optional. |
17 | Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the tx-size attribute, later in this list).
Optional (defaults to 1 ). |
18 | Receive timeout in milliseconds.
Optional (defaults to 1000 ). |
19 | Specifies the interval between recovery attempts of the underlying AbstractMessageListenerContainer (in milliseconds).
Optional (defaults to 5000 ). |
20 | If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If false , the container does not throw an exception and goes into recovery mode, attempting to restart according to the recovery-interval .
Optional (defaults to true ). |
21 | The time to wait for workers (in milliseconds) after the underlying AbstractMessageListenerContainer is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to 5000 ). |
22 | By default, the underlying AbstractMessageListenerContainer uses a SimpleAsyncTaskExecutor implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling TaskExecutor implementation as an alternative.
Optional (defaults to SimpleAsyncTaskExecutor ). |
23 | By default, the underlying AbstractMessageListenerContainer creates a new instance of the DefaultTransactionAttribute (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to DefaultTransactionAttribute ). |
24 | Sets a bean reference to an external PlatformTransactionManager on the underlying AbstractMessageListenerContainer .
The transaction manager works in conjunction with the channel-transacted attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the channelTransacted flag is true , the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the channelTransacted flag is false , no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
Transactions with Spring AMQP.
Optional. |
25 | Tells the SimpleMessageListenerContainer how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in prefetch-count .
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to 1 ). |
26 | Indicates that the underlying listener container should be a DirectMessageListenerContainer instead of the default SimpleMessageListenerContainer .
See the Spring AMQP Reference Manual for more information. |
container
Note that when configuring an external container, you cannot use the Spring AMQP namespace to define the container.
This is because the namespace requires at least one
|
Even though the Spring Integration JMS and AMQP support is similar, important differences exist.
The JMS inbound channel adapter is using a JmsDestinationPollingSource under the covers and expects a configured poller.
The AMQP inbound channel adapter uses an AbstractMessageListenerContainer and is message driven.
In that regard, it is more similar to the JMS message-driven channel adapter.
|
Configuring with Java Configuration
The following Spring Boot application shows an example of configuring the inbound adapter with Java configuration:
@SpringBootApplication
public class AmqpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL:
@SpringBootApplication
public class AmqpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
Batched Messages
See the Spring AMQP Documentation for more information about batched messages.
To produce batched messages with Spring Integration, simply configure the outbound endpoint with a BatchingRabbitTemplate
.
When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a Message<?>
for each fragment.
Starting with version 5.2, if the container’s deBatchingEnabled
property is set to false
, the de-batching is performed by the adapter instead, and a single Message<List<?>>
is produced with the payload being a list of the fragment payloads (after conversion if appropriate).
The default BatchingStrategy
is the SimpleBatchingStrategy
, but this can be overridden on the adapter.
Polled Inbound Channel Adapter
Overview
Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand — for example, with a MessageSourcePollingTemplate
or a poller.
See Deferred Acknowledgment Pollable Message Source for more information.
It does not currently support XML configuration.
The following example shows how to configure an AmqpMessageSource
with Java configuration:
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
return new AmpqpMessageSource(connectionFactory, "someQueue");
}
See the Javadoc for configuration properties.
The following example shows how to configure an inboundPolledAdapter
with the Java DSL:
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
.handle(p -> {
...
})
.get();
}
Batched Messages
See Batched Messages.
For the polled adapter, there is no listener container, batched messages are always debatched (if the BatchingStrategy
supports doing so).
Inbound Gateway
The inbound gateway supports all the attributes on the inbound channel adapter (except that 'channel' is replaced by 'request-channel'), plus some additional attributes. The following listing shows the available attributes:
<int-amqp:inbound-gateway
id="inboundGateway" (1)
request-channel="myRequestChannel" (2)
header-mapper="" (3)
mapped-request-headers="" (4)
mapped-reply-headers="" (5)
reply-channel="myReplyChannel" (6)
reply-timeout="1000" (7)
amqp-template="" (8)
default-reply-to="" /> (9)
1 | The Unique ID for this adapter. Optional. |
2 | Message channel to which converted messages are sent. Required. |
3 | A reference to an AmqpHeaderMapper to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as contentType ) are copied to and from Spring Integration MessageHeaders .
Any user-defined headers within the AMQP MessageProperties are not copied to or from an AMQP message by the default DefaultAmqpHeaderMapper .
Not allowed if 'request-header-names' or 'reply-header-names' is provided. |
4 | Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders .
This attribute can be provided only if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "thing1*, thing2" or "*thing1" ). |
5 | Comma-separated list of names of MessageHeaders to be mapped into the AMQP message properties of the AMQP reply message.
All standard Headers (such as contentType ) are mapped to AMQP Message Properties, while user-defined headers are mapped to the 'headers' property.
This attribute can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (for example, "*" or "foo*, bar" or "*foo" ). |
6 | Message Channel where reply Messages are expected. Optional. |
7 | Sets the receiveTimeout on the underlying o.s.i.core.MessagingTemplate for receiving messages from the reply channel.
If not specified, this property defaults to 1000 (1 second).
Only applies if the container thread hands off to another thread before the reply is sent. |
8 | The customized AmqpTemplate bean reference (to have more control over the reply messages to send).
You can provide an alternative implementation to the RabbitTemplate . |
9 | The replyTo o.s.amqp.core.Address to be used when the requestMessage does not have a replyTo
property.
If this option is not specified, no amqp-template is provided, no replyTo property exists in the request message, and
an IllegalStateException is thrown because the reply cannot be routed.
If this option is not specified and an external amqp-template is provided, no exception is thrown.
You must either specify this option or configure a default exchange and routingKey on that template,
if you anticipate cases when no replyTo property exists in the request message. |
See the note in Inbound Channel Adapter about configuring the listener-container
attribute.
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the inbound gateway with Java configuration:
@SpringBootApplication
public class AmqpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
gateway.setRequestChannel(channel);
gateway.setDefaultReplyTo("bar");
return gateway;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new AbstractReplyProducingMessageHandler() {
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
return "reply to " + requestMessage.getPayload();
}
};
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the inbound gateway with the Java DSL:
@SpringBootApplication
public class AmqpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
}
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo"))
.transform(String.class, String::toUpperCase)
.get();
}
}
Batched Messages
See Batched Messages.
Inbound Endpoint Acknowledge Mode
By default, the inbound endpoints use the AUTO
acknowledge mode, which means the container automatically acknowledges the message when the downstream integration flow completes (or a message is handed off to another thread by using a QueueChannel
or ExecutorChannel
).
Setting the mode to NONE
configures the consumer such that acknowledgments are not used at all (the broker automatically acknowledges the message as soon as it is sent).
Setting the mode to MANUAL
lets user code acknowledge the message at some other point during processing.
To support this, with this mode, the endpoints provide the Channel
and deliveryTag
in the amqp_channel
and amqp_deliveryTag
headers, respectively.
You can perform any valid Rabbit command on the Channel
but, generally, only basicAck
and basicNack
(or basicReject
) are used.
In order to not interfere with the operation of the container, you should not retain a reference to the channel and use it only in the context of the current message.
Since the Channel is a reference to a “live” object, it cannot be serialized and is lost if a message is persisted.
|
The following example shows how you might use MANUAL
acknowledgement:
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {
// Do some processing
if (allOK) {
channel.basicAck(deliveryTag, false);
// perhaps do some more processing
}
else {
channel.basicNack(deliveryTag, false, true);
}
return someResultForDownStreamProcessing;
}
Outbound Channel Adapter
The following outbound endpoints have many similar configuration options.
Starting with version 5.2, the confirm-timeout
has been added.
Normally, when publisher confirms are enabled, the broker will quickly return an ack (or nack) which will be sent to the appropriate channel.
If a channel is closed before the confirm is received, the Spring AMQP framework will synthesize a nack.
"Missing" acks should never occur but, if you set this property, the endpoint will periodically check for them and synthesize a nack if the time elapses without a confirm being received.
Outbound Channel Adapter
The following example shows the available properties for an AMQP outbound channel adapter:
<int-amqp:outbound-channel-adapter id="outboundAmqp" (1)
channel="outboundChannel" (2)
amqp-template="myAmqpTemplate" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
routing-key="" (7)
routing-key-expression="" (8)
default-delivery-mode"" (9)
confirm-correlation-expression="" (10)
confirm-ack-channel="" (11)
confirm-nack-channel="" (12)
confirm-timeout="" (13)
wait-for-confirm="" (14)
return-channel="" (15)
error-message-strategy="" (16)
header-mapper="" (17)
mapped-request-headers="" (18)
lazy-connect="true" /> (19)
1 | The unique ID for this adapter. Optional. |
2 | Message channel to which messages should be sent to have them converted and published to an AMQP exchange. Required. |
3 | Bean reference to the configured AMQP template.
Optional (defaults to amqpTemplate ). |
4 | The name of the AMQP exchange to which messages are sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name-expression'. Optional. |
5 | A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional. |
6 | The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.
Optional (defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] ). |
7 | The fixed routing-key to use when sending messages.
By default, this is an empty String .
Mutually exclusive with 'routing-key-expression'.
Optional. |
8 | A SpEL expression that is evaluated to determine the routing key to use when sending messages, with the message as the root object (for example, 'payload.key').
By default, this is an empty String .
Mutually exclusive with 'routing-key'.
Optional. |
9 | The default delivery mode for messages: PERSISTENT or NON_PERSISTENT .
Overridden if the header-mapper sets the delivery mode.
If the Spring Integration message header amqp_deliveryMode is present, the DefaultHeaderMapper sets the value.
If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate .
If that is not customized at all, the default is PERSISTENT .
Optional. |
10 | An expression that defines correlation data.
When provided, this configures the underlying AMQP template to receive publisher confirmations.
Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true .
When a publisher confirmation is received and correlation data is supplied, it is written to either the confirm-ack-channel or the confirm-nack-channel , depending on the confirmation type.
The payload of the confirmation is the correlation data, as defined by this expression.
The message has an 'amqp_publishConfirm' header set to true (ack ) or false (nack ).
Examples: headers['myCorrelationData'] and payload .
Version 4.1 introduced the amqp_publishConfirmNackCause message header.
It contains the cause of a 'nack' for a publisher confirmation.
Starting with version 4.2, if the expression resolves to a Message<?> instance (such as #this ), the message emitted on the ack /nack channel is based on that message, with the additional header(s) added.
Previously, a new message was created with the correlation data as its payload, regardless of type.
Optional. |
11 | The channel to which positive (ack ) publisher confirms are sent.
The payload is the correlation data defined by the confirm-correlation-expression .
If the expression is #root or #this , the message is built from the original message, with the amqp_publishConfirm header set to true .
Optional (the default is nullChannel ). |
12 | The channel to which negative (nack ) publisher confirmations are sent.
The payload is the correlation data defined by the confirm-correlation-expression (if there is no ErrorMessageStrategy configured).
If the expression is #root or #this , the message is built from the original message, with the amqp_publishConfirm header set to false .
When there is an ErrorMessageStrategy , the message is an ErrorMessage with a NackedAmqpMessageException payload.
Optional (the default is nullChannel ). |
13 | When set, the adapter will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Default none (nacks will not be generated). |
14 | When set to true, the calling thread will block, waiting for a publisher confirmation.
This requires a RabbitTemplate configured for confirms as well as a confirm-correlation-expression .
The thread will block for up to confirm-timeout (or 5 seconds by default).
If a timeout occurs, a MessageTimeoutException will be thrown.
If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirm, a MessageHandlingException will be thrown, with an appropriate message. |
15 | The channel to which returned messages are sent.
When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.
When there is no ErrorMessageStrategy configured, the message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode , amqp_returnReplyText , amqp_returnExchange , amqp_returnRoutingKey .
When there is an ErrorMessageStrategy , the message is an ErrorMessage with a ReturnedAmqpMessageException payload.
Optional. |
16 | A reference to an ErrorMessageStrategy implementation used to build ErrorMessage instances when sending returned or negatively acknowledged messages. |
17 | A reference to an AmqpHeaderMapper to use when sending AMQP Messages.
By default, only standard AMQP properties (such as contentType ) are copied to the Spring Integration MessageHeaders .
Any user-defined headers is not copied to the message by the default`DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
Optional. |
18 | Comma-separated list of names of AMQP Headers to be mapped from the MessageHeaders to the AMQP Message.
Not allowed if the 'header-mapper' reference is provided.
The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "thing1*, thing2" or "*thing1" ). |
19 | When set to false , the endpoint attempts to connect to the broker during application context initialization.
This allows “fail fast” detection of bad configuration but also causes initialization to fail if the broker is down.
When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent. |
return-channel
Using a |
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:
@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToRabbit("foo");
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
void sendToRabbit(String data);
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToRabbit("foo");
}
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(amqpOutboundChannel())
.handle(Amqp.outboundAdapter(amqpTemplate)
.routingKey("foo")) // default exchange - route to queue 'foo'
.get();
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
void sendToRabbit(String data);
}
}
Outbound Gateway
The following listing shows the possible properties for an AMQP Outbound Gateway:
<int-amqp:outbound-gateway id="outboundGateway" (1)
request-channel="myRequestChannel" (2)
amqp-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
error-message-strategy="" (18)
lazy-connect="true" /> (19)
1 | The unique ID for this adapter. Optional. |
2 | Message channel to which messages are sent to have them converted and published to an AMQP exchange. Required. |
3 | Bean reference to the configured AMQP template.
Optional (defaults to amqpTemplate ). |
4 | The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name cxchange. Mutually exclusive with 'exchange-name-expression'. Optional. |
5 | A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages should be sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional. |
6 | The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.
Optional (defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] ). |
7 | Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional. |
8 | The time the gateway waits when sending the reply message to the reply-channel .
This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full.
Defaults to infinity. |
9 | When true , the gateway throws an exception if no reply message is received within the AmqpTemplate’s `replyTimeout property.
Defaults to true . |
10 | The routing-key to use when sending messages.
By default, this is an empty String .
Mutually exclusive with 'routing-key-expression'.
Optional. |
11 | A SpEL expression that is evaluated to determine the routing-key to use when sending messages, with the message as the root object (for example, 'payload.key').
By default, this is an empty String .
Mutually exclusive with 'routing-key'.
Optional. |
12 | The default delivery mode for messages: PERSISTENT or NON_PERSISTENT .
Overridden if the header-mapper sets the delivery mode.
If the Spring Integration message header amqp_deliveryMode is present, the DefaultHeaderMapper sets the value.
If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate .
If that is not customized at all, the default is PERSISTENT .
Optional. |
13 | Since version 4.2.
An expression defining correlation data.
When provided, this configures the underlying AMQP template to receive publisher confirms.
Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true .
When a publisher confirm is received and correlation data is supplied, it is written to either the confirm-ack-channel or the confirm-nack-channel , depending on the confirmation type.
The payload of the confirm is the correlation data, as defined by this expression.
The message has a header 'amqp_publishConfirm' set to true (ack ) or false (nack ).
For nack confirmations, Spring Integration provides an additional header amqp_publishConfirmNackCause .
Examples: headers['myCorrelationData'] and payload .
If the expression resolves to a Message<?> instance (such as #this ), the message
emitted on the ack /nack channel is based on that message, with the additional headers added.
Previously, a new message was created with the correlation data as its payload, regardless of type.
Optional. |
14 | The channel to which positive (ack ) publisher confirmations are sent.
The payload is the correlation data defined by confirm-correlation-expression .
If the expression is #root or #this , the message is built from the original message, with the amqp_publishConfirm header set to true .
Optional (the default is nullChannel ). |
15 | The channel to which negative (nack ) publisher confirmations are sent.
The payload is the correlation data defined by confirm-correlation-expression (if there is no ErrorMessageStrategy configured).
If the expression is #root or #this , the message is built from the original message, with the amqp_publishConfirm header set to false .
When there is an ErrorMessageStrategy , the message is an ErrorMessage with a NackedAmqpMessageException payload.
Optional (the default is nullChannel ). |
16 | When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Default none (nacks will not be generated). |
17 | The channel to which returned messages are sent.
When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.
When there is no ErrorMessageStrategy configured, the message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode , amqp_returnReplyText , amqp_returnExchange , and amqp_returnRoutingKey .
When there is an ErrorMessageStrategy , the message is an ErrorMessage with a ReturnedAmqpMessageException payload.
Optional. |
18 | A reference to an ErrorMessageStrategy implementation used to build ErrorMessage instances when sending returned or negatively acknowledged messages. |
19 | When set to false , the endpoint attempts to connect to the broker during application context initialization.
This allows “fail fast” detection of bad configuration by logging an error message if the broker is down.
When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent. |
return-channel
Using a |
The underlying AmqpTemplate has a default replyTimeout of five seconds.
If you require a longer timeout, you must configure it on the template .
|
Configuring with Java Configuration
The following Spring Boot application shows an example of how to configure the outbound gateway with Java configuration:
@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
String reply = gateway.sendToRabbit("foo");
System.out.println(reply);
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExpectReply(true);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
String sendToRabbit(String data);
}
}
Note that the only difference between the outbound adapter and outbound gateway configuration is the setting of the
expectReply
property.
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(AmqpJavaApplication.class)
.web(false)
.run(args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
MyGateway gateway = context.getBean(MyGateway.class);
String reply = gateway.sendToRabbit("foo");
System.out.println(reply);
}
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(amqpOutboundChannel())
.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey("foo")) // default exchange - route to queue 'foo'
.get();
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
String sendToRabbit(String data);
}
}
Asynchronous Outbound Gateway
The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a
reply is received (or a timeout occurs).
Spring Integration version 4.3 added an asynchronous gateway, which uses the AsyncRabbitTemplate
from Spring AMQP.
When a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread.
This can be useful when the gateway is invoked on a poller thread.
The thread is released and is available for other tasks in the framework.
The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
1 | The unique ID for this adapter. Optional. |
2 | Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange. Required. |
3 | Bean reference to the configured AsyncRabbitTemplate .
Optional (it defaults to asyncRabbitTemplate ). |
4 | The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name-expression'. Optional. |
5 | A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional. |
6 | The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover.
Optional (it defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] ). |
7 | Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional. |
8 | The time the gateway waits when sending the reply message to the reply-channel .
This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full.
The default is infinity. |
9 | When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is true , the gateway sends an error message to the inbound message’s errorChannel header.
When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is false , the gateway sends an error message to the default errorChannel (if available).
It defaults to true . |
10 | The routing-key to use when sending Messages.
By default, this is an empty String .
Mutually exclusive with 'routing-key-expression'.
Optional. |
11 | A SpEL expression that is evaluated to determine the routing-key to use when sending messages,
with the message as the root object (for example, 'payload.key').
By default, this is an empty String .
Mutually exclusive with 'routing-key'.
Optional. |
12 | The default delivery mode for messages: PERSISTENT or NON_PERSISTENT .
Overridden if the header-mapper sets the delivery mode.
If the Spring Integration message header (amqp_deliveryMode ) is present, the DefaultHeaderMapper sets the value.
If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate .
If that is not customized, the default is PERSISTENT .
Optional. |
13 | An expression that defines correlation data.
When provided, this configures the underlying AMQP template to receive publisher confirmations.
Requires a dedicated RabbitTemplate and a CachingConnectionFactory with its publisherConfirms property set to true .
When a publisher confirmation is received and correlation data is supplied, the confirmation is written to either the confirm-ack-channel or the confirm-nack-channel , depending on the confirmation type.
The payload of the confirmation is the correlation data as defined by this expression, and the message has its 'amqp_publishConfirm' header set to true (ack ) or false (nack ).
For nack instances, an additional header (amqp_publishConfirmNackCause ) is provided.
Examples: headers['myCorrelationData'] , payload .
If the expression resolves to a Message<?> instance (such as “#this”), the message emitted on the ack /nack channel is based on that message, with the additional headers added.
Optional. |
14 | The channel to which positive (ack ) publisher confirmations are sent.
The payload is the correlation data defined by the confirm-correlation-expression .
Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true .
Optional (the default is nullChannel ). |
15 | Since version 4.2.
The channel to which negative (nack ) publisher confirmations are sent.
The payload is the correlation data defined by the confirm-correlation-expression .
Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true .
Optional (the default is nullChannel ). |
16 | When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Default none (nacks will not be generated). |
17 | The channel to which returned messages are sent.
When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway.
The message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode , amqp_returnReplyText , amqp_returnExchange , and amqp_returnRoutingKey .
Requires the underlying AsyncRabbitTemplate to have its mandatory property set to true .
Optional. |
18 | When set to false , the endpoint tries to connect to the broker during application context initialization.
Doing so allows “fail fast” detection of bad configuration, by logging an error message if the broker is down.
When true (the default), the connection is established (if it does not already exist because some other component established
it) when the first message is sent. |
See also Asynchronous Service Activator for more information.
RabbitTemplate
When you use confirmations and returns, we recommend that the |
Configuring with Java Configuration
The following configuration shows an example of how to configure the outbound gateway with Java configuration:
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AmqpTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:
@SpringBootApplication
public class AmqpAsyncApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(AmqpAsyncApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
String reply = gateway.sendToRabbit("foo");
System.out.println(reply);
}
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("foo")); // default exchange - route to queue 'foo'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
Inbound Message Conversion
Inbound messages, arriving at the channel adapter or gateway, are converted to the spring-messaging
Message<?>
payload using a message converter.
By default, a SimpleMessageConverter
is used, which handles java serialization and text.
Headers are mapped using the DefaultHeaderMapper.inboundMapper()
by default.
If a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container’s error handler.
The default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured).
If an error channel is defined, the ErrorMessage
payload is a ListenerExecutionFailedException
with properties failedMessage
(the Spring AMQP message that could not be converted) and the cause
.
If the container AcknowledgeMode
is AUTO
(the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged.
If the error flow throws an exception, the exception type, in conjunction with the container’s error handler, will determine whether or not the message is requeued.
If the container is configured with AcknowledgeMode.MANUAL
, the payload is a ManualAckListenerExecutionFailedException
with additional properties channel
and deliveryTag
.
This enables the error flow to call basicAck
or basicNack
(or basicReject
) for the message, to control its disposition.
Outbound Message Conversion
Spring AMQP 1.4 introduced the ContentTypeDelegatingMessageConverter
, where the actual converter is selected based
on the incoming content type message property.
This can be used by inbound endpoints.
As of Spring Integration version 4.3, you can use the ContentTypeDelegatingMessageConverter
on outbound endpoints as well, with the contentType
header specifying which converter is used.
The following example configures a ContentTypeDelegatingMessageConverter
, with the default converter being the SimpleMessageConverter
(which handles Java serialization and plain text), together with a JSON converter:
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
exchange-name="someExchange"
routing-key="someKey"
amqp-template="amqpTemplateContentTypeConverter" />
<int:channel id="ctRequestChannel"/>
<rabbit:template id="amqpTemplateContentTypeConverter"
connection-factory="connectionFactory" message-converter="ctConverter" />
<bean id="ctConverter"
class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json">
<bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
</entry>
</map>
</property>
</bean>
Sending a message to ctRequestChannel
with the contentType
header set to application/json
causes the JSON converter to be selected.
This applies to both the outbound channel adapter and gateway.
Starting with version 5.0, headers that are added to the There are, however, cases where the previous behavior is desired — for example, when a There is now a property called Starting with version 5.1.9, a similar |
Outbound User ID
Spring AMQP version 1.6 introduced a mechanism to allow the specification of a default user ID for outbound messages.
It has always been possible to set the AmqpHeaders.USER_ID
header, which now takes precedence over the default.
This might be useful to message recipients.
For inbound messages, if the message publisher sets the property, it is made available in the AmqpHeaders.RECEIVED_USER_ID
header.
Note that RabbitMQ validates that the user ID is the actual user ID for the connection or that the connection allows impersonation.
To configure a default user ID for outbound messages, configure it on a RabbitTemplate
and configure the outbound adapter or gateway to use that template.
Similarly, to set the user ID property on replies, inject an appropriately configured template into the inbound gateway.
See the Spring AMQP documentation for more information.
Delayed Message Exchange
Spring AMQP supports the RabbitMQ Delayed Message Exchange Plugin.
For inbound messages, the x-delay
header is mapped to the AmqpHeaders.RECEIVED_DELAY
header.
Setting the AMQPHeaders.DELAY
header causes the corresponding x-delay
header to be set in outbound messages.
You can also specify the delay
and delayExpression
properties on outbound endpoints (delay-expression
when using XML configuration).
These properties take precedence over the AmqpHeaders.DELAY
header.
AMQP-backed Message Channels
There are two message channel implementations available.
One is point-to-point, and the other is publish-subscribe.
Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate
and
SimpleMessageListenerContainer
(as shown earlier in this chapter for the channel adapters and gateways).
However, the examples we show here have minimal configuration.
Explore the XML schema to view the available attributes.
A point-to-point channel might look like the following example:
<int-amqp:channel id="p2pChannel"/>
Under the covers, the preceding example causes a Queue
named si.p2pChannel
to be declared, and this channel sends to that Queue
(technically, by sending to the no-name direct exchange with a routing key that matches the name of this Queue
).
This channel also registers a consumer on that Queue
.
If you want the channel to be “pollable” instead of message-driven, provide the message-driven
flag with a value of false
, as the following example shows:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
A publish-subscribe channel might look like the following:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
Under the covers, the preceding example causes a fanout exchange named si.fanout.pubSubChannel
to be declared, and this channel sends to that fanout exchange.
This channel also declares a server-named exclusive, auto-delete, non-durable Queue
and binds that to the fanout exchange while registering a consumer on that Queue
to receive messages.
There is no “pollable” option for a publish-subscribe-channel.
It must be message-driven.
Starting with version 4.1, AMQP-backed message channels (in conjunction with channel-transacted
) support
template-channel-transacted
to separate transactional
configuration for the AbstractMessageListenerContainer
and
for the RabbitTemplate
.
Note that, previously, channel-transacted
was true
by default.
Now, by default, it is false
for the AbstractMessageListenerContainer
.
Prior to version 4.3, AMQP-backed channels only supported messages with Serializable
payloads and headers.
The entire message was converted (serialized) and sent to RabbitMQ.
Now, you can set the extract-payload
attribute (or setExtractPayload()
when using Java configuration) to true
.
When this flag is true
, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters.
This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the Jackson2JsonMessageConverter
).
See AMQP Message Headers for more about the default mapped headers.
You can modify the mapping by providing custom mappers that use the outbound-header-mapper
and inbound-header-mapper
attributes.
You can now also specify a default-delivery-mode
, which is used to set the delivery mode when there is no amqp_deliveryMode
header.
By default, Spring AMQP MessageProperties
uses PERSISTENT
delivery mode.
As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications. For that purpose, use channel adapters instead. |
Starting with version 5.0, the pollable channel now blocks the poller thread for the specified receiveTimeout (the default is 1 second).
Previously, unlike other PollableChannel implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout.
Blocking is a little more expensive than using a basicGet() to retrieve a message (with no timeout), because a consumer has to be created to receive each message.
To restore the previous behavior, set the poller’s receiveTimeout to 0.
|
Configuring with Java Configuration
The following example shows how to configure the channels with Java configuration:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
Configuring with the Java DSL
The following example shows how to configure the channels with the Java DSL:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(...)
...
.channel(Amqp.publisSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}
AMQP Message Headers
Overview
The Spring Integration AMQP Adapters automatically map all AMQP properties and headers.
(This is a change from 4.3 - previously, only standard headers were mapped).
By default, these properties are copied to and from Spring Integration MessageHeaders
by using the
DefaultAmqpHeaderMapper
.
You can pass in your own implementation of AMQP-specific header mappers, as the adapters have properties to support doing so.
Any user-defined headers within the AMQP MessageProperties
are copied to or from an AMQP message, unless explicitly negated by the requestHeaderNames
or replyHeaderNames
properties of the DefaultAmqpHeaderMapper
.
By default, for an outbound mapper, no x-*
headers are mapped.
See the caution that appears later in this section for why.
To override the default and revert to the pre-4.3 behavior, use STANDARD_REQUEST_HEADERS
and
STANDARD_REPLY_HEADERS
in the properties.
When mapping user-defined headers, the values can also contain simple wildcard patterns (such as thing* or thing ) to be matched.
matches all headers.
|
Starting with version 4.1, the AbstractHeaderMapper
(a DefaultAmqpHeaderMapper
superclass) lets the NON_STANDARD_HEADERS
token be configured for the requestHeaderNames
and replyHeaderNames
properties (in addition to the existing STANDARD_REQUEST_HEADERS
and STANDARD_REPLY_HEADERS
) to map all user-defined headers.
The org.springframework.amqp.support.AmqpHeaders
class identifies the default headers that are used by the
DefaultAmqpHeaderMapper
:
-
amqp_appId
-
amqp_clusterId
-
amqp_contentEncoding
-
amqp_contentLength
-
content-type
(see contentType Header) -
amqp_correlationId
-
amqp_delay
-
amqp_deliveryMode
-
amqp_deliveryTag
-
amqp_expiration
-
amqp_messageCount
-
amqp_messageId
-
amqp_receivedDelay
-
amqp_receivedDeliveryMode
-
amqp_receivedExchange
-
amqp_receivedRoutingKey
-
amqp_redelivered
-
amqp_replyTo
-
amqp_timestamp
-
amqp_type
-
amqp_userId
-
amqp_publishConfirm
-
amqp_publishConfirmNackCause
-
amqp_returnReplyCode
-
amqp_returnReplyText
-
amqp_returnExchange
-
amqp_returnRoutingKey
-
amqp_channel
-
amqp_consumerTag
-
amqp_consumerQueue
As mentioned earlier in this section, using a header mapping pattern of`` is a common way to copy all headers.
However, this can have some unexpected side effects, because certain RabbitMQ proprietary properties/headers are also copied.
For example, when you use federation, the received message may have a property named x-received-from , which contains the node that sent the message.
If you use the wildcard character for the request and reply header mapping on the inbound gateway, this header is copied, which may cause some issues with federation.
This reply message may be federated back to the sending broker, which may think that a message is looping and, as a result, silently drop it.
If you wish to use the convenience of wildcard header mapping, you may need to filter out some headers in the downstream flow.
For example, to avoid copying the x-received-from header back to the reply you can use <int:header-filter … header-names="x-received-from"> before sending the reply to the AMQP inbound gateway.
Alternatively, you can explicitly list those properties that you actually want mapped, instead of using wildcards.
For these reasons, for inbound messages, the mapper (by default) does not map any x-* headers.
It also does not map the deliveryMode to the amqp_deliveryMode header, to avoid propagation of that header from an inbound message to an outbound message.
Instead, this header is mapped to amqp_receivedDeliveryMode , which is not mapped on output.
|
Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !
.
Negated patterns get priority, so a list such as STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1
does not map thing1
(nor thing2
nor thing3
).
The standard headers plus bad
and qux
are mapped.
The negation technique can be useful for example to not map JSON type headers for incoming messages when a JSON deserialization logic is done in the receiver downstream different way.
For this purpose a !json_*
pattern should be configured for header mapper of the inbound channel adapter/gateway.
If you have a user-defined header that begins with ! that you do wish to map, you need to escape it with \ , as follows: STANDARD_REQUEST_HEADERS,\!myBangHeader .
The header named !myBangHeader is now mapped.
|
Starting with version 5.1, the DefaultAmqpHeaderMapper will fall back to mapping MessageHeaders.ID and MessageHeaders.TIMESTAMP to MessageProperties.messageId and MessageProperties.timestamp respectively, if the corresponding amqp_messageId or amqp_timestamp headers are not present on outbound messages.
Inbound properties will be mapped to the amqp_* headers as before.
It is useful to populate the messageId property when message consumers are using stateful retry.
|
contentType Header
Unlike other headers, the AmqpHeaders.CONTENT_TYPE
is not prefixed with amqp_
; this allows transparent passing of the contentType header across different technologies.
For example an inbound HTTP message sent to a RabbitMQ queue.
The contentType
header is mapped to Spring AMQP’s MessageProperties.contentType
property and that is subsequently mapped to RabbitMQ’s content_type
property.
Prior to version 5.1, this header was also mapped as an entry in the MessageProperties.headers
map; this was incorrect and, furthermore, the value could be wrong since the underlying Spring AMQP message converter might have changed the content type.
Such a change would be reflected in the first-class content_type
property, but not in the RabbitMQ headers map.
Inbound mapping ignored the headers map value.
contentType
is no longer mapped to an entry in the headers map.
Strict Message Ordering
This section describes message ordering for inbound and outbound messages.
Inbound
If you require strict ordering of inbound messages, you must configure the inbound listener container’s prefetchCount
property to 1
.
This is because, if a message fails and is redelivered, it arrives after existing prefetched messages.
Since Spring AMQP version 2.0, the prefetchCount
defaults to 250
for improved performance.
Strict ordering requirements come at the cost of decreased performance.
Outbound
Consider the following integration flow:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gateway.class)
.split(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
Suppose we send messages A
, B
and C
to the gateway.
While it is likely that messages A
, B
, C
are sent in order, there is no guarantee.
This is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message.
One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred fold.
To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the BoundRabbitChannelAdvice
which is a HandleMessageAdvice
.
See Handling Message Advice.
When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations).
The following example shows how to use BoundRabbitChannelAdvice
:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gateway.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
Notice that the same RabbitTemplate
(which implements RabbitOperations
) is used in the advice and the outbound adapter.
The advice runs the downstream flow within the template’s invoke
method so that all operations run on the same channel.
If the optional timeout is provided, when the flow completes, the advice calls the waitForConfirmsOrDie
method, which throws an exception if the confirmations are not received within the specified time.
There must be no thread handoffs in the downstream flow (QueueChannel , ExecutorChannel , and others).
|
AMQP Samples
To experiment with the AMQP adapters, check out the samples available in the Spring Integration samples git repository at https://github.com/SpringSource/spring-integration-samples
Currently, one sample demonstrates the basic functionality of the Spring Integration AMQP adapter by using an outbound channel adapter and an inbound channel adapter. As AMQP broker implementation in the sample uses RabbitMQ.
In order to run the example, you need a running instance of RabbitMQ. A local installation with just the basic defaults suffices. For detailed RabbitMQ installation procedures, see https://www.rabbitmq.com/install.html |
Once the sample application is started, enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue. In return, that message is retrieved by Spring Integration and printed to the console.
The following image illustrates the basic set of Spring Integration components used in this sample.