Class AbstractMessageListenerContainer
- All Implemented Interfaces:
MessageListenerContainer
,Aware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Phased
,SmartLifecycle
- Direct Known Subclasses:
DirectMessageListenerContainer
,SimpleMessageListenerContainer
- Author:
- Mark Pollack, Mark Fisher, Dave Syer, James Carr, Gary Russell, Alex Panchenko, Johno Crawford, Arnaud Cogoluègnes, Artem Bilan, Mohammad Hewedy, Mat Jaggard
-
Nested Class Summary
-
Field Summary
Modifier and TypeFieldDescriptionprotected final Object
static final boolean
static final int
static final long
The default recovery interval: 5000 ms = 5 seconds.static final long
Fields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
logger
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionprotected void
actualInvokeListener
(com.rabbitmq.client.Channel channel, Object data) Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.void
addAfterReceivePostProcessors
(MessagePostProcessor... postprocessors) AddMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
.void
addQueueNames
(String... queueNames) Add queue(s) to this container's list of queues.void
Add queue(s) to this container's list of queues.void
Delegates tovalidateConfiguration()
andinitialize()
.protected boolean
Traverse the cause chain and, if anImmediateAcknowledgeAmqpException
is found before anAmqpRejectAndDontRequeueException
, return true.protected void
checkMessageListener
(Object listener) Check the given message listener, throwing an exception if it does not correspond to a supported listener type.protected void
protected void
void
destroy()
Callsshutdown()
when the BeanFactory destroys the container instance.protected abstract void
Register any invokers within this container.protected void
doInvokeListener
(MessageListener listener, Object data) Invoke the specified listener as Spring Rabbit MessageListener.protected void
doInvokeListener
(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data) Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded.protected final void
doSetPossibleAuthenticationFailureFatal
(boolean possibleAuthenticationFailureFatal) protected abstract void
Close the registered invokers.protected void
doStart()
Start this container, and notify all invoker tasks.protected void
doStop()
This method is invoked when the container is stopping.protected void
executeListener
(com.rabbitmq.client.Channel channel, Object data) Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).protected void
executeListenerAndHandleException
(com.rabbitmq.client.Channel channel, Object data) protected Advice[]
protected Collection<MessagePostProcessor>
protected AmqpAdmin
protected ApplicationEventPublisher
protected BatchingStrategy
protected long
Get the consumeDelay - a time to wait before consuming in ms.Return the consumer arguments.protected ConsumerTagStrategy
Return the consumer tag strategy to use.protected ConditionalExceptionLogger
protected long
protected long
protected long
Get the time the last message was received - initialized to container start time.protected MessageAckListener
Get the message listener.protected MessagePropertiesConverter
int
getPhase()
protected int
Return the prefetch count.String[]
Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.protected BackOff
protected RoutingConnectionFactory
Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory
; null otherwise.protected String
Return the lookup key if the connection factory is aRoutingConnectionFactory
; null otherwise.protected long
protected Executor
protected TransactionAttribute
protected PlatformTransactionManager
protected void
Handle the given exception that arose during listener execution.void
Initialize this container.protected void
initializeProxy
(Object delegate) protected void
Invoke the registered ErrorHandler, if any.protected void
invokeListener
(com.rabbitmq.client.Channel channel, Object data) final boolean
isActive()
protected boolean
protected boolean
protected boolean
boolean
protected boolean
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.protected boolean
protected boolean
Return the default requeue rejected.protected boolean
Return whether the consumers should be exclusive.boolean
protected boolean
Force close the channel if the consumer threads don't respond to a shutdown.protected boolean
protected boolean
protected boolean
protected boolean
protected boolean
Return whether the consumers should be no-local.boolean
protected boolean
final boolean
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.protected boolean
void
lazyLoad()
Do not check for missing or mismatched queues during startup.protected void
prepareHolderForRollback
(RabbitResourceHolder resourceHolder, RuntimeException exception) A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.protected void
publishConsumerFailedEvent
(String reason, boolean fatal, Throwable t) protected final void
publishIdleContainerEvent
(long idleTime) protected void
publishMissingQueueEvent
(String queue) protected void
UseAmqpAdmin.initialize()
to redeclare everything if necessary.boolean
removeAfterReceivePostProcessor
(MessagePostProcessor afterReceivePostProcessor) Remove the providedMessagePostProcessor
from theafterReceivePostProcessors
list.boolean
removeQueueNames
(String... queueNames) Remove queue(s) from this container's list of queues.boolean
removeQueues
(Queue... queues) Remove queue(s) from this container's list of queues.final void
setAcknowledgeMode
(AcknowledgeMode acknowledgeMode) Flag controlling the behaviour of the container with respect to message acknowledgement.void
setAdviceChain
(Advice... adviceChain) Public setter for theAdvice
to apply to listener executions.void
setAfterReceivePostProcessors
(MessagePostProcessor... afterReceivePostProcessors) SetMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
.void
setAlwaysRequeueWithTxManagerRollback
(boolean alwaysRequeueWithTxManagerRollback) Set to true to always requeue on transaction rollback with an externalTransactionManager
.void
setAmqpAdmin
(AmqpAdmin amqpAdmin) Set theAmqpAdmin
, used to declare any auto-delete queues, bindings etc when the container is started.void
setApplicationEventPublisher
(ApplicationEventPublisher applicationEventPublisher) void
setAutoDeclare
(boolean autoDeclare) Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().void
setAutoStartup
(boolean autoStartup) Set whether to automatically start the container after initialization.void
setBatchingStrategy
(BatchingStrategy batchingStrategy) Set a batching strategy to use when de-batching messages.void
setConsumeDelay
(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms.void
setConsumerArguments
(Map<String, Object> args) Set consumer arguments.void
setConsumerTagStrategy
(ConsumerTagStrategy consumerTagStrategy) Set the implementation ofConsumerTagStrategy
to generate consumer tags.void
setDeBatchingEnabled
(boolean deBatchingEnabled) Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false).void
setDefaultRequeueRejected
(boolean defaultRequeueRejected) Set the default behavior when a message is rejected, for example because the listener threw an exception.void
setErrorHandler
(ErrorHandler errorHandler) Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.void
setErrorHandlerLoggerName
(String errorHandlerLoggerName) Set the name (category) of the logger used to log exceptions thrown by the error handler.void
setExclusive
(boolean exclusive) Set to true for an exclusive consumer.void
setExclusiveConsumerExceptionLogger
(ConditionalExceptionLogger exclusiveConsumerExceptionLogger) Set aConditionalExceptionLogger
for logging exclusive consumer failures.void
setExposeListenerChannel
(boolean exposeListenerChannel) Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListener
as well as toRabbitTemplate
calls.void
setFailedDeclarationRetryInterval
(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.void
setForceCloseChannel
(boolean forceCloseChannel) Set to true to force close the channel if the consumer threads don't respond to a shutdown.void
setGlobalQos
(boolean globalQos) Apply prefetchCount to the entire channel.void
setIdleEventInterval
(long idleEventInterval) How often to emitListenerContainerIdleEvent
s in milliseconds.void
setjavaLangErrorHandler
(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler) Provide a JavaLangErrorHandler implementation; by default,System.exit(99)
is called.void
setLookupKeyQualifier
(String lookupKeyQualifier) Set a qualifier that will prefix the connection factory lookup key; default none.void
setMessageAckListener
(MessageAckListener messageAckListener) Set aMessageAckListener
to use when ack a message(messages) inAcknowledgeMode.AUTO
mode.void
setMessageListener
(MessageListener messageListener) Set theMessageListener
.void
setMessagePropertiesConverter
(MessagePropertiesConverter messagePropertiesConverter) Set theMessagePropertiesConverter
for this listener container.void
setMismatchedQueuesFatal
(boolean mismatchedQueuesFatal) Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc).void
setMissingQueuesFatal
(boolean missingQueuesFatal) If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal.void
setNoLocal
(boolean noLocal) Set to true for an no-local consumer.protected void
void
setObservationConvention
(RabbitListenerObservationConvention observationConvention) Set an observation convention; used to add additional key/values to observations.void
setPhase
(int phase) Specify the phase in which this container should be started and stopped.void
setPossibleAuthenticationFailureFatal
(boolean possibleAuthenticationFailureFatal) void
setPrefetchCount
(int prefetchCount) Tell the broker how many messages to send to each consumer in a single request.void
setQueueNames
(String... queueName) Set the name of the queue(s) to receive messages from.final void
Set the name of the queue(s) to receive messages from.void
setRecoveryBackOff
(BackOff recoveryBackOff) Specify theBackOff
for interval between recovery attempts.void
setRecoveryInterval
(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds.void
setShutdownTimeout
(long shutdownTimeout) The time to wait for workers in milliseconds after the container is stopped.void
setStatefulRetryFatalWithNullMessageId
(boolean statefulRetryFatalWithNullMessageId) Set whether a message with a null messageId is fatal for the consumer when using stateful retry.void
setTaskExecutor
(Executor taskExecutor) Set a task executor for the container - used to create the consumers not at runtime.void
setTransactionAttribute
(TransactionAttribute transactionAttribute) Set the transaction attribute to use when using an external transaction manager.void
setTransactionManager
(PlatformTransactionManager transactionManager) Set the transaction manager to use.void
setupMessageListener
(MessageListener messageListener) Setup the message listener to use.void
shutdown()
Stop the shared Connection, calldoShutdown()
, and close this container.void
start()
Start this container.void
stop()
Stop this container.protected void
protected void
Validate the configuration of this container.protected ListenerExecutionFailedException
Methods inherited from class org.springframework.amqp.rabbit.listener.ObservableListenerContainer
checkMicrometer, checkObservation, getApplicationContext, getBeanName, getListenerId, getMicrometerHolder, setApplicationContext, setBeanName, setListenerId, setMicrometerEnabled, setMicrometerTags, setObservationEnabled
Methods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getObservationRegistry, getTransactionalResourceHolder, isChannelTransacted, obtainObservationRegistry, setChannelTransacted, setConnectionFactory
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.amqp.rabbit.listener.MessageListenerContainer
isConsumerBatchEnabled
Methods inherited from interface org.springframework.context.SmartLifecycle
stop
-
Field Details
-
DEFAULT_DEBATCHING_ENABLED
public static final boolean DEFAULT_DEBATCHING_ENABLED- See Also:
-
DEFAULT_PREFETCH_COUNT
public static final int DEFAULT_PREFETCH_COUNT- See Also:
-
DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_RECOVERY_INTERVALThe default recovery interval: 5000 ms = 5 seconds.- See Also:
-
DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_SHUTDOWN_TIMEOUT- See Also:
-
consumersMonitor
-
-
Constructor Details
-
AbstractMessageListenerContainer
public AbstractMessageListenerContainer()
-
-
Method Details
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisher
in interfaceApplicationEventPublisherAware
-
getApplicationEventPublisher
-
setAcknowledgeMode
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to
AcknowledgeMode.MANUAL
if the listener will send the acknowledgements itself usingChannel.basicAck(long, boolean)
. Manual acks are consistent with either a transactional or non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary.Set to
AcknowledgeMode.NONE
to tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). IfAcknowledgeMode.NONE
then the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).- Parameters:
acknowledgeMode
- the acknowledge mode to set. Defaults toAcknowledgeMode.AUTO
- See Also:
-
getAcknowledgeMode
- Returns:
- the acknowledgeMode
-
setQueueNames
Set the name of the queue(s) to receive messages from.- Specified by:
setQueueNames
in interfaceMessageListenerContainer
- Parameters:
queueName
- the desired queueName(s) (can not benull
)
-
setQueues
Set the name of the queue(s) to receive messages from.- Parameters:
queues
- the desired queue(s) (can not benull
)
-
getQueueNames
- Returns:
- the name of the queues to receive messages from.
-
getQueueNamesAsSet
-
getQueueNamesToQueues
Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.- Returns:
- the map.
- Since:
- 2.1
-
addQueueNames
Add queue(s) to this container's list of queues.- Parameters:
queueNames
- The queue(s) to add.
-
addQueues
Add queue(s) to this container's list of queues.- Parameters:
queues
- The queue(s) to add.
-
removeQueueNames
Remove queue(s) from this container's list of queues.- Parameters:
queueNames
- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNames
List.
-
removeQueues
Remove queue(s) from this container's list of queues.- Parameters:
queues
- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNames
List.
-
isExposeListenerChannel
public boolean isExposeListenerChannel()- Returns:
- whether to expose the listener
Channel
to a registeredChannelAwareMessageListener
.
-
setExposeListenerChannel
public void setExposeListenerChannel(boolean exposeListenerChannel) Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListener
as well as toRabbitTemplate
calls.Default is "true", reusing the listener's
Channel
. Turn this off to expose a fresh Rabbit Channel fetched from the same underlying RabbitConnection
instead.Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplate
calls. So in terms of RabbitTemplate exposure, this setting only affects locally transacted Channels.- Parameters:
exposeListenerChannel
- true to expose the channel.- See Also:
-
setMessageListener
Set theMessageListener
.- Parameters:
messageListener
- the listener.- Since:
- 2.0
-
checkMessageListener
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.Only a Spring
MessageListener
object will be accepted.- Parameters:
listener
- the message listener object to check- Throws:
IllegalArgumentException
- if the supplied listener is not a MessageListener- See Also:
-
getMessageListener
Description copied from interface:MessageListenerContainer
Get the message listener.- Specified by:
getMessageListener
in interfaceMessageListenerContainer
- Returns:
- The message listener object.
-
setErrorHandler
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default aConditionalRejectingErrorHandler
with its default list of fatal exceptions will be used.- Parameters:
errorHandler
- The error handler.
-
setDeBatchingEnabled
public void setDeBatchingEnabled(boolean deBatchingEnabled) Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false). Default: true.- Parameters:
deBatchingEnabled
- the deBatchingEnabled to set.- See Also:
-
isDeBatchingEnabled
protected boolean isDeBatchingEnabled() -
setAdviceChain
Public setter for theAdvice
to apply to listener executions.If a {code #setTransactionManager(PlatformTransactionManager) transactionManager} is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
- Parameters:
adviceChain
- the advice chain to set
-
getAdviceChain
-
setAfterReceivePostProcessors
SetMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder
,Order
and finally unordered.- Parameters:
afterReceivePostProcessors
- the post processor.- Since:
- 1.4.2
- See Also:
-
addAfterReceivePostProcessors
AddMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder
,Order
and finally unordered.In contrast to
setAfterReceivePostProcessors(MessagePostProcessor...)
, this method does not override the previously added afterReceivePostProcessors.- Parameters:
postprocessors
- the post processor.- Since:
- 2.1.4
-
removeAfterReceivePostProcessor
Remove the providedMessagePostProcessor
from theafterReceivePostProcessors
list.- Parameters:
afterReceivePostProcessor
- the MessagePostProcessor to remove.- Returns:
- the boolean if the provided post processor has been removed.
- Since:
- 2.1.4
- See Also:
-
setAutoStartup
public void setAutoStartup(boolean autoStartup) Set whether to automatically start the container after initialization.Default is "true"; set this to "false" to allow for manual startup through the
start()
method.- Specified by:
setAutoStartup
in interfaceMessageListenerContainer
- Parameters:
autoStartup
- true for auto startup.
-
isAutoStartup
public boolean isAutoStartup()- Specified by:
isAutoStartup
in interfaceSmartLifecycle
-
setPhase
public void setPhase(int phase) Specify the phase in which this container should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible.- Parameters:
phase
- The phase.
-
getPhase
public int getPhase()- Specified by:
getPhase
in interfacePhased
- Specified by:
getPhase
in interfaceSmartLifecycle
- Returns:
- The phase in which this container will be started and stopped.
-
getConnectionFactory
- Overrides:
getConnectionFactory
in classRabbitAccessor
- Returns:
- The ConnectionFactory that this accessor uses for obtaining RabbitMQ
Connections
.
-
setLookupKeyQualifier
Set a qualifier that will prefix the connection factory lookup key; default none.- Parameters:
lookupKeyQualifier
- the qualifier- Since:
- 1.6.9
- See Also:
-
isForceCloseChannel
protected boolean isForceCloseChannel()Force close the channel if the consumer threads don't respond to a shutdown.- Returns:
- true to force close.
- Since:
- 1.7.4
-
setForceCloseChannel
public void setForceCloseChannel(boolean forceCloseChannel) Set to true to force close the channel if the consumer threads don't respond to a shutdown. Default: true (since 2.0).- Parameters:
forceCloseChannel
- true to force close.- Since:
- 1.7.4
-
getRoutingLookupKey
Return the lookup key if the connection factory is aRoutingConnectionFactory
; null otherwise. The routing key is the comma-delimited list of queue names with all spaces removed and bracketed by [...], optionally prefixed by a qualifier, e.g. "foo[...]".- Returns:
- the key or null.
- Since:
- 1.6.9
- See Also:
-
getRoutingConnectionFactory
Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory
; null otherwise.- Returns:
- the
RoutingConnectionFactory
or null. - Since:
- 1.6.9
-
setConsumerTagStrategy
Set the implementation ofConsumerTagStrategy
to generate consumer tags. By default, the RabbitMQ server generates consumer tags.- Parameters:
consumerTagStrategy
- the consumerTagStrategy to set.- Since:
- 1.4.5
-
getConsumerTagStrategy
Return the consumer tag strategy to use.- Returns:
- the strategy.
- Since:
- 2.0
-
setConsumerArguments
Set consumer arguments.- Parameters:
args
- the arguments.- Since:
- 1.3
-
getConsumerArguments
Return the consumer arguments.- Returns:
- the arguments.
- Since:
- 2.0
-
setExclusive
public void setExclusive(boolean exclusive) Set to true for an exclusive consumer.- Parameters:
exclusive
- true for an exclusive consumer.
-
isExclusive
protected boolean isExclusive()Return whether the consumers should be exclusive.- Returns:
- true for exclusive consumers.
-
setNoLocal
public void setNoLocal(boolean noLocal) Set to true for an no-local consumer.- Parameters:
noLocal
- true for an no-local consumer.
-
isNoLocal
protected boolean isNoLocal()Return whether the consumers should be no-local.- Returns:
- true for no-local consumers.
-
setDefaultRequeueRejected
public void setDefaultRequeueRejected(boolean defaultRequeueRejected) Set the default behavior when a message is rejected, for example because the listener threw an exception. When true, messages will be requeued, when false, they will not. For versions of Rabbit that support dead-lettering, the message must not be requeued in order to be sent to the dead letter exchange. Setting to false causes all rejections to not be requeued. When true, the default can be overridden by the listener throwing anAmqpRejectAndDontRequeueException
. Default true.- Parameters:
defaultRequeueRejected
- true to reject by default.
-
isDefaultRequeueRejected
protected boolean isDefaultRequeueRejected()Return the default requeue rejected.- Returns:
- the boolean.
- Since:
- 2.0
- See Also:
-
setPrefetchCount
public void setPrefetchCount(int prefetchCount) Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.- Parameters:
prefetchCount
- the prefetch count- See Also:
-
Channel.basicQos(int, boolean)
-
getPrefetchCount
protected int getPrefetchCount()Return the prefetch count.- Returns:
- the count.
- Since:
- 2.0
-
setGlobalQos
public void setGlobalQos(boolean globalQos) Apply prefetchCount to the entire channel.- Parameters:
globalQos
- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
-
Channel.basicQos(int, boolean)
-
isGlobalQos
protected boolean isGlobalQos() -
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout) The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Defaults to 5 seconds.- Parameters:
shutdownTimeout
- the shutdown timeout to set
-
getShutdownTimeout
protected long getShutdownTimeout() -
setIdleEventInterval
public void setIdleEventInterval(long idleEventInterval) How often to emitListenerContainerIdleEvent
s in milliseconds.- Parameters:
idleEventInterval
- the interval.
-
getIdleEventInterval
protected long getIdleEventInterval() -
getLastReceive
protected long getLastReceive()Get the time the last message was received - initialized to container start time.- Returns:
- the time.
-
setTransactionManager
Set the transaction manager to use.- Parameters:
transactionManager
- the transaction manager.
-
getTransactionManager
-
setTransactionAttribute
Set the transaction attribute to use when using an external transaction manager.- Parameters:
transactionAttribute
- the transaction attribute to set
-
getTransactionAttribute
-
setTaskExecutor
Set a task executor for the container - used to create the consumers not at runtime.- Parameters:
taskExecutor
- the task executor.
-
getTaskExecutor
-
setRecoveryInterval
public void setRecoveryInterval(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.- Parameters:
recoveryInterval
- The recovery interval.
-
setRecoveryBackOff
Specify theBackOff
for interval between recovery attempts. The default is 5000 ms, that is, 5 seconds. With theBackOff
you can supply themaxAttempts
for recovery before thestop()
will be performed.- Parameters:
recoveryBackOff
- The BackOff to recover.- Since:
- 1.5
-
getRecoveryBackOff
-
setMessagePropertiesConverter
Set theMessagePropertiesConverter
for this listener container.- Parameters:
messagePropertiesConverter
- The properties converter.
-
getMessagePropertiesConverter
-
getAmqpAdmin
-
setAmqpAdmin
Set theAmqpAdmin
, used to declare any auto-delete queues, bindings etc when the container is started. Only needed if those queues use conditional declaration (have a 'declared-by' attribute). If not specified, an internal admin will be used which will attempt to declare all elements not having a 'declared-by' attribute.- Parameters:
amqpAdmin
- the AmqpAdmin to use- Since:
- 2.1
-
setMissingQueuesFatal
public void setMissingQueuesFatal(boolean missingQueuesFatal) If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
- Parameters:
missingQueuesFatal
- the missingQueuesFatal to set.- Since:
- 1.3.5
- See Also:
-
isMissingQueuesFatal
protected boolean isMissingQueuesFatal() -
isMissingQueuesFatalSet
protected boolean isMissingQueuesFatalSet() -
setMismatchedQueuesFatal
public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal) Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc). Default false.- Parameters:
mismatchedQueuesFatal
- true to fail initialization when this condition occurs.- Since:
- 1.6
-
isMismatchedQueuesFatal
protected boolean isMismatchedQueuesFatal() -
setPossibleAuthenticationFailureFatal
public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) -
doSetPossibleAuthenticationFailureFatal
protected final void doSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) -
isPossibleAuthenticationFailureFatal
public boolean isPossibleAuthenticationFailureFatal() -
isPossibleAuthenticationFailureFatalSet
protected boolean isPossibleAuthenticationFailureFatalSet() -
isAsyncReplies
protected boolean isAsyncReplies() -
setAutoDeclare
public void setAutoDeclare(boolean autoDeclare) Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().- Parameters:
autoDeclare
- the boolean flag to indicate an declaration operation.- Since:
- 1.4
- See Also:
-
isAutoDeclare
protected boolean isAutoDeclare() -
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval
- the interval, default 5000.- Since:
- 1.3.9
-
getFailedDeclarationRetryInterval
protected long getFailedDeclarationRetryInterval() -
isStatefulRetryFatalWithNullMessageId
protected boolean isStatefulRetryFatalWithNullMessageId() -
setStatefulRetryFatalWithNullMessageId
public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId) Set whether a message with a null messageId is fatal for the consumer when using stateful retry. When false, instead of stopping the consumer, the message is rejected and not requeued - it will be discarded or routed to the dead letter queue, if so configured. Default true.- Parameters:
statefulRetryFatalWithNullMessageId
- true for fatal.- Since:
- 2.0
-
setExclusiveConsumerExceptionLogger
public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger) Set aConditionalExceptionLogger
for logging exclusive consumer failures. The default is to log such failures at WARN level.- Parameters:
exclusiveConsumerExceptionLogger
- the conditional exception logger.- Since:
- 1.5
-
getExclusiveConsumerExceptionLogger
-
setAlwaysRequeueWithTxManagerRollback
public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback) Set to true to always requeue on transaction rollback with an externalTransactionManager
. With earlier releases, when a transaction manager was configured, a transaction rollback always requeued the message. This was inconsistent with local transactions where the normaldefaultRequeueRejected
andAmqpRejectAndDontRequeueException
logic was honored to determine whether the message was requeued. RabbitMQ does not consider the message delivery to be part of the transaction. This boolean was introduced in 1.7.1, set to true by default, to be consistent with previous behavior. Starting with version 2.0, it is false by default.- Parameters:
alwaysRequeueWithTxManagerRollback
- true to always requeue on rollback.- Since:
- 1.7.1.
-
isAlwaysRequeueWithTxManagerRollback
protected boolean isAlwaysRequeueWithTxManagerRollback() -
setErrorHandlerLoggerName
Set the name (category) of the logger used to log exceptions thrown by the error handler. It defaults to the container's logger but can be overridden if you want it to log at a different level to the container. Such exceptions are logged at the ERROR level.- Parameters:
errorHandlerLoggerName
- the logger name.- Since:
- 2.0.8
-
setBatchingStrategy
Set a batching strategy to use when de-batching messages. Default isSimpleBatchingStrategy
.- Parameters:
batchingStrategy
- the strategy.- Since:
- 2.2
- See Also:
-
getBatchingStrategy
-
getAfterReceivePostProcessors
-
setObservationConvention
Set an observation convention; used to add additional key/values to observations.- Parameters:
observationConvention
- the convention.- Since:
- 3.0
-
getConsumeDelay
protected long getConsumeDelay()Get the consumeDelay - a time to wait before consuming in ms.- Returns:
- the consume delay.
- Since:
- 2.3
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1
, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay
- the consume delay.- Since:
- 2.3
-
getJavaLangErrorHandler
-
setjavaLangErrorHandler
public void setjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler) Provide a JavaLangErrorHandler implementation; by default,System.exit(99)
is called.- Parameters:
javaLangErrorHandler
- the handler.- Since:
- 2.2.12
-
setMessageAckListener
Set aMessageAckListener
to use when ack a message(messages) inAcknowledgeMode.AUTO
mode.- Parameters:
messageAckListener
- the messageAckListener.- Since:
- 2.4.6
- See Also:
-
getMessageAckListener
-
afterPropertiesSet
public void afterPropertiesSet()Delegates tovalidateConfiguration()
andinitialize()
.- Specified by:
afterPropertiesSet
in interfaceInitializingBean
- Specified by:
afterPropertiesSet
in interfaceMessageListenerContainer
- Overrides:
afterPropertiesSet
in classRabbitAccessor
-
setupMessageListener
Description copied from interface:MessageListenerContainer
Setup the message listener to use. Throws anIllegalArgumentException
if that message listener type is not supported.- Specified by:
setupMessageListener
in interfaceMessageListenerContainer
- Parameters:
messageListener
- theobject
to wrapped to theMessageListener
.
-
validateConfiguration
protected void validateConfiguration()Validate the configuration of this container.The default implementation is empty. To be overridden in subclasses.
-
initializeProxy
-
destroy
public void destroy()Callsshutdown()
when the BeanFactory destroys the container instance.- Specified by:
destroy
in interfaceDisposableBean
- Overrides:
destroy
in classObservableListenerContainer
- See Also:
-
initialize
public void initialize()Initialize this container.Creates a Rabbit Connection and calls
doInitialize()
. -
shutdown
public void shutdown()Stop the shared Connection, calldoShutdown()
, and close this container. -
setNotRunning
protected void setNotRunning() -
doInitialize
protected abstract void doInitialize()Register any invokers within this container.Subclasses need to implement this method for their specific invoker management process.
-
doShutdown
protected abstract void doShutdown()Close the registered invokers.Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
- See Also:
-
isActive
public final boolean isActive()- Returns:
- Whether this container is currently active, that is, whether it has been set up but not shut down yet.
-
start
public void start()Start this container. -
doStart
protected void doStart()Start this container, and notify all invoker tasks. -
stop
public void stop()Stop this container. -
doStop
protected void doStop()This method is invoked when the container is stopping. -
isRunning
public final boolean isRunning()Determine whether this container is currently running, that is, whether it has been started and not stopped yet. -
invokeErrorHandler
Invoke the registered ErrorHandler, if any. Log at error level otherwise. The default error handler is aConditionalRejectingErrorHandler
with the defaultFatalExceptionStrategy
implementation.- Parameters:
ex
- the uncaught error that arose during Rabbit processing.- See Also:
-
executeListener
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).- Parameters:
channel
- the Rabbit Channel to operate ondata
- the received Rabbit Message- See Also:
-
executeListenerAndHandleException
-
invokeListener
-
actualInvokeListener
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.- Parameters:
channel
- the Rabbit Channel to operate ondata
- the received Rabbit Message or List of Message.- See Also:
-
doInvokeListener
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data) Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded. An exception thrown from the listener will be wrapped in aListenerExecutionFailedException
.- Parameters:
listener
- the Spring ChannelAwareMessageListener to invokechannel
- the Rabbit Channel to operate ondata
- the received Rabbit Message or List of Message.- See Also:
-
doInvokeListener
Invoke the specified listener as Spring Rabbit MessageListener.Default implementation performs a plain invocation of the
onMessage
method.Exception thrown from listener will be wrapped to
ListenerExecutionFailedException
.- Parameters:
listener
- the Rabbit MessageListener to invokedata
- the received Rabbit Message or List of Message.- See Also:
-
isChannelLocallyTransacted
protected boolean isChannelLocallyTransacted()Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
- Returns:
- whether the given Channel is locally transacted
- See Also:
-
handleListenerException
Handle the given exception that arose during listener execution.The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
- Parameters:
ex
- the exception to handle
-
wrapToListenerExecutionFailedExceptionIfNeeded
protected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data) - Parameters:
e
- The Exception.data
- The failed message.- Returns:
- If 'e' is of type
ListenerExecutionFailedException
- return 'e' as it is, otherwise wrap it toListenerExecutionFailedException
and return.
-
publishConsumerFailedEvent
-
publishMissingQueueEvent
-
publishIdleContainerEvent
protected final void publishIdleContainerEvent(long idleTime) -
updateLastReceive
protected void updateLastReceive() -
configureAdminIfNeeded
protected void configureAdminIfNeeded() -
checkMismatchedQueues
protected void checkMismatchedQueues() -
lazyLoad
public void lazyLoad()Description copied from interface:MessageListenerContainer
Do not check for missing or mismatched queues during startup. Used for lazily loaded message listener containers to avoid a deadlock when starting such containers. Applications lazily loading containers should verify the queue configuration before loading the container bean.- Specified by:
lazyLoad
in interfaceMessageListenerContainer
-
redeclareElementsIfNecessary
protected void redeclareElementsIfNecessary()UseAmqpAdmin.initialize()
to redeclare everything if necessary. Since auto deletion of a queue can cause upstream elements (bindings, exchanges) to be deleted too, everything needs to be redeclared if a queue is missing. Declaration is idempotent so, aside from some network chatter, there is no issue, and we only will do it if we detect our queue is gone.In general it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.
Starting with version 1.6, if
mismatchedQueuesFatal
is true, the declarations are always attempted during restart so the listener will fail with a fatal error if mismatches occur. -
causeChainHasImmediateAcknowledgeAmqpException
Traverse the cause chain and, if anImmediateAcknowledgeAmqpException
is found before anAmqpRejectAndDontRequeueException
, return true. AnError
will take precedence.- Parameters:
ex
- the exception- Returns:
- true if we should ack immediately.
- Since:
- 1.6.6
-
prepareHolderForRollback
protected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception) A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.TransactionDefinition.PROPAGATION_NONE
). In that case the delivery tags will have been processed manually.- Parameters:
resourceHolder
- the bound resource holder (if a transaction is active).exception
- the exception.
-
debatch
-