AmqpTemplate

As with many other high-level abstractions provided by the Spring Framework and related projects, Spring AMQP provides a “template” that plays a central role. The interface that defines the main operations is called AmqpTemplate. Those operations cover the general behavior for sending and receiving messages. In other words, they are not unique to any implementation — hence the “AMQP” in the name. On the other hand, there are implementations of that interface that are tied to implementations of the AMQP protocol. Unlike JMS, which is an interface-level API itself, AMQP is a wire-level protocol. The implementations of that protocol provide their own client libraries, so each implementation of the template interface depends on a particular client library. Currently, there is only a single implementation: RabbitTemplate. In the examples that follow, we often use an AmqpTemplate. However, when you look at the configuration examples or any code excerpts where the template is instantiated or setters are invoked, you can see the implementation type (for example, RabbitTemplate).

As mentioned earlier, the AmqpTemplate interface defines all the basic operations for sending and receiving messages. We will explore message sending and reception, respectively, in Sending Messages and Receiving Messages.

Adding Retry Capabilities

Starting with version 1.3, you can now configure the RabbitTemplate to use a RetryTemplate to help with handling problems with broker connectivity. See the spring-retry project for complete information. The following is only one example that uses an exponential back off policy and the default SimpleRetryPolicy, which makes three tries before throwing the exception to the caller.

The following example uses the XML namespace:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

The following example uses the @Configuration annotation in Java:

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

Starting with version 1.4, in addition to the retryTemplate property, the recoveryCallback option is supported on the RabbitTemplate. It is used as a second argument for the RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback).

The RecoveryCallback is somewhat limited, in that the retry context contains only the lastThrowable field. For more sophisticated use cases, you should use an external RetryTemplate so that you can convey additional information to the RecoveryCallback through the context’s attributes. The following example shows how to do so:
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

In this case, you would not inject a RetryTemplate into the RabbitTemplate.

Publishing is Asynchronous — How to Detect Successes and Failures

Publishing messages is an asynchronous mechanism and, by default, messages that cannot be routed are dropped by RabbitMQ. For successful publishing, you can receive an asynchronous confirm, as described in Correlated Publisher Confirms and Returns. Consider two failure scenarios:

  • Publish to an exchange but there is no matching destination queue.

  • Publish to a non-existent exchange.

The first case is covered by publisher returns, as described in Correlated Publisher Confirms and Returns.

For the second case, the message is dropped and no return is generated. The underlying channel is closed with an exception. By default, this exception is logged, but you can register a ChannelListener with the CachingConnectionFactory to obtain notifications of such events. The following example shows how to add a ConnectionListener:

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

You can examine the signal’s reason property to determine the problem that occurred.

To detect the exception on the sending thread, you can setChannelTransacted(true) on the RabbitTemplate and the exception is detected on the txCommit(). However, transactions significantly impede performance, so consider this carefully before enabling transactions for just this one use case.

Correlated Publisher Confirms and Returns

The RabbitTemplate implementation of AmqpTemplate supports publisher confirms and returns.

For returned messages, the template’s mandatory property must be set to true or the mandatory-expression must evaluate to true for a particular message. This feature requires a CachingConnectionFactory that has its publisherReturns property set to true (see Publisher Confirms and Returns). Returns are sent to the client by it registering a RabbitTemplate.ReturnsCallback by calling setReturnsCallback(ReturnsCallback callback). The callback must implement the following method:

void returnedMessage(ReturnedMessage returned);

The ReturnedMessage has the following properties:

  • message - the returned message itself

  • replyCode - a code indicating the reason for the return

  • replyText - a textual reason for the return - e.g. NO_ROUTE

  • exchange - the exchange to which the message was sent

  • routingKey - the routing key that was used

Only one ReturnsCallback is supported by each RabbitTemplate. See also Reply Timeout.

For publisher confirms (also known as publisher acknowledgements), the template requires a CachingConnectionFactory that has its publisherConfirm property set to ConfirmType.CORRELATED. Confirms are sent to the client by it registering a RabbitTemplate.ConfirmCallback by calling setConfirmCallback(ConfirmCallback callback). The callback must implement this method:

void confirm(CorrelationData correlationData, boolean ack, String cause);

The CorrelationData is an object supplied by the client when sending the original message. The ack is true for an ack and false for a nack. For nack instances, the cause may contain a reason for the nack, if it is available when the nack is generated. An example is when sending a message to a non-existent exchange. In that case, the broker closes the channel. The reason for the closure is included in the cause. The cause was added in version 1.4.

Only one ConfirmCallback is supported by a RabbitTemplate.

When a rabbit template send operation completes, the channel is closed. This precludes the reception of confirms or returns when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns and confirms proceed normally). When the cache is full, the framework defers the close for up to five seconds, in order to allow time for the confirms and returns to be received. When using confirms, the channel is closed when the last confirm is received. When using only returns, the channel remains open for the full five seconds. We generally recommend setting the connection factory’s channelCacheSize to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed. You can monitor channel usage by using the RabbitMQ management plugin. If you see channels being opened and closed rapidly, you should consider increasing the cache size to reduce overhead on the server.
Before version 2.1, channels enabled for publisher confirms were returned to the cache before the confirms were received. Some other process could check out the channel and perform some operation that causes the channel to close — such as publishing a message to a non-existent exchange. This could cause the confirm to be lost. Version 2.1 and later no longer return the channel to the cache while confirms are outstanding. The RabbitTemplate performs a logical close() on the channel after each operation. In general, this means that only one confirm is outstanding on a channel at a time.
Starting with version 2.2, the callbacks are invoked on one of the connection factory’s executor threads. This is to avoid a potential deadlock if you perform Rabbit operations from within the callback. With previous versions, the callbacks were invoked directly on the amqp-client connection I/O thread; this would deadlock if you perform some RPC operation (such as opening a new channel) since the I/O thread blocks waiting for the result, but the result needs to be processed by the I/O thread itself. With those versions, it was necessary to hand off work (such as sending a message) to another thread within the callback. This is no longer necessary since the framework now hands off the callback invocation to the executor.
The guarantee of receiving a returned message before the ack is still maintained as long as the return callback executes in 60 seconds or less. The confirm is scheduled to be delivered after the return callback exits or after 60 seconds, whichever comes first.

The CorrelationData object has a CompletableFuture that you can use to get the result, instead of using a ConfirmCallback on the template. The following example shows how to configure a CorrelationData instance:

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

Since it is a CompletableFuture<Confirm>, you can either get() the result when ready or use whenComplete() for an asynchronous callback. The Confirm object is a simple bean with 2 properties: ack and reason (for nack instances). The reason is not populated for broker-generated nack instances. It is populated for nack instances generated by the framework (for example, closing the connection while ack instances are outstanding).

In addition, when both confirms and returns are enabled, the CorrelationData return property is populated with the returned message, if it couldn’t be routed to any queue. It is guaranteed that the returned message property is set before the future is set with the ack. CorrelationData.getReturn() returns a ReturnMessage with properties:

  • message (the returned message)

  • replyCode

  • replyText

  • exchange

  • routingKey

See also Scoped Operations for a simpler mechanism for waiting for publisher confirms.

Scoped Operations

Normally, when using the template, a Channel is checked out of the cache (or created), used for the operation, and returned to the cache for reuse. In a multi-threaded environment, there is no guarantee that the next operation uses the same channel. There may be times, however, where you want to have more control over the use of a channel and ensure that a number of operations are all performed on the same channel.

Starting with version 2.0, a new method called invoke is provided, with an OperationsCallback. Any operations performed within the scope of the callback and on the provided RabbitOperations argument use the same dedicated Channel, which will be closed at the end (not returned to a cache). If the channel is a PublisherCallbackChannel, it is returned to the cache after all confirms have been received (see Correlated Publisher Confirms and Returns).

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

One example of why you might need this is if you wish to use the waitForConfirms() method on the underlying Channel. This method was not previously exposed by the Spring API because the channel is, generally, cached and shared, as discussed earlier. The RabbitTemplate now provides waitForConfirms(long timeout) and waitForConfirmsOrDie(long timeout), which delegate to the dedicated channel used within the scope of the OperationsCallback. The methods cannot be used outside of that scope, for obvious reasons.

Note that a higher-level abstraction that lets you correlate confirms to requests is provided elsewhere (see Correlated Publisher Confirms and Returns). If you want only to wait until the broker has confirmed delivery, you can use the technique shown in the following example:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

If you wish RabbitAdmin operations to be invoked on the same channel within the scope of the OperationsCallback, the admin must have been constructed by using the same RabbitTemplate that was used for the invoke operation.

The preceding discussion is moot if the template operations are already performed within the scope of an existing transaction — for example, when running on a transacted listener container thread and performing operations on a transacted template. In that case, the operations are performed on that channel and committed when the thread returns to the container. It is not necessary to use invoke in that scenario.

When using confirms in this way, much of the infrastructure set up for correlating confirms to requests is not really needed (unless returns are also enabled). Starting with version 2.2, the connection factory supports a new property called publisherConfirmType. When this is set to ConfirmType.SIMPLE, the infrastructure is avoided and the confirm processing can be more efficient.

Furthermore, the RabbitTemplate sets the publisherSequenceNumber property in the sent message MessageProperties. If you wish to check (or log or otherwise use) specific confirms, you can do so with an overloaded invoke method, as the following example shows:

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
These ConfirmCallback objects (for ack and nack instances) are the Rabbit client callbacks, not the template callback.

The following example logs ack and nack instances:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
Scoped operations are bound to a thread. See Strict Message Ordering in a Multi-Threaded Environment for a discussion about strict ordering in a multi-threaded environment.

Strict Message Ordering in a Multi-Threaded Environment

The discussion in Scoped Operations applies only when the operations are performed on the same thread.

Consider the following situation:

  • thread-1 sends a message to a queue and hands off work to thread-2

  • thread-2 sends a message to the same queue

Because of the async nature of RabbitMQ and the use of cached channels; it is not certain that the same channel will be used and therefore the order in which the messages arrive in the queue is not guaranteed. (In most cases they will arrive in order, but the probability of out-of-order delivery is not zero). To solve this use case, you can use a bounded channel cache with size 1 (together with a channelCheckoutTimeout) to ensure the messages are always published on the same channel, and order will be guaranteed. To do this, if you have other uses for the connection factory, such as consumers, you should either use a dedicated connection factory for the template, or configure the template to use the publisher connection factory embedded in the main connection factory (see Using a Separate Connection).

This is best illustrated with a simple Spring Boot Application:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

Even though the publishing is performed on two different threads, they will both use the same channel because the cache is capped at a single channel.

Starting with version 2.3.7, the ThreadChannelConnectionFactory supports transferring a thread’s channel(s) to another thread, using the prepareContextSwitch and switchContext methods. The first method returns a context which is passed to the second thread which calls the second method. A thread can have either a non-transactional channel or a transactional channel (or one of each) bound to it; you cannot transfer them individually, unless you use two connection factories. An example follows:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
Once the prepareSwitchContext is called, if the current thread performs any more operations, they will be performed on a new channel. It is important to close the thread-bound channel when it is no longer needed.

Messaging Integration

Starting with version 1.4, RabbitMessagingTemplate (built on top of RabbitTemplate) provides an integration with the Spring Framework messaging abstraction — that is, org.springframework.messaging.Message. This lets you send and receive messages by using the spring-messaging Message<?> abstraction. This abstraction is used by other Spring projects, such as Spring Integration and Spring’s STOMP support. There are two message converters involved: one to convert between a spring-messaging Message<?> and Spring AMQP’s Message abstraction and one to convert between Spring AMQP’s Message abstraction and the format required by the underlying RabbitMQ client library. By default, the message payload is converted by the provided RabbitTemplate instance’s message converter. Alternatively, you can inject a custom MessagingMessageConverter with some other payload converter, as the following example shows:

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

Validated User Id

Starting with version 1.6, the template now supports a user-id-expression (userIdExpression when using Java configuration). If a message is sent, the user id property is set (if not already set) after evaluating this expression. The root object for the evaluation is the message to be sent.

The following examples show how to use the user-id-expression attribute:

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

The first example is a literal expression. The second obtains the username property from a connection factory bean in the application context.

Using a Separate Connection

Starting with version 2.0.2, you can set the usePublisherConnection property to true to use a different connection to that used by listener containers, when possible. This is to avoid consumers being blocked when a producer is blocked for any reason. The connection factories maintain a second internal connection factory for this purpose; by default it is the same type as the main factory, but can be set explicitly if you wish to use a different factory type for publishing. If the rabbit template is running in a transaction started by the listener container, the container’s channel is used, regardless of this setting.

In general, you should not use a RabbitAdmin with a template that has this set to true. Use the RabbitAdmin constructor that takes a connection factory. If you use the other constructor that takes a template, ensure the template’s property is false. This is because, often, an admin is used to declare queues for listener containers. Using a template that has the property set to true would mean that exclusive queues (such as AnonymousQueue) would be declared on a different connection to that used by listener containers. In that case, the queues cannot be used by the containers.