public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
Modifier and Type | Class and Description |
---|---|
static interface |
SimpleMessageListenerContainer.ContainerDelegate |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_PREFETCH_COUNT |
static long |
DEFAULT_RECEIVE_TIMEOUT |
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.
|
static long |
DEFAULT_SHUTDOWN_TIMEOUT |
logger
Constructor and Description |
---|
SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.
|
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).
|
Modifier and Type | Method and Description |
---|---|
protected void |
addAndStartConsumers(int delta) |
void |
addQueueNames(String... queueName)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queue)
Add queue(s) to this container's list of queues.
|
protected BlockingQueueConsumer |
createBlockingQueueConsumer() |
protected void |
doInitialize()
Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
MessageConsumer.
|
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
int |
getActiveConsumerCount() |
protected RabbitAdmin |
getRabbitAdmin() |
protected void |
handleStartupFailure(Throwable t)
Wait for a period determined by the
recoveryInterval to give the container a
chance to recover from consumer startup failure, e.g. |
protected int |
initializeConsumers() |
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
boolean |
removeQueueNames(String... queueName)
Remove queues from this container's list of queues.
|
boolean |
removeQueues(Queue... queue)
Remove queue(s) from this container's list of queues.
|
void |
setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
Public setter for the
Advice to apply to listener executions. |
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. |
void |
setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. |
void |
setConsumerArguments(Map<String,Object> args) |
void |
setDefaultRequeueRejected(boolean defaultRequeueRejected)
Determines the default behavior when a message is rejected, for example because the listener
threw an exception.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer - if true, the concurrency must be 1.
|
void |
setMaxConcurrentConsumers(int maxConcurrentConsumers)
Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.
|
void |
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the
MessagePropertiesConverter for this listener container. |
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal (default true).
|
void |
setPrefetchCount(int prefetchCount)
Tells the broker how many messages to send to each consumer in a single request.
|
void |
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.
|
void |
setRabbitAdmin(RabbitAdmin rabbitAdmin)
Set the
RabbitAdmin , used to declare any auto-delete queues, bindings
etc when the container is started. |
void |
setReceiveTimeout(long receiveTimeout)
The time (in milliseconds) that a consumer should wait for data.
|
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.
|
void |
setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced
closed.
|
void |
setStartConsumerMinInterval(long startConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
maxConcurrentConsumers has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. |
void |
setStopConsumerMinInterval(long stopConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers , and
the number of consumers exceeds concurrentConsumers , specifies the
minimum time (milliseconds) between stopping idle consumers. |
void |
setTaskExecutor(Executor taskExecutor) |
void |
setTransactionAttribute(org.springframework.transaction.interceptor.TransactionAttribute transactionAttribute) |
void |
setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager) |
void |
setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional).
|
protected boolean |
sharedConnectionEnabled()
Always use a shared Rabbit Connection.
|
protected void |
validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
consumers.
|
afterPropertiesSet, checkMessageListener, destroy, doInvokeListener, doInvokeListener, executeListener, getAcknowledgeMode, getApplicationContext, getBeanName, getMessageListener, getPhase, getQueueNames, getQueueNamesAsSet, getRequiredQueueNames, handleListenerException, initialize, invokeErrorHandler, isActive, isAutoStartup, isExposeListenerChannel, isRunning, setAcknowledgeMode, setApplicationContext, setAutoStartup, setBeanName, setErrorHandler, setExposeListenerChannel, setMessageListener, setPhase, shutdown, start, stop, stop, wrapToListenerExecutionFailedExceptionIfNeeded
convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public static final long DEFAULT_RECEIVE_TIMEOUT
public static final int DEFAULT_PREFETCH_COUNT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_RECOVERY_INTERVAL
public SimpleMessageListenerContainer()
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory
- the ConnectionFactory
public void setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
Public setter for the Advice
to apply to listener executions. If txSize>1
then
multiple listener executions will all be wrapped in the same advice up to that limit.
If a transactionManager
is provided as well, then
separate advice is created for the transaction and applied first in the chain. In that case the advice chain
provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
adviceChain
- the advice chain to setpublic void setRecoveryInterval(long recoveryInterval)
recoveryInterval
- The recovery interval.public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues. Cannot be less than maxConcurrentConsumers
(if set).
concurrentConsumers
- the minimum number of consumers to create.setMaxConcurrentConsumers(int)
public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
concurrentConsumers
.maxConcurrentConsumers
- the maximum number of consumers.setConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setStopConsumerMinInterval(long)
,
setConsecutiveActiveTrigger(int)
,
setConsecutiveIdleTrigger(int)
public final void setExclusive(boolean exclusive)
exclusive
- true for an exclusive consumer.public final void setStartConsumerMinInterval(long startConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. Default is 10000
(10 seconds).startConsumerMinInterval
- The minimum interval between new consumer starts.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
public final void setStopConsumerMinInterval(long stopConsumerMinInterval)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
minimum time (milliseconds) between stopping idle consumers. Default is 60000
(1 minute).stopConsumerMinInterval
- The minimum interval between consumer stops.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
maxConcurrentConsumers
has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
This is impacted by the txSize
.
Default is 10 consecutive messages.consecutiveActiveTrigger
- The number of consecutive receives to trigger a new consumer.setMaxConcurrentConsumers(int)
,
setStartConsumerMinInterval(long)
,
setTxSize(int)
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
maxConcurrentConsumers
is greater then concurrentConsumers
, and
the number of consumers exceeds concurrentConsumers
, specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. The idle time is effectively
receiveTimeout
* txSize
* this value because the consumer thread waits for
a message for up to receiveTimeout
up to txSize
times.
Default is 10 consecutive idles.consecutiveIdleTrigger
- The number of consecutive timeouts to trigger stopping a consumer.setMaxConcurrentConsumers(int)
,
setStopConsumerMinInterval(long)
,
setReceiveTimeout(long)
,
setTxSize(int)
public void setReceiveTimeout(long receiveTimeout)
receiveTimeout
- the timeout.setConsecutiveIdleTrigger(int)
public void setShutdownTimeout(long shutdownTimeout)
shutdownTimeout
- the shutdown timeout to setpublic void setTaskExecutor(Executor taskExecutor)
public void setPrefetchCount(int prefetchCount)
the transaction size
.prefetchCount
- the prefetch countpublic void setTxSize(int txSize)
the prefetch count
. Also affects
how often acks are sent when using AcknowledgeMode.AUTO
- one ack per txSize. Default is 1.txSize
- the transaction sizepublic void setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
public void setTransactionAttribute(org.springframework.transaction.interceptor.TransactionAttribute transactionAttribute)
transactionAttribute
- the transaction attribute to setpublic void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
MessagePropertiesConverter
for this listener container.messagePropertiesConverter
- The properties converter.public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
AmqpRejectAndDontRequeueException
. Default true.defaultRequeueRejected
- true to reject by default.protected RabbitAdmin getRabbitAdmin()
public void setRabbitAdmin(RabbitAdmin rabbitAdmin)
RabbitAdmin
, used to declare any auto-delete queues, bindings
etc when the container is started. Only needed if those queues use conditional
declaration (have a 'declared-by' attribute). If not specified, an internal
admin will be used which will attempt to declare all elements not having a
'declared-by' attribute.rabbitAdmin
- The admin.public void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will
continue to attempt to start the consumers according to the setRecoveryInterval(long)
.
Note that each consumer will make 3 attempts (at 5 second intervals) on each
recovery attempt.
missingQueuesFatal
- the missingQueuesFatal to set.public void setQueueNames(String... queueName)
AbstractMessageListenerContainer
setQueueNames
in class AbstractMessageListenerContainer
queueName
- the desired queueName(s) (can not be null
)public void setQueues(Queue... queues)
AbstractMessageListenerContainer
setQueues
in class AbstractMessageListenerContainer
queues
- the desired queue(s) (can not be null
)public void addQueueNames(String... queueName)
addQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to add.public void addQueues(Queue... queue)
addQueues
in class AbstractMessageListenerContainer
queue
- The queue to add.public boolean removeQueueNames(String... queueName)
removeQueueNames
in class AbstractMessageListenerContainer
queueName
- The queue to remove.queueNames
List.public boolean removeQueues(Queue... queue)
removeQueues
in class AbstractMessageListenerContainer
queue
- The queue to remove.queueNames
List.protected void validateConfiguration()
validateConfiguration
in class AbstractMessageListenerContainer
protected final boolean sharedConnectionEnabled()
protected void doInitialize() throws Exception
doInitialize
in class AbstractMessageListenerContainer
Exception
- Any Exception.@ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
protected void doStart() throws Exception
doStart
in class AbstractMessageListenerContainer
Exception
- Any Exception.protected void doStop()
AbstractMessageListenerContainer
doStop
in class AbstractMessageListenerContainer
protected void doShutdown()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.shutdown()
protected int initializeConsumers()
protected void addAndStartConsumers(int delta)
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
AbstractMessageListenerContainer
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
isChannelLocallyTransacted
in class AbstractMessageListenerContainer
channel
- the Channel to checkRabbitAccessor.isChannelTransacted()
protected BlockingQueueConsumer createBlockingQueueConsumer()
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message) throws Exception
AbstractMessageListenerContainer
invokeListener
in class AbstractMessageListenerContainer
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageException
- if thrown by Rabbit API methodsAbstractMessageListenerContainer.setMessageListener(java.lang.Object)
protected void handleStartupFailure(Throwable t) throws Exception
recoveryInterval
to give the container a
chance to recover from consumer startup failure, e.g. if the broker is down.t
- the exception that stopped the startupException
- if the shared connection still can't be established