Class SimpleMessageListenerContainer
- All Implemented Interfaces:
MessageListenerContainer
,Aware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Phased
,SmartLifecycle
- Since:
- 1.0
- Author:
- Mark Pollack, Mark Fisher, Dave Syer, Gary Russell, Artem Bilan, Alex Panchenko, Mat Jaggard, Yansong Ren, Tim Bourquin, Jeonggi Kim, Java4ye
-
Nested Class Summary
Nested classes/interfaces inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
AbstractMessageListenerContainer.DefaultExclusiveConsumerLogger, AbstractMessageListenerContainer.JavaLangErrorHandler, AbstractMessageListenerContainer.SharedConnectionNotInitializedException, AbstractMessageListenerContainer.WrappedTransactionException
-
Field Summary
Fields inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
consumersLock, DEFAULT_DEBATCHING_ENABLED, DEFAULT_PREFETCH_COUNT, DEFAULT_RECOVERY_INTERVAL, DEFAULT_SHUTDOWN_TIMEOUT, lifecycleLock, stopNow
Fields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
logger
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorDescriptionDefault constructor for convenient dependency injection via setters.SimpleMessageListenerContainer
(ConnectionFactory connectionFactory) Create a listener container from the connection factory (mandatory). -
Method Summary
Modifier and TypeMethodDescriptionprotected void
addAndStartConsumers
(int delta) Start up to delta consumers, limited bysetMaxConcurrentConsumers(int)
.void
addQueueNames
(String... queueName) Add queue(s) to this container's list of queues.void
Add queue(s) to this container's list of queues.protected void
adjustConsumers
(int deltaArg) Adjust consumers depending on delta.protected BlockingQueueConsumer
protected void
Register any invokers within this container.protected void
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.int
protected void
handleStartupFailure
(BackOffExecution backOffExecution) protected int
boolean
Return true if this container is capable of (and configured to) create batches of consumed messages.protected void
publishConsumerFailedEvent
(String reason, boolean fatal, Throwable t) boolean
removeQueueNames
(String... queueName) Remove queues from this container's list of queues.boolean
removeQueues
(Queue... queue) Remove queues from this container's list of queues.void
setBatchReceiveTimeout
(long batchReceiveTimeout) The number of milliseconds of timeout for gathering batch messages.void
setBatchSize
(int batchSize) This property has several functions.void
setConcurrency
(String concurrency) Specify concurrency limits via a "lower-upper" String, e.g.void
setConcurrentConsumers
(int concurrentConsumers) Specify the number of concurrent consumers to create.final void
setConsecutiveActiveTrigger
(int consecutiveActiveTrigger) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, andmaxConcurrentConsumers
has not been reached, specifies the number of consecutive cycles when a single consumer was active, in order to consider starting a new consumer.final void
setConsecutiveIdleTrigger
(int consecutiveIdleTrigger) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, and the number of consumers exceedsconcurrentConsumers
, specifies the number of consecutive receive attempts that return no data; after which we consider stopping a consumer.void
setConsumerBatchEnabled
(boolean consumerBatchEnabled) Set to true to present a list of messages based on thesetBatchSize(int)
, if the listener supports it.void
setConsumerStartTimeout
(long consumerStartTimeout) When starting a consumer, if this time (ms) elapses before the consumer starts, an error log is written; one possible cause would be if thetaskExecutor
has insufficient threads to support the container concurrency.void
setDeclarationRetries
(int declarationRetries) Set the number of retries after passive queue declaration fails.void
setEnforceImmediateAckForManual
(boolean enforceImmediateAckForManual) Set totrue
to enforceChannel.basicAck(long, boolean)
forAcknowledgeMode.MANUAL
whenImmediateAcknowledgeAmqpException
is thrown.final void
setExclusive
(boolean exclusive) Set to true for an exclusive consumer - if true, the concurrency must be 1.void
setMaxConcurrentConsumers
(int maxConcurrentConsumers) Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.void
setMissingQueuesFatal
(boolean missingQueuesFatal) If all the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal.void
setQueueNames
(String... queueName) Set the name of the queue(s) to receive messages from.void
setReceiveTimeout
(long receiveTimeout) The time (in milliseconds) that a consumer should wait for data.void
setRetryDeclarationInterval
(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).final void
setStartConsumerMinInterval
(long startConsumerMinInterval) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, andmaxConcurrentConsumers
has not been reached, specifies the minimum time (milliseconds) between starting new consumers on demand.final void
setStopConsumerMinInterval
(long stopConsumerMinInterval) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, and the number of consumers exceedsconcurrentConsumers
, specifies the minimum time (milliseconds) between stopping idle consumers.protected final boolean
Always use a shared Rabbit Connection.protected void
shutdownAndWaitOrCallback
(Runnable callback) toString()
protected void
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.Methods inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
actualInvokeListener, addAfterReceivePostProcessors, afterPropertiesSet, causeChainHasImmediateAcknowledgeAmqpException, checkMessageListener, checkMismatchedQueues, configureAdminIfNeeded, debatch, destroy, doInvokeListener, doInvokeListener, doSetPossibleAuthenticationFailureFatal, doShutdown, doStop, executeListener, executeListenerAndHandleException, getAcknowledgeMode, getAdviceChain, getAfterReceivePostProcessors, getAmqpAdmin, getApplicationEventPublisher, getBatchingStrategy, getConnectionFactory, getConsumeDelay, getConsumerArguments, getConsumerTagStrategy, getExclusiveConsumerExceptionLogger, getFailedDeclarationRetryInterval, getIdleEventInterval, getJavaLangErrorHandler, getLastReceive, getMessageAckListener, getMessageListener, getMessagePropertiesConverter, getPhase, getPrefetchCount, getQueueNames, getQueueNamesAsSet, getQueueNamesToQueues, getRecoveryBackOff, getRoutingConnectionFactory, getRoutingLookupKey, getShutdownTimeout, getTaskExecutor, getTransactionAttribute, getTransactionManager, handleListenerException, initialize, initializeProxy, invokeErrorHandler, invokeListener, isActive, isAlwaysRequeueWithTxManagerRollback, isAsyncReplies, isAutoDeclare, isAutoStartup, isChannelLocallyTransacted, isDeBatchingEnabled, isDefaultRequeueRejected, isExclusive, isExposeListenerChannel, isForceCloseChannel, isForceStop, isGlobalQos, isMismatchedQueuesFatal, isMissingQueuesFatal, isMissingQueuesFatalSet, isNoLocal, isPossibleAuthenticationFailureFatal, isPossibleAuthenticationFailureFatalSet, isRunning, isStatefulRetryFatalWithNullMessageId, lazyLoad, prepareHolderForRollback, publishIdleContainerEvent, publishMissingQueueEvent, redeclareElementsIfNecessary, removeAfterReceivePostProcessor, setAcknowledgeMode, setAdviceChain, setAfterReceivePostProcessors, setAlwaysRequeueWithTxManagerRollback, setAmqpAdmin, setApplicationEventPublisher, setAutoDeclare, setAutoStartup, setBatchingStrategy, setConsumeDelay, setConsumerArguments, setConsumerTagStrategy, setDeBatchingEnabled, setDefaultRequeueRejected, setErrorHandler, setErrorHandlerLoggerName, setExclusiveConsumerExceptionLogger, setExposeListenerChannel, setFailedDeclarationRetryInterval, setForceCloseChannel, setForceStop, setGlobalQos, setIdleEventInterval, setjavaLangErrorHandler, setLookupKeyQualifier, setMessageAckListener, setMessageListener, setMessagePropertiesConverter, setMismatchedQueuesFatal, setNoLocal, setNotRunning, setObservationConvention, setPhase, setPossibleAuthenticationFailureFatal, setPrefetchCount, setQueues, setRecoveryBackOff, setRecoveryInterval, setShutdownTimeout, setStatefulRetryFatalWithNullMessageId, setTaskExecutor, setTransactionAttribute, setTransactionManager, setupMessageListener, shutdown, shutdown, start, stop, stop, updateLastReceive, wrapToListenerExecutionFailedExceptionIfNeeded
Methods inherited from class org.springframework.amqp.rabbit.listener.ObservableListenerContainer
checkMicrometer, checkObservation, getApplicationContext, getBeanName, getListenerId, getMicrometerHolder, setApplicationContext, setBeanName, setListenerId, setMicrometerEnabled, setMicrometerTags, setObservationEnabled
Methods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getObservationRegistry, getTransactionalResourceHolder, isChannelTransacted, obtainObservationRegistry, setChannelTransacted, setConnectionFactory
-
Field Details
-
DEFAULT_RECEIVE_TIMEOUT
public static final long DEFAULT_RECEIVE_TIMEOUT- See Also:
-
-
Constructor Details
-
SimpleMessageListenerContainer
public SimpleMessageListenerContainer()Default constructor for convenient dependency injection via setters. -
SimpleMessageListenerContainer
Create a listener container from the connection factory (mandatory).- Parameters:
connectionFactory
- theConnectionFactory
-
-
Method Details
-
setConcurrentConsumers
public void setConcurrentConsumers(int concurrentConsumers) Specify the number of concurrent consumers to create. Default is 1.Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues. Cannot be more than
maxConcurrentConsumers
(if set).- Parameters:
concurrentConsumers
- the minimum number of consumers to create.- See Also:
-
setMaxConcurrentConsumers
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'. Consumers will be added on demand. Cannot be less thanconcurrentConsumers
.- Parameters:
maxConcurrentConsumers
- the maximum number of consumers.- See Also:
-
setConcurrency
Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple upper limit String, e.g. "10" (a fixed number of consumers).This listener container will always hold on to the minimum number of consumers (
setConcurrentConsumers(int)
) and will slowly scale up to the maximum number of consumerssetMaxConcurrentConsumers(int)
in case of increasing load.- Parameters:
concurrency
- the concurrency.- Since:
- 2.0
-
setExclusive
public final void setExclusive(boolean exclusive) Set to true for an exclusive consumer - if true, the concurrency must be 1.- Overrides:
setExclusive
in classAbstractMessageListenerContainer
- Parameters:
exclusive
- true for an exclusive consumer.
-
setStartConsumerMinInterval
public final void setStartConsumerMinInterval(long startConsumerMinInterval) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, andmaxConcurrentConsumers
has not been reached, specifies the minimum time (milliseconds) between starting new consumers on demand. Default is 10000 (10 seconds).- Parameters:
startConsumerMinInterval
- The minimum interval between new consumer starts.- See Also:
-
setStopConsumerMinInterval
public final void setStopConsumerMinInterval(long stopConsumerMinInterval) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, and the number of consumers exceedsconcurrentConsumers
, specifies the minimum time (milliseconds) between stopping idle consumers. Default is 60000 (1 minute).- Parameters:
stopConsumerMinInterval
- The minimum interval between consumer stops.- See Also:
-
setConsecutiveActiveTrigger
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, andmaxConcurrentConsumers
has not been reached, specifies the number of consecutive cycles when a single consumer was active, in order to consider starting a new consumer. If the consumer goes idle for one cycle, the counter is reset. This is impacted by thebatchSize
. Default is 10 consecutive messages.- Parameters:
consecutiveActiveTrigger
- The number of consecutive receives to trigger a new consumer.- See Also:
-
setConsecutiveIdleTrigger
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) IfmaxConcurrentConsumers
is greater thenconcurrentConsumers
, and the number of consumers exceedsconcurrentConsumers
, specifies the number of consecutive receive attempts that return no data; after which we consider stopping a consumer. The idle time is effectivelyreceiveTimeout
*batchSize
* this value because the consumer thread waits for a message for up toreceiveTimeout
up tobatchSize
times. Default is 10 consecutive idles.- Parameters:
consecutiveIdleTrigger
- The number of consecutive timeouts to trigger stopping a consumer.- See Also:
-
setReceiveTimeout
public void setReceiveTimeout(long receiveTimeout) The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second).- Parameters:
receiveTimeout
- the timeout.- See Also:
-
setBatchReceiveTimeout
public void setBatchReceiveTimeout(long batchReceiveTimeout) The number of milliseconds of timeout for gathering batch messages. It limits the time to wait to fill batchSize. Default is 0 (no timeout).- Parameters:
batchReceiveTimeout
- the timeout for gathering batch messages.- Since:
- 3.1.2
- See Also:
-
setBatchSize
public void setBatchSize(int batchSize) This property has several functions.When the channel is transacted, it determines how many messages to process in a single transaction. It should be less than or equal to
the prefetch count
.It also affects how often acks are sent when using
AcknowledgeMode.AUTO
- one ack per BatchSize.Finally, when
setConsumerBatchEnabled(boolean)
is true, it determines how many records to include in the batch as long as sufficient messages arrive withinsetReceiveTimeout(long)
.IMPORTANT The batch size represents the number of physical messages received. If
AbstractMessageListenerContainer.setDeBatchingEnabled(boolean)
is true and a message is a batch created by a producer, the actual number of messages received by the listener will be larger than this batch size.Default is 1.
- Parameters:
batchSize
- the batch size- Since:
- 2.2
- See Also:
-
setConsumerBatchEnabled
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) Set to true to present a list of messages based on thesetBatchSize(int)
, if the listener supports it. This will coercedeBatchingEnabled
to true as well.- Parameters:
consumerBatchEnabled
- true to create message batches in the container.- Since:
- 2.2
- See Also:
-
isConsumerBatchEnabled
public boolean isConsumerBatchEnabled()Description copied from interface:MessageListenerContainer
Return true if this container is capable of (and configured to) create batches of consumed messages.- Returns:
- true if enabled.
-
setMissingQueuesFatal
public void setMissingQueuesFatal(boolean missingQueuesFatal) If all the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
When true, if the queues are removed while the container is running, the container is stopped.
Defaults to true for this container.
- Overrides:
setMissingQueuesFatal
in classAbstractMessageListenerContainer
- Parameters:
missingQueuesFatal
- the missingQueuesFatal to set.- See Also:
-
setQueueNames
Description copied from class:AbstractMessageListenerContainer
Set the name of the queue(s) to receive messages from.- Specified by:
setQueueNames
in interfaceMessageListenerContainer
- Overrides:
setQueueNames
in classAbstractMessageListenerContainer
- Parameters:
queueName
- the desired queueName(s) (can not benull
)
-
addQueueNames
Add queue(s) to this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. The queue must exist to avoid problems when restarting the consumers.- Overrides:
addQueueNames
in classAbstractMessageListenerContainer
- Parameters:
queueName
- The queue to add.
-
removeQueueNames
Remove queues from this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. At least one queue must remain.- Overrides:
removeQueueNames
in classAbstractMessageListenerContainer
- Parameters:
queueName
- The queue to remove.- Returns:
- the boolean result of removal on the target
queueNames
List.
-
addQueues
Add queue(s) to this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. The queue must exist to avoid problems when restarting the consumers.- Overrides:
addQueues
in classAbstractMessageListenerContainer
- Parameters:
queue
- The queue to add.
-
removeQueues
Remove queues from this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. At least one queue must remain.- Overrides:
removeQueues
in classAbstractMessageListenerContainer
- Parameters:
queue
- The queue to remove.- Returns:
- the boolean result of removal on the target
queueNames
List.
-
setDeclarationRetries
public void setDeclarationRetries(int declarationRetries) Set the number of retries after passive queue declaration fails.- Parameters:
declarationRetries
- The number of retries, default 3.- Since:
- 1.3.9
- See Also:
-
setRetryDeclarationInterval
public void setRetryDeclarationInterval(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).- Parameters:
retryDeclarationInterval
- the interval, default 60000.- Since:
- 1.3.9
-
setConsumerStartTimeout
public void setConsumerStartTimeout(long consumerStartTimeout) When starting a consumer, if this time (ms) elapses before the consumer starts, an error log is written; one possible cause would be if thetaskExecutor
has insufficient threads to support the container concurrency. Default 60000.- Parameters:
consumerStartTimeout
- the timeout.- Since:
- 1.7.5
-
setEnforceImmediateAckForManual
public void setEnforceImmediateAckForManual(boolean enforceImmediateAckForManual) Set totrue
to enforceChannel.basicAck(long, boolean)
forAcknowledgeMode.MANUAL
whenImmediateAcknowledgeAmqpException
is thrown. This might be a tentative solution to not break behavior for current minor version.- Parameters:
enforceImmediateAckForManual
- the flag to ack message for MANUAL mode on ImmediateAcknowledgeAmqpException- Since:
- 3.1.2
-
validateConfiguration
protected void validateConfiguration()Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.- Overrides:
validateConfiguration
in classAbstractMessageListenerContainer
-
doInitialize
protected void doInitialize()Description copied from class:AbstractMessageListenerContainer
Register any invokers within this container.Subclasses need to implement this method for their specific invoker management process.
- Specified by:
doInitialize
in classAbstractMessageListenerContainer
-
getActiveConsumerCount
-
doStart
protected void doStart()Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer to this container's task executor.- Overrides:
doStart
in classAbstractMessageListenerContainer
-
shutdownAndWaitOrCallback
- Overrides:
shutdownAndWaitOrCallback
in classAbstractMessageListenerContainer
-
initializeConsumers
protected int initializeConsumers() -
adjustConsumers
protected void adjustConsumers(int deltaArg) Adjust consumers depending on delta.- Parameters:
deltaArg
- a negative value increases, positive decreases.- Since:
- 1.7.8
-
addAndStartConsumers
protected void addAndStartConsumers(int delta) Start up to delta consumers, limited bysetMaxConcurrentConsumers(int)
.- Parameters:
delta
- the consumers to add.
-
createBlockingQueueConsumer
-
handleStartupFailure
-
publishConsumerFailedEvent
- Overrides:
publishConsumerFailedEvent
in classAbstractMessageListenerContainer
-
toString
-