AMQP-backed Message Channels
There are two message channel implementations available.
One is point-to-point, and the other is publish-subscribe.
Both of these channels provide a wide range of configuration attributes for the underlying AmqpTemplate
and
SimpleMessageListenerContainer
(as shown earlier in this chapter for the channel adapters and gateways).
However, the examples we show here have minimal configuration.
Explore the XML schema to view the available attributes.
A point-to-point channel might look like the following example:
<int-amqp:channel id="p2pChannel"/>
Under the covers, the preceding example causes a Queue
named si.p2pChannel
to be declared, and this channel sends to that Queue
(technically, by sending to the no-name direct exchange with a routing key that matches the name of this Queue
).
This channel also registers a consumer on that Queue
.
If you want the channel to be “pollable” instead of message-driven, provide the message-driven
flag with a value of false
, as the following example shows:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
A publish-subscribe channel might look like the following:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
Under the covers, the preceding example causes a fanout exchange named si.fanout.pubSubChannel
to be declared, and this channel sends to that fanout exchange.
This channel also declares a server-named exclusive, auto-delete, non-durable Queue
and binds that to the fanout exchange while registering a consumer on that Queue
to receive messages.
There is no “pollable” option for a publish-subscribe-channel.
It must be message-driven.
Starting with version 4.1, AMQP-backed message channels (in conjunction with channel-transacted
) support
template-channel-transacted
to separate transactional
configuration for the AbstractMessageListenerContainer
and
for the RabbitTemplate
.
Note that, previously, channel-transacted
was true
by default.
Now, by default, it is false
for the AbstractMessageListenerContainer
.
Prior to version 4.3, AMQP-backed channels only supported messages with Serializable
payloads and headers.
The entire message was converted (serialized) and sent to RabbitMQ.
Now, you can set the extract-payload
attribute (or setExtractPayload()
when using Java configuration) to true
.
When this flag is true
, the message payload is converted and the headers are mapped, in a manner similar to when you use channel adapters.
This arrangement lets AMQP-backed channels be used with non-serializable payloads (perhaps with another message converter, such as the Jackson2JsonMessageConverter
).
See AMQP Message Headers for more about the default mapped headers.
You can modify the mapping by providing custom mappers that use the outbound-header-mapper
and inbound-header-mapper
attributes.
You can now also specify a default-delivery-mode
, which is used to set the delivery mode when there is no amqp_deliveryMode
header.
By default, Spring AMQP MessageProperties
uses PERSISTENT
delivery mode.
As with other persistence-backed channels, AMQP-backed channels are intended to provide message persistence to avoid message loss. They are not intended to distribute work to other peer applications. For that purpose, use channel adapters instead. |
Starting with version 5.0, the pollable channel now blocks the poller thread for the specified receiveTimeout (the default is 1 second).
Previously, unlike other PollableChannel implementations, the thread returned immediately to the scheduler if no message was available, regardless of the receive timeout.
Blocking is a little more expensive than using a basicGet() to retrieve a message (with no timeout), because a consumer has to be created to receive each message.
To restore the previous behavior, set the poller’s receiveTimeout to 0.
|
Configuring with Java Configuration
The following example shows how to configure the channels with Java configuration:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
Configuring with the Java DSL
The following example shows how to configure the channels with the Java DSL:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}