This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 3.2.2! |
Listener Concurrency
SimpleMessageListenerContainer
By default, the listener container starts a single consumer that receives messages from the queues.
When examining the table in the previous section, you can see a number of properties and attributes that control concurrency.
The simplest is concurrentConsumers
, which creates that (fixed) number of consumers that concurrently process messages.
Prior to version 1.3.0, this was the only setting available and the container had to be stopped and started again to change the setting.
Since version 1.3.0, you can now dynamically adjust the concurrentConsumers
property.
If it is changed while the container is running, consumers are added or removed as necessary to adjust to the new setting.
In addition, a new property called maxConcurrentConsumers
has been added and the container dynamically adjusts the concurrency based on workload.
This works in conjunction with four additional properties: consecutiveActiveTrigger
, startConsumerMinInterval
, consecutiveIdleTrigger
, and stopConsumerMinInterval
.
With the default settings, the algorithm to increase consumers works as follows:
If the maxConcurrentConsumers
has not been reached and an existing consumer is active for ten consecutive cycles AND at least 10 seconds has elapsed since the last consumer was started, a new consumer is started.
A consumer is considered active if it received at least one message in batchSize
* receiveTimeout
milliseconds.
With the default settings, the algorithm to decrease consumers works as follows:
If there are more than concurrentConsumers
running and a consumer detects ten consecutive timeouts (idle) AND the last consumer was stopped at least 60 seconds ago, a consumer is stopped.
The timeout depends on the receiveTimeout
and the batchSize
properties.
A consumer is considered idle if it receives no messages in batchSize
* receiveTimeout
milliseconds.
So, with the default timeout (one second) and a batchSize
of four, stopping a consumer is considered after 40 seconds of idle time (four timeouts correspond to one idle detection).
Practically, consumers can be stopped only if the whole container is idle for some time. This is because the broker shares its work across all the active consumers. |
Each consumer uses a single channel, regardless of the number of configured queues.
Starting with version 2.0, the concurrentConsumers
and maxConcurrentConsumers
properties can be set with the concurrency
property — for example, 2-4
.
Using DirectMessageListenerContainer
With this container, concurrency is based on the configured queues and consumersPerQueue
.
Each consumer for each queue uses a separate channel, and the concurrency is controlled by the rabbit client library.
By default, at the time of writing, it uses a pool of DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2
threads.
You can configure a taskExecutor
to provide the required maximum concurrency.