AMQP-backed Message Channels

There are two message channel implementations available. One is point-to-point, and the other is publish-subscribe. Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate and SimpleMessageListenerContainer (as shown earlier in this chapter for the channel adapters and gateways). However, the examples we show here have minimal configuration. Explore the XML schema to view the available attributes.

A point-to-point channel might look like the following example:

<int-amqp:channel id="p2pChannel"/>

Under the covers, the preceding example causes a Queue named si.p2pChannel to be declared, and this channel sends to that Queue (technically, by sending to the no-name direct exchange with a routing key that matches the name of this Queue). This channel also registers a consumer on that Queue. If you want the channel to be “pollable” instead of message-driven, provide the message-driven flag with a value of false, as the following example shows:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

A publish-subscribe channel might look like the following:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

Under the covers, the preceding example causes a fanout exchange named si.fanout.pubSubChannel to be declared, and this channel sends to that fanout exchange. This channel also declares a server-named exclusive, auto-delete, non-durable Queue and binds that to the fanout exchange while registering a consumer on that Queue to receive messages. There is no “pollable” option for a publish-subscribe-channel. It must be message-driven.

Starting with version 4.1, AMQP-backed message channels (in conjunction with channel-transacted) support template-channel-transacted to separate transactional configuration for the AbstractMessageListenerContainer and for the RabbitTemplate. Note that, previously, channel-transacted was true by default. Now, by default, it is false for the AbstractMessageListenerContainer.

Prior to version 4.3, AMQP-backed channels only supported messages with Serializable payloads and headers. The entire message was converted (serialized) and sent to RabbitMQ. Now, you can set the extract-payload attribute (or setExtractPayload() when using Java configuration) to true. When this flag is true, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters. This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the Jackson2JsonMessageConverter). See AMQP Message Headers for more about the default mapped headers. You can modify the mapping by providing custom mappers that use the outbound-header-mapper and inbound-header-mapper attributes. You can now also specify a default-delivery-mode, which is used to set the delivery mode when there is no amqp_deliveryMode header. By default, Spring AMQP MessageProperties uses PERSISTENT delivery mode.

As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications. For that purpose, use channel adapters instead.
Starting with version 5.0, the pollable channel now blocks the poller thread for the specified receiveTimeout (the default is 1 second). Previously, unlike other PollableChannel implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout. Blocking is a little more expensive than using a basicGet() to retrieve a message (with no timeout), because a consumer has to be created to receive each message. To restore the previous behavior, set the poller’s receiveTimeout to 0.

Configuring with Java Configuration

The following example shows how to configure the channels with Java configuration:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

Configuring with the Java DSL

The following example shows how to configure the channels with the Java DSL:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}