public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
Modifier and Type | Class and Description |
---|---|
static interface |
SimpleMessageListenerContainer.ContainerDelegate |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_PREFETCH_COUNT |
static long |
DEFAULT_RECEIVE_TIMEOUT |
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.
|
static long |
DEFAULT_SHUTDOWN_TIMEOUT |
logger
Constructor and Description |
---|
SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.
|
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).
|
Modifier and Type | Method and Description |
---|---|
protected BlockingQueueConsumer |
createBlockingQueueConsumer() |
protected void |
doInitialize()
Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
MessageConsumer.
|
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
int |
getActiveConsumerCount() |
protected void |
handleStartupFailure(Throwable t)
Wait for a period determined by the
recoveryInterval to give the container a
chance to recover from consumer startup failure, e.g. |
protected int |
initializeConsumers() |
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
void |
setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
Public setter for the
Advice to apply to listener executions. |
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setDefaultRequeueRejected(boolean defaultRequeueRejected)
Determines the default behavior when a message is rejected, for example because the listener
threw an exception.
|
void |
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the
MessagePropertiesConverter for this listener container. |
void |
setPrefetchCount(int prefetchCount)
Tells the broker how many messages to send to each consumer in a single request.
|
void |
setReceiveTimeout(long receiveTimeout) |
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.
|
void |
setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced
closed.
|
void |
setTaskExecutor(Executor taskExecutor) |
void |
setTransactionAttribute(TransactionAttribute transactionAttribute) |
void |
setTransactionManager(PlatformTransactionManager transactionManager) |
void |
setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional).
|
protected boolean |
sharedConnectionEnabled()
Always use a shared Rabbit Connection.
|
protected void |
validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
consumers.
|
afterPropertiesSet, checkMessageListener, destroy, doInvokeListener, doInvokeListener, executeListener, getAcknowledgeMode, getBeanName, getMessageListener, getPhase, getQueueNames, getRequiredQueueNames, handleListenerException, initialize, invokeErrorHandler, isActive, isAutoStartup, isExposeListenerChannel, isRunning, setAcknowledgeMode, setAutoStartup, setBeanName, setErrorHandler, setExposeListenerChannel, setMessageListener, setPhase, setQueueNames, setQueues, shutdown, start, stop, stop, wrapToListenerExecutionFailedExceptionIfNeeded
convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public static final long DEFAULT_RECEIVE_TIMEOUT
public static final int DEFAULT_PREFETCH_COUNT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_RECOVERY_INTERVAL
public SimpleMessageListenerContainer()
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory
- the ConnectionFactory
public void setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
Public setter for the Advice
to apply to listener executions. If txSize>1
then
multiple listener executions will all be wrapped in the same advice up to that limit.
If a transactionManager
is provided as well, then
separate advice is created for the transaction and applied first in the chain. In that case the advice chain
provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
adviceChain
- the advice chain to setpublic void setRecoveryInterval(long recoveryInterval)
public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.
public void setReceiveTimeout(long receiveTimeout)
public void setShutdownTimeout(long shutdownTimeout)
shutdownTimeout
- the shutdown timeout to setpublic void setTaskExecutor(Executor taskExecutor)
public void setPrefetchCount(int prefetchCount)
the transaction size
.prefetchCount
- the prefetch countpublic void setTxSize(int txSize)
the prefetch count
.txSize
- the transaction sizepublic void setTransactionManager(PlatformTransactionManager transactionManager)
public void setTransactionAttribute(TransactionAttribute transactionAttribute)
transactionAttribute
- the transaction attribute to setpublic void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
MessagePropertiesConverter
for this listener container.public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
AmqpRejectAndDontRequeueException
. Default true.defaultRequeueRejected
- protected void validateConfiguration()
validateConfiguration
in class AbstractMessageListenerContainer
protected final boolean sharedConnectionEnabled()
protected void doInitialize() throws Exception
doInitialize
in class AbstractMessageListenerContainer
Exception
@ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
protected void doStart() throws Exception
doStart
in class AbstractMessageListenerContainer
Exception
protected void doStop()
AbstractMessageListenerContainer
doStop
in class AbstractMessageListenerContainer
protected void doShutdown()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.shutdown()
protected int initializeConsumers()
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
AbstractMessageListenerContainer
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
isChannelLocallyTransacted
in class AbstractMessageListenerContainer
channel
- the Channel to checkRabbitAccessor.isChannelTransacted()
protected BlockingQueueConsumer createBlockingQueueConsumer()
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message) throws Exception
AbstractMessageListenerContainer
invokeListener
in class AbstractMessageListenerContainer
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageException
AbstractMessageListenerContainer.setMessageListener(java.lang.Object)
protected void handleStartupFailure(Throwable t) throws Exception
recoveryInterval
to give the container a
chance to recover from consumer startup failure, e.g. if the broker is down.t
- the exception that stopped the startupException
- if the shared connection still can't be established