Connection and Resource Management

Whereas the AMQP model we described in the previous section is generic and applicable to all implementations, when we get into the management of resources, the details are specific to the broker implementation. Therefore, in this section, we focus on code that exists only within our “spring-rabbit” module since, at this point, RabbitMQ is the only supported implementation.

The central component for managing a connection to the RabbitMQ broker is the ConnectionFactory interface. The responsibility of a ConnectionFactory implementation is to provide an instance of org.springframework.amqp.rabbit.connection.Connection, which is a wrapper for com.rabbitmq.client.Connection.

Choosing a Connection Factory

There are three connection factories to chose from

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

The first two were added in version 2.3.

For most use cases, the CachingConnectionFactory should be used. The ThreadChannelConnectionFactory can be used if you want to ensure strict message ordering without the need to use Scoped Operations. The PooledChannelConnectionFactory is similar to the CachingConnectionFactory in that it uses a single connection and a pool of channels. It’s implementation is simpler but it doesn’t support correlated publisher confirmations.

Simple publisher confirmations are supported by all three factories.

When configuring a RabbitTemplate to use a separate connection, you can now, starting with version 2.3.2, configure the publishing connection factory to be a different type. By default, the publishing factory is the same type and any properties set on the main factory are also propagated to the publishing factory.

Starting with version 3.1, the AbstractConnectionFactory includes the connectionCreatingBackOff property, which supports a backoff policy in the connection module. Currently, there is support in the behavior of createChannel() to handle exceptions that occur when the channelMax limit is reached, implementing a backoff strategy based on attempts and intervals.

PooledChannelConnectionFactory

This factory manages a single connection and two pools of channels, based on the Apache Pool2. One pool is for transactional channels, the other is for non-transactional channels. The pools are GenericObjectPool s with default configuration; a callback is provided to configure the pools; refer to the Apache documentation for more information.

The Apache commons-pool2 jar must be on the class path to use this factory.

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

This factory manages a single connection and two ThreadLocal s, one for transactional channels, the other for non-transactional channels. This factory ensures that all operations on the same thread use the same channel (as long as it remains open). This facilitates strict message ordering without the need for Scoped Operations. To avoid memory leaks, if your application uses many short-lived threads, you must call the factory’s closeThreadChannel() to release the channel resource. Starting with version 2.3.7, a thread can transfer its channel(s) to another thread. See Strict Message Ordering in a Multi-Threaded Environment for more information.

CachingConnectionFactory

The third implementation provided is the CachingConnectionFactory, which, by default, establishes a single connection proxy that can be shared by the application. Sharing of the connection is possible since the “unit of work” for messaging with AMQP is actually a “channel” (in some ways, this is similar to the relationship between a connection and a session in JMS). The connection instance provides a createChannel method. The CachingConnectionFactory implementation supports caching of those channels, and it maintains separate caches for channels based on whether they are transactional. When creating an instance of CachingConnectionFactory, you can provide the 'hostname' through the constructor. You should also provide the 'username' and 'password' properties. To configure the size of the channel cache (the default is 25), you can call the setChannelCacheSize() method.

Starting with version 1.3, you can configure the CachingConnectionFactory to cache connections as well as only channels. In this case, each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache (if the cache size has not been reached). Channels created on such connections are also cached. The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members, and others. To cache connections, set the cacheMode to CacheMode.CONNECTION.

This does not limit the number of connections. Rather, it specifies how many idle open connections are allowed.

Starting with version 1.5.5, a new property called connectionLimit is provided. When this property is set, it limits the total number of connections allowed. When set, if the limit is reached, the channelCheckoutTimeLimit is used to wait for a connection to become idle. If the time is exceeded, an AmqpTimeoutException is thrown.

When the cache mode is CONNECTION, automatic declaration of queues and others (See Automatic Declaration of Exchanges, Queues, and Bindings) is NOT supported.

Also, at the time of this writing, the amqp-client library by default creates a fixed thread pool for each connection (default size: Runtime.getRuntime().availableProcessors() * 2 threads). When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. Then, the same executor can be used by all connections and its threads can be shared. The executor’s thread pool should be unbounded or set appropriately for the expected use (usually, at least one thread per connection). If multiple channels are created on each connection, the pool size affects the concurrency, so a variable (or simple cached) thread pool executor would be most suitable.

It is important to understand that the cache size is (by default) not a limit but is merely the number of channels that can be cached. With a cache size of, say, 10, any number of channels can actually be in use. If more than 10 channels are being used and they are all returned to the cache, 10 go in the cache. The remainder are physically closed.

Starting with version 1.6, the default channel cache size has been increased from 1 to 25. In high volume, multi-threaded environments, a small cache means that channels are created and closed at a high rate. Increasing the default cache size can avoid this overhead. You should monitor the channels in use through the RabbitMQ Admin UI and consider increasing the cache size further if you see many channels being created and closed. The cache grows only on-demand (to suit the concurrency requirements of the application), so this change does not impact existing low-volume applications.

Starting with version 1.4.2, the CachingConnectionFactory has a property called channelCheckoutTimeout. When this property is greater than zero, the channelCacheSize becomes a limit on the number of channels that can be created on a connection. If the limit is reached, calling threads block until a channel is available or this timeout is reached, in which case a AmqpTimeoutException is thrown.

Channels used within the framework (for example, RabbitTemplate) are reliably returned to the cache. If you create channels outside of the framework, (for example, by accessing the connections directly and invoking createChannel()), you must return them (by closing) reliably, perhaps in a finally block, to avoid running out of channels.

The following example shows how to create a new connection:

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

When using XML, the configuration might look like the following example:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
There is also a SingleConnectionFactory implementation that is available only in the unit test code of the framework. It is simpler than CachingConnectionFactory, since it does not cache channels, but it is not intended for practical usage outside of simple tests due to its lack of performance and resilience. If you need to implement your own ConnectionFactory for some reason, the AbstractConnectionFactory base class may provide a nice starting point.

A ConnectionFactory can be created quickly and conveniently by using the rabbit namespace, as follows:

<rabbit:connection-factory id="connectionFactory"/>

In most cases, this approach is preferable, since the framework can choose the best defaults for you. The created instance is a CachingConnectionFactory. Keep in mind that the default cache size for channels is 25. If you want more channels to be cached, set a larger value by setting the 'channelCacheSize' property. In XML it would look like as follows:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

Also, with the namespace, you can add the 'channel-cache-size' attribute, as follows:

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

The default cache mode is CHANNEL, but you can configure it to cache connections instead. In the following example, we use connection-cache-size:

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

You can provide host and port attributes by using the namespace, as follows:

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

Alternatively, if running in a clustered environment, you can use the addresses attribute, as follows:

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

See Connecting to a Cluster for information about address-shuffle-mode.

The following example with a custom thread factory that prefixes thread names with rabbitmq-:

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

AddressResolver

Starting with version 2.1.15, you can now use an AddressResolver to resolve the connection address(es). This will override any settings of the addresses and host/port properties.

Naming Connections

Starting with version 1.7, a ConnectionNameStrategy is provided for the injection into the AbstractionConnectionFactory. The generated name is used for the application-specific identification of the target RabbitMQ connection. The connection name is displayed in the management UI if the RabbitMQ server supports it. This value does not have to be unique and cannot be used as a connection identifier — for example, in HTTP API requests. This value is supposed to be human-readable and is a part of ClientProperties under the connection_name key. You can use a simple Lambda, as follows:

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

The ConnectionFactory argument can be used to distinguish target connection names by some logic. By default, the beanName of the AbstractConnectionFactory, a hex string representing the object, and an internal counter are used to generate the connection_name. The <rabbit:connection-factory> namespace component is also supplied with the connection-name-strategy attribute.

An implementation of SimplePropertyValueConnectionNameStrategy sets the connection name to an application property. You can declare it as a @Bean and inject it into the connection factory, as the following example shows:

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

The property must exist in the application context’s Environment.

When using Spring Boot and its autoconfigured connection factory, you need only declare the ConnectionNameStrategy @Bean. Boot auto-detects the bean and wires it into the factory.

Blocked Connections and Resource Constraints

The connection might be blocked for interaction from the broker that corresponds to the Memory Alarm. Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection can be supplied with com.rabbitmq.client.BlockedListener instances to be notified for connection blocked and unblocked events. In addition, the AbstractConnectionFactory emits a ConnectionBlockedEvent and ConnectionUnblockedEvent, respectively, through its internal BlockedListener implementation. These let you provide application logic to react appropriately to problems on the broker and (for example) take some corrective actions.

When the application is configured with a single CachingConnectionFactory, as it is by default with Spring Boot auto-configuration, the application stops working when the connection is blocked by the Broker. And when it is blocked by the Broker, any of its clients stop to work. If we have producers and consumers in the same application, we may end up with a deadlock when producers are blocking the connection (because there are no resources on the Broker any more) and consumers cannot free them (because the connection is blocked). To mitigate the problem, we suggest having one more separate CachingConnectionFactory instance with the same options — one for producers and one for consumers. A separate CachingConnectionFactory is not possible for transactional producers that execute on a consumer thread, since they should reuse the Channel associated with the consumer transactions.

Starting with version 2.0.2, the RabbitTemplate has a configuration option to automatically use a second connection factory, unless transactions are being used. See Using a Separate Connection for more information. The ConnectionNameStrategy for the publisher connection is the same as the primary strategy with .publisher appended to the result of calling the method.

Starting with version 1.7.7, an AmqpResourceNotAvailableException is provided, which is thrown when SimpleConnection.createChannel() cannot create a Channel (for example, because the channelMax limit is reached and there are no available channels in the cache). You can use this exception in the RetryPolicy to recover the operation after some back-off.

Configuring the Underlying Client Connection Factory

The CachingConnectionFactory uses an instance of the Rabbit client ConnectionFactory. A number of configuration properties are passed through (host, port, userName, password, requestedHeartBeat, and connectionTimeout for example) when setting the equivalent property on the CachingConnectionFactory. To set other properties (clientProperties, for example), you can define an instance of the Rabbit factory and provide a reference to it by using the appropriate constructor of the CachingConnectionFactory. When using the namespace (as described earlier), you need to provide a reference to the configured factory in the connection-factory attribute. For convenience, a factory bean is provided to assist in configuring the connection factory in a Spring application context, as discussed in the next section.

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
The 4.0.x client enables automatic recovery by default. While compatible with this feature, Spring AMQP has its own recovery mechanisms and the client recovery feature generally is not needed. We recommend disabling amqp-client automatic recovery, to avoid getting AutoRecoverConnectionNotCurrentlyOpenException instances when the broker is available but the connection has not yet recovered. You may notice this exception, for example, when a RetryTemplate is configured in a RabbitTemplate, even when failing over to another broker in a cluster. Since the auto-recovering connection recovers on a timer, the connection may be recovered more quickly by using Spring AMQP’s recovery mechanisms. Starting with version 1.7.1, Spring AMQP disables amqp-client automatic recovery unless you explicitly create your own RabbitMQ connection factory and provide it to the CachingConnectionFactory. RabbitMQ ConnectionFactory instances created by the RabbitConnectionFactoryBean also have the option disabled by default.

RabbitConnectionFactoryBean and Configuring SSL

Starting with version 1.4, a convenient RabbitConnectionFactoryBean is provided to enable convenient configuration of SSL properties on the underlying client connection factory by using dependency injection. Other setters delegate to the underlying factory. Previously, you had to configure the SSL options programmatically. The following example shows how to configure a RabbitConnectionFactoryBean:

Java
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
Boot application.properties
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
XML
<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

See the RabbitMQ Documentation for information about configuring SSL. Omit the keyStore and trustStore configuration to connect over SSL without certificate validation. The next example shows how you can provide key and trust store configuration.

The sslPropertiesLocation property is a Spring Resource pointing to a properties file containing the following keys:

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

The keyStore and truststore are Spring Resources pointing to the stores. Typically this properties file is secured by the operating system with the application having read access.

Starting with Spring AMQP version 1.5,you can set these properties directly on the factory bean. If both discrete properties and sslPropertiesLocation is provided, properties in the latter override the discrete values.

Starting with version 2.0, the server certificate is validated by default because it is more secure. If you wish to skip this validation for some reason, set the factory bean’s skipServerCertificateValidation property to true. Starting with version 2.1, the RabbitConnectionFactoryBean now calls enableHostnameVerification() by default. To revert to the previous behavior, set the enableHostnameVerification property to false.
Starting with version 2.2.5, the factory bean will always use TLS v1.2 by default; previously, it used v1.1 in some cases and v1.2 in others (depending on other properties). If you need to use v1.1 for some reason, set the sslAlgorithm property: setSslAlgorithm("TLSv1.1").

Connecting to a Cluster

To connect to a cluster, configure the addresses property on the CachingConnectionFactory:

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

Starting with version 3.0, the underlying connection factory will attempt to connect to a host, by choosing a random address, whenever a new connection is established. To revert to the previous behavior of attempting to connect from first to last, set the addressShuffleMode property to AddressShuffleMode.NONE.

Starting with version 2.3, the INORDER shuffle mode was added, which means the first address is moved to the end after a connection is created. You may wish to use this mode with the RabbitMQ Sharding Plugin with CacheMode.CONNECTION and suitable concurrency if you wish to consume from all shards on all nodes.

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

Routing Connection Factory

Starting with version 1.3, the AbstractRoutingConnectionFactory has been introduced. This factory provides a mechanism to configure mappings for several ConnectionFactories and determine a target ConnectionFactory by some lookupKey at runtime. Typically, the implementation checks a thread-bound context. For convenience, Spring AMQP provides the SimpleRoutingConnectionFactory, which gets the current thread-bound lookupKey from the SimpleResourceHolder. The following examples shows how to configure a SimpleRoutingConnectionFactory in both XML and Java:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

It is important to unbind the resource after use. For more information, see the JavaDoc for AbstractRoutingConnectionFactory.

Starting with version 1.4, RabbitTemplate supports the SpEL sendConnectionFactorySelectorExpression and receiveConnectionFactorySelectorExpression properties, which are evaluated on each AMQP protocol interaction operation (send, sendAndReceive, receive, or receiveAndReply), resolving to a lookupKey value for the provided AbstractRoutingConnectionFactory. You can use bean references, such as @vHostResolver.getVHost(#root) in the expression. For send operations, the message to be sent is the root evaluation object. For receive operations, the queueName is the root evaluation object.

The routing algorithm is as follows: If the selector expression is null or is evaluated to null or the provided ConnectionFactory is not an instance of AbstractRoutingConnectionFactory, everything works as before, relying on the provided ConnectionFactory implementation. The same occurs if the evaluation result is not null, but there is no target ConnectionFactory for that lookupKey and the AbstractRoutingConnectionFactory is configured with lenientFallback = true. In the case of an AbstractRoutingConnectionFactory, it does fallback to its routing implementation based on determineCurrentLookupKey(). However, if lenientFallback = false, an IllegalStateException is thrown.

The namespace support also provides the send-connection-factory-selector-expression and receive-connection-factory-selector-expression attributes on the <rabbit:template> component.

Also, starting with version 1.4, you can configure a routing connection factory in a listener container. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with setQueueNames("thing1", "thing2"), the lookup key is [thing1,thing]" (note that there is no space in the key).

Starting with version 1.6.9, you can add a qualifier to the lookup key by using setLookupKeyQualifier on the listener container. Doing so enables, for example, listening to queues with the same name but in a different virtual host (where you would have a connection factory for each).

For example, with lookup key qualifier thing1 and a container listening to queue thing2, the lookup key you could register the target connection factory with could be thing1[thing2].

The target (and default, if provided) connection factories must have the same settings for publisher confirms and returns. See Publisher Confirms and Returns.

Starting with version 2.4.4, this validation can be disabled. If you have a case that the values between confirms and returns need to be unequal, you can use AbstractRoutingConnectionFactory#setConsistentConfirmsReturns to turn of the validation. Note that the first connection factory added to AbstractRoutingConnectionFactory will determine the general values of confirms and returns.

It may be useful if you have a case that certain messages you would to check confirms/returns and others you don’t. For example:

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

This way messages with the header x-use-publisher-confirms: true will be sent through the caching connection, and you can ensure the message delivery. See Publisher Confirms and Returns for more information about ensuring message delivery.

Queue Affinity and the LocalizedQueueConnectionFactory

When using HA queues in a cluster, for the best performance, you may want to connect to the physical broker where the lead queue resides. The CachingConnectionFactory can be configured with multiple broker addresses. This is to fail over and the client attempts to connect in accordance with the configured AddressShuffleMode order. The LocalizedQueueConnectionFactory uses the REST API provided by the management plugin to determine which node is the lead for the queue. It then creates (or retrieves from a cache) a CachingConnectionFactory that connects to just that node. If the connection fails, the new lead node is determined and the consumer connects to it. The LocalizedQueueConnectionFactory is configured with a default connection factory, in case the physical location of the queue cannot be determined, in which case it connects as normal to the cluster.

The LocalizedQueueConnectionFactory is a RoutingConnectionFactory and the SimpleMessageListenerContainer uses the queue names as the lookup key as discussed in Routing Connection Factory above.

For this reason (the use of the queue name for the lookup), the LocalizedQueueConnectionFactory can only be used if the container is configured to listen to a single queue.
The RabbitMQ management plugin must be enabled on each node.
This connection factory is intended for long-lived connections, such as those used by the SimpleMessageListenerContainer. It is not intended for short connection use, such as with a RabbitTemplate because of the overhead of invoking the REST API before making the connection. Also, for publish operations, the queue is unknown, and the message is published to all cluster members anyway, so the logic of looking up the node has little value.

The following example configuration shows how to configure the factories:

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

Notice that the first three parameters are arrays of addresses, adminUris, and nodes. These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node.

Starting with version 3.0, the RabbitMQ http-client is no longer used to access the Rest API. Instead, by default, the WebClient from Spring Webflux is used if spring-webflux is on the class path; otherwise a RestTemplate is used.

To add WebFlux to the class path:

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

You can also use other REST technology by implementing LocalizedQueueConnectionFactory.NodeLocator and overriding its createClient, ``restCall, and optionally, close methods.

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public HashMap<String, Object> restCall(MyClient client, URI uri) {
        ...
    });

});

The framework provides the WebFluxNodeLocator and RestTemplateNodeLocator, with the default as discussed above.

Publisher Confirms and Returns

Confirmed (with correlation) and returned messages are supported by setting the CachingConnectionFactory property publisherConfirmType to ConfirmType.CORRELATED and the publisherReturns property to 'true'.

When these options are set, Channel instances created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm or return to the appropriate listener. These features are explained further in the following sections.

See also Correlated Publisher Confirms and Returns and simplePublisherConfirms in Scoped Operations.

For some more background information, see the blog post by the RabbitMQ team titled Introducing Publisher Confirms.

Connection and Channel Listeners

The connection factory supports registering ConnectionListener and ChannelListener implementations. This allows you to receive notifications for connection and channel related events. (A ConnectionListener is used by the RabbitAdmin to perform declarations when the connection is established - see Automatic Declaration of Exchanges, Queues, and Bindings for more information). The following listing shows the ConnectionListener interface definition:

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection object can be supplied with com.rabbitmq.client.BlockedListener instances to be notified for connection blocked and unblocked events. The following example shows the ChannelListener interface definition:

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

See Publishing is Asynchronous — How to Detect Successes and Failures for one scenario where you might want to register a ChannelListener.

Logging Channel Close Events

Version 1.5 introduced a mechanism to enable users to control logging levels.

The AbstractConnectionFactory uses a default strategy to log channel closures as follows:

  • Normal channel closes (200 OK) are not logged.

  • If a channel is closed due to a failed passive queue declaration, it is logged at DEBUG level.

  • If a channel is closed because the basic.consume is refused due to an exclusive consumer condition, it is logged at DEBUG level (since 3.1, previously INFO).

  • All others are logged at ERROR level.

To modify this behavior, you can inject a custom ConditionalExceptionLogger into the CachingConnectionFactory in its closeExceptionLogger property.

Also, the AbstractConnectionFactory.DefaultChannelCloseLogger is now public, allowing it to be sub classed.

See also Consumer Events.

Runtime Cache Properties

Staring with version 1.6, the CachingConnectionFactory now provides cache statistics through the getCacheProperties() method. These statistics can be used to tune the cache to optimize it in production. For example, the high water marks can be used to determine whether the cache size should be increased. If it equals the cache size, you might want to consider increasing further. The following table describes the CacheMode.CHANNEL properties:

Table 1. Cache properties for CacheMode.CHANNEL
Property Meaning
connectionName

The name of the connection generated by the ConnectionNameStrategy.

channelCacheSize

The currently configured maximum channels that are allowed to be idle.

localPort

The local port for the connection (if available). This can be used to correlate with connections and channels on the RabbitMQ Admin UI.

idleChannelsTx

The number of transactional channels that are currently idle (cached).

idleChannelsNotTx

The number of non-transactional channels that are currently idle (cached).

idleChannelsTxHighWater

The maximum number of transactional channels that have been concurrently idle (cached).

idleChannelsNotTxHighWater

The maximum number of non-transactional channels have been concurrently idle (cached).

The following table describes the CacheMode.CONNECTION properties:

Table 2. Cache properties for CacheMode.CONNECTION
Property Meaning
connectionName:<localPort>

The name of the connection generated by the ConnectionNameStrategy.

openConnections

The number of connection objects representing connections to brokers.

channelCacheSize

The currently configured maximum channels that are allowed to be idle.

connectionCacheSize

The currently configured maximum connections that are allowed to be idle.

idleConnections

The number of connections that are currently idle.

idleConnectionsHighWater

The maximum number of connections that have been concurrently idle.

idleChannelsTx:<localPort>

The number of transactional channels that are currently idle (cached) for this connection. You can use the localPort part of the property name to correlate with connections and channels on the RabbitMQ Admin UI.

idleChannelsNotTx:<localPort>

The number of non-transactional channels that are currently idle (cached) for this connection. The localPort part of the property name can be used to correlate with connections and channels on the RabbitMQ Admin UI.

idleChannelsTxHighWater:<localPort>

The maximum number of transactional channels that have been concurrently idle (cached). The localPort part of the property name can be used to correlate with connections and channels on the RabbitMQ Admin UI.

idleChannelsNotTxHighWater:<localPort>

The maximum number of non-transactional channels have been concurrently idle (cached). You can use the localPort part of the property name to correlate with connections and channels on the RabbitMQ Admin UI.

The cacheMode property (CHANNEL or CONNECTION) is also included.

cacheStats
Figure 1. JVisualVM Example

RabbitMQ Automatic Connection/Topology recovery

Since the first version of Spring AMQP, the framework has provided its own connection and channel recovery in the event of a broker failure. Also, as discussed in Configuring the Broker, the RabbitAdmin re-declares any infrastructure beans (queues and others) when the connection is re-established. It therefore does not rely on the auto-recovery that is now provided by the amqp-client library. The amqp-client, has auto recovery enabled by default. There are some incompatibilities between the two recovery mechanisms so, by default, Spring sets the automaticRecoveryEnabled property on the underlying RabbitMQ connectionFactory to false. Even if the property is true, Spring effectively disables it, by immediately closing any recovered connections.

By default, only elements (queues, exchanges, bindings) that are defined as beans will be re-declared after a connection failure. See Recovering Auto-Delete Declarations for how to change that behavior.