Class AbstractMessageListenerContainer
- java.lang.Object
-
- org.springframework.amqp.rabbit.connection.RabbitAccessor
-
- org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
-
- All Implemented Interfaces:
MessageListenerContainer
,Aware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Phased
,SmartLifecycle
- Direct Known Subclasses:
DirectMessageListenerContainer
,SimpleMessageListenerContainer
public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware
- Author:
- Mark Pollack, Mark Fisher, Dave Syer, James Carr, Gary Russell, Alex Panchenko, Johno Crawford, Arnaud Cogoluègnes, Artem Bilan, Mohammad Hewedy
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
AbstractMessageListenerContainer.JavaLangErrorHandler
A handler forError
on the container thread(s).static class
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception that indicates that the initial setup of this container's shared Rabbit Connection failed.protected static class
AbstractMessageListenerContainer.WrappedTransactionException
A runtime exception to wrap aThrowable
.
-
Field Summary
Fields Modifier and Type Field Description protected Object
consumersMonitor
static boolean
DEFAULT_DEBATCHING_ENABLED
static int
DEFAULT_PREFETCH_COUNT
static long
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.static long
DEFAULT_SHUTDOWN_TIMEOUT
-
Fields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
logger
-
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
-
Constructor Summary
Constructors Constructor Description AbstractMessageListenerContainer()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
actualInvokeListener(com.rabbitmq.client.Channel channel, Object data)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.void
addAfterReceivePostProcessors(MessagePostProcessor... postprocessors)
AddMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
.void
addQueueNames(String... queueNames)
Add queue(s) to this container's list of queues.void
addQueues(Queue... queues)
Add queue(s) to this container's list of queues.void
afterPropertiesSet()
Delegates tovalidateConfiguration()
andinitialize()
.protected boolean
causeChainHasImmediateAcknowledgeAmqpException(Throwable ex)
Traverse the cause chain and, if anImmediateAcknowledgeAmqpException
is found before anAmqpRejectAndDontRequeueException
, return true.protected void
checkMessageListener(Object listener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.protected void
checkMismatchedQueues()
protected void
configureAdminIfNeeded()
protected List<Message>
debatch(Message message)
void
destroy()
Callsshutdown()
when the BeanFactory destroys the container instance.protected abstract void
doInitialize()
Register any invokers within this container.protected void
doInvokeListener(MessageListener listener, Object data)
Invoke the specified listener as Spring Rabbit MessageListener.protected void
doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded.protected void
doSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
protected abstract void
doShutdown()
Close the registered invokers.protected void
doStart()
Start this container, and notify all invoker tasks.protected void
doStop()
This method is invoked when the container is stopping.protected void
executeListener(com.rabbitmq.client.Channel channel, Object data)
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).AcknowledgeMode
getAcknowledgeMode()
protected Advice[]
getAdviceChain()
protected Collection<MessagePostProcessor>
getAfterReceivePostProcessors()
protected AmqpAdmin
getAmqpAdmin()
protected ApplicationContext
getApplicationContext()
protected ApplicationEventPublisher
getApplicationEventPublisher()
protected BatchingStrategy
getBatchingStrategy()
protected String
getBeanName()
ConnectionFactory
getConnectionFactory()
protected long
getConsumeDelay()
Get the consumeDelay - a time to wait before consuming in ms.Map<String,Object>
getConsumerArguments()
Return the consumer arguments.protected ConsumerTagStrategy
getConsumerTagStrategy()
Return the consumer tag strategy to use.protected ConditionalExceptionLogger
getExclusiveConsumerExceptionLogger()
protected long
getFailedDeclarationRetryInterval()
protected long
getIdleEventInterval()
protected AbstractMessageListenerContainer.JavaLangErrorHandler
getJavaLangErrorHandler()
protected long
getLastReceive()
Get the time the last message was received - initialized to container start time.String
getListenerId()
The 'id' attribute of the listener.Object
getMessageListener()
Get the message listener.protected MessagePropertiesConverter
getMessagePropertiesConverter()
int
getPhase()
protected int
getPrefetchCount()
Return the prefetch count.String[]
getQueueNames()
protected Set<String>
getQueueNamesAsSet()
protected Map<String,Queue>
getQueueNamesToQueues()
Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.protected BackOff
getRecoveryBackOff()
protected RoutingConnectionFactory
getRoutingConnectionFactory()
Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory
; null otherwise.protected String
getRoutingLookupKey()
Return the lookup key if the connection factory is aRoutingConnectionFactory
; null otherwise.protected long
getShutdownTimeout()
protected Executor
getTaskExecutor()
protected TransactionAttribute
getTransactionAttribute()
protected PlatformTransactionManager
getTransactionManager()
protected void
handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.void
initialize()
Initialize this container.protected void
initializeProxy(Object delegate)
protected void
invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any.protected void
invokeListener(com.rabbitmq.client.Channel channel, Object data)
boolean
isActive()
protected boolean
isAlwaysRequeueWithTxManagerRollback()
protected boolean
isAsyncReplies()
protected boolean
isAutoDeclare()
boolean
isAutoStartup()
protected boolean
isChannelLocallyTransacted()
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.protected boolean
isDeBatchingEnabled()
protected boolean
isDefaultRequeueRejected()
Return the default requeue rejected.protected boolean
isExclusive()
Return whether the consumers should be exclusive.boolean
isExposeListenerChannel()
protected boolean
isForceCloseChannel()
Force close the channel if the consumer threads don't respond to a shutdown.protected boolean
isGlobalQos()
protected boolean
isMismatchedQueuesFatal()
protected boolean
isMissingQueuesFatal()
protected boolean
isMissingQueuesFatalSet()
protected boolean
isNoLocal()
Return whether the consumers should be no-local.boolean
isPossibleAuthenticationFailureFatal()
protected boolean
isPossibleAuthenticationFailureFatalSet()
boolean
isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.protected boolean
isStatefulRetryFatalWithNullMessageId()
void
lazyLoad()
Do not check for missing or mismatched queues during startup.protected void
prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception)
A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.protected void
publishConsumerFailedEvent(String reason, boolean fatal, Throwable t)
protected void
publishIdleContainerEvent(long idleTime)
protected void
publishMissingQueueEvent(String queue)
protected void
redeclareElementsIfNecessary()
UseAmqpAdmin.initialize()
to redeclare everything if necessary.boolean
removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
Remove the providedMessagePostProcessor
from theafterReceivePostProcessors
list.boolean
removeQueueNames(String... queueNames)
Remove queue(s) from this container's list of queues.boolean
removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.void
setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement.void
setAdviceChain(Advice... adviceChain)
Public setter for theAdvice
to apply to listener executions.void
setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
SetMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
.void
setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
Set to true to always requeue on transaction rollback with an externalTransactionManager
.void
setAmqpAdmin(AmqpAdmin amqpAdmin)
Set theAmqpAdmin
, used to declare any auto-delete queues, bindings etc when the container is started.void
setApplicationContext(ApplicationContext applicationContext)
void
setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
void
setAutoDeclare(boolean autoDeclare)
Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().void
setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.void
setBatchingStrategy(BatchingStrategy batchingStrategy)
Set a batching strategy to use when de-batching messages.void
setBeanName(String beanName)
void
setConsumeDelay(long consumeDelay)
Set the consumeDelay - a time to wait before consuming in ms.void
setConsumerArguments(Map<String,Object> args)
Set consumer arguments.void
setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
Set the implementation ofConsumerTagStrategy
to generate consumer tags.void
setDeBatchingEnabled(boolean deBatchingEnabled)
Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false).void
setDefaultRequeueRejected(boolean defaultRequeueRejected)
Set the default behavior when a message is rejected, for example because the listener threw an exception.void
setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.void
setErrorHandlerLoggerName(String errorHandlerLoggerName)
Set the name (category) of the logger used to log exceptions thrown by the error handler.void
setExclusive(boolean exclusive)
Set to true for an exclusive consumer.void
setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
Set aConditionalExceptionLogger
for logging exclusive consumer failures.void
setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListener
as well as toRabbitTemplate
calls.void
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.void
setForceCloseChannel(boolean forceCloseChannel)
Set to true to force close the channel if the consumer threads don't respond to a shutdown.void
setGlobalQos(boolean globalQos)
Apply prefetchCount to the entire channel.void
setIdleEventInterval(long idleEventInterval)
How often to emitListenerContainerIdleEvent
s in milliseconds.void
setjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler)
Provide a JavaLangErrorHandler implementation; by default,System.exit(99)
is called.void
setListenerId(String listenerId)
Set the listener id.void
setLookupKeyQualifier(String lookupKeyQualifier)
Set a qualifier that will prefix the connection factory lookup key; default none.void
setMessageListener(MessageListener messageListener)
Set theMessageListener
.void
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set theMessagePropertiesConverter
for this listener container.void
setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable micrometer listener timers.void
setMicrometerTags(Map<String,String> tags)
Set additional tags for the Micrometer listener timers.void
setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc).void
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal.void
setNoLocal(boolean noLocal)
Set to true for an no-local consumer.void
setPhase(int phase)
Specify the phase in which this container should be started and stopped.void
setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
void
setPrefetchCount(int prefetchCount)
Tell the broker how many messages to send to each consumer in a single request.void
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.void
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.void
setRecoveryBackOff(BackOff recoveryBackOff)
Specify theBackOff
for interval between recovery attempts.void
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.void
setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped.void
setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)
Set whether a message with a null messageId is fatal for the consumer when using stateful retry.void
setTaskExecutor(Executor taskExecutor)
Set a task executor for the container - used to create the consumers not at runtime.void
setTransactionAttribute(TransactionAttribute transactionAttribute)
Set the transaction attribute to use when using an external transaction manager.void
setTransactionManager(PlatformTransactionManager transactionManager)
Set the transaction manager to use.void
setupMessageListener(MessageListener messageListener)
Setup the message listener to use.void
shutdown()
Stop the shared Connection, calldoShutdown()
, and close this container.void
start()
Start this container.void
stop()
Stop this container.void
stop(Runnable callback)
protected void
updateLastReceive()
protected void
validateConfiguration()
Validate the configuration of this container.protected ListenerExecutionFailedException
wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data)
-
Methods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.amqp.rabbit.listener.MessageListenerContainer
isConsumerBatchEnabled
-
-
-
-
Field Detail
-
DEFAULT_DEBATCHING_ENABLED
public static final boolean DEFAULT_DEBATCHING_ENABLED
- See Also:
- Constant Field Values
-
DEFAULT_PREFETCH_COUNT
public static final int DEFAULT_PREFETCH_COUNT
- See Also:
- Constant Field Values
-
DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.- See Also:
- Constant Field Values
-
DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
- See Also:
- Constant Field Values
-
consumersMonitor
protected final Object consumersMonitor
-
-
Method Detail
-
setApplicationEventPublisher
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
- Specified by:
setApplicationEventPublisher
in interfaceApplicationEventPublisherAware
-
getApplicationEventPublisher
protected ApplicationEventPublisher getApplicationEventPublisher()
-
setAcknowledgeMode
public final void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to
AcknowledgeMode.MANUAL
if the listener will send the acknowledgements itself usingChannel.basicAck(long, boolean)
. Manual acks are consistent with either a transactional or non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary.Set to
AcknowledgeMode.NONE
to tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). IfAcknowledgeMode.NONE
then the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).- Parameters:
acknowledgeMode
- the acknowledge mode to set. Defaults toAcknowledgeMode.AUTO
- See Also:
AcknowledgeMode
-
getAcknowledgeMode
public AcknowledgeMode getAcknowledgeMode()
- Returns:
- the acknowledgeMode
-
setQueueNames
public void setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.- Specified by:
setQueueNames
in interfaceMessageListenerContainer
- Parameters:
queueName
- the desired queueName(s) (can not benull
)
-
setQueues
public final void setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.- Parameters:
queues
- the desired queue(s) (can not benull
)
-
getQueueNames
public String[] getQueueNames()
- Returns:
- the name of the queues to receive messages from.
-
getQueueNamesToQueues
protected Map<String,Queue> getQueueNamesToQueues()
Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.- Returns:
- the map.
- Since:
- 2.1
-
addQueueNames
public void addQueueNames(String... queueNames)
Add queue(s) to this container's list of queues.- Parameters:
queueNames
- The queue(s) to add.
-
addQueues
public void addQueues(Queue... queues)
Add queue(s) to this container's list of queues.- Parameters:
queues
- The queue(s) to add.
-
removeQueueNames
public boolean removeQueueNames(String... queueNames)
Remove queue(s) from this container's list of queues.- Parameters:
queueNames
- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNames
List.
-
removeQueues
public boolean removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.- Parameters:
queues
- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNames
List.
-
isExposeListenerChannel
public boolean isExposeListenerChannel()
- Returns:
- whether to expose the listener
Channel
to a registeredChannelAwareMessageListener
.
-
setExposeListenerChannel
public void setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListener
as well as toRabbitTemplate
calls.Default is "true", reusing the listener's
Channel
. Turn this off to expose a fresh Rabbit Channel fetched from the same underlying RabbitConnection
instead.Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplate
calls. So in terms of RabbitTemplate exposure, this setting only affects locally transacted Channels.- Parameters:
exposeListenerChannel
- true to expose the channel.- See Also:
ChannelAwareMessageListener
-
setMessageListener
public void setMessageListener(MessageListener messageListener)
Set theMessageListener
.- Parameters:
messageListener
- the listener.- Since:
- 2.0
-
checkMessageListener
protected void checkMessageListener(Object listener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.Only a Spring
MessageListener
object will be accepted.- Parameters:
listener
- the message listener object to check- Throws:
IllegalArgumentException
- if the supplied listener is not a MessageListener- See Also:
MessageListener
-
getMessageListener
@Nullable public Object getMessageListener()
Description copied from interface:MessageListenerContainer
Get the message listener.- Specified by:
getMessageListener
in interfaceMessageListenerContainer
- Returns:
- The message listener object.
-
setErrorHandler
public void setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default aConditionalRejectingErrorHandler
with its default list of fatal exceptions will be used.- Parameters:
errorHandler
- The error handler.
-
setDeBatchingEnabled
public void setDeBatchingEnabled(boolean deBatchingEnabled)
Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false). Default: true.- Parameters:
deBatchingEnabled
- the deBatchingEnabled to set.- See Also:
setBatchingStrategy(BatchingStrategy)
-
isDeBatchingEnabled
protected boolean isDeBatchingEnabled()
-
setAdviceChain
public void setAdviceChain(Advice... adviceChain)
Public setter for theAdvice
to apply to listener executions.If a {code #setTransactionManager(PlatformTransactionManager) transactionManager} is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
- Parameters:
adviceChain
- the advice chain to set
-
getAdviceChain
protected Advice[] getAdviceChain()
-
setAfterReceivePostProcessors
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
SetMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder
,Order
and finally unordered.- Parameters:
afterReceivePostProcessors
- the post processor.- Since:
- 1.4.2
- See Also:
addAfterReceivePostProcessors(MessagePostProcessor...)
-
addAfterReceivePostProcessors
public void addAfterReceivePostProcessors(MessagePostProcessor... postprocessors)
AddMessagePostProcessor
s that will be applied after message reception, before invoking theMessageListener
. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder
,Order
and finally unordered.In contrast to
setAfterReceivePostProcessors(MessagePostProcessor...)
, this method does not override the previously added afterReceivePostProcessors.- Parameters:
postprocessors
- the post processor.- Since:
- 2.1.4
-
removeAfterReceivePostProcessor
public boolean removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
Remove the providedMessagePostProcessor
from theafterReceivePostProcessors
list.- Parameters:
afterReceivePostProcessor
- the MessagePostProcessor to remove.- Returns:
- the boolean if the provided post processor has been removed.
- Since:
- 2.1.4
- See Also:
addAfterReceivePostProcessors(MessagePostProcessor...)
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.Default is "true"; set this to "false" to allow for manual startup through the
start()
method.- Specified by:
setAutoStartup
in interfaceMessageListenerContainer
- Parameters:
autoStartup
- true for auto startup.
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartup
in interfaceSmartLifecycle
-
setPhase
public void setPhase(int phase)
Specify the phase in which this container should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible.- Parameters:
phase
- The phase.
-
getPhase
public int getPhase()
- Specified by:
getPhase
in interfacePhased
- Specified by:
getPhase
in interfaceSmartLifecycle
- Returns:
- The phase in which this container will be started and stopped.
-
setBeanName
public void setBeanName(String beanName)
- Specified by:
setBeanName
in interfaceBeanNameAware
-
getBeanName
@Nullable protected final String getBeanName()
- Returns:
- The bean name that this listener container has been assigned in its containing bean factory, if any.
-
getApplicationContext
protected final ApplicationContext getApplicationContext()
-
setApplicationContext
public final void setApplicationContext(ApplicationContext applicationContext)
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
-
getConnectionFactory
public ConnectionFactory getConnectionFactory()
- Overrides:
getConnectionFactory
in classRabbitAccessor
- Returns:
- The ConnectionFactory that this accessor uses for obtaining RabbitMQ
Connections
.
-
setLookupKeyQualifier
public void setLookupKeyQualifier(String lookupKeyQualifier)
Set a qualifier that will prefix the connection factory lookup key; default none.- Parameters:
lookupKeyQualifier
- the qualifier- Since:
- 1.6.9
- See Also:
getRoutingLookupKey()
-
isForceCloseChannel
protected boolean isForceCloseChannel()
Force close the channel if the consumer threads don't respond to a shutdown.- Returns:
- true to force close.
- Since:
- 1.7.4
-
setForceCloseChannel
public void setForceCloseChannel(boolean forceCloseChannel)
Set to true to force close the channel if the consumer threads don't respond to a shutdown. Default: true (since 2.0).- Parameters:
forceCloseChannel
- true to force close.- Since:
- 1.7.4
-
getRoutingLookupKey
@Nullable protected String getRoutingLookupKey()
Return the lookup key if the connection factory is aRoutingConnectionFactory
; null otherwise. The routing key is the comma-delimited list of queue names with all spaces removed and bracketed by [...], optionally prefixed by a qualifier, e.g. "foo[...]".- Returns:
- the key or null.
- Since:
- 1.6.9
- See Also:
setLookupKeyQualifier(String)
-
getRoutingConnectionFactory
@Nullable protected RoutingConnectionFactory getRoutingConnectionFactory()
Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory
; null otherwise.- Returns:
- the
RoutingConnectionFactory
or null. - Since:
- 1.6.9
-
getListenerId
@Nullable public String getListenerId()
The 'id' attribute of the listener.- Returns:
- the id (or the container bean name if no id set).
-
setListenerId
public void setListenerId(String listenerId)
Description copied from interface:MessageListenerContainer
Set the listener id.- Specified by:
setListenerId
in interfaceMessageListenerContainer
- Parameters:
listenerId
- the id.
-
setConsumerTagStrategy
public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
Set the implementation ofConsumerTagStrategy
to generate consumer tags. By default, the RabbitMQ server generates consumer tags.- Parameters:
consumerTagStrategy
- the consumerTagStrategy to set.- Since:
- 1.4.5
-
getConsumerTagStrategy
@Nullable protected ConsumerTagStrategy getConsumerTagStrategy()
Return the consumer tag strategy to use.- Returns:
- the strategy.
- Since:
- 2.0
-
setConsumerArguments
public void setConsumerArguments(Map<String,Object> args)
Set consumer arguments.- Parameters:
args
- the arguments.- Since:
- 1.3
-
getConsumerArguments
public Map<String,Object> getConsumerArguments()
Return the consumer arguments.- Returns:
- the arguments.
- Since:
- 2.0
-
setExclusive
public void setExclusive(boolean exclusive)
Set to true for an exclusive consumer.- Parameters:
exclusive
- true for an exclusive consumer.
-
isExclusive
protected boolean isExclusive()
Return whether the consumers should be exclusive.- Returns:
- true for exclusive consumers.
-
setNoLocal
public void setNoLocal(boolean noLocal)
Set to true for an no-local consumer.- Parameters:
noLocal
- true for an no-local consumer.
-
isNoLocal
protected boolean isNoLocal()
Return whether the consumers should be no-local.- Returns:
- true for no-local consumers.
-
setDefaultRequeueRejected
public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
Set the default behavior when a message is rejected, for example because the listener threw an exception. When true, messages will be requeued, when false, they will not. For versions of Rabbit that support dead-lettering, the message must not be requeued in order to be sent to the dead letter exchange. Setting to false causes all rejections to not be requeued. When true, the default can be overridden by the listener throwing anAmqpRejectAndDontRequeueException
. Default true.- Parameters:
defaultRequeueRejected
- true to reject by default.
-
isDefaultRequeueRejected
protected boolean isDefaultRequeueRejected()
Return the default requeue rejected.- Returns:
- the boolean.
- Since:
- 2.0
- See Also:
setDefaultRequeueRejected(boolean)
-
setPrefetchCount
public void setPrefetchCount(int prefetchCount)
Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.- Parameters:
prefetchCount
- the prefetch count- See Also:
Channel.basicQos(int, boolean)
-
getPrefetchCount
protected int getPrefetchCount()
Return the prefetch count.- Returns:
- the count.
- Since:
- 2.0
-
setGlobalQos
public void setGlobalQos(boolean globalQos)
Apply prefetchCount to the entire channel.- Parameters:
globalQos
- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
Channel.basicQos(int, boolean)
-
isGlobalQos
protected boolean isGlobalQos()
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Defaults to 5 seconds.- Parameters:
shutdownTimeout
- the shutdown timeout to set
-
getShutdownTimeout
protected long getShutdownTimeout()
-
setIdleEventInterval
public void setIdleEventInterval(long idleEventInterval)
How often to emitListenerContainerIdleEvent
s in milliseconds.- Parameters:
idleEventInterval
- the interval.
-
getIdleEventInterval
protected long getIdleEventInterval()
-
getLastReceive
protected long getLastReceive()
Get the time the last message was received - initialized to container start time.- Returns:
- the time.
-
setTransactionManager
public void setTransactionManager(PlatformTransactionManager transactionManager)
Set the transaction manager to use.- Parameters:
transactionManager
- the transaction manager.
-
getTransactionManager
@Nullable protected PlatformTransactionManager getTransactionManager()
-
setTransactionAttribute
public void setTransactionAttribute(TransactionAttribute transactionAttribute)
Set the transaction attribute to use when using an external transaction manager.- Parameters:
transactionAttribute
- the transaction attribute to set
-
getTransactionAttribute
protected TransactionAttribute getTransactionAttribute()
-
setTaskExecutor
public void setTaskExecutor(Executor taskExecutor)
Set a task executor for the container - used to create the consumers not at runtime.- Parameters:
taskExecutor
- the task executor.
-
getTaskExecutor
protected Executor getTaskExecutor()
-
setRecoveryInterval
public void setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.- Parameters:
recoveryInterval
- The recovery interval.
-
setRecoveryBackOff
public void setRecoveryBackOff(BackOff recoveryBackOff)
Specify theBackOff
for interval between recovery attempts. The default is 5000 ms, that is, 5 seconds. With theBackOff
you can supply themaxAttempts
for recovery before thestop()
will be performed.- Parameters:
recoveryBackOff
- The BackOff to recover.- Since:
- 1.5
-
getRecoveryBackOff
protected BackOff getRecoveryBackOff()
-
setMessagePropertiesConverter
public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set theMessagePropertiesConverter
for this listener container.- Parameters:
messagePropertiesConverter
- The properties converter.
-
getMessagePropertiesConverter
protected MessagePropertiesConverter getMessagePropertiesConverter()
-
setAmqpAdmin
public void setAmqpAdmin(AmqpAdmin amqpAdmin)
Set theAmqpAdmin
, used to declare any auto-delete queues, bindings etc when the container is started. Only needed if those queues use conditional declaration (have a 'declared-by' attribute). If not specified, an internal admin will be used which will attempt to declare all elements not having a 'declared-by' attribute.- Parameters:
amqpAdmin
- the AmqpAdmin to use- Since:
- 2.1
-
setMissingQueuesFatal
public void setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
- Parameters:
missingQueuesFatal
- the missingQueuesFatal to set.- Since:
- 1.3.5
- See Also:
setAutoDeclare(boolean)
-
isMissingQueuesFatal
protected boolean isMissingQueuesFatal()
-
isMissingQueuesFatalSet
protected boolean isMissingQueuesFatalSet()
-
setMismatchedQueuesFatal
public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc). Default false.- Parameters:
mismatchedQueuesFatal
- true to fail initialization when this condition occurs.- Since:
- 1.6
-
isMismatchedQueuesFatal
protected boolean isMismatchedQueuesFatal()
-
setPossibleAuthenticationFailureFatal
public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
-
doSetPossibleAuthenticationFailureFatal
protected final void doSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
-
isPossibleAuthenticationFailureFatal
public boolean isPossibleAuthenticationFailureFatal()
-
isPossibleAuthenticationFailureFatalSet
protected boolean isPossibleAuthenticationFailureFatalSet()
-
isAsyncReplies
protected boolean isAsyncReplies()
-
setAutoDeclare
public void setAutoDeclare(boolean autoDeclare)
Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().- Parameters:
autoDeclare
- the boolean flag to indicate an declaration operation.- Since:
- 1.4
- See Also:
redeclareElementsIfNecessary()
-
isAutoDeclare
protected boolean isAutoDeclare()
-
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval
- the interval, default 5000.- Since:
- 1.3.9
-
getFailedDeclarationRetryInterval
protected long getFailedDeclarationRetryInterval()
-
isStatefulRetryFatalWithNullMessageId
protected boolean isStatefulRetryFatalWithNullMessageId()
-
setStatefulRetryFatalWithNullMessageId
public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)
Set whether a message with a null messageId is fatal for the consumer when using stateful retry. When false, instead of stopping the consumer, the message is rejected and not requeued - it will be discarded or routed to the dead letter queue, if so configured. Default true.- Parameters:
statefulRetryFatalWithNullMessageId
- true for fatal.- Since:
- 2.0
-
setExclusiveConsumerExceptionLogger
public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
Set aConditionalExceptionLogger
for logging exclusive consumer failures. The default is to log such failures at WARN level.- Parameters:
exclusiveConsumerExceptionLogger
- the conditional exception logger.- Since:
- 1.5
-
getExclusiveConsumerExceptionLogger
protected ConditionalExceptionLogger getExclusiveConsumerExceptionLogger()
-
setAlwaysRequeueWithTxManagerRollback
public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
Set to true to always requeue on transaction rollback with an externalTransactionManager
. With earlier releases, when a transaction manager was configured, a transaction rollback always requeued the message. This was inconsistent with local transactions where the normaldefaultRequeueRejected
andAmqpRejectAndDontRequeueException
logic was honored to determine whether the message was requeued. RabbitMQ does not consider the message delivery to be part of the transaction. This boolean was introduced in 1.7.1, set to true by default, to be consistent with previous behavior. Starting with version 2.0, it is false by default.- Parameters:
alwaysRequeueWithTxManagerRollback
- true to always requeue on rollback.- Since:
- 1.7.1.
-
isAlwaysRequeueWithTxManagerRollback
protected boolean isAlwaysRequeueWithTxManagerRollback()
-
setErrorHandlerLoggerName
public void setErrorHandlerLoggerName(String errorHandlerLoggerName)
Set the name (category) of the logger used to log exceptions thrown by the error handler. It defaults to the container's logger but can be overridden if you want it to log at a different level to the container. Such exceptions are logged at the ERROR level.- Parameters:
errorHandlerLoggerName
- the logger name.- Since:
- 2.0.8
-
setBatchingStrategy
public void setBatchingStrategy(BatchingStrategy batchingStrategy)
Set a batching strategy to use when de-batching messages. Default isSimpleBatchingStrategy
.- Parameters:
batchingStrategy
- the strategy.- Since:
- 2.2
- See Also:
setDeBatchingEnabled(boolean)
-
getBatchingStrategy
protected BatchingStrategy getBatchingStrategy()
-
getAfterReceivePostProcessors
protected Collection<MessagePostProcessor> getAfterReceivePostProcessors()
-
setMicrometerTags
public void setMicrometerTags(Map<String,String> tags)
Set additional tags for the Micrometer listener timers.- Parameters:
tags
- the tags.- Since:
- 2.2
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable micrometer listener timers.- Parameters:
micrometerEnabled
- false to disable.- Since:
- 2.2
-
getConsumeDelay
protected long getConsumeDelay()
Get the consumeDelay - a time to wait before consuming in ms.- Returns:
- the consume delay.
- Since:
- 2.3
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay)
Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1
, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay
- the consume delay.- Since:
- 2.3
-
getJavaLangErrorHandler
protected AbstractMessageListenerContainer.JavaLangErrorHandler getJavaLangErrorHandler()
-
setjavaLangErrorHandler
public void setjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler)
Provide a JavaLangErrorHandler implementation; by default,System.exit(99)
is called.- Parameters:
javaLangErrorHandler
- the handler.- Since:
- 2.2.12
-
afterPropertiesSet
public void afterPropertiesSet()
Delegates tovalidateConfiguration()
andinitialize()
.- Specified by:
afterPropertiesSet
in interfaceInitializingBean
- Specified by:
afterPropertiesSet
in interfaceMessageListenerContainer
- Overrides:
afterPropertiesSet
in classRabbitAccessor
-
setupMessageListener
public void setupMessageListener(MessageListener messageListener)
Description copied from interface:MessageListenerContainer
Setup the message listener to use. Throws anIllegalArgumentException
if that message listener type is not supported.- Specified by:
setupMessageListener
in interfaceMessageListenerContainer
- Parameters:
messageListener
- theobject
to wrapped to theMessageListener
.
-
validateConfiguration
protected void validateConfiguration()
Validate the configuration of this container.The default implementation is empty. To be overridden in subclasses.
-
initializeProxy
protected void initializeProxy(Object delegate)
-
destroy
public void destroy()
Callsshutdown()
when the BeanFactory destroys the container instance.- Specified by:
destroy
in interfaceDisposableBean
- See Also:
shutdown()
-
initialize
public void initialize()
Initialize this container.Creates a Rabbit Connection and calls
doInitialize()
.
-
shutdown
public void shutdown()
Stop the shared Connection, calldoShutdown()
, and close this container.
-
doInitialize
protected abstract void doInitialize()
Register any invokers within this container.Subclasses need to implement this method for their specific invoker management process.
-
doShutdown
protected abstract void doShutdown()
Close the registered invokers.Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
- See Also:
shutdown()
-
isActive
public final boolean isActive()
- Returns:
- Whether this container is currently active, that is, whether it has been set up but not shut down yet.
-
start
public void start()
Start this container.
-
doStart
protected void doStart()
Start this container, and notify all invoker tasks.
-
stop
public void stop()
Stop this container.
-
stop
public void stop(Runnable callback)
- Specified by:
stop
in interfaceSmartLifecycle
-
doStop
protected void doStop()
This method is invoked when the container is stopping.
-
isRunning
public final boolean isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
-
invokeErrorHandler
protected void invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any. Log at error level otherwise. The default error handler is aConditionalRejectingErrorHandler
with the defaultFatalExceptionStrategy
implementation.- Parameters:
ex
- the uncaught error that arose during Rabbit processing.- See Also:
setErrorHandler(org.springframework.util.ErrorHandler)
-
executeListener
protected void executeListener(com.rabbitmq.client.Channel channel, Object data)
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).- Parameters:
channel
- the Rabbit Channel to operate ondata
- the received Rabbit Message- See Also:
invokeListener(com.rabbitmq.client.Channel, java.lang.Object)
,handleListenerException(java.lang.Throwable)
-
invokeListener
protected void invokeListener(com.rabbitmq.client.Channel channel, Object data)
-
actualInvokeListener
protected void actualInvokeListener(com.rabbitmq.client.Channel channel, Object data)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.- Parameters:
channel
- the Rabbit Channel to operate ondata
- the received Rabbit Message or List of Message.- See Also:
setMessageListener(MessageListener)
-
doInvokeListener
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded. An exception thrown from the listener will be wrapped in aListenerExecutionFailedException
.- Parameters:
listener
- the Spring ChannelAwareMessageListener to invokechannel
- the Rabbit Channel to operate ondata
- the received Rabbit Message or List of Message.- See Also:
ChannelAwareMessageListener
,setExposeListenerChannel(boolean)
-
doInvokeListener
protected void doInvokeListener(MessageListener listener, Object data)
Invoke the specified listener as Spring Rabbit MessageListener.Default implementation performs a plain invocation of the
onMessage
method.Exception thrown from listener will be wrapped to
ListenerExecutionFailedException
.- Parameters:
listener
- the Rabbit MessageListener to invokedata
- the received Rabbit Message or List of Message.- See Also:
MessageListener.onMessage(org.springframework.amqp.core.Message)
-
isChannelLocallyTransacted
protected boolean isChannelLocallyTransacted()
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
- Returns:
- whether the given Channel is locally transacted
- See Also:
RabbitAccessor.isChannelTransacted()
-
handleListenerException
protected void handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
- Parameters:
ex
- the exception to handle
-
wrapToListenerExecutionFailedExceptionIfNeeded
protected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data)
- Parameters:
e
- The Exception.data
- The failed message.- Returns:
- If 'e' is of type
ListenerExecutionFailedException
- return 'e' as it is, otherwise wrap it toListenerExecutionFailedException
and return.
-
publishConsumerFailedEvent
protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t)
-
publishMissingQueueEvent
protected void publishMissingQueueEvent(String queue)
-
publishIdleContainerEvent
protected final void publishIdleContainerEvent(long idleTime)
-
updateLastReceive
protected void updateLastReceive()
-
configureAdminIfNeeded
protected void configureAdminIfNeeded()
-
checkMismatchedQueues
protected void checkMismatchedQueues()
-
lazyLoad
public void lazyLoad()
Description copied from interface:MessageListenerContainer
Do not check for missing or mismatched queues during startup. Used for lazily loaded message listener containers to avoid a deadlock when starting such containers. Applications lazily loading containers should verify the queue configuration before loading the container bean.- Specified by:
lazyLoad
in interfaceMessageListenerContainer
-
redeclareElementsIfNecessary
protected void redeclareElementsIfNecessary()
UseAmqpAdmin.initialize()
to redeclare everything if necessary. Since auto deletion of a queue can cause upstream elements (bindings, exchanges) to be deleted too, everything needs to be redeclared if a queue is missing. Declaration is idempotent so, aside from some network chatter, there is no issue, and we only will do it if we detect our queue is gone.In general it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.
Starting with version 1.6, if
mismatchedQueuesFatal
is true, the declarations are always attempted during restart so the listener will fail with a fatal error if mismatches occur.
-
causeChainHasImmediateAcknowledgeAmqpException
protected boolean causeChainHasImmediateAcknowledgeAmqpException(Throwable ex)
Traverse the cause chain and, if anImmediateAcknowledgeAmqpException
is found before anAmqpRejectAndDontRequeueException
, return true. AnError
will take precedence.- Parameters:
ex
- the exception- Returns:
- true if we should ack immediately.
- Since:
- 1.6.6
-
prepareHolderForRollback
protected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception)
A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.TransactionDefinition.PROPAGATION_NONE
). In that case the delivery tags will have been processed manually.- Parameters:
resourceHolder
- the bound resource holder (if a transaction is active).exception
- the exception.
-
-