public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware
Modifier and Type | Class and Description |
---|---|
static class |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception that indicates that the initial setup of this container's shared Rabbit Connection failed.
|
protected static class |
AbstractMessageListenerContainer.WrappedTransactionException
A runtime exception to wrap a
Throwable . |
Modifier and Type | Field and Description |
---|---|
protected Object |
consumersMonitor |
static boolean |
DEFAULT_DEBATCHING_ENABLED |
static int |
DEFAULT_PREFETCH_COUNT |
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.
|
static long |
DEFAULT_SHUTDOWN_TIMEOUT |
logger
DEFAULT_PHASE
Constructor and Description |
---|
AbstractMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
protected void |
actualInvokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
void |
addAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
Add
MessagePostProcessor s that will be applied after message reception, before
invoking the MessageListener . |
void |
addQueueNames(String... queueNames)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queues)
Add queue(s) to this container's list of queues.
|
void |
afterPropertiesSet()
Delegates to
validateConfiguration() and initialize() . |
protected boolean |
causeChainHasImmediateAcknowledgeAmqpException(Throwable ex)
Traverse the cause chain and, if an
ImmediateAcknowledgeAmqpException
is found before an AmqpRejectAndDontRequeueException , return true. |
protected void |
checkMessageListener(Object messageListener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.
|
protected void |
checkMismatchedQueues() |
protected void |
configureAdminIfNeeded() |
void |
destroy()
Calls
shutdown() when the BeanFactory destroys the container instance. |
protected abstract void |
doInitialize()
Register any invokers within this container.
|
protected void |
doInvokeListener(ChannelAwareMessageListener listener,
com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially
with its own transaction) to the listener if demanded.
|
protected void |
doInvokeListener(MessageListener listener,
Message message)
Invoke the specified listener as Spring Rabbit MessageListener.
|
protected abstract void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Start this container, and notify all invoker tasks.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
protected void |
executeListener(com.rabbitmq.client.Channel channel,
Message messageIn)
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
|
AcknowledgeMode |
getAcknowledgeMode() |
protected Advice[] |
getAdviceChain() |
protected AmqpAdmin |
getAmqpAdmin() |
protected ApplicationContext |
getApplicationContext() |
protected ApplicationEventPublisher |
getApplicationEventPublisher() |
protected String |
getBeanName() |
ConnectionFactory |
getConnectionFactory() |
protected Map<String,Object> |
getConsumerArguments()
Return the consumer arguments.
|
protected ConsumerTagStrategy |
getConsumerTagStrategy()
Return the consumer tag strategy to use.
|
protected ConditionalExceptionLogger |
getExclusiveConsumerExceptionLogger() |
protected long |
getFailedDeclarationRetryInterval() |
protected long |
getIdleEventInterval() |
protected long |
getLastReceive()
Get the time the last message was received - initialized to container start
time.
|
String |
getListenerId()
The 'id' attribute of the listener.
|
MessageConverter |
getMessageConverter()
Deprecated.
|
Object |
getMessageListener() |
protected MessagePropertiesConverter |
getMessagePropertiesConverter() |
int |
getPhase() |
protected int |
getPrefetchCount()
Return the prefetch count.
|
String[] |
getQueueNames() |
protected Set<String> |
getQueueNamesAsSet() |
protected Map<String,Queue> |
getQueueNamesToQueues()
Returns a map of current queue names to the Queue object; allows the
determination of a changed broker-named queue.
|
protected AmqpAdmin |
getRabbitAdmin()
Deprecated.
in favor of
getAmqpAdmin() |
protected BackOff |
getRecoveryBackOff() |
protected RoutingConnectionFactory |
getRoutingConnectionFactory()
Return the (@link RoutingConnectionFactory} if the connection factory is a
RoutingConnectionFactory ; null otherwise. |
protected String |
getRoutingLookupKey()
Return the lookup key if the connection factory is a
RoutingConnectionFactory ; null otherwise. |
protected long |
getShutdownTimeout() |
protected Executor |
getTaskExecutor() |
protected TransactionAttribute |
getTransactionAttribute() |
protected PlatformTransactionManager |
getTransactionManager() |
protected void |
handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.
|
void |
initialize()
Initialize this container.
|
protected void |
initializeProxy(Object delegate) |
protected void |
invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any.
|
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message) |
boolean |
isActive() |
protected boolean |
isAlwaysRequeueWithTxManagerRollback() |
protected boolean |
isAutoDeclare() |
boolean |
isAutoStartup() |
protected boolean |
isChannelLocallyTransacted()
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
protected boolean |
isDefaultRequeueRejected()
Return the default requeue rejected.
|
protected boolean |
isExclusive()
Return whether the consumers should be exclusive.
|
boolean |
isExposeListenerChannel() |
protected boolean |
isForceCloseChannel()
Force close the channel if the consumer threads don't respond to a shutdown.
|
protected boolean |
isMismatchedQueuesFatal() |
protected boolean |
isMissingQueuesFatal() |
protected boolean |
isMissingQueuesFatalSet() |
protected boolean |
isNoLocal()
Return whether the consumers should be no-local.
|
boolean |
isPossibleAuthenticationFailureFatal() |
protected boolean |
isPossibleAuthenticationFailureFatalSet() |
boolean |
isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
|
protected boolean |
isStatefulRetryFatalWithNullMessageId() |
void |
lazyLoad()
Do not check for missing or mismatched queues during startup.
|
protected void |
prepareHolderForRollback(RabbitResourceHolder resourceHolder,
RuntimeException exception)
A null resource holder is rare, but possible if the transaction attribute caused no
transaction to be started (e.g.
|
protected void |
publishConsumerFailedEvent(String reason,
boolean fatal,
Throwable t) |
protected void |
publishIdleContainerEvent(long idleTime) |
protected void |
redeclareElementsIfNecessary()
Use
AmqpAdmin.initialize() to redeclare everything if necessary. |
boolean |
removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
Remove the provided
MessagePostProcessor from the afterReceivePostProcessors list. |
boolean |
removeQueueNames(String... queueNames)
Remove queue(s) from this container's list of queues.
|
boolean |
removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.
|
void |
setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement.
|
void |
setAdviceChain(Advice... adviceChain)
Public setter for the
Advice to apply to listener executions. |
void |
setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
Set
MessagePostProcessor s that will be applied after message reception, before
invoking the MessageListener . |
void |
setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
Set to true to always requeue on transaction rollback with an external
TransactionManager . |
void |
setAmqpAdmin(AmqpAdmin amqpAdmin)
Set the
AmqpAdmin , used to declare any auto-delete queues, bindings
etc when the container is started. |
void |
setApplicationContext(ApplicationContext applicationContext) |
void |
setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) |
void |
setAutoDeclare(boolean autoDeclare)
Set to true to automatically declare elements (queues, exchanges, bindings)
in the application context during container start().
|
void |
setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.
|
void |
setBeanName(String beanName) |
void |
setChannelAwareMessageListener(ChannelAwareMessageListener messageListener)
Deprecated.
use
setMessageListener(MessageListener) since
ChannelAwareMessageListener now inherits MessageListener . |
void |
setConsumerArguments(Map<String,Object> args)
Set consumer arguments.
|
void |
setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
Set the implementation of
ConsumerTagStrategy to generate consumer tags. |
void |
setDeBatchingEnabled(boolean deBatchingEnabled)
Determine whether or not the container should de-batch batched
messages (true) or call the listener with the batch (false).
|
void |
setDefaultRequeueRejected(boolean defaultRequeueRejected)
Set the default behavior when a message is rejected, for example because the listener
threw an exception.
|
void |
setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
|
void |
setErrorHandlerLoggerName(String errorHandlerLoggerName)
Set the name (category) of the logger used to log exceptions thrown by the error handler.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer.
|
void |
setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
Set a
ConditionalExceptionLogger for logging exclusive consumer failures. |
void |
setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registered
ChannelAwareMessageListener as well as
to RabbitTemplate calls. |
void |
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.
|
void |
setForceCloseChannel(boolean forceCloseChannel)
Set to true to force close the channel if the consumer threads don't respond to a
shutdown.
|
void |
setIdleEventInterval(long idleEventInterval)
How often to emit
ListenerContainerIdleEvent s in milliseconds. |
void |
setListenerId(String listenerId) |
void |
setLookupKeyQualifier(String lookupKeyQualifier)
Set a qualifier that will prefix the connection factory lookup key; default none.
|
void |
setMessageConverter(MessageConverter messageConverter)
Deprecated.
- this converter is not used by the container; it was only
used to configure the converter for a
@RabbitListener adapter.
That is now handled differently. If you are manually creating a listener
container, the converter must be configured in a listener adapter (if
present). |
void |
setMessageListener(MessageListener messageListener)
Set the
MessageListener . |
void |
setMessageListener(Object messageListener)
Deprecated.
|
void |
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the
MessagePropertiesConverter for this listener container. |
void |
setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
Prevent the container from starting if any of the queues defined in the context have
mismatched arguments (TTL etc).
|
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal.
|
void |
setNoLocal(boolean noLocal)
Set to true for an no-local consumer.
|
void |
setPhase(int phase)
Specify the phase in which this container should be started and stopped.
|
void |
setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) |
void |
setPrefetchCount(int prefetchCount)
Tell the broker how many messages to send to each consumer in a single request.
|
void |
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.
|
void |
setRabbitAdmin(AmqpAdmin amqpAdmin)
Deprecated.
in favor of
setAmqpAdmin(AmqpAdmin) |
void |
setRecoveryBackOff(BackOff recoveryBackOff)
Specify the
BackOff for interval between recovery attempts. |
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.
|
void |
setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped.
|
void |
setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)
Set whether a message with a null messageId is fatal for the consumer
when using stateful retry.
|
void |
setTaskExecutor(Executor taskExecutor)
Set a task executor for the container - used to create the consumers not at
runtime.
|
void |
setTransactionAttribute(TransactionAttribute transactionAttribute)
Set the transaction attribute to use when using an external transaction manager.
|
void |
setTransactionManager(PlatformTransactionManager transactionManager)
Set the transaction manager to use.
|
void |
setupMessageListener(MessageListener messageListener)
Setup the message listener to use.
|
void |
shutdown()
Stop the shared Connection, call
doShutdown() , and close this container. |
void |
start()
Start this container.
|
void |
stop()
Stop this container.
|
protected void |
updateLastReceive() |
protected void |
validateConfiguration()
Validate the configuration of this container.
|
protected ListenerExecutionFailedException |
wrapToListenerExecutionFailedExceptionIfNeeded(Exception e,
Message message) |
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
stop
public static final boolean DEFAULT_DEBATCHING_ENABLED
public static final int DEFAULT_PREFETCH_COUNT
public static final long DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_SHUTDOWN_TIMEOUT
protected final Object consumersMonitor
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher
in interface ApplicationEventPublisherAware
protected ApplicationEventPublisher getApplicationEventPublisher()
public final void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to AcknowledgeMode.MANUAL
if the listener will send the acknowledgements itself using
Channel.basicAck(long, boolean)
. Manual acks are consistent with either a transactional or
non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a
single message then the transaction is probably unnecessary.
Set to AcknowledgeMode.NONE
to tell the broker not to expect any acknowledgements, and it will assume all
messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If
AcknowledgeMode.NONE
then the channel cannot be transactional (so the container will fail on start up if
that flag is accidentally set).
acknowledgeMode
- the acknowledge mode to set. Defaults to AcknowledgeMode.AUTO
AcknowledgeMode
public AcknowledgeMode getAcknowledgeMode()
public void setQueueNames(String... queueName)
queueName
- the desired queueName(s) (can not be null
)public final void setQueues(Queue... queues)
queues
- the desired queue(s) (can not be null
)public String[] getQueueNames()
protected Map<String,Queue> getQueueNamesToQueues()
public void addQueueNames(String... queueNames)
queueNames
- The queue(s) to add.public void addQueues(Queue... queues)
queues
- The queue(s) to add.public boolean removeQueueNames(String... queueNames)
queueNames
- The queue(s) to remove.queueNames
List.public boolean removeQueues(Queue... queues)
queues
- The queue(s) to remove.queueNames
List.public boolean isExposeListenerChannel()
Channel
to a registered ChannelAwareMessageListener
.public void setExposeListenerChannel(boolean exposeListenerChannel)
ChannelAwareMessageListener
as well as
to RabbitTemplate
calls.
Default is "true", reusing the listener's Channel
. Turn this off to expose a fresh Rabbit Channel fetched
from the same underlying Rabbit Connection
instead.
Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplate
calls. So in terms of RabbitTemplate exposure, this
setting only affects locally transacted Channels.
exposeListenerChannel
- true to expose the channel.ChannelAwareMessageListener
@Deprecated public void setMessageListener(Object messageListener)
setMessageListener(MessageListener)
.MessageListener
object or a Spring ChannelAwareMessageListener
object.messageListener
- The listener.IllegalArgumentException
- if the supplied listener is not a
MessageListener
or a ChannelAwareMessageListener
MessageListener
,
ChannelAwareMessageListener
public void setMessageListener(MessageListener messageListener)
MessageListener
.messageListener
- the listener.@Deprecated public void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener)
setMessageListener(MessageListener)
since
ChannelAwareMessageListener
now inherits MessageListener
.ChannelAwareMessageListener
.messageListener
- the listener.protected void checkMessageListener(Object messageListener)
Only a Spring MessageListener
object will be accepted.
messageListener
- the message listener object to checkIllegalArgumentException
- if the supplied listener is not a MessageListenerMessageListener
public Object getMessageListener()
public void setErrorHandler(ErrorHandler errorHandler)
errorHandler
- The error handler.@Deprecated public void setMessageConverter(MessageConverter messageConverter)
@RabbitListener
adapter.
That is now handled differently. If you are manually creating a listener
container, the converter must be configured in a listener adapter (if
present).MessageConverter
strategy for converting AMQP Messages.messageConverter
- the message converter to use@Deprecated public MessageConverter getMessageConverter()
getMessageConverter
in interface MessageListenerContainer
MessageConverter
that can be used to
convert Message
, if any.public void setDeBatchingEnabled(boolean deBatchingEnabled)
deBatchingEnabled
- the deBatchingEnabled to set.public void setAdviceChain(Advice... adviceChain)
Advice
to apply to listener executions.
If a {code #setTransactionManager(PlatformTransactionManager) transactionManager} is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
adviceChain
- the advice chain to setprotected Advice[] getAdviceChain()
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
MessagePostProcessor
s that will be applied after message reception, before
invoking the MessageListener
. Often used to decompress data. Processors are invoked in order,
depending on PriorityOrder
, Order
and finally unordered.afterReceivePostProcessors
- the post processor.addAfterReceivePostProcessors(MessagePostProcessor...)
public void addAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
MessagePostProcessor
s that will be applied after message reception, before
invoking the MessageListener
. Often used to decompress data. Processors are invoked in order,
depending on PriorityOrder
, Order
and finally unordered.
In contrast to setAfterReceivePostProcessors(MessagePostProcessor...)
, this
method does not override the previously added afterReceivePostProcessors.
afterReceivePostProcessors
- the post processor.public boolean removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
MessagePostProcessor
from the afterReceivePostProcessors
list.afterReceivePostProcessor
- the MessagePostProcessor to remove.addAfterReceivePostProcessors(MessagePostProcessor...)
public void setAutoStartup(boolean autoStartup)
Default is "true"; set this to "false" to allow for manual startup through the start()
method.
autoStartup
- true for auto startup.public boolean isAutoStartup()
isAutoStartup
in interface SmartLifecycle
public void setPhase(int phase)
phase
- The phase.public int getPhase()
getPhase
in interface Phased
getPhase
in interface SmartLifecycle
public void setBeanName(String beanName)
setBeanName
in interface BeanNameAware
@Nullable protected final String getBeanName()
protected final ApplicationContext getApplicationContext()
public final void setApplicationContext(ApplicationContext applicationContext)
setApplicationContext
in interface ApplicationContextAware
public ConnectionFactory getConnectionFactory()
getConnectionFactory
in class RabbitAccessor
Connections
.public void setLookupKeyQualifier(String lookupKeyQualifier)
lookupKeyQualifier
- the qualifiergetRoutingLookupKey()
protected boolean isForceCloseChannel()
public void setForceCloseChannel(boolean forceCloseChannel)
forceCloseChannel
- true to force close.@Nullable protected String getRoutingLookupKey()
RoutingConnectionFactory
; null otherwise. The routing key is the
comma-delimited list of queue names with all spaces removed and bracketed by [...],
optionally prefixed by a qualifier, e.g. "foo[...]".setLookupKeyQualifier(String)
@Nullable protected RoutingConnectionFactory getRoutingConnectionFactory()
RoutingConnectionFactory
; null otherwise.RoutingConnectionFactory
or null.@Nullable public String getListenerId()
public void setListenerId(String listenerId)
public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
ConsumerTagStrategy
to generate consumer tags.
By default, the RabbitMQ server generates consumer tags.consumerTagStrategy
- the consumerTagStrategy to set.@Nullable protected ConsumerTagStrategy getConsumerTagStrategy()
public void setConsumerArguments(Map<String,Object> args)
args
- the arguments.protected Map<String,Object> getConsumerArguments()
public void setExclusive(boolean exclusive)
exclusive
- true for an exclusive consumer.protected boolean isExclusive()
public void setNoLocal(boolean noLocal)
noLocal
- true for an no-local consumer.protected boolean isNoLocal()
public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
AmqpRejectAndDontRequeueException
. Default true.defaultRequeueRejected
- true to reject by default.protected boolean isDefaultRequeueRejected()
setDefaultRequeueRejected(boolean)
public void setPrefetchCount(int prefetchCount)
prefetchCount
- the prefetch countprotected int getPrefetchCount()
public void setShutdownTimeout(long shutdownTimeout)
shutdownTimeout
- the shutdown timeout to setprotected long getShutdownTimeout()
public void setIdleEventInterval(long idleEventInterval)
ListenerContainerIdleEvent
s in milliseconds.idleEventInterval
- the interval.protected long getIdleEventInterval()
protected long getLastReceive()
public void setTransactionManager(PlatformTransactionManager transactionManager)
transactionManager
- the transaction manager.@Nullable protected PlatformTransactionManager getTransactionManager()
public void setTransactionAttribute(TransactionAttribute transactionAttribute)
transactionAttribute
- the transaction attribute to setprotected TransactionAttribute getTransactionAttribute()
public void setTaskExecutor(Executor taskExecutor)
taskExecutor
- the task executor.protected Executor getTaskExecutor()
public void setRecoveryInterval(long recoveryInterval)
recoveryInterval
- The recovery interval.public void setRecoveryBackOff(BackOff recoveryBackOff)
BackOff
for interval between recovery attempts.
The default is 5000 ms, that is, 5 seconds.
With the BackOff
you can supply the maxAttempts
for recovery before
the stop()
will be performed.recoveryBackOff
- The BackOff to recover.protected BackOff getRecoveryBackOff()
public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
MessagePropertiesConverter
for this listener container.messagePropertiesConverter
- The properties converter.protected MessagePropertiesConverter getMessagePropertiesConverter()
@Deprecated @Nullable protected AmqpAdmin getRabbitAdmin()
getAmqpAdmin()
public void setAmqpAdmin(AmqpAdmin amqpAdmin)
AmqpAdmin
, used to declare any auto-delete queues, bindings
etc when the container is started. Only needed if those queues use conditional
declaration (have a 'declared-by' attribute). If not specified, an internal
admin will be used which will attempt to declare all elements not having a
'declared-by' attribute.amqpAdmin
- the AmqpAdmin to use@Deprecated public final void setRabbitAdmin(AmqpAdmin amqpAdmin)
setAmqpAdmin(AmqpAdmin)
AmqpAdmin
, used to declare any auto-delete queues, bindings
etc when the container is started. Only needed if those queues use conditional
declaration (have a 'declared-by' attribute). If not specified, an internal
admin will be used which will attempt to declare all elements not having a
'declared-by' attribute.amqpAdmin
- The admin.public void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
missingQueuesFatal
- the missingQueuesFatal to set.setAutoDeclare(boolean)
protected boolean isMissingQueuesFatal()
protected boolean isMissingQueuesFatalSet()
public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
mismatchedQueuesFatal
- true to fail initialization when this condition occurs.protected boolean isMismatchedQueuesFatal()
public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
public boolean isPossibleAuthenticationFailureFatal()
protected boolean isPossibleAuthenticationFailureFatalSet()
public void setAutoDeclare(boolean autoDeclare)
autoDeclare
- the boolean flag to indicate an declaration operation.redeclareElementsIfNecessary()
protected boolean isAutoDeclare()
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
failedDeclarationRetryInterval
- the interval, default 5000.protected long getFailedDeclarationRetryInterval()
protected boolean isStatefulRetryFatalWithNullMessageId()
public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)
statefulRetryFatalWithNullMessageId
- true for fatal.public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
ConditionalExceptionLogger
for logging exclusive consumer failures. The
default is to log such failures at WARN level.exclusiveConsumerExceptionLogger
- the conditional exception logger.protected ConditionalExceptionLogger getExclusiveConsumerExceptionLogger()
public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
TransactionManager
.
With earlier releases, when a transaction manager was configured, a transaction
rollback always requeued the message. This was inconsistent with local transactions
where the normal defaultRequeueRejected
and AmqpRejectAndDontRequeueException
logic was honored to determine whether
the message was requeued. RabbitMQ does not consider the message delivery to be part
of the transaction.
This boolean was introduced in 1.7.1, set to true by default, to be consistent with
previous behavior. Starting with version 2.0, it is false by default.alwaysRequeueWithTxManagerRollback
- true to always requeue on rollback.protected boolean isAlwaysRequeueWithTxManagerRollback()
public void setErrorHandlerLoggerName(String errorHandlerLoggerName)
errorHandlerLoggerName
- the logger name.public final void afterPropertiesSet()
validateConfiguration()
and initialize()
.afterPropertiesSet
in interface InitializingBean
afterPropertiesSet
in class RabbitAccessor
public void setupMessageListener(MessageListener messageListener)
MessageListenerContainer
IllegalArgumentException
if that message listener type is not supported.setupMessageListener
in interface MessageListenerContainer
messageListener
- the object
to wrapped to the MessageListener
.protected void validateConfiguration()
The default implementation is empty. To be overridden in subclasses.
protected void initializeProxy(Object delegate)
public void destroy()
shutdown()
when the BeanFactory destroys the container instance.destroy
in interface DisposableBean
shutdown()
public void initialize()
Creates a Rabbit Connection and calls doInitialize()
.
public void shutdown()
doShutdown()
, and close this container.protected abstract void doInitialize()
Subclasses need to implement this method for their specific invoker management process.
protected abstract void doShutdown()
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
shutdown()
public final boolean isActive()
public void start()
protected void doStart()
public void stop()
protected void doStop()
public final boolean isRunning()
protected void invokeErrorHandler(Throwable ex)
ConditionalRejectingErrorHandler
with
the default FatalExceptionStrategy
implementation.ex
- the uncaught error that arose during Rabbit processing.setErrorHandler(org.springframework.util.ErrorHandler)
protected void executeListener(com.rabbitmq.client.Channel channel, Message messageIn)
channel
- the Rabbit Channel to operate onmessageIn
- the received Rabbit MessageinvokeListener(com.rabbitmq.client.Channel, org.springframework.amqp.core.Message)
,
handleListenerException(java.lang.Throwable)
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message)
protected void actualInvokeListener(com.rabbitmq.client.Channel channel, Message message)
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessagesetMessageListener(MessageListener)
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Message message)
ListenerExecutionFailedException
.listener
- the Spring ChannelAwareMessageListener to invokechannel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageChannelAwareMessageListener
,
setExposeListenerChannel(boolean)
protected void doInvokeListener(MessageListener listener, Message message)
Default implementation performs a plain invocation of the onMessage
method.
Exception thrown from listener will be wrapped to ListenerExecutionFailedException
.
listener
- the Rabbit MessageListener to invokemessage
- the received Rabbit MessageMessageListener.onMessage(org.springframework.amqp.core.Message)
protected boolean isChannelLocallyTransacted()
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
RabbitAccessor.isChannelTransacted()
protected void handleListenerException(Throwable ex)
The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
ex
- the exception to handleprotected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Message message)
e
- The Exception.message
- The failed message.ListenerExecutionFailedException
- return 'e' as it is, otherwise wrap it to
ListenerExecutionFailedException
and return.protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t)
protected final void publishIdleContainerEvent(long idleTime)
protected void updateLastReceive()
protected void configureAdminIfNeeded()
protected void checkMismatchedQueues()
public void lazyLoad()
MessageListenerContainer
lazyLoad
in interface MessageListenerContainer
protected void redeclareElementsIfNecessary()
AmqpAdmin.initialize()
to redeclare everything if necessary.
Since auto deletion of a queue can cause upstream elements
(bindings, exchanges) to be deleted too, everything needs to be redeclared if
a queue is missing.
Declaration is idempotent so, aside from some network chatter, there is no issue,
and we only will do it if we detect our queue is gone.
In general it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.
Starting with version 1.6, if
mismatchedQueuesFatal
is true,
the declarations are always attempted during restart so the listener will
fail with a fatal error if mismatches occur.
protected boolean causeChainHasImmediateAcknowledgeAmqpException(Throwable ex)
ImmediateAcknowledgeAmqpException
is found before an AmqpRejectAndDontRequeueException
, return true.
An Error
will take precedence.ex
- the exceptionprotected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception)
TransactionDefinition.PROPAGATION_NONE
). In
that case the delivery tags will have been processed manually.resourceHolder
- the bound resource holder (if a transaction is active).exception
- the exception.