Threading and Asynchronous Consumers
A number of different threads are involved with asynchronous consumers.
Threads from the TaskExecutor
configured in the SimpleMessageListenerContainer
are used to invoke the MessageListener
when a new message is delivered by RabbitMQ Client
.
If not configured, a SimpleAsyncTaskExecutor
is used.
If you use a pooled executor, you need to ensure the pool size is sufficient to handle the configured concurrency.
With the DirectMessageListenerContainer
, the MessageListener
is invoked directly on a RabbitMQ Client
thread.
In this case, the taskExecutor
is used for the task that monitors the consumers.
When using the default SimpleAsyncTaskExecutor , for the threads the listener is invoked on, the listener container beanName is used in the threadNamePrefix .
This is useful for log analysis.
We generally recommend always including the thread name in the logging appender configuration.
When a TaskExecutor is specifically provided through the taskExecutor property on the container, it is used as is, without modification.
It is recommended that you use a similar technique to name the threads created by a custom TaskExecutor bean definition, to aid with thread identification in log messages.
|
The Executor
configured in the CachingConnectionFactory
is passed into the RabbitMQ Client
when creating the connection, and its threads are used to deliver new messages to the listener container.
If this is not configured, the client uses an internal thread pool executor with (at the time of writing) a pool size of Runtime.getRuntime().availableProcessors() * 2
for each connection.
If you have a large number of factories or are using CacheMode.CONNECTION
, you may wish to consider using a shared ThreadPoolTaskExecutor
with enough threads to satisfy your workload.
With the DirectMessageListenerContainer , you need to ensure that the connection factory is configured with a task executor that has sufficient threads to support your desired concurrency across all listener containers that use that factory.
The default pool size (at the time of writing) is Runtime.getRuntime().availableProcessors() * 2 .
|
The RabbitMQ client
uses a ThreadFactory
to create threads for low-level I/O (socket) operations.
To modify this factory, you need to configure the underlying RabbitMQ ConnectionFactory
, as discussed in Configuring the Underlying Client Connection Factory.