Threading and Asynchronous Consumers

A number of different threads are involved with asynchronous consumers.

Threads from the TaskExecutor configured in the SimpleMessageListenerContainer are used to invoke the MessageListener when a new message is delivered by RabbitMQ Client. If not configured, a SimpleAsyncTaskExecutor is used. If you use a pooled executor, you need to ensure the pool size is sufficient to handle the configured concurrency. With the DirectMessageListenerContainer, the MessageListener is invoked directly on a RabbitMQ Client thread. In this case, the taskExecutor is used for the task that monitors the consumers.

When using the default SimpleAsyncTaskExecutor, for the threads the listener is invoked on, the listener container beanName is used in the threadNamePrefix. This is useful for log analysis. We generally recommend always including the thread name in the logging appender configuration. When a TaskExecutor is specifically provided through the taskExecutor property on the container, it is used as is, without modification. It is recommended that you use a similar technique to name the threads created by a custom TaskExecutor bean definition, to aid with thread identification in log messages.

The Executor configured in the CachingConnectionFactory is passed into the RabbitMQ Client when creating the connection, and its threads are used to deliver new messages to the listener container. If this is not configured, the client uses an internal thread pool executor with (at the time of writing) a pool size of Runtime.getRuntime().availableProcessors() * 2 for each connection.

If you have a large number of factories or are using CacheMode.CONNECTION, you may wish to consider using a shared ThreadPoolTaskExecutor with enough threads to satisfy your workload.

With the DirectMessageListenerContainer, you need to ensure that the connection factory is configured with a task executor that has sufficient threads to support your desired concurrency across all listener containers that use that factory. The default pool size (at the time of writing) is Runtime.getRuntime().availableProcessors() * 2.

The RabbitMQ client uses a ThreadFactory to create threads for low-level I/O (socket) operations. To modify this factory, you need to configure the underlying RabbitMQ ConnectionFactory, as discussed in Configuring the Underlying Client Connection Factory.