public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
AbstractMessageListenerContainer.SharedConnectionNotInitializedException, AbstractMessageListenerContainer.WrappedTransactionException
Modifier and Type | Field and Description |
---|---|
static long |
DEFAULT_RECEIVE_TIMEOUT |
consumersMonitor, DEFAULT_DEBATCHING_ENABLED, DEFAULT_PREFETCH_COUNT, DEFAULT_RECOVERY_INTERVAL, DEFAULT_SHUTDOWN_TIMEOUT
logger
DEFAULT_PHASE
Constructor and Description |
---|
SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.
|
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).
|
Modifier and Type | Method and Description |
---|---|
protected void |
addAndStartConsumers(int delta)
Start up to delta consumers, limited by
setMaxConcurrentConsumers(int) . |
void |
addQueueNames(String... queueName)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queue)
Add queue(s) to this container's list of queues.
|
protected void |
adjustConsumers(int deltaArg)
Adjust consumers depending on delta.
|
protected BlockingQueueConsumer |
createBlockingQueueConsumer() |
protected void |
doInitialize()
Register any invokers within this container.
|
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.
|
int |
getActiveConsumerCount() |
protected void |
handleStartupFailure(BackOffExecution backOffExecution) |
protected int |
initializeConsumers() |
protected void |
publishConsumerFailedEvent(String reason,
boolean fatal,
Throwable t) |
boolean |
removeQueueNames(String... queueName)
Remove queues from this container's list of queues.
|
boolean |
removeQueues(Queue... queue)
Remove queues from this container's list of queues.
|
void |
setConcurrency(String concurrency)
Specify concurrency limits via a "lower-upper" String, e.g.
|
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. |
void |
setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. |
void |
setConsumerStartTimeout(long consumerStartTimeout)
When starting a consumer, if this time (ms) elapses before the consumer starts, an
error log is written; one possible cause would be if the
taskExecutor has
insufficient threads to support the container concurrency. |
void |
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer - if true, the concurrency must be 1.
|
void |
setMaxConcurrentConsumers(int maxConcurrentConsumers)
Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.
|
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal.
|
void |
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setReceiveTimeout(long receiveTimeout)
The time (in milliseconds) that a consumer should wait for data.
|
void |
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only
a subset of the queues were available (milliseconds).
|
void |
setStartConsumerMinInterval(long startConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. |
void |
setStopConsumerMinInterval(long stopConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
minimum time (milliseconds) between stopping idle consumers. |
void |
setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the
channel is transactional).
|
protected boolean |
sharedConnectionEnabled()
Always use a shared Rabbit Connection.
|
String |
toString() |
protected void |
validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
consumers.
|
actualInvokeListener, addAfterReceivePostProcessors, afterPropertiesSet, causeChainHasImmediateAcknowledgeAmqpException, checkMessageListener, checkMismatchedQueues, configureAdminIfNeeded, destroy, doInvokeListener, doInvokeListener, doStop, executeListener, getAcknowledgeMode, getAdviceChain, getAmqpAdmin, getApplicationContext, getApplicationEventPublisher, getBeanName, getConnectionFactory, getConsumerArguments, getConsumerTagStrategy, getExclusiveConsumerExceptionLogger, getFailedDeclarationRetryInterval, getIdleEventInterval, getLastReceive, getListenerId, getMessageConverter, getMessageListener, getMessagePropertiesConverter, getPhase, getPrefetchCount, getQueueNames, getQueueNamesAsSet, getQueueNamesToQueues, getRabbitAdmin, getRecoveryBackOff, getRoutingConnectionFactory, getRoutingLookupKey, getShutdownTimeout, getTaskExecutor, getTransactionAttribute, getTransactionManager, handleListenerException, initialize, initializeProxy, invokeErrorHandler, invokeListener, isActive, isAlwaysRequeueWithTxManagerRollback, isAutoDeclare, isAutoStartup, isChannelLocallyTransacted, isDefaultRequeueRejected, isExclusive, isExposeListenerChannel, isForceCloseChannel, isMismatchedQueuesFatal, isMissingQueuesFatal, isMissingQueuesFatalSet, isNoLocal, isPossibleAuthenticationFailureFatal, isPossibleAuthenticationFailureFatalSet, isRunning, isStatefulRetryFatalWithNullMessageId, lazyLoad, prepareHolderForRollback, publishIdleContainerEvent, redeclareElementsIfNecessary, removeAfterReceivePostProcessor, setAcknowledgeMode, setAdviceChain, setAfterReceivePostProcessors, setAlwaysRequeueWithTxManagerRollback, setAmqpAdmin, setApplicationContext, setApplicationEventPublisher, setAutoDeclare, setAutoStartup, setBeanName, setChannelAwareMessageListener, setConsumerArguments, setConsumerTagStrategy, setDeBatchingEnabled, setDefaultRequeueRejected, setErrorHandler, setErrorHandlerLoggerName, setExclusiveConsumerExceptionLogger, setExposeListenerChannel, setFailedDeclarationRetryInterval, setForceCloseChannel, setIdleEventInterval, setListenerId, setLookupKeyQualifier, setMessageConverter, setMessageListener, setMessageListener, setMessagePropertiesConverter, setMismatchedQueuesFatal, setNoLocal, setPhase, setPossibleAuthenticationFailureFatal, setPrefetchCount, setQueues, setRabbitAdmin, setRecoveryBackOff, setRecoveryInterval, setShutdownTimeout, setStatefulRetryFatalWithNullMessageId, setTaskExecutor, setTransactionAttribute, setTransactionManager, setupMessageListener, shutdown, start, stop, updateLastReceive, wrapToListenerExecutionFailedExceptionIfNeeded
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
stop
public static final long DEFAULT_RECEIVE_TIMEOUT
public SimpleMessageListenerContainer()
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory
- the ConnectionFactory
public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues. Cannot be more than maxConcurrentConsumers
(if set).
concurrentConsumers
- the minimum number of consumers to create.setMaxConcurrentConsumers(int)
public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
concurrentConsumers
.maxConcurrentConsumers
- the maximum number of consumers.setConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setStopConsumerMinInterval(long)
,
setConsecutiveActiveTrigger(int)
,
setConsecutiveIdleTrigger(int)
public void setConcurrency(String concurrency)
This listener container will always hold on to the minimum number of consumers
(setConcurrentConsumers(int)
) and will slowly scale up to the maximum number
of consumers setMaxConcurrentConsumers(int)
in case of increasing load.
concurrency
- the concurrency.public final void setExclusive(boolean exclusive)
setExclusive
in class AbstractMessageListenerContainer
exclusive
- true for an exclusive consumer.public final void setStartConsumerMinInterval(long startConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. Default is 10000
(10 seconds).startConsumerMinInterval
- The minimum interval between new consumer starts.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
public final void setStopConsumerMinInterval(long stopConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
minimum time (milliseconds) between stopping idle consumers. Default is 60000
(1 minute).stopConsumerMinInterval
- The minimum interval between consumer stops.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
This is impacted by the txSize
.
Default is 10 consecutive messages.consecutiveActiveTrigger
- The number of consecutive receives to trigger a new consumer.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setTxSize(int)
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. The idle time is effectively
receiveTimeout
* txSize
* this value because the consumer thread waits for
a message for up to receiveTimeout
up to txSize
times.
Default is 10 consecutive idles.consecutiveIdleTrigger
- The number of consecutive timeouts to trigger stopping a consumer.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
,
setReceiveTimeout(long)
,
setTxSize(int)
public void setReceiveTimeout(long receiveTimeout)
receiveTimeout
- the timeout.setConsecutiveIdleTrigger(int)
public void setTxSize(int txSize)
the prefetch count
. Also affects how often acks are
sent when using AcknowledgeMode.AUTO
- one
ack per txSize. Default is 1.txSize
- the transaction sizepublic void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
When true, if the queues are removed while the container is running, the container is stopped.
Defaults to true for this container.
setMissingQueuesFatal
in class AbstractMessageListenerContainer
missingQueuesFatal
- the missingQueuesFatal to set.AbstractMessageListenerContainer.setAutoDeclare(boolean)
public void setQueueNames(String... queueName)
AbstractMessageListenerContainer
setQueueNames
in class AbstractMessageListenerContainer
queueName
- the desired queueName(s) (can not be null
)public void addQueueNames(String... queueName)
addQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to add.public boolean removeQueueNames(String... queueName)
removeQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to remove.queueNames
List.public void addQueues(Queue... queue)
addQueues
in class AbstractMessageListenerContainer
queue
- The queue to add.public boolean removeQueues(Queue... queue)
removeQueues
in class AbstractMessageListenerContainer
queue
- The queue to remove.queueNames
List.public void setDeclarationRetries(int declarationRetries)
declarationRetries
- The number of retries, default 3.AbstractMessageListenerContainer.setFailedDeclarationRetryInterval(long)
public void setRetryDeclarationInterval(long retryDeclarationInterval)
retryDeclarationInterval
- the interval, default 60000.public void setConsumerStartTimeout(long consumerStartTimeout)
taskExecutor
has
insufficient threads to support the container concurrency. Default 60000.consumerStartTimeout
- the timeout.protected void validateConfiguration()
validateConfiguration
in class AbstractMessageListenerContainer
protected final boolean sharedConnectionEnabled()
protected void doInitialize()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
doInitialize
in class AbstractMessageListenerContainer
@ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
protected void doStart()
doStart
in class AbstractMessageListenerContainer
protected void doShutdown()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.shutdown()
protected int initializeConsumers()
protected void adjustConsumers(int deltaArg)
deltaArg
- a negative value increases, positive decreases.protected void addAndStartConsumers(int delta)
setMaxConcurrentConsumers(int)
.delta
- the consumers to add.protected BlockingQueueConsumer createBlockingQueueConsumer()
protected void handleStartupFailure(BackOffExecution backOffExecution)
protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t)
publishConsumerFailedEvent
in class AbstractMessageListenerContainer