public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
AbstractMessageListenerContainer.SharedConnectionNotInitializedException, AbstractMessageListenerContainer.WrappedTransactionException
Modifier and Type | Field and Description |
---|---|
static long |
DEFAULT_RECEIVE_TIMEOUT |
consumersMonitor, DEFAULT_DEBATCHING_ENABLED, DEFAULT_PREFETCH_COUNT, DEFAULT_RECOVERY_INTERVAL, DEFAULT_SHUTDOWN_TIMEOUT
logger
Constructor and Description |
---|
SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.
|
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).
|
Modifier and Type | Method and Description |
---|---|
protected void |
addAndStartConsumers(int delta)
Start up to delta consumers, limited by
setMaxConcurrentConsumers(int) . |
void |
addQueueNames(java.lang.String... queueName)
Add queue(s) to this container's list of queues.
|
protected void |
adjustConsumers(int delta)
Adjust consumers depending on delta.
|
protected BlockingQueueConsumer |
createBlockingQueueConsumer() |
protected void |
doInitialize()
Register any invokers within this container.
|
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.
|
int |
getActiveConsumerCount() |
protected void |
handleStartupFailure(org.springframework.util.backoff.BackOffExecution backOffExecution)
Wait for a period determined by the
recoveryInterval
or AbstractMessageListenerContainer.setRecoveryBackOff(BackOff) to give the container a
chance to recover from consumer startup failure, e.g. |
protected int |
initializeConsumers() |
protected void |
publishConsumerFailedEvent(java.lang.String reason,
boolean fatal,
java.lang.Throwable t) |
boolean |
removeQueueNames(java.lang.String... queueName)
Remove queues from this container's list of queues.
|
void |
setConcurrency(java.lang.String concurrency)
Specify concurrency limits via a "lower-upper" String, e.g.
|
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. |
void |
setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. |
void |
setConsumerStartTimeout(long consumerStartTimeout)
When starting a consumer, if this time (ms) elapses before the consumer starts, an
error log is written; one possible cause would be if the
taskExecutor has
insufficient threads to support the container concurrency. |
void |
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer - if true, the concurrency must be 1.
|
void |
setMaxConcurrentConsumers(int maxConcurrentConsumers)
Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.
|
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal.
|
void |
setQueueNames(java.lang.String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setReceiveTimeout(long receiveTimeout)
The time (in milliseconds) that a consumer should wait for data.
|
void |
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only
a subset of the queues were available (milliseconds).
|
void |
setStartConsumerMinInterval(long startConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. |
void |
setStopConsumerMinInterval(long stopConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
minimum time (milliseconds) between stopping idle consumers. |
void |
setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional).
|
protected boolean |
sharedConnectionEnabled()
Always use a shared Rabbit Connection.
|
java.lang.String |
toString() |
protected void |
validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
consumers.
|
actualInvokeListener, addQueues, afterPropertiesSet, causeChainHasImmediateAcknowledgeAmqpException, checkMessageListener, checkMismatchedQueues, configureAdminIfNeeded, destroy, doInvokeListener, doInvokeListener, doStop, executeListener, getAcknowledgeMode, getAdviceChain, getApplicationContext, getApplicationEventPublisher, getBeanName, getConnectionFactory, getConsumerArguments, getConsumerTagStrategy, getExclusiveConsumerExceptionLogger, getFailedDeclarationRetryInterval, getIdleEventInterval, getLastReceive, getListenerId, getMessageConverter, getMessageListener, getMessagePropertiesConverter, getPhase, getPrefetchCount, getQueueNames, getQueueNamesAsSet, getRabbitAdmin, getRecoveryBackOff, getRoutingConnectionFactory, getRoutingLookupKey, getShutdownTimeout, getTaskExecutor, getTransactionAttribute, getTransactionManager, handleListenerException, initialize, initializeProxy, invokeErrorHandler, invokeListener, isActive, isAlwaysRequeueWithTxManagerRollback, isAutoDeclare, isAutoStartup, isChannelLocallyTransacted, isDefaultRequeueRejected, isExclusive, isExposeListenerChannel, isForceCloseChannel, isMismatchedQueuesFatal, isMissingQueuesFatal, isMissingQueuesFatalSet, isNoLocal, isPossibleAuthenticationFailureFatal, isPossibleAuthenticationFailureFatalSet, isRunning, isStatefulRetryFatalWithNullMessageId, prepareHolderForRollback, publishIdleContainerEvent, redeclareElementsIfNecessary, removeQueues, setAcknowledgeMode, setAdviceChain, setAfterReceivePostProcessors, setAlwaysRequeueWithTxManagerRollback, setApplicationContext, setApplicationEventPublisher, setAutoDeclare, setAutoStartup, setBeanName, setChannelAwareMessageListener, setConsumerArguments, setConsumerTagStrategy, setDeBatchingEnabled, setDefaultRequeueRejected, setErrorHandler, setErrorHandlerLoggerName, setExclusiveConsumerExceptionLogger, setExposeListenerChannel, setFailedDeclarationRetryInterval, setForceCloseChannel, setIdleEventInterval, setListenerId, setLookupKeyQualifier, setMessageConverter, setMessageListener, setMessageListener, setMessagePropertiesConverter, setMismatchedQueuesFatal, setNoLocal, setPhase, setPossibleAuthenticationFailureFatal, setPrefetchCount, setQueues, setRabbitAdmin, setRecoveryBackOff, setRecoveryInterval, setShutdownTimeout, setStatefulRetryFatalWithNullMessageId, setTaskExecutor, setTransactionAttribute, setTransactionManager, setupMessageListener, shutdown, start, stop, stop, updateLastReceive, wrapToListenerExecutionFailedExceptionIfNeeded
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public static final long DEFAULT_RECEIVE_TIMEOUT
public SimpleMessageListenerContainer()
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory
- the ConnectionFactory
public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues. Cannot be more than maxConcurrentConsumers
(if set).
concurrentConsumers
- the minimum number of consumers to create.setMaxConcurrentConsumers(int)
public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
concurrentConsumers
.maxConcurrentConsumers
- the maximum number of consumers.setConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setStopConsumerMinInterval(long)
,
setConsecutiveActiveTrigger(int)
,
setConsecutiveIdleTrigger(int)
public void setConcurrency(java.lang.String concurrency)
This listener container will always hold on to the minimum number of consumers
(setConcurrentConsumers(int)
) and will slowly scale up to the maximum number
of consumers setMaxConcurrentConsumers(int)
in case of increasing load.
concurrency
- the concurrency.public final void setExclusive(boolean exclusive)
setExclusive
in class AbstractMessageListenerContainer
exclusive
- true for an exclusive consumer.public final void setStartConsumerMinInterval(long startConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. Default is 10000
(10 seconds).startConsumerMinInterval
- The minimum interval between new consumer starts.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
public final void setStopConsumerMinInterval(long stopConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
minimum time (milliseconds) between stopping idle consumers. Default is 60000
(1 minute).stopConsumerMinInterval
- The minimum interval between consumer stops.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
This is impacted by the txSize
.
Default is 10 consecutive messages.consecutiveActiveTrigger
- The number of consecutive receives to trigger a new consumer.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setTxSize(int)
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. The idle time is effectively
receiveTimeout
* txSize
* this value because the consumer thread waits for
a message for up to receiveTimeout
up to txSize
times.
Default is 10 consecutive idles.consecutiveIdleTrigger
- The number of consecutive timeouts to trigger stopping a consumer.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
,
setReceiveTimeout(long)
,
setTxSize(int)
public void setReceiveTimeout(long receiveTimeout)
receiveTimeout
- the timeout.setConsecutiveIdleTrigger(int)
public void setTxSize(int txSize)
the prefetch count
. Also affects
how often acks are sent when using AcknowledgeMode.AUTO
- one ack per txSize. Default is 1.txSize
- the transaction sizepublic void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
When true, if the queues are removed while the container is running, the container is stopped.
Defaults to true for this container.
setMissingQueuesFatal
in class AbstractMessageListenerContainer
missingQueuesFatal
- the missingQueuesFatal to set.AbstractMessageListenerContainer.setAutoDeclare(boolean)
public void setQueueNames(java.lang.String... queueName)
AbstractMessageListenerContainer
setQueueNames
in class AbstractMessageListenerContainer
queueName
- the desired queueName(s) (can not be null
)public void addQueueNames(java.lang.String... queueName)
addQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to add.public boolean removeQueueNames(java.lang.String... queueName)
removeQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to remove.queueNames
List.public void setDeclarationRetries(int declarationRetries)
declarationRetries
- The number of retries, default 3.AbstractMessageListenerContainer.setFailedDeclarationRetryInterval(long)
public void setRetryDeclarationInterval(long retryDeclarationInterval)
retryDeclarationInterval
- the interval, default 60000.public void setConsumerStartTimeout(long consumerStartTimeout)
taskExecutor
has
insufficient threads to support the container concurrency. Default 60000.consumerStartTimeout
- the timeout.protected void validateConfiguration()
validateConfiguration
in class AbstractMessageListenerContainer
protected final boolean sharedConnectionEnabled()
protected void doInitialize() throws java.lang.Exception
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
doInitialize
in class AbstractMessageListenerContainer
java.lang.Exception
- Any Exception.@ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
protected void doStart() throws java.lang.Exception
doStart
in class AbstractMessageListenerContainer
java.lang.Exception
- Any Exception.protected void doShutdown()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.shutdown()
protected int initializeConsumers()
protected void adjustConsumers(int delta)
delta
- a negative value increases, positive decreases.protected void addAndStartConsumers(int delta)
setMaxConcurrentConsumers(int)
.delta
- the consumers to add.protected BlockingQueueConsumer createBlockingQueueConsumer()
protected void handleStartupFailure(org.springframework.util.backoff.BackOffExecution backOffExecution) throws java.lang.Exception
recoveryInterval
or AbstractMessageListenerContainer.setRecoveryBackOff(BackOff)
to give the container a
chance to recover from consumer startup failure, e.g. if the broker is down.backOffExecution
- the BackOffExecution to get the recoveryInterval
java.lang.Exception
- if the shared connection still can't be establishedprotected void publishConsumerFailedEvent(java.lang.String reason, boolean fatal, java.lang.Throwable t)
publishConsumerFailedEvent
in class AbstractMessageListenerContainer
public java.lang.String toString()
toString
in class java.lang.Object