public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements org.springframework.context.ApplicationEventPublisherAware
Modifier and Type | Class and Description |
---|---|
static interface |
SimpleMessageListenerContainer.ContainerDelegate |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_PREFETCH_COUNT |
static long |
DEFAULT_RECEIVE_TIMEOUT |
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.
|
static long |
DEFAULT_SHUTDOWN_TIMEOUT |
DEFAULT_DEBATCHING_ENABLED
logger
Constructor and Description |
---|
SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.
|
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).
|
Modifier and Type | Method and Description |
---|---|
protected void |
addAndStartConsumers(int delta)
Start up to delta consumers, limited by
setMaxConcurrentConsumers(int) . |
void |
addQueueNames(java.lang.String... queueName)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queue)
Add queue(s) to this container's list of queues.
|
protected void |
adjustConsumers(int delta)
Adjust consumers depending on delta.
|
protected BlockingQueueConsumer |
createBlockingQueueConsumer() |
protected void |
doInitialize()
Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
MessageConsumer.
|
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
int |
getActiveConsumerCount() |
protected RabbitAdmin |
getRabbitAdmin() |
protected void |
handleStartupFailure(org.springframework.util.backoff.BackOffExecution backOffExecution)
Wait for a period determined by the
recoveryInterval
or setRecoveryBackOff(BackOff) to give the container a
chance to recover from consumer startup failure, e.g. |
protected int |
initializeConsumers() |
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
protected void |
redeclareElementsIfNecessary()
Use
RabbitAdmin.initialize() to redeclare everything if necessary. |
boolean |
removeQueueNames(java.lang.String... queueName)
Remove queues from this container's list of queues.
|
boolean |
removeQueues(Queue... queue)
Remove queue(s) from this container's list of queues.
|
void |
setAdviceChain(org.aopalliance.aop.Advice... adviceChain)
Public setter for the
Advice to apply to listener executions. |
void |
setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
Set to false to avoid always requeuing on transaction rollback with an external
TransactionManager . |
void |
setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) |
void |
setAutoDeclare(boolean autoDeclare) |
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. |
void |
setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. |
void |
setConsumerArguments(java.util.Map<java.lang.String,java.lang.Object> args) |
void |
setConsumerStartTimeout(long consumerStartTimeout)
When starting a consumer, if this time (ms) elapses before the consumer starts, an
error log is written; one possible cause would be if the
taskExecutor has
insufficient threads to support the container concurrency. |
void |
setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
Set the implementation of
ConsumerTagStrategy to generate consumer tags. |
void |
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.
|
void |
setDefaultRequeueRejected(boolean defaultRequeueRejected)
Determines the default behavior when a message is rejected, for example because the listener
threw an exception.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer - if true, the concurrency must be 1.
|
void |
setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
Set a
ConditionalExceptionLogger for logging exclusive consumer failures. |
void |
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.
|
void |
setIdleEventInterval(long idleEventInterval)
How often to emit
ListenerContainerIdleEvent s in milliseconds. |
void |
setMaxConcurrentConsumers(int maxConcurrentConsumers)
Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.
|
void |
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the
MessagePropertiesConverter for this listener container. |
void |
setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
Prevent the container from starting if any of the queues defined in the context have
mismatched arguments (TTL etc).
|
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal (default true).
|
void |
setNoLocal(boolean noLocal)
Set to true for an no-local consumer.
|
void |
setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
Prevent the container to fail during initialization if a
PossibleAuthenticationFailureException is thrown. |
void |
setPrefetchCount(int prefetchCount)
Tells the broker how many messages to send to each consumer in a single request.
|
void |
setQueueNames(java.lang.String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.
|
void |
setRabbitAdmin(RabbitAdmin rabbitAdmin)
Set the
RabbitAdmin , used to declare any auto-delete queues, bindings
etc when the container is started. |
void |
setReceiveTimeout(long receiveTimeout)
The time (in milliseconds) that a consumer should wait for data.
|
void |
setRecoveryBackOff(org.springframework.util.backoff.BackOff recoveryBackOff)
Specify the
BackOff for interval between recovery attempts. |
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.
|
void |
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only
a subset of the queues were available (milliseconds).
|
void |
setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced
closed.
|
void |
setStartConsumerMinInterval(long startConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. |
void |
setStopConsumerMinInterval(long stopConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
minimum time (milliseconds) between stopping idle consumers. |
void |
setTaskExecutor(java.util.concurrent.Executor taskExecutor) |
void |
setTransactionAttribute(org.springframework.transaction.interceptor.TransactionAttribute transactionAttribute) |
void |
setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager) |
void |
setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional).
|
protected boolean |
sharedConnectionEnabled()
Always use a shared Rabbit Connection.
|
java.lang.String |
toString() |
protected void |
validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
consumers.
|
afterPropertiesSet, causeChainHasImmediateAcknowledgeAmqpException, checkMessageListener, destroy, doInvokeListener, doInvokeListener, executeListener, getAcknowledgeMode, getApplicationContext, getBeanName, getConnectionFactory, getListenerId, getMessageConverter, getMessageListener, getPhase, getQueueNames, getQueueNamesAsSet, getRequiredQueueNames, getRoutingConnectionFactory, getRoutingLookupKey, handleListenerException, initialize, invokeErrorHandler, isActive, isAutoStartup, isExposeListenerChannel, isForceCloseChannel, isRunning, setAcknowledgeMode, setAfterReceivePostProcessors, setApplicationContext, setAutoStartup, setBeanName, setDeBatchingEnabled, setErrorHandler, setExposeListenerChannel, setForceCloseChannel, setListenerId, setLookupKeyQualifier, setMessageConverter, setMessageListener, setPhase, setupMessageListener, shutdown, start, stop, stop, wrapToListenerExecutionFailedExceptionIfNeeded
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public static final long DEFAULT_RECEIVE_TIMEOUT
public static final int DEFAULT_PREFETCH_COUNT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_RECOVERY_INTERVAL
public SimpleMessageListenerContainer()
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory
- the ConnectionFactory
public void setAdviceChain(org.aopalliance.aop.Advice... adviceChain)
Advice
to apply to listener executions. If txSize>1
then
multiple listener executions will all be wrapped in the same advice up to that limit.
If a transactionManager
is provided as well, then
separate advice is created for the transaction and applied first in the chain. In that case the advice chain
provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
adviceChain
- the advice chain to setpublic void setRecoveryInterval(long recoveryInterval)
recoveryInterval
- The recovery interval.public void setRecoveryBackOff(org.springframework.util.backoff.BackOff recoveryBackOff)
BackOff
for interval between recovery attempts.
The default is 5000 ms, that is, 5 seconds.
With the BackOff
you can supply the maxAttempts
for recovery before
the AbstractMessageListenerContainer.stop()
will be performed.recoveryBackOff
- The BackOff to recover.public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues. Cannot be more than maxConcurrentConsumers
(if set).
concurrentConsumers
- the minimum number of consumers to create.setMaxConcurrentConsumers(int)
public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
concurrentConsumers
.maxConcurrentConsumers
- the maximum number of consumers.setConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setStopConsumerMinInterval(long)
,
setConsecutiveActiveTrigger(int)
,
setConsecutiveIdleTrigger(int)
public final void setNoLocal(boolean noLocal)
noLocal
- true for an no-local consumer.public final void setExclusive(boolean exclusive)
exclusive
- true for an exclusive consumer.public final void setStartConsumerMinInterval(long startConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. Default is 10000
(10 seconds).startConsumerMinInterval
- The minimum interval between new consumer starts.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
public final void setStopConsumerMinInterval(long stopConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
minimum time (milliseconds) between stopping idle consumers. Default is 60000
(1 minute).stopConsumerMinInterval
- The minimum interval between consumer stops.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
This is impacted by the txSize
.
Default is 10 consecutive messages.consecutiveActiveTrigger
- The number of consecutive receives to trigger a new consumer.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setTxSize(int)
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. The idle time is effectively
receiveTimeout
* txSize
* this value because the consumer thread waits for
a message for up to receiveTimeout
up to txSize
times.
Default is 10 consecutive idles.consecutiveIdleTrigger
- The number of consecutive timeouts to trigger stopping a consumer.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
,
setReceiveTimeout(long)
,
setTxSize(int)
public void setReceiveTimeout(long receiveTimeout)
receiveTimeout
- the timeout.setConsecutiveIdleTrigger(int)
public void setShutdownTimeout(long shutdownTimeout)
shutdownTimeout
- the shutdown timeout to setpublic void setTaskExecutor(java.util.concurrent.Executor taskExecutor)
public void setPrefetchCount(int prefetchCount)
the transaction size
.prefetchCount
- the prefetch countpublic void setTxSize(int txSize)
the prefetch count
. Also affects
how often acks are sent when using AcknowledgeMode.AUTO
- one ack per txSize. Default is 1.txSize
- the transaction sizepublic void setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
public void setTransactionAttribute(org.springframework.transaction.interceptor.TransactionAttribute transactionAttribute)
transactionAttribute
- the transaction attribute to setpublic void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
MessagePropertiesConverter
for this listener container.messagePropertiesConverter
- The properties converter.public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
AmqpRejectAndDontRequeueException
. Default true.defaultRequeueRejected
- true to reject by default.public void setConsumerArguments(java.util.Map<java.lang.String,java.lang.Object> args)
protected RabbitAdmin getRabbitAdmin()
public void setRabbitAdmin(RabbitAdmin rabbitAdmin)
RabbitAdmin
, used to declare any auto-delete queues, bindings
etc when the container is started. Only needed if those queues use conditional
declaration (have a 'declared-by' attribute). If not specified, an internal
admin will be used which will attempt to declare all elements not having a
'declared-by' attribute.rabbitAdmin
- The admin.public void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will
continue to attempt to start the consumers according to the setRecoveryInterval(long)
.
Note that each consumer will make 3 attempts (at 5 second intervals) on each
recovery attempt.
missingQueuesFatal
- the missingQueuesFatal to set.public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
mismatchedQueuesFatal
- true to fail initialization when this condition occurs.public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
PossibleAuthenticationFailureException
is thrown.
Default true.possibleAuthenticationFailureFatal
- false do not fail initialization when this condition occurs.public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher
in interface org.springframework.context.ApplicationEventPublisherAware
public void setQueueNames(java.lang.String... queueName)
AbstractMessageListenerContainer
setQueueNames
in class AbstractMessageListenerContainer
queueName
- the desired queueName(s) (can not be null
)public void setQueues(Queue... queues)
AbstractMessageListenerContainer
setQueues
in class AbstractMessageListenerContainer
queues
- the desired queue(s) (can not be null
)public void setAutoDeclare(boolean autoDeclare)
autoDeclare
- the boolean flag to indicate an redeclaration operation.redeclareElementsIfNecessary()
public void addQueueNames(java.lang.String... queueName)
addQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to add.public void addQueues(Queue... queue)
addQueues
in class AbstractMessageListenerContainer
queue
- The queue to add.public boolean removeQueueNames(java.lang.String... queueName)
removeQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to remove.queueNames
List.public boolean removeQueues(Queue... queue)
removeQueues
in class AbstractMessageListenerContainer
queue
- The queue to remove.queueNames
List.public void setDeclarationRetries(int declarationRetries)
declarationRetries
- The number of retries, default 3.setFailedDeclarationRetryInterval(long)
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
failedDeclarationRetryInterval
- the interval, default 5000.setDeclarationRetries(int)
public void setRetryDeclarationInterval(long retryDeclarationInterval)
retryDeclarationInterval
- the interval, default 60000.public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
ConsumerTagStrategy
to generate consumer tags.
By default, the RabbitMQ server generates consumer tags.consumerTagStrategy
- the consumerTagStrategy to set.public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
ConditionalExceptionLogger
for logging exclusive consumer failures. The
default is to log such failures at WARN level.exclusiveConsumerExceptionLogger
- the conditional exception logger.public void setIdleEventInterval(long idleEventInterval)
ListenerContainerIdleEvent
s in milliseconds.idleEventInterval
- the interval.public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
TransactionManager
.
By default, when a transaction manager was configured, a transaction
rollback always requeued\s the message. This is inconsistent with local transactions
where the normal defaultRequeueRejected
and AmqpRejectAndDontRequeueException
logic is honored to determine whether
the message is requeued. RabbitMQ does not consider the message delivery to be part
of the transaction.
This boolean was introduced in 1.7.1, set to true by default, to be consistent with
previous behavior. Starting with version 2.0, it will be false by default.alwaysRequeueWithTxManagerRollback
- false to not always requeue on rollback.public void setConsumerStartTimeout(long consumerStartTimeout)
taskExecutor
has
insufficient threads to support the container concurrency. Default 60000.consumerStartTimeout
- the timeout.protected void validateConfiguration()
validateConfiguration
in class AbstractMessageListenerContainer
protected final boolean sharedConnectionEnabled()
protected void doInitialize() throws java.lang.Exception
doInitialize
in class AbstractMessageListenerContainer
java.lang.Exception
- Any Exception.@ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
protected void doStart() throws java.lang.Exception
doStart
in class AbstractMessageListenerContainer
java.lang.Exception
- Any Exception.protected void doStop()
AbstractMessageListenerContainer
doStop
in class AbstractMessageListenerContainer
protected void doShutdown()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.shutdown()
protected int initializeConsumers()
protected void adjustConsumers(int delta)
delta
- a negative value increases, positive decreases.protected void addAndStartConsumers(int delta)
setMaxConcurrentConsumers(int)
.delta
- the consumers to add.protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
AbstractMessageListenerContainer
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
isChannelLocallyTransacted
in class AbstractMessageListenerContainer
channel
- the Channel to checkRabbitAccessor.isChannelTransacted()
protected BlockingQueueConsumer createBlockingQueueConsumer()
protected void redeclareElementsIfNecessary()
RabbitAdmin.initialize()
to redeclare everything if necessary.
Since auto deletion of a queue can cause upstream elements
(bindings, exchanges) to be deleted too, everything needs to be redeclared if
a queue is missing.
Declaration is idempotent so, aside from some network chatter, there is no issue,
and we only will do it if we detect our queue is gone.
In general it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.
Starting with version 1.6, if
mismatchedQueuesFatal
is true,
the declarations are always attempted during restart so the listener will
fail with a fatal error if mismatches occur.
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message) throws java.lang.Exception
AbstractMessageListenerContainer
invokeListener
in class AbstractMessageListenerContainer
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit Messagejava.lang.Exception
- if thrown by Rabbit API methodsAbstractMessageListenerContainer.setMessageListener(java.lang.Object)
protected void handleStartupFailure(org.springframework.util.backoff.BackOffExecution backOffExecution) throws java.lang.Exception
recoveryInterval
or setRecoveryBackOff(BackOff)
to give the container a
chance to recover from consumer startup failure, e.g. if the broker is down.backOffExecution
- the BackOffExecution to get the recoveryInterval
java.lang.Exception
- if the shared connection still can't be establishedpublic java.lang.String toString()
toString
in class java.lang.Object