11. AMQP Support

11.1 Introduction

Spring Integration provides Channel Adapters for receiving and sending messages using the Advanced Message Queuing Protocol (AMQP). The following adapters are available:

Spring Integration also provides a point-to-point Message Channel as well as a publish/subscribe Message Channel backed by AMQP Exchanges and Queues.

In order to provide AMQP support, Spring Integration relies on (Spring AMQP) which "applies core Spring concepts to the development of AMQP-based messaging solutions". Spring AMQP provides similar semantics to (Spring JMS).

Whereas the provided AMQP Channel Adapters are intended for unidirectional Messaging (send or receive) only, Spring Integration also provides inbound and outbound AMQP Gateways for request/reply operations.

[Tip]Tip

Please familiarize yourself with the reference documentation of the Spring AMQP project as well. It provides much more in-depth information regarding Spring’s integration with AMQP in general and RabbitMQ in particular.

11.2 Inbound Channel Adapter

A configuration sample for an AMQP Inbound Channel Adapter is shown below.

<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp" 1
                                  channel="inboundChannel" 2
                                  queue-names="si.test.queue" 3
                                  acknowledge-mode="AUTO" 4
                                  advice-chain="" 5
                                  channel-transacted="" 6
                                  concurrent-consumers="" 7
                                  connection-factory="" 8
                                  error-channel="" 9
                                  expose-listener-channel="" 10
                                  header-mapper="" 11
                                  mapped-request-headers="" 12
                                  listener-container="" 13
                                  message-converter="" 14
                                  message-properties-converter="" 15
                                  phase="" (16)
                                  prefetch-count="" (17)
                                  receive-timeout="" (18)
                                  recovery-interval="" (19)
                                  missing-queues-fatal="" (20)
                                  shutdown-timeout="" (21)
                                  task-executor="" (22)
                                  transaction-attribute="" (23)
                                  transaction-manager="" (24)
                                  tx-size="" /> (25)

1

Unique ID for this adapter. Optional.

2

Message Channel to which converted Messages should be sent. Required.

3

Names of the AMQP Queues from which Messages should be consumed (comma-separated list).Required.

4

Acknowledge Mode for the MessageListenerContainer. When set to MANUAL, the delivery tag and channel are provided in message headers amqp_deliveryTag and amqp_channel respectively; the user application is responsible for acknowledgement. NONE means no acknowledgements (autoAck); AUTO means the adapter’s container will acknowledge when the downstream flow completes.Optional (Defaults to AUTO) see Section 11.4, “Inbound Endpoint Acknowledge Mode”.

5

Extra AOP Advice(s) to handle cross cutting behavior associated with this Inbound Channel Adapter. Optional.

6

Flag to indicate that channels created by this component will be transactional. If true, tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback depending on the outcome, with an exception signalling a rollback. Optional (Defaults to false).

7

Specify the number of concurrent consumers to create. Default is 1. Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues. Optional.

8

Bean reference to the RabbitMQ ConnectionFactory. Optional (Defaults to connectionFactory).

9

Message Channel to which error Messages should be sent. Optional.

10

Shall the listener channel (com.rabbitmq.client.Channel) be exposed to a registered ChannelAwareMessageListener. Optional (Defaults to true).

11

A reference to an AmqpHeaderMapper to use when receiving AMQP Messages. Optional. By default only standard AMQP properties (e.g. contentType) will be copied to Spring Integration MessageHeaders. Any user-defined headers within the AMQP MessageProperties will NOT be copied to the Message by the default DefaultAmqpHeaderMapper. Not allowed if request-header-names is provided.

12

Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders. This can only be provided if the header-mapper reference is not provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "foo*, bar" or "*foo").

13

Reference to the SimpleMessageListenerContainer to use for receiving AMQP Messages. If this attribute is provided, then no other attribute related to the listener container configuration should be provided. In other words, by setting this reference, you must take full responsibility of the listener container configuration. The only exception is the MessageListener itself. Since that is actually the core responsibility of this Channel Adapter implementation, the referenced listener container must NOT already have its own MessageListener configured. Optional.

14

The MessageConverter to use when receiving AMQP Messages. Optional.

15

The MessagePropertiesConverter to use when receiving AMQP Messages. Optional.

(16)

Specify the phase in which the underlying SimpleMessageListenerContainer should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible. Optional.

(17)

Tells the AMQP broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. It should be greater than or equal to the transaction size (see attribute "tx-size"). Optional (Defaults to 1).

(18)

Receive timeout in milliseconds. Optional (Defaults to 1000).

(19)

Specifies the interval between recovery attempts of the underlying SimpleMessageListenerContainer (in milliseconds) .Optional (Defaults to 5000).

(20)

If true, and none of the queues are available on the broker, the container will throw a fatal exception during startup and will stop if the queues are deleted when the container is running (after making 3 attempts to passively declare the queues). If false, the container will not throw an exception and go into recovery mode, attempting to restart according to the recovery-interval. Optional (Defaults to true).

(21)

The time to wait for workers in milliseconds after the underlying SimpleMessageListenerContainer is stopped, and before the AMQP connection is forced closed. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Otherwise the connection is closed and messages remain unacked (if the channel is transactional). Optional (Defaults to 5000).

(22)

By default, the underlying SimpleMessageListenerContainer uses a SimpleAsyncTaskExecutor implementation, that fires up a new Thread for each task, executing it asynchronously. By default, the number of concurrent threads is unlimited. NOTE: This implementation does not reuse threads. Consider a thread-pooling TaskExecutor implementation as an alternative. Optional (Defaults to SimpleAsyncTaskExecutor).

(23)

By default the underlying SimpleMessageListenerContainer creates a new instance of the DefaultTransactionAttribute (takes the EJB approach to rolling back on runtime, but not checked exceptions. Optional (Defaults to DefaultTransactionAttribute).

(24)

Sets a Bean reference to an external PlatformTransactionManager on the underlying SimpleMessageListenerContainer. The transaction manager works in conjunction with the "channel-transacted" attribute. If there is already a transaction in progress when the framework is sending or receiving a message, and the channelTransacted flag is true, then the commit or rollback of the messaging transaction will be deferred until the end of the current transaction. If the channelTransacted flag is false, then no transaction semantics apply to the messaging operation (it is auto-acked). For further information see Transactions with Spring AMQP. Optional.

(25)

Tells the SimpleMessageListenerContainer how many messages to process in a single transaction (if the channel is transactional). For best results it should be less than or equal to the set "prefetch-count". Optional (Defaults to 1).

[Note]container

Note that when configuring an external container, you cannot use the Spring AMQP namespace to define the container. This is because the namespace requires at least one <listener/> element. In this environment, the listener is internal to the adapter. For this reason, you must define the container using a normal Spring <bean/> definition, such as:

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="foo.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
[Important]Important

Even though the Spring Integration JMS and AMQP support is very similar, important differences exist. The JMS Inbound Channel Adapter is using a JmsDestinationPollingSource under the covers and expects a configured Poller. The AMQP Inbound Channel Adapter on the other side uses a`SimpleMessageListenerContainer` and is message driven. In that regard it is more similar to the JMS Message Driven Channel Adapter.

11.2.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:

@SpringBootApplication
public class AmqpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(AmqpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel amqpInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
            @Qualifier("amqpInputChannel") MessageChannel channel) {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(channel);
        return adapter;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container =
                                   new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("foo");
        container.setConcurrentConsumers(2);
        // ...
        return container;
    }

    @Bean
    @ServiceActivator(inputChannel = "amqpInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

11.2.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the inbound adapter using the Java DSL:

@SpringBootApplication
public class AmqpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(AmqpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "foo"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

11.3 Inbound Gateway

The inbound gateway supports all the attributes on the inbound channel adapter (except channel is replaced by request-channel), plus some additional attributes:

<int-amqp:inbound-gateway
                          id="inboundGateway" 1
                          request-channel="myRequestChannel" 2
                          header-mapper="" 3
                          mapped-request-headers="" 4
                          mapped-reply-headers="" 5
                          reply-channel="myReplyChannel" 6
                          reply-timeout="1000"  7
                          amqp-template="" 8
                          default-reply-to="" /> 9

1

Unique ID for this adapter. Optional.

2

Message Channel to which converted Messages should be sent. Required.

3

A reference to an AmqpHeaderMapper to use when receiving AMQP Messages. Optional. By default only standard AMQP properties (e.g. contentType) will be copied to and from Spring Integration MessageHeaders. Any user-defined headers within the AMQP`MessageProperties` will NOT be copied to or from an AMQP Message by the default DefaultAmqpHeaderMapper. Not allowed if request-header-names or reply-header-names is provided.

4

Comma-separated list of names of AMQP Headers to be mapped from the AMQP request into the MessageHeaders. This can only be provided if the header-mapper reference is not provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "foo*, bar" or "*foo").

5

Comma-separated list of names of MessageHeaders to be mapped into the AMQP Message Properties of the AMQP reply message. All standard Headers (e.g., contentType) will be mapped to AMQP Message Properties while user-defined headers will be mapped to the headers property. This can only be provided if the header-mapper reference is not provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "foo*, bar" or "*foo").

6

Message Channel where reply Messages will be expected. Optional.

7

Used to set the receiveTimeout on the underlying org.springframework.integration.core.MessagingTemplate for receiving messages from the reply channel. If not specified this property will default to "1000" (1 second). Only applies if the container thread hands off to another thread before the reply is sent.

8

The customized AmqpTemplate bean reference to have more control for the reply messages to send or you can provide an alternative implementation to the RabbitTemplate.

9

The replyTo org.springframework.amqp.core.Address to be used when the requestMessage doesn’t have replyTo property. If this option isn’t specified, no amqp-template is provided, and no replyTo property exists in the request message, an IllegalStateException is thrown because the reply can’t be routed. If this option isn’t specified, and an external amqp-template is provided, no exception will be thrown. You must either specify this option, or configure a default exchange and routingKey on that template, if you anticipate cases when no replyTo property exists in the request message.

See the note in Section 11.2, “Inbound Channel Adapter” about configuring the listener-container attribute.

11.3.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound gateway using Java configuration:

@SpringBootApplication
public class AmqpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(AmqpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel amqpInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
            @Qualifier("amqpInputChannel") MessageChannel channel) {
        AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
        gateway.setRequestChannel(channel);
        gateway.setDefaultReplyTo("bar");
        return gateway;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container =
                        new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("foo");
        container.setConcurrentConsumers(2);
        // ...
        return container;
    }

    @Bean
    @ServiceActivator(inputChannel = "amqpInputChannel")
    public MessageHandler handler() {
        return new AbstractReplyProducingMessageHandler() {

            @Override
            protected Object handleRequestMessage(Message<?> requestMessage) {
                return "reply to " + requestMessage.getPayload();
            }

        };
    }

}

11.3.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the inbound gateway using the Java DSL:

@SpringBootApplication
public class AmqpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(AmqpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean // return the upper cased payload
    public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo"))
                .transform(String.class, String::toUpperCase)
                .get();
    }

}

11.4 Inbound Endpoint Acknowledge Mode

By default the inbound endpoints use acknowledge mode AUTO, which means the container automatically acks the message when the downstream integration flow completes (or a message is handed off to another thread using a QueueChannel or ExecutorChannel). Setting the mode to NONE configures the consumer such that acks are not used at all (the broker automatically acks the message as soon as it is sent). Setting the mode to`MANUAL` allows user code to ack the message at some other point during processing. To support this, with this mode, the endpoints provide the Channel and deliveryTag in the amqp_channel and amqp_deliveryTag headers respectively.

You can perform any valid rabbit command on the Channel but, generally, only basicAck and basicNack (or basicReject) would be used. In order to not interfere with the operation of the container, you should not retain a reference to the channel and just use it in the context of the current message.

[Note]Note

Since the Channel is a reference to a "live" object, it cannot be serialized and will be lost if a message is persisted.

This is an example of how you might use MANUAL acknowledgement:

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {

    // Do some processing

    if (allOK) {
        channel.basicAck(deliveryTag, false);

        // perhaps do some more processing

    }
    else {
        channel.basicNack(deliveryTag, false, true);
    }
    return someResultForDownStreamProcessing;
}

11.5 Outbound Channel Adapter

A configuration sample for an AMQP Outbound Channel Adapter is shown below.

<int-amqp:outbound-channel-adapter id="outboundAmqp" 1
                               channel="outboundChannel" 2
                               amqp-template="myAmqpTemplate" 3
                               exchange-name="" 4
                               exchange-name-expression="" 5
                               order="1" 6
                               routing-key="" 7
                               routing-key-expression="" 8
                               default-delivery-mode"" 9
                               confirm-correlation-expression="" 10
                               confirm-ack-channel="" 11
                               confirm-nack-channel="" 12
                               return-channel="" 13
                               header-mapper="" 14
                               mapped-request-headers="" 15
                               lazy-connect="true" /> (16)

1

Unique ID for this adapter. Optional.

2

Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required.

3

Bean Reference to the configured AMQP Template Optional (Defaults to "amqpTemplate").

4

The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name-expression. Optional.

5

A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name. Optional.

6

The order for this consumer when multiple consumers are registered thereby enabling load-balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).

7

The fixed routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with routing-key-expression.Optional.

8

A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key). By default, this will be an empty String. Mutually exclusive with routing-key. Optional.

9

The default delivery mode for messages; PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. The DefaultHeaderMapper sets the value if the Spring Integration message header amqp_deliveryMode is present. If this attribute is not supplied and the header mapper doesn’t set it, the default depends on the underlying spring-amqp MessagePropertiesConverter used by the RabbitTemplate. If that is not customized at all, the default is PERSISTENT. Optional.

10

An expression defining correlation data. When provided, this configures the underlying amqp template to receive publisher confirms. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true. When a publisher confirm is received, and correlation data is supplied, it is written to either the confirm-ack-channel, or the confirm-nack-channel, depending on the confirmation type. The payload of the confirm is the correlation data as defined by this expression and the message will have a header amqp_publishConfirm set to true (ack) or false (nack). Examples: "headers['myCorrelationData']", "payload". Starting with version 4.1 the amqp_publishConfirmNackCause message header has been added. It contains the cause of a nack for publisher confirms. Starting with version 4.2, if the expression resolves to a Message<?> instance (such as "#this"), the message emitted on the ack/nack channel is based on that message, with the additional header(s) added. Previously, a new message was created with the correlation data as its payload, regardless of type. Optional.

11

The channel to which positive (ack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Optional, default=nullChannel.

12

The channel to which negative (nack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Optional, default=nullChannel.

13

The channel to which returned messages are sent. When provided, the underlying amqp template is configured to return undeliverable messages to the adapter. The message will be constructed from the data received from amqp, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey. Optional.

14

A reference to an AmqpHeaderMapper to use when sending AMQP Messages. By default only standard AMQP properties (e.g. contentType) will be copied to the Spring Integration MessageHeaders. Any user-defined headers will NOT be copied to the Message by the default`DefaultAmqpHeaderMapper`. Not allowed if request-header-names is provided. Optional.

15

Comma-separated list of names of AMQP Headers to be mapped from the MessageHeaders to the AMQP Message. Not allowed if the header-mapper reference is provided. The values in this list can also be simple patterns to be matched against the header names (e.g. "*" or "foo*, bar" or "*foo").

(16)

When set to false, the endpoint will attempt to connect to the broker during application context initialization. This allows "fail fast" detection of bad configuration, but will also cause initialization to fail if the broker is down. When true (default), the connection is established (if it doesn’t already exist because some other component established it) when the first message is sent.

[Important]return-channel

Using a return-channel requires a RabbitTemplate with the mandatory property set to true, and a CachingConnectionFactory with the publisherReturns property set to true. When using multiple outbound endpoints with returns, a separate RabbitTemplate is needed for each endpoint.

11.5.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the outbound adapter using Java configuration:

@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {

    public static void main(String[] args) {
         ConfigurableApplicationContext context =
              new SpringApplicationBuilder(AmqpJavaApplication.class)
                       .web(false)
                       .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         gateway.sendToRabbit("foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
        AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
    public interface MyGateway {

        void sendToRabbit(String data);

    }

}

11.5.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the outbound adapter using the Java DSL:

@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {

    public static void main(String[] args) {
         ConfigurableApplicationContext context =
                  new SpringApplicationBuilder(AmqpJavaApplication.class)
                          .web(false)
                          .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         gateway.sendToRabbit("foo");
    }

    @Bean
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
        return IntegrationFlows.from(amqpOutboundChannel())
                .handle(Amqp.outboundAdapter(amqpTemplate)
                            .routingKey("foo")) // default exchange - route to queue 'foo'
                .get();
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
    public interface MyGateway {

        void sendToRabbit(String data);

    }
}

11.6 Outbound Gateway

Configuration for an AMQP Outbound Gateway is shown below.

<int-amqp:outbound-gateway id="inboundGateway" 1
                           request-channel="myRequestChannel" 2
                           amqp-template="" 3
                           exchange-name="" 4
                           exchange-name-expression="" 5
                           order="1" 6
                           reply-channel="" 7
                           reply-timeout="" 8
                           requires-reply="" 9
                           routing-key="" 10
                           routing-key-expression="" 11
                           default-delivery-mode"" 12
                           confirm-correlation-expression="" 13
                           confirm-ack-channel="" 14
                           confirm-nack-channel="" 15
                           return-channel="" (16)
                           lazy-connect="true" /> (17)

1

Unique ID for this adapter. Optional.

2

Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required.

3

Bean Reference to the configured AMQP Template Optional (Defaults to "amqpTemplate").

4

The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name-expression. Optional.

5

A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name. Optional.

6

The order for this consumer when multiple consumers are registered thereby enabling load-balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).

7

Message Channel to which replies should be sent after being received from an AMQP Queue and converted.Optional.

8

The time the gateway will wait when sending the reply message to the reply-channel. This only applies if the reply-channel can block - such as a QueueChannel with a capacity limit that is currently full. Default: infinity.

9

When true, the gateway will throw an exception if no reply message is received within the AmqpTemplate's replyTimeout property. Default: true.

10

The routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with routing-key-expression. Optional.

11

A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key). By default, this will be an empty String. Mutually exclusive with routing-key. Optional.

12

The default delivery mode for messages; PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. The DefaultHeaderMapper sets the value if the Spring Integration message header amqp_deliveryMode is present. If this attribute is not supplied and the header mapper doesn’t set it, the default depends on the underlying spring-amqp MessagePropertiesConverter used by the RabbitTemplate. If that is not customized at all, the default is PERSISTENT. Optional.

13

Since version 4.2. An expression defining correlation data. When provided, this configures the underlying amqp template to receive publisher confirms. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true. When a publisher confirm is received, and correlation data is supplied, it is written to either the confirm-ack-channel, or the confirm-nack-channel, depending on the confirmation type. The payload of the confirm is the correlation data as defined by this expression and the message will have a header amqp_publishConfirm set to true (ack) or false (nack). For nacks, an additional header amqp_publishConfirmNackCause is provided. Examples: "headers[myCorrelationData]", "payload". If the expression resolves to a Message<?> instance (such as "#this"), the message emitted on the ack/nack channel is based on that message, with the additional header(s) added. Previously, a new message was created with the correlation data as its payload, regardless of type. Optional.

14

Since version 4.2. The channel to which positive (ack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Optional, default=nullChannel.

15

Since version 4.2. The channel to which negative (nack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Optional, default=nullChannel.

(16)

The channel to which returned messages are sent. When provided, the underlying amqp template is configured to return undeliverable messages to the gateway. The message will be constructed from the data received from amqp, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey. Optional.

(17)

When set to false, the endpoint will attempt to connect to the broker during application context initialization. This allows "fail fast" detection of bad configuration, by logging an error message if the broker is down. When true (default), the connection is established (if it doesn’t already exist because some other component established it) when the first message is sent.

[Important]return-channel

Using a return-channel requires a RabbitTemplate with the mandatory property set to true, and a CachingConnectionFactory with the publisherReturns property set to true. When using multiple outbound endpoints with returns, a separate RabbitTemplate is needed for each endpoint.

[Important]Important

The underlying AmqpTemplate has a default replyTimeout of 5 seconds. If you require a longer timeout, it must be configured on the template.

11.6.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the outbound gateway using Java configuration:

@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {

    public static void main(String[] args) {
         ConfigurableApplicationContext context =
                new SpringApplicationBuilder(AmqpJavaApplication.class)
                       .web(false)
                       .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         String reply = gateway.sendToRabbit("foo");
         System.out.println(reply);
    }

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
        AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
        outbound.setExpectReply(true);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}

Notice that the only difference between the outbound adapter and outbound gateway configuration is the setting of the expectReply property.

11.6.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the outbound adapter using the Java DSL:

@SpringBootApplication
@IntegrationComponentScan
public class AmqpJavaApplication {

    public static void main(String[] args) {
         ConfigurableApplicationContext context =
                 new SpringApplicationBuilder(AmqpJavaApplication.class)
                      .web(false)
                      .run(args);
         RabbitTemplate template = context.getBean(RabbitTemplate.class);
         MyGateway gateway = context.getBean(MyGateway.class);
         String reply = gateway.sendToRabbit("foo");
         System.out.println(reply);
    }

    @Bean
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
        return IntegrationFlows.from(amqpOutboundChannel())
                .handle(Amqp.outboundGateway(amqpTemplate)
                        .routingKey("foo")) // default exchange - route to queue 'foo'
                .get();
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
    public interface MyGateway {

        String sendToRabbit(String data);

    }
}

11.7 Async Outbound Gateway

The gateway discussed in the previous section is synchronous, in that the sending thread is suspended until a reply is received (or a timeout occurs). Spring Integration version 4.3 added this asynchronous gateway, which uses the AsyncRabbitTemplate from Spring AMQP. When a message is sent, the thread returns immediately after the send completes, and the reply is sent on the template’s listener container thread when it is received. This can be useful when the gateway is invoked on a poller thread; the thread is released and is available for other tasks in the framework.

Configuration for an AMQP Async Outbound Gateway is shown below.

<int-amqp:outbound-gateway id="inboundGateway" 1
                           request-channel="myRequestChannel" 2
                           async-template="" 3
                           exchange-name="" 4
                           exchange-name-expression="" 5
                           order="1" 6
                           reply-channel="" 7
                           reply-timeout="" 8
                           requires-reply="" 9
                           routing-key="" 10
                           routing-key-expression="" 11
                           default-delivery-mode"" 12
                           confirm-correlation-expression="" 13
                           confirm-ack-channel="" 14
                           confirm-nack-channel="" 15
                           return-channel="" (16)
                           lazy-connect="true" /> (17)

1

Unique ID for this adapter. Optional.

2

Message Channel to which Messages should be sent in order to have them converted and published to an AMQP Exchange. Required.

3

Bean Reference to the configured AsyncRabbitTemplate Optional (Defaults to "asyncRabbitTemplate").

4

The name of the AMQP Exchange to which Messages should be sent. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name-expression. Optional.

5

A SpEL expression that is evaluated to determine the name of the AMQP Exchange to which Messages should be sent, with the message as the root object. If not provided, Messages will be sent to the default, no-name Exchange. Mutually exclusive with exchange-name. Optional.

6

The order for this consumer when multiple consumers are registered thereby enabling load-balancing and/or failover. Optional (Defaults to Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE]).

7

Message Channel to which replies should be sent after being received from an AMQP Queue and converted. Optional.

8

The time the gateway will wait when sending the reply message to the reply-channel. This only applies if the reply-channel can block - such as a QueueChannel with a capacity limit that is currently full. Default: infinity.

9

When true, the gateway will send an error message to the inbound message’s errorChannel header, if present or otherwise to the default errorChannel (if available), when no reply message is received within the AsyncRabbitTemplate's receiveTimeout property. Default: true.

10

The routing-key to use when sending Messages. By default, this will be an empty String. Mutually exclusive with routing-key-expression. Optional.

11

A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key). By default, this will be an empty String. Mutually exclusive with routing-key. Optional.

12

The default delivery mode for messages; PERSISTENT or NON_PERSISTENT. Overridden if the header-mapper sets the delivery mode. The DefaultHeaderMapper sets the value if the Spring Integration message header amqp_deliveryMode is present. If this attribute is not supplied and the header mapper doesn’t set it, the default depends on the underlying spring-amqp MessagePropertiesConverter used by the RabbitTemplate. If that is not customized at all, the default is PERSISTENT. Optional.

13

An expression defining correlation data. When provided, this configures the underlying amqp template to receive publisher confirms. Requires a dedicated RabbitTemplate and a CachingConnectionFactory with the publisherConfirms property set to true. When a publisher confirm is received, and correlation data is supplied, it is written to either the confirm-ack-channel, or the confirm-nack-channel, depending on the confirmation type. The payload of the confirm is the correlation data as defined by this expression and the message will have a header amqp_publishConfirm set to true (ack) or false (nack). For nacks, an additional header amqp_publishConfirmNackCause is provided. Examples: "headers[myCorrelationData]", "payload". If the expression resolves to a Message<?> instance (such as "#this"), the message emitted on the ack/nack channel is based on that message, with the additional header(s) added. Optional.

14

The channel to which positive (ack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Optional, default=nullChannel.

15

Since version 4.2. The channel to which negative (nack) publisher confirms are sent; payload is the correlation data defined by the confirm-correlation-expression. Requires the underlying AsyncRabbitTemplate to have its enableConfirms property set to true. Optional, default=nullChannel.

(16)

The channel to which returned messages are sent. When provided, the underlying amqp template is configured to return undeliverable messages to the gateway. The message will be constructed from the data received from amqp, with the following additional headers: amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey. Requires the underlying AsyncRabbitTemplate to have its mandatory property set to true. Optional.

(17)

When set to false, the endpoint will attempt to connect to the broker during application context initialization. This allows "fail fast" detection of bad configuration, by logging an error message if the broker is down. When true (default), the connection is established (if it doesn’t already exist because some other component established it) when the first message is sent.

Also see Section 8.4.3, “Asynchronous Service Activator” for more information.

[Important]RabbitTemplate

When using confirms and returns, it is recommended that the RabbitTemplate wired into the AsyncRabbitTemplate be dedicated. Otherwise, unexpected side-effects may be encountered.

11.7.1 Configuring with Java Configuration

The following configuration provides an example of configuring the outbound gateway using Java configuration:

@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AmqpTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {
        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}

11.7.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the outbound adapter using the Java DSL:

// To be supplied when the DSL Amqp factory class adds support for the async gateway.

11.8 Outbound Message Conversion

Spring AMQP 1.4 introduced the ContentTypeDelegatingMessageConverter where the actual converter is selected based on the incoming content type message property. This could be used by inbound endpoints.

Spring Integration version 4.3 now allows the ContentTypeDelegatingMessageConverter to be used on outbound endpoints as well - with the contentType header specifiying which converter will be used.

The following configures a ContentTypeDelegatingMessageConverter with the default converter being the SimpleMessageConverter (which handles java serialization and plain text), together with a JSON converter:

<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
                               exchange-name="someExchange"
                               routing-key="someKey"
                               amqp-template="amqpTemplateContentTypeConverter" />

<int:channel id="ctRequestChannel"/>

<rabbit:template id="amqpTemplateContentTypeConverter"
        connection-factory="connectionFactory" message-converter="ctConverter" />

<bean id="ctConverter"
        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json">
                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
            </entry>
        </map>
    </property>
</bean>

Sending a message to ctRequestChannel with the contentType header set to application/json will cause the JSON converter to be selected.

This applies to both the outbound channel adapter and gateway.

11.9 AMQP Backed Message Channels

There are two Message Channel implementations available. One is point-to-point, and the other is publish/subscribe. Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate and SimpleMessageListenerContainer as you have seen on the Channel Adapters and Gateways. However, the examples we’ll show here are going to have minimal configuration. Explore the XML schema to view the available attributes.

A point-to-point channel would look like this:

<int-amqp:channel id="p2pChannel"/>

Under the covers a Queue named "si.p2pChannel" would be declared, and this channel will send to that Queue (technically by sending to the no-name Direct Exchange with a routing key that matches this Queue’s name). This channel will also register a consumer on that Queue. If you want the channel to be "pollable" instead of message-driven, then simply provide the "message-driven" flag with a value of false:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

A publish/subscribe channel would look like this:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

Under the covers a Fanout Exchange named "si.fanout.pubSubChannel" would be declared, and this channel will send to that Fanout Exchange. This channel will also declare a server-named exclusive, auto-delete, non-durable Queue and bind that to the Fanout Exchange while registering a consumer on that Queue to receive Messages. There is no "pollable" option for a publish-subscribe-channel; it must be message-driven.

Starting with version 4.1 AMQP Backed Message Channels, alongside with channel-transacted, support template-channel-transacted to separate transactional configuration for the AbstractMessageListenerContainer and for the RabbitTemplate. Note, previously, the channel-transacted was true by default, now it changed to false as standard default value for the AbstractMessageListenerContainer.

Prior to version 4.3, AMQP-backed channels only supported messages with Serializable payloads and headers. The entire message was converted (serialized) and sent to RabbitMQ. Now, you can set the extract-payload attribute (or setExtractPayload() when using Java configuration) to true. When this flag is true, the message payload is converted and the headers mapped, in a similar manner to when using channel adapters. This allows AMQP-backed channels to be used with non-serializable payloads (perhaps with another message converter such as the Jackson2JsonMessageConverter). The default mapped headers are discussed in Section 11.10, “AMQP Message Headers”. You can modify the mapping by providing custom mappers using the outbound-header-mapper and inbound-header-mapper attributes. You can now also specify a default-delivery-mode, used to set the delivery mode when there is no amqp_deliveryMode header. By default, Spring AMQP MessageProperties uses PERSISTENT delivery mode.

[Important]Important

Just as with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications; for that purpose, use channel adapters instead.

11.9.1 Configuring with Java Configuration

The following provides an example of configuring the channels using Java configuration:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

11.9.2 Configuring with the Java DSL

The following provides an example of configuring the channels using the Java DSL:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.publisSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}

11.10 AMQP Message Headers

The Spring Integration AMQP Adapters will map all AMQP properties and headers automatically. (This is a change in 4.3 - previously, only standard headers were mapped). These properties will be copied by default to and from Spring Integration MessageHeaders using the DefaultAmqpHeaderMapper.

Of course, you can pass in your own implementation of AMQP specific header mappers, as the adapters have respective properties to support that.

Any user-defined headers within the AMQP MessageProperties WILL be copied to or from an AMQP Message, unless explicitly negated by the requestHeaderNames and/or replyHeaderNames properties of the DefaultAmqpHeaderMapper. For an outbound mapper, no x-* headers are mapped by default; see the caution below for the reason why.

To override the default, and revert to the pre-4.3 behavior, use STANDARD_REQUEST_HEADERS and STANDARD_REPLY_HEADERS in the properties.

[Tip]Tip

When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo") to be matched. * matches all headers.

Starting with version 4.1, the AbstractHeaderMapper (a DefaultAmqpHeaderMapper superclass) allows the NON_STANDARD_HEADERS token to be configured for the requestHeaderNames and/or replyHeaderNames properties (in addition to the existing STANDARD_REQUEST_HEADERS and STANDARD_REPLY_HEADERS) to map all user-defined headers.

Class org.springframework.amqp.support.AmqpHeaders identifies the default headers that will be used by the DefaultAmqpHeaderMapper:

  • amqp_appId
  • amqp_clusterId
  • amqp_contentEncoding
  • amqp_contentLength
  • content-type
  • amqp_correlationId
  • amqp_delay
  • amqp_deliveryMode
  • amqp_deliveryTag
  • amqp_expiration
  • amqp_messageCount
  • amqp_messageId
  • amqp_receivedDelay
  • amqp_receivedDeliveryMode
  • amqp_receivedExchange
  • amqp_receivedRoutingKey
  • amqp_redelivered
  • amqp_replyTo
  • amqp_timestamp
  • amqp_type
  • amqp_userId
  • amqp_publishConfirm
  • amqp_publishConfirmNackCause
  • amqp_returnReplyCode
  • amqp_returnReplyText
  • amqp_returnExchange
  • amqp_returnRoutingKey
[Caution]Caution

As mentioned above, using a header mapping pattern * is a common way to copy all headers. However, this can have some unexpected side-effects because certain RabbitMQ proprietary properties/headers will be copied as well. For example, when you use Federation, the received message may have a property named x-received-from which contains the node that sent the message. If you use the wildcard character * for the request and reply header mapping on the Inbound Gateway, this header will be copied as well, which may cause some issues with federation; this reply message may be federated back to the sending broker, which will think that a message is looping and is thus silently dropped. If you wish to use the convenience of wildcard header mapping, you may need to filter out some headers in the downstream flow. For example, to avoid copying the x-received-from header back to the reply you can use <int:header-filter ... header-names="x-received-from"> before sending the reply to the AMQP Inbound Gateway. Alternatively, you could explicitly list those properties that you actually want mapped instead of using wildcards. For these reasons, for inbound messages, the mapper by default does not map any x-* headers; it also does not map the deliveryMode to amqp_deliveryMode header, to avoid propagation of that header from an inbound message to an outbound message. Instead, this header is mapped to amqp_receivedDeliveryMode, which is not mapped on output.

Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !. Negated patterns get priority, so a list such as STANDARD_REQUEST_HEADERS,foo,ba*,!bar,!baz,qux,!foo will NOT map foo (nor bar nor baz); the standard headers plus bad, qux will be mapped.

[Important]Important

If you have a user defined header that begins with ! that you do wish to map, you need to escape it with \ thus: STANDARD_REQUEST_HEADERS,\!myBangHeader and it WILL be mapped.

11.11 AMQP Samples

To experiment with the AMQP adapters, check out the samples available in the Spring Integration Samples Git repository at:

Currently there is one sample available that demonstrates the basic functionality of the Spring Integration AMQP Adapter using an Outbound Channel Adapter and an Inbound Channel Adapter. As AMQP Broker implementation the sample uses RabbitMQ (http://www.rabbitmq.com/).

[Note]Note

In order to run the example you will need a running instance of RabbitMQ. A local installation with just the basic defaults will be sufficient. For detailed RabbitMQ installation procedures please visit: http://www.rabbitmq.com/install.html

Once the sample application is started, you enter some text on the command prompt and a message containing that entered text is dispatched to the AMQP queue. In return that message is retrieved via Spring Integration and then printed to the console.

The image belows illustrates the basic set of Spring Integration components used in this sample.

Figure 11.1. The Spring Integration graph of the AMQP sample

spring integration amqp sample graph