AMQP (RabbitMQ) Support

Spring Integration provides channel adapters for receiving and sending messages by using the Advanced Message Queuing Protocol (AMQP).

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-amqp:6.0.9"

The following adapters are available:

Spring Integration also provides a point-to-point message channel and a publish-subscribe message channel backed by AMQP Exchanges and Queues.

To provide AMQP support, Spring Integration relies on (Spring AMQP), which applies core Spring concepts to the development of AMQP-based messaging solutions. Spring AMQP provides similar semantics to (Spring JMS).

Whereas the provided AMQP Channel Adapters are intended for unidirectional messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP gateways for request-reply operations.

TIP: You should familiarize yourself with the reference documentation of the Spring AMQP project. It provides much more in-depth information about Spring’s integration with AMQP in general and RabbitMQ in particular.

Inbound Channel Adapter

The following listing shows the possible configuration options for an AMQP Inbound Channel Adapter:

Java DSL
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
Java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
XML
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  tx-size=""                      (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)

<1> The unique ID for this adapter.
Optional.
<2> Message channel to which converted messages should be sent.
Required.
<3> Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required.
<4> Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.
`NONE` means no acknowledgements (`autoAck`).
`AUTO` means the adapter's container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See <<amqp-inbound-ack>>.
<5> Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional.
<6> Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false).
<7> Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional.
<8> Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`).
<9> Message channel to which error messages should be sent.
Optional.
<10> Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true).
<11> A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
<12> Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1*, thing2" or "*something").
<13> Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional.
<14> The `MessageConverter` to use when receiving AMQP messages.
Optional.
<15> The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional.
<16> Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
<19> Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`).
<20> If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`).
<21> The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`).
<22> By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`).
<23> By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`).
<24> Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
<26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information.
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
container

Note that when configuring an external container with XML, you cannot use the Spring AMQP namespace to define the container. This is because the namespace requires at least one <listener/> element. In this environment, the listener is internal to the adapter. For this reason, you must define the container by using a normal Spring <bean/> definition, as the following example shows:

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
Even though the Spring Integration JMS and AMQP support is similar, important differences exist. The JMS inbound channel adapter is using a JmsDestinationPollingSource under the covers and expects a configured poller. The AMQP inbound channel adapter uses an AbstractMessageListenerContainer and is message driven. In that regard, it is more similar to the JMS message-driven channel adapter.

Starting with version 5.5, the AmqpInboundChannelAdapter can be configured with an org.springframework.amqp.rabbit.retry.MessageRecoverer strategy which is used in the RecoveryCallback when the retry operation is called internally. See setMessageRecoverer() JavaDocs for more information.

When using exclusive or single-active consumers in the listener container, it is recommended that you set the container property forceStop to true. This will prevent a race condition where, after stopping the container, another consumer could start consuming messages before this instance has fully stopped.

Batched Messages

See the Spring AMQP Documentation for more information about batched messages.

To produce batched messages with Spring Integration, simply configure the outbound endpoint with a BatchingRabbitTemplate.

When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a Message<?> for each fragment. Starting with version 5.2, if the container’s deBatchingEnabled property is set to false, the de-batching is performed by the adapter instead, and a single Message<List<?>> is produced with the payload being a list of the fragment payloads (after conversion if appropriate).

The default BatchingStrategy is the SimpleBatchingStrategy, but this can be overridden on the adapter.

The org.springframework.amqp.rabbit.retry.MessageBatchRecoverer must be used with batches when recovery is required for retry operations.

Polled Inbound Channel Adapter

Overview

Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand — for example, with a MessageSourcePollingTemplate or a poller. See Deferred Acknowledgment Pollable Message Source for more information.

It does not currently support XML configuration.

The following example shows how to configure an AmqpMessageSource:

Java DSL
@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
            .handle(p -> {
                ...
            })
            .get();
}
Java
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
    return new AmqpMessageSource(connectionFactory, "someQueue");
}

See the Javadoc for configuration properties.

XML
This adapter currently does not have XML configuration support.

Batched Messages

For the polled adapter, there is no listener container, batched messages are always debatched (if the BatchingStrategy supports doing so).

Inbound Gateway

The inbound gateway supports all the attributes on the inbound channel adapter (except that 'channel' is replaced by 'request-channel'), plus some additional attributes. The following listing shows the available attributes:

Java DSL
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
            .transform(String.class, String::toUpperCase)
            .get();
}
Java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
    gateway.setRequestChannel(channel);
    gateway.setDefaultReplyTo("bar");
    return gateway;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                    new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("foo");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new AbstractReplyProducingMessageHandler() {

        @Override
        protected Object handleRequestMessage(Message<?> requestMessage) {
            return "reply to " + requestMessage.getPayload();
        }

    };
}
XML
<int-amqp:inbound-gateway
                          id="inboundGateway"                (1)
                          request-channel="myRequestChannel" (2)
                          header-mapper=""                   (3)
                          mapped-request-headers=""          (4)
                          mapped-reply-headers=""            (5)
                          reply-channel="myReplyChannel"     (6)
                          reply-timeout="1000"               (7)
                          amqp-template=""                   (8)
                          default-reply-to="" />             (9)
1 The Unique ID for this adapter. Optional.
2 Message channel to which converted messages are sent. Required.
3 A reference to an AmqpHeaderMapper to use when receiving AMQP Messages. Optional. By default, only standard AMQP properties (such as contentType) are copied to and from Spring Integration MessageHeaders. Any user-defined headers within the AMQP MessageProperties are not copied to or from an AMQP message by the default DefaultAmqpHeaderMapper. Not allowed if 'request-header-names' or 'reply-header-names' is provided.
4 Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders. This attribute can be provided only if the 'header-mapper' reference is not provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "thing1*, thing2" or "*thing1").
5 Comma-separated list of names of MessageHeaders to be mapped into the AMQP message properties of the AMQP reply message. All standard Headers (such as contentType) are mapped to AMQP Message Properties, while user-defined headers are mapped to the 'headers' property. This attribute can only be provided if the 'header-mapper' reference is not provided. The values in this list can also be simple patterns to be matched against the header names (for example, "*" or "foo*, bar" or "*foo").
6 Message Channel where reply Messages are expected. Optional.
7 Sets the receiveTimeout on the underlying o.s.i.core.MessagingTemplate for receiving messages from the reply channel. If not specified, this property defaults to 1000 (1 second). Only applies if the container thread hands off to another thread before the reply is sent.
8 The customized AmqpTemplate bean reference (to have more control over the reply messages to send). You can provide an alternative implementation to the RabbitTemplate.
9 The replyTo o.s.amqp.core.Address to be used when the requestMessage does not have a replyTo property. If this option is not specified, no amqp-template is provided, no replyTo property exists in the request message, and an IllegalStateException is thrown because the reply cannot be routed. If this option is not specified and an external amqp-template is provided, no exception is thrown. You must either specify this option or configure a default exchange and routingKey on that template, if you anticipate cases when no replyTo property exists in the request message.

See the note in Inbound Channel Adapter about configuring the listener-container attribute.

Starting with version 5.5, the AmqpInboundChannelAdapter can be configured with an org.springframework.amqp.rabbit.retry.MessageRecoverer strategy which is used in the RecoveryCallback when the retry operation is called internally. See setMessageRecoverer() JavaDocs for more information.

Batched Messages

Inbound Endpoint Acknowledge Mode

By default, the inbound endpoints use the AUTO acknowledge mode, which means the container automatically acknowledges the message when the downstream integration flow completes (or a message is handed off to another thread by using a QueueChannel or ExecutorChannel). Setting the mode to NONE configures the consumer such that acknowledgments are not used at all (the broker automatically acknowledges the message as soon as it is sent). Setting the mode to MANUAL lets user code acknowledge the message at some other point during processing. To support this, with this mode, the endpoints provide the Channel and deliveryTag in the amqp_channel and amqp_deliveryTag headers, respectively.

You can perform any valid Rabbit command on the Channel but, generally, only basicAck and basicNack (or basicReject) are used. In order to not interfere with the operation of the container, you should not retain a reference to the channel and use it only in the context of the current message.

Since the Channel is a reference to a “live” object, it cannot be serialized and is lost if a message is persisted.

The following example shows how you might use MANUAL acknowledgement:

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {

    // Do some processing

    if (allOK) {
        channel.basicAck(deliveryTag, false);

        // perhaps do some more processing

    }
    else {
        channel.basicNack(deliveryTag, false, true);
    }
    return someResultForDownStreamProcessing;
}

Outbound Endpoints

The following outbound endpoints have many similar configuration options. Starting with version 5.2, the confirm-timeout has been added. Normally, when publisher confirms are enabled, the broker will quickly return an ack (or nack) which will be sent to the appropriate channel. If a channel is closed before the confirm is received, the Spring AMQP framework will synthesize a nack. "Missing" acks should never occur but, if you set this property, the endpoint will periodically check for them and synthesize a nack if the time elapses without a confirm being received.

Outbound Channel Adapter

The following example shows the available properties for an AMQP outbound channel adapter:

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlow.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
Java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
XML
<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)
                               channel="outboundChannel"         (2)
                               amqp-template="myAmqpTemplate"    (3)
                               exchange-name=""                  (4)
                               exchange-name-expression=""       (5)
                               order="1"                         (6)
                               routing-key=""                    (7)
                               routing-key-expression=""         (8)
                               default-delivery-mode""           (9)
                               confirm-correlation-expression="" (10)
                               confirm-ack-channel=""            (11)
                               confirm-nack-channel=""           (12)
                               confirm-timeout=""                (13)
                               wait-for-confirm=""               (14)
                               return-channel=""                 (15)
                               error-message-strategy=""         (16)
                               header-mapper=""                  (17)
                               mapped-request-headers=""         (18)
                               lazy-connect="true"               (19)
                               multi-send="false"/>              (20)
1 The unique ID for this adapter. Optional.
2 Message channel to which messages should be sent to have them converted and published to an AMQP exchange. Required.
3 Bean reference to the configured AMQP template. Optional (defaults to amqpTemplate).
4 The name of the AMQP exchange to which messages are sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name-expression'. Optional.
5 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional.
6 The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover. Optional (defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 The fixed routing-key to use when sending messages. By default, this is an empty String. Mutually exclusive with 'routing-key-expression'. Optional.
8 A SpEL expression that is evaluated to determine the routing key to use when sending messages, with the message as the root object (for example, 'payload.key'). By default, this is an empty String. Mutually exclusive with 'routing-key'. Optional.
9 The default delivery mode for messages: PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. If the Spring Integration message header amqp_deliveryMode is present, the DefaultHeaderMapper sets the value. If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate. If that is not customized at all, the default is PERSISTENT. Optional.
10 An expression that defines correlation data. When provided, this configures the underlying AMQP template to receive publisher confirmations. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true. When a publisher confirmation is received and correlation data is supplied, it is written to either the confirm-ack-channel or the confirm-nack-channel, depending on the confirmation type. The payload of the confirmation is the correlation data, as defined by this expression. The message has an 'amqp_publishConfirm' header set to true (ack) or false (nack). Examples: headers['myCorrelationData'] and payload. Version 4.1 introduced the amqp_publishConfirmNackCause message header. It contains the cause of a 'nack' for a publisher confirmation. Starting with version 4.2, if the expression resolves to a Message<?> instance (such as #this), the message emitted on the ack/nack channel is based on that message, with the additional header(s) added. Previously, a new message was created with the correlation data as its payload, regardless of type. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
11 The channel to which positive (ack) publisher confirms are sent. The payload is the correlation data defined by the confirm-correlation-expression. If the expression is #root or #this, the message is built from the original message, with the amqp_publishConfirm header set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
12 The channel to which negative (nack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression (if there is no ErrorMessageStrategy configured). If the expression is #root or #this, the message is built from the original message, with the amqp_publishConfirm header set to false. When there is an ErrorMessageStrategy, the message is an ErrorMessage with a NackedAmqpMessageException payload. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
13 When set, the adapter will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Also see Alternative Mechanism for Publisher Confirms and Returns. Default none (nacks will not be generated).
14 When set to true, the calling thread will block, waiting for a publisher confirmation. This requires a RabbitTemplate configured for confirms as well as a confirm-correlation-expression. The thread will block for up to confirm-timeout (or 5 seconds by default). If a timeout occurs, a MessageTimeoutException will be thrown. If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirmation, a MessageHandlingException will be thrown, with an appropriate message.
15 The channel to which returned messages are sent. When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter. When there is no ErrorMessageStrategy configured, the message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey. When there is an ErrorMessageStrategy, the message is an ErrorMessage with a ReturnedAmqpMessageException payload. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
16 A reference to an ErrorMessageStrategy implementation used to build ErrorMessage instances when sending returned or negatively acknowledged messages.
17 A reference to an AmqpHeaderMapper to use when sending AMQP Messages. By default, only standard AMQP properties (such as contentType) are copied to the Spring Integration MessageHeaders. Any user-defined headers is not copied to the message by the default`DefaultAmqpHeaderMapper`. Not allowed if 'request-header-names' is provided. Optional.
18 Comma-separated list of names of AMQP Headers to be mapped from the MessageHeaders to the AMQP Message. Not allowed if the 'header-mapper' reference is provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "thing1*, thing2" or "*thing1").
19 When set to false, the endpoint attempts to connect to the broker during application context initialization. This allows “fail fast” detection of bad configuration but also causes initialization to fail if the broker is down. When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.
20 When set to true, payloads of type Iterable<Message<?>> will be sent as discrete messages on the same channel within the scope of a single RabbitTemplate invocation. Requires a RabbitTemplate. When wait-for-confirms is true, RabbitTemplate.waitForConfirmsOrDie() is invoked after the messages have been sent. With a transactional template, the sends will be performed in either a new transaction or one that has already been started (if present).
return-channel

Using a return-channel requires a RabbitTemplate with the mandatory property set to true and a CachingConnectionFactory with the publisherReturns property set to true. When using multiple outbound endpoints with returns, a separate RabbitTemplate is needed for each endpoint.

Outbound Gateway

The following listing shows the possible properties for an AMQP Outbound Gateway:

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
                    .routingKey("foo")) // default exchange - route to queue 'foo'
            .get();
}

@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {

    String sendToRabbit(String data);

}
Java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setExpectReply(true);
    outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {

    String sendToRabbit(String data);

}
XML
<int-amqp:outbound-gateway id="outboundGateway"               (1)
                           request-channel="myRequestChannel" (2)
                           amqp-template=""                   (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           error-message-strategy=""          (18)
                           lazy-connect="true" />             (19)
1 The unique ID for this adapter. Optional.
2 Message channel to which messages are sent to have them converted and published to an AMQP exchange. Required.
3 Bean reference to the configured AMQP template. Optional (defaults to amqpTemplate).
4 The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name cxchange. Mutually exclusive with 'exchange-name-expression'. Optional.
5 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages should be sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional.
6 The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover. Optional (defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional.
8 The time the gateway waits when sending the reply message to the reply-channel. This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. Defaults to infinity.
9 When true, the gateway throws an exception if no reply message is received within the AmqpTemplate’s `replyTimeout property. Defaults to true.
10 The routing-key to use when sending messages. By default, this is an empty String. Mutually exclusive with 'routing-key-expression'. Optional.
11 A SpEL expression that is evaluated to determine the routing-key to use when sending messages, with the message as the root object (for example, 'payload.key'). By default, this is an empty String. Mutually exclusive with 'routing-key'. Optional.
12 The default delivery mode for messages: PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. If the Spring Integration message header amqp_deliveryMode is present, the DefaultHeaderMapper sets the value. If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate. If that is not customized at all, the default is PERSISTENT. Optional.
13 Since version 4.2. An expression defining correlation data. When provided, this configures the underlying AMQP template to receive publisher confirms. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true. When a publisher confirm is received and correlation data is supplied, it is written to either the confirm-ack-channel or the confirm-nack-channel, depending on the confirmation type. The payload of the confirm is the correlation data, as defined by this expression. The message has a header 'amqp_publishConfirm' set to true (ack) or false (nack). For nack confirmations, Spring Integration provides an additional header amqp_publishConfirmNackCause. Examples: headers['myCorrelationData'] and payload. If the expression resolves to a Message<?> instance (such as #this), the message emitted on the ack/nack channel is based on that message, with the additional headers added. Previously, a new message was created with the correlation data as its payload, regardless of type. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
14 The channel to which positive (ack) publisher confirmations are sent. The payload is the correlation data defined by confirm-correlation-expression. If the expression is #root or #this, the message is built from the original message, with the amqp_publishConfirm header set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
15 The channel to which negative (nack) publisher confirmations are sent. The payload is the correlation data defined by confirm-correlation-expression (if there is no ErrorMessageStrategy configured). If the expression is #root or #this, the message is built from the original message, with the amqp_publishConfirm header set to false. When there is an ErrorMessageStrategy, the message is an ErrorMessage with a NackedAmqpMessageException payload. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
16 When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Default none (nacks will not be generated).
17 The channel to which returned messages are sent. When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter. When there is no ErrorMessageStrategy configured, the message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, and amqp_returnRoutingKey. When there is an ErrorMessageStrategy, the message is an ErrorMessage with a ReturnedAmqpMessageException payload. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
18 A reference to an ErrorMessageStrategy implementation used to build ErrorMessage instances when sending returned or negatively acknowledged messages.
19 When set to false, the endpoint attempts to connect to the broker during application context initialization. This allows “fail fast” detection of bad configuration by logging an error message if the broker is down. When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.
return-channel

Using a return-channel requires a RabbitTemplate with the mandatory property set to true and a CachingConnectionFactory with the publisherReturns property set to true. When using multiple outbound endpoints with returns, a separate RabbitTemplate is needed for each endpoint.

The underlying AmqpTemplate has a default replyTimeout of five seconds. If you require a longer timeout, you must configure it on the template.

Note that the only difference between the outbound adapter and outbound gateway configuration is the setting of the expectReply property.

Asynchronous Outbound Gateway

The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a reply is received (or a timeout occurs). Spring Integration version 4.3 added an asynchronous gateway, which uses the AsyncRabbitTemplate from Spring AMQP. When a message is sent, the thread returns immediately after the send operation completes, and, when the message is received, the reply is sent on the template’s listener container thread. This can be useful when the gateway is invoked on a poller thread. The thread is released and is available for other tasks in the framework.

The following listing shows the possible configuration options for an AMQP asynchronous outbound gateway:

Java DSL
@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
Java
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
XML
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 The unique ID for this adapter. Optional.
2 Message channel to which messages should be sent in order to have them converted and published to an AMQP exchange. Required.
3 Bean reference to the configured AsyncRabbitTemplate. Optional (it defaults to asyncRabbitTemplate).
4 The name of the AMQP exchange to which messages should be sent. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name-expression'. Optional.
5 A SpEL expression that is evaluated to determine the name of the AMQP exchange to which messages are sent, with the message as the root object. If not provided, messages are sent to the default, no-name exchange. Mutually exclusive with 'exchange-name'. Optional.
6 The order for this consumer when multiple consumers are registered, thereby enabling load-balancing and failover. Optional (it defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).
7 Message channel to which replies should be sent after being received from an AMQP queue and converted. Optional.
8 The time the gateway waits when sending the reply message to the reply-channel. This only applies if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. The default is infinity.
9 When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is true, the gateway sends an error message to the inbound message’s errorChannel header. When no reply message is received within the AsyncRabbitTemplate’s `receiveTimeout property and this setting is false, the gateway sends an error message to the default errorChannel (if available). It defaults to true.
10 The routing-key to use when sending Messages. By default, this is an empty String. Mutually exclusive with 'routing-key-expression'. Optional.
11 A SpEL expression that is evaluated to determine the routing-key to use when sending messages, with the message as the root object (for example, 'payload.key'). By default, this is an empty String. Mutually exclusive with 'routing-key'. Optional.
12 The default delivery mode for messages: PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. If the Spring Integration message header (amqp_deliveryMode) is present, the DefaultHeaderMapper sets the value. If this attribute is not supplied and the header mapper does not set it, the default depends on the underlying Spring AMQP MessagePropertiesConverter used by the RabbitTemplate. If that is not customized, the default is PERSISTENT. Optional.
13 An expression that defines correlation data. When provided, this configures the underlying AMQP template to receive publisher confirmations. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with its publisherConfirms property set to true. When a publisher confirmation is received and correlation data is supplied, the confirmation is written to either the confirm-ack-channel or the confirm-nack-channel, depending on the confirmation type. The payload of the confirmation is the correlation data as defined by this expression, and the message has its 'amqp_publishConfirm' header set to true (ack) or false (nack). For nack instances, an additional header (amqp_publishConfirmNackCause) is provided. Examples: headers['myCorrelationData'], payload. If the expression resolves to a Message<?> instance (such as “#this”), the message emitted on the ack/nack channel is based on that message, with the additional headers added. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
14 The channel to which positive (ack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
15 Since version 4.2. The channel to which negative (nack) publisher confirmations are sent. The payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional (the default is nullChannel).
16 When set, the gateway will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds. Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value. Also see Alternative Mechanism for Publisher Confirms and Returns. Default none (nacks will not be generated).
17 The channel to which returned messages are sent. When provided, the underlying AMQP template is configured to return undeliverable messages to the gateway. The message is constructed from the data received from AMQP, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, and amqp_returnRoutingKey. Requires the underlying AsyncRabbitTemplate to have its mandatory property set to true. Also see Alternative Mechanism for Publisher Confirms and Returns. Optional.
18 When set to false, the endpoint tries to connect to the broker during application context initialization. Doing so allows “fail fast” detection of bad configuration, by logging an error message if the broker is down. When true (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.

See also Asynchronous Service Activator for more information.

RabbitTemplate

When you use confirmations and returns, we recommend that the RabbitTemplate wired into the AsyncRabbitTemplate be dedicated. Otherwise, unexpected side effects may be encountered.

Alternative Mechanism for Publisher Confirms and Returns

When the connection factory is configured for publisher confirms and returns, the sections above discuss the configuration of message channels to receive the confirms and returns asynchronously. Starting with version 5.4, there is an additional mechanism which is generally easier to use.

In this case, do not configure a confirm-correlation-expression or the confirm and return channels. Instead, add a CorrelationData instance in the AmqpHeaders.PUBLISH_CONFIRM_CORRELATION header; you can then wait for the result(s) later, by checking the state of the future in the CorrelationData instances for which you have sent messages. The returnedMessage field will always be populated (if a message is returned) before the future is completed.

CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
        .setHeader("rk", "someKeyThatWontRoute")
        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
        .build());
...
try {
    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
    Message returned = corr.getReturnedMessage();
    if (returned !- null) {
        // message could not be routed
    }
}
catch { ... }

To improve performance, you may wish to send multiple messages and wait for the confirmations later, rather than one-at-a-time. The returned message is the raw message after conversion; you can sub-class a CorrelationData with whatever additional data you need.

Inbound Message Conversion

Inbound messages, arriving at the channel adapter or gateway, are converted to the spring-messaging Message<?> payload using a message converter. By default, a SimpleMessageConverter is used, which handles java serialization and text. Headers are mapped using the DefaultHeaderMapper.inboundMapper() by default. If a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container’s error handler. The default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured). If an error channel is defined, the ErrorMessage payload is a ListenerExecutionFailedException with properties failedMessage (the Spring AMQP message that could not be converted) and the cause. If the container AcknowledgeMode is AUTO (the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged. If the error flow throws an exception, the exception type, in conjunction with the container’s error handler, will determine whether the message is requeued. If the container is configured with AcknowledgeMode.MANUAL, the payload is a ManualAckListenerExecutionFailedException with additional properties channel and deliveryTag. This enables the error flow to call basicAck or basicNack (or basicReject) for the message, to control its disposition.

Outbound Message Conversion

Spring AMQP 1.4 introduced the ContentTypeDelegatingMessageConverter, where the actual converter is selected based on the incoming content type message property. This can be used by inbound endpoints.

As of Spring Integration version 4.3, you can use the ContentTypeDelegatingMessageConverter on outbound endpoints as well, with the contentType header specifying which converter is used.

The following example configures a ContentTypeDelegatingMessageConverter, with the default converter being the SimpleMessageConverter (which handles Java serialization and plain text), together with a JSON converter:

<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
                               exchange-name="someExchange"
                               routing-key="someKey"
                               amqp-template="amqpTemplateContentTypeConverter" />

<int:channel id="ctRequestChannel"/>

<rabbit:template id="amqpTemplateContentTypeConverter"
        connection-factory="connectionFactory" message-converter="ctConverter" />

<bean id="ctConverter"
        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json">
                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
            </entry>
        </map>
    </property>
</bean>

Sending a message to ctRequestChannel with the contentType header set to application/json causes the JSON converter to be selected.

This applies to both the outbound channel adapter and gateway.

Starting with version 5.0, headers that are added to the MessageProperties of the outbound message are never overwritten by mapped headers (by default). Previously, this was only the case if the message converter was a ContentTypeDelegatingMessageConverter (in that case, the header was mapped first so that the proper converter could be selected). For other converters, such as the SimpleMessageConverter, mapped headers overwrote any headers added by the converter. This caused problems when an outbound message had some leftover contentType headers (perhaps from an inbound channel adapter) and the correct outbound contentType was incorrectly overwritten. The work-around was to use a header filter to remove the header before sending the message to the outbound endpoint.

There are, however, cases where the previous behavior is desired — for example, when a String payload that contains JSON, the SimpleMessageConverter is not aware of the content and sets the contentType message property to text/plain but your application would like to override that to application/json by setting the contentType header of the message sent to the outbound endpoint. The ObjectToJsonTransformer does exactly that (by default).

There is now a property called headersMappedLast on the outbound channel adapter and gateway (as well as on AMQP-backed channels). Setting this to true restores the behavior of overwriting the property added by the converter.

Starting with version 5.1.9, a similar replyHeadersMappedLast is provided for the AmqpInboundGateway when we produce a reply and would like to override headers populated by the converter. See its JavaDocs for more information.

Outbound User ID

Spring AMQP version 1.6 introduced a mechanism to allow the specification of a default user ID for outbound messages. It has always been possible to set the AmqpHeaders.USER_ID header, which now takes precedence over the default. This might be useful to message recipients. For inbound messages, if the message publisher sets the property, it is made available in the AmqpHeaders.RECEIVED_USER_ID header. Note that RabbitMQ validates that the user ID is the actual user ID for the connection or that the connection allows impersonation.

To configure a default user ID for outbound messages, configure it on a RabbitTemplate and configure the outbound adapter or gateway to use that template. Similarly, to set the user ID property on replies, inject an appropriately configured template into the inbound gateway. See the Spring AMQP documentation for more information.

Delayed Message Exchange

Spring AMQP supports the RabbitMQ Delayed Message Exchange Plugin. For inbound messages, the x-delay header is mapped to the AmqpHeaders.RECEIVED_DELAY header. Setting the AMQPHeaders.DELAY header causes the corresponding x-delay header to be set in outbound messages. You can also specify the delay and delayExpression properties on outbound endpoints (delay-expression when using XML configuration). These properties take precedence over the AmqpHeaders.DELAY header.

AMQP-backed Message Channels

There are two message channel implementations available. One is point-to-point, and the other is publish-subscribe. Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate and SimpleMessageListenerContainer (as shown earlier in this chapter for the channel adapters and gateways). However, the examples we show here have minimal configuration. Explore the XML schema to view the available attributes.

A point-to-point channel might look like the following example:

<int-amqp:channel id="p2pChannel"/>

Under the covers, the preceding example causes a Queue named si.p2pChannel to be declared, and this channel sends to that Queue (technically, by sending to the no-name direct exchange with a routing key that matches the name of this Queue). This channel also registers a consumer on that Queue. If you want the channel to be “pollable” instead of message-driven, provide the message-driven flag with a value of false, as the following example shows:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

A publish-subscribe channel might look like the following:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

Under the covers, the preceding example causes a fanout exchange named si.fanout.pubSubChannel to be declared, and this channel sends to that fanout exchange. This channel also declares a server-named exclusive, auto-delete, non-durable Queue and binds that to the fanout exchange while registering a consumer on that Queue to receive messages. There is no “pollable” option for a publish-subscribe-channel. It must be message-driven.

Starting with version 4.1, AMQP-backed message channels (in conjunction with channel-transacted) support template-channel-transacted to separate transactional configuration for the AbstractMessageListenerContainer and for the RabbitTemplate. Note that, previously, channel-transacted was true by default. Now, by default, it is false for the AbstractMessageListenerContainer.

Prior to version 4.3, AMQP-backed channels only supported messages with Serializable payloads and headers. The entire message was converted (serialized) and sent to RabbitMQ. Now, you can set the extract-payload attribute (or setExtractPayload() when using Java configuration) to true. When this flag is true, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters. This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the Jackson2JsonMessageConverter). See AMQP Message Headers for more about the default mapped headers. You can modify the mapping by providing custom mappers that use the outbound-header-mapper and inbound-header-mapper attributes. You can now also specify a default-delivery-mode, which is used to set the delivery mode when there is no amqp_deliveryMode header. By default, Spring AMQP MessageProperties uses PERSISTENT delivery mode.

As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications. For that purpose, use channel adapters instead.
Starting with version 5.0, the pollable channel now blocks the poller thread for the specified receiveTimeout (the default is 1 second). Previously, unlike other PollableChannel implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout. Blocking is a little more expensive than using a basicGet() to retrieve a message (with no timeout), because a consumer has to be created to receive each message. To restore the previous behavior, set the poller’s receiveTimeout to 0.

Configuring with Java Configuration

The following example shows how to configure the channels with Java configuration:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

Configuring with the Java DSL

The following example shows how to configure the channels with the Java DSL:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}

AMQP Message Headers

Overview

The Spring Integration AMQP Adapters automatically map all AMQP properties and headers. (This is a change from 4.3 - previously, only standard headers were mapped). By default, these properties are copied to and from Spring Integration MessageHeaders by using the DefaultAmqpHeaderMapper.

You can pass in your own implementation of AMQP-specific header mappers, as the adapters have properties to support doing so.

Any user-defined headers within the AMQP MessageProperties are copied to or from an AMQP message, unless explicitly negated by the requestHeaderNames or replyHeaderNames properties of the DefaultAmqpHeaderMapper. By default, for an outbound mapper, no x-* headers are mapped. See the caution that appears later in this section for why.

To override the default and revert to the pre-4.3 behavior, use STANDARD_REQUEST_HEADERS and STANDARD_REPLY_HEADERS in the properties.

When mapping user-defined headers, the values can also contain simple wildcard patterns (such as thing* or *thing) to be matched. The * matches all headers.

Starting with version 4.1, the AbstractHeaderMapper (a DefaultAmqpHeaderMapper superclass) lets the NON_STANDARD_HEADERS token be configured for the requestHeaderNames and replyHeaderNames properties (in addition to the existing STANDARD_REQUEST_HEADERS and STANDARD_REPLY_HEADERS) to map all user-defined headers.

The org.springframework.amqp.support.AmqpHeaders class identifies the default headers that are used by the DefaultAmqpHeaderMapper:

  • amqp_appId

  • amqp_clusterId

  • amqp_contentEncoding

  • amqp_contentLength

  • content-type (see The contentType Header)

  • amqp_correlationId

  • amqp_delay

  • amqp_deliveryMode

  • amqp_deliveryTag

  • amqp_expiration

  • amqp_messageCount

  • amqp_messageId

  • amqp_receivedDelay

  • amqp_receivedDeliveryMode

  • amqp_receivedExchange

  • amqp_receivedRoutingKey

  • amqp_redelivered

  • amqp_replyTo

  • amqp_timestamp

  • amqp_type

  • amqp_userId

  • amqp_publishConfirm

  • amqp_publishConfirmNackCause

  • amqp_returnReplyCode

  • amqp_returnReplyText

  • amqp_returnExchange

  • amqp_returnRoutingKey

  • amqp_channel

  • amqp_consumerTag

  • amqp_consumerQueue

As mentioned earlier in this section, using a header mapping pattern of * is a common way to copy all headers. However, this can have some unexpected side effects, because certain RabbitMQ proprietary properties/headers are also copied. For example, when you use federation, the received message may have a property named x-received-from, which contains the node that sent the message. If you use the wildcard character * for the request and reply header mapping on the inbound gateway, this header is copied, which may cause some issues with federation. This reply message may be federated back to the sending broker, which may think that a message is looping and, as a result, silently drop it. If you wish to use the convenience of wildcard header mapping, you may need to filter out some headers in the downstream flow. For example, to avoid copying the x-received-from header back to the reply you can use <int:header-filter …​ header-names="x-received-from"> before sending the reply to the AMQP inbound gateway. Alternatively, you can explicitly list those properties that you actually want mapped, instead of using wildcards. For these reasons, for inbound messages, the mapper (by default) does not map any x-* headers. It also does not map the deliveryMode to the amqp_deliveryMode header, to avoid propagation of that header from an inbound message to an outbound message. Instead, this header is mapped to amqp_receivedDeliveryMode, which is not mapped on output.

Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !. Negated patterns get priority, so a list such as STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1 does not map thing1 (nor thing2 nor thing3). The standard headers plus bad and qux are mapped. The negation technique can be useful for example to not map JSON type headers for incoming messages when a JSON deserialization logic is done in the receiver downstream different way. For this purpose a !json_* pattern should be configured for header mapper of the inbound channel adapter/gateway.

If you have a user-defined header that begins with ! that you do wish to map, you need to escape it with \, as follows: STANDARD_REQUEST_HEADERS,\!myBangHeader. The header named !myBangHeader is now mapped.
Starting with version 5.1, the DefaultAmqpHeaderMapper will fall back to mapping MessageHeaders.ID and MessageHeaders.TIMESTAMP to MessageProperties.messageId and MessageProperties.timestamp respectively, if the corresponding amqp_messageId or amqp_timestamp headers are not present on outbound messages. Inbound properties will be mapped to the amqp_* headers as before. It is useful to populate the messageId property when message consumers are using stateful retry.

The contentType Header

Unlike other headers, the AmqpHeaders.CONTENT_TYPE is not prefixed with amqp_; this allows transparent passing of the contentType header across different technologies. For example an inbound HTTP message sent to a RabbitMQ queue.

The contentType header is mapped to Spring AMQP’s MessageProperties.contentType property and that is subsequently mapped to RabbitMQ’s content_type property.

Prior to version 5.1, this header was also mapped as an entry in the MessageProperties.headers map; this was incorrect and, furthermore, the value could be wrong since the underlying Spring AMQP message converter might have changed the content type. Such a change would be reflected in the first-class content_type property, but not in the RabbitMQ headers map. Inbound mapping ignored the headers map value. contentType is no longer mapped to an entry in the headers map.

Strict Message Ordering

This section describes message ordering for inbound and outbound messages.

Inbound

If you require strict ordering of inbound messages, you must configure the inbound listener container’s prefetchCount property to 1. This is because, if a message fails and is redelivered, it arrives after existing prefetched messages. Since Spring AMQP version 2.0, the prefetchCount defaults to 250 for improved performance. Strict ordering requirements come at the cost of decreased performance.

Outbound

Consider the following integration flow:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .split(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

Suppose we send messages A, B and C to the gateway. While it is likely that messages A, B, C are sent in order, there is no guarantee. This is because the template “borrows” a channel from the cache for each send operation, and there is no guarantee that the same channel is used for each message. One solution is to start a transaction before the splitter, but transactions are expensive in RabbitMQ and can reduce performance several hundred-fold.

To solve this problem in a more efficient manner, starting with version 5.1, Spring Integration provides the BoundRabbitChannelAdvice which is a HandleMessageAdvice. See Handling Message Advice. When applied before the splitter, it ensures that all downstream operations are performed on the same channel and, optionally, can wait until publisher confirmations for all sent messages are received (if the connection factory is configured for confirmations). The following example shows how to use BoundRabbitChannelAdvice:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .split(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

Notice that the same RabbitTemplate (which implements RabbitOperations) is used in the advice and the outbound adapter. The advice runs the downstream flow within the template’s invoke method so that all operations run on the same channel. If the optional timeout is provided, when the flow completes, the advice calls the waitForConfirmsOrDie method, which throws an exception if the confirmations are not received within the specified time.

There must be no thread handoffs in the downstream flow (QueueChannel, ExecutorChannel, and others).

AMQP Samples

To experiment with the AMQP adapters, check out the samples available in the Spring Integration samples git repository at https://github.com/SpringSource/spring-integration-samples

Currently, one sample demonstrates the basic functionality of the Spring Integration AMQP adapter by using an outbound channel adapter and an inbound channel adapter. As AMQP broker implementation in the sample uses RabbitMQ.

In order to run the example, you need a running instance of RabbitMQ. A local installation with just the basic defaults suffices. For detailed RabbitMQ installation procedures, see https://www.rabbitmq.com/install.html

Once the sample application is started, enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue. In return, that message is retrieved by Spring Integration and printed to the console.

The following image illustrates the basic set of Spring Integration components used in this sample.

RabbitMQ Stream Queue Support

The Spring Integration graph of the AMQP sample image::images/spring-integration-amqp-sample-graph.png[]

Version 6.0 introduced support for RabbitMQ Stream Queues.

The DSL factory class for these endpoints is Rabbit.

RabbitMQ Stream Inbound Channel Adapter

@Bean
IntegrationFlow flow(Environment env) {
    @Bean
	IntegrationFlow simpleStream(Environment env) {
		return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
						.configureContainer(container -> container.queueName("my.stream")))
				// ...
				.get();
	}

	@Bean
	IntegrationFlow superStream(Environment env) {
		return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
						.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
				// ...
				.get();
	}
}

RabbitMQ Stream Outbound Channel Adapter

@Bean
IntegrationFlow outbound(RabbitStreamTemplate template) {
    return f -> f
            // ...
            .handle(RabbitStream.outboundStreamAdapter(template));

}