Class AbstractMessageListenerContainer

All Implemented Interfaces:
MessageListenerContainer, Aware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ApplicationEventPublisherAware, Lifecycle, Phased, SmartLifecycle
Direct Known Subclasses:
DirectMessageListenerContainer, SimpleMessageListenerContainer

public abstract class AbstractMessageListenerContainer extends ObservableListenerContainer implements ApplicationEventPublisherAware
Author:
Mark Pollack, Mark Fisher, Dave Syer, James Carr, Gary Russell, Alex Panchenko, Johno Crawford, Arnaud Cogoluègnes, Artem Bilan, Mohammad Hewedy, Mat Jaggard
  • Field Details

    • DEFAULT_DEBATCHING_ENABLED

      public static final boolean DEFAULT_DEBATCHING_ENABLED
      See Also:
    • DEFAULT_PREFETCH_COUNT

      public static final int DEFAULT_PREFETCH_COUNT
      See Also:
    • DEFAULT_RECOVERY_INTERVAL

      public static final long DEFAULT_RECOVERY_INTERVAL
      The default recovery interval: 5000 ms = 5 seconds.
      See Also:
    • DEFAULT_SHUTDOWN_TIMEOUT

      public static final long DEFAULT_SHUTDOWN_TIMEOUT
      See Also:
    • lifecycleLock

      protected final Lock lifecycleLock
    • consumersLock

      protected final Lock consumersLock
    • stopNow

      protected final AtomicBoolean stopNow
  • Constructor Details

    • AbstractMessageListenerContainer

      public AbstractMessageListenerContainer()
  • Method Details

    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface ApplicationEventPublisherAware
    • getApplicationEventPublisher

      @Nullable protected ApplicationEventPublisher getApplicationEventPublisher()
    • setAcknowledgeMode

      public final void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)

      Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).

      Set to AcknowledgeMode.MANUAL if the listener will send the acknowledgements itself using Channel.basicAck(long, boolean). Manual acks are consistent with either a transactional or non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary.

      Set to AcknowledgeMode.NONE to tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If AcknowledgeMode.NONE then the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).

      Parameters:
      acknowledgeMode - the acknowledge mode to set. Defaults to AcknowledgeMode.AUTO
      See Also:
    • getAcknowledgeMode

      public AcknowledgeMode getAcknowledgeMode()
      Returns:
      the acknowledgeMode
    • setQueueNames

      public void setQueueNames(String... queueName)
      Set the name of the queue(s) to receive messages from.
      Specified by:
      setQueueNames in interface MessageListenerContainer
      Parameters:
      queueName - the desired queueName(s) (can not be null)
    • setQueues

      public final void setQueues(Queue... queues)
      Set the name of the queue(s) to receive messages from.
      Parameters:
      queues - the desired queue(s) (can not be null)
    • getQueueNames

      public String[] getQueueNames()
      Returns:
      the name of the queues to receive messages from.
    • getQueueNamesAsSet

      protected Set<String> getQueueNamesAsSet()
    • getQueueNamesToQueues

      protected Map<String,Queue> getQueueNamesToQueues()
      Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.
      Returns:
      the map.
      Since:
      2.1
    • addQueueNames

      public void addQueueNames(String... queueNames)
      Add queue(s) to this container's list of queues.
      Parameters:
      queueNames - The queue(s) to add.
    • addQueues

      public void addQueues(Queue... queues)
      Add queue(s) to this container's list of queues.
      Parameters:
      queues - The queue(s) to add.
    • removeQueueNames

      public boolean removeQueueNames(String... queueNames)
      Remove queue(s) from this container's list of queues.
      Parameters:
      queueNames - The queue(s) to remove.
      Returns:
      the boolean result of removal on the target queueNames List.
    • removeQueues

      public boolean removeQueues(Queue... queues)
      Remove queue(s) from this container's list of queues.
      Parameters:
      queues - The queue(s) to remove.
      Returns:
      the boolean result of removal on the target queueNames List.
    • isExposeListenerChannel

      public boolean isExposeListenerChannel()
      Returns:
      whether to expose the listener Channel to a registered ChannelAwareMessageListener.
    • setExposeListenerChannel

      public void setExposeListenerChannel(boolean exposeListenerChannel)
      Set whether to expose the listener Rabbit Channel to a registered ChannelAwareMessageListener as well as to RabbitTemplate calls.

      Default is "true", reusing the listener's Channel. Turn this off to expose a fresh Rabbit Channel fetched from the same underlying Rabbit Connection instead.

      Note that Channels managed by an external transaction manager will always get exposed to RabbitTemplate calls. So in terms of RabbitTemplate exposure, this setting only affects locally transacted Channels.

      Parameters:
      exposeListenerChannel - true to expose the channel.
      See Also:
    • setMessageListener

      public void setMessageListener(MessageListener messageListener)
      Parameters:
      messageListener - the listener.
      Since:
      2.0
    • checkMessageListener

      protected void checkMessageListener(Object listener)
      Check the given message listener, throwing an exception if it does not correspond to a supported listener type.

      Only a Spring MessageListener object will be accepted.

      Parameters:
      listener - the message listener object to check
      Throws:
      IllegalArgumentException - if the supplied listener is not a MessageListener
      See Also:
    • getMessageListener

      @Nullable public MessageListener getMessageListener()
      Get a reference to the message listener.
      Specified by:
      getMessageListener in interface MessageListenerContainer
      Returns:
      the message listener.
    • setErrorHandler

      public void setErrorHandler(ErrorHandler errorHandler)
      Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, a ConditionalRejectingErrorHandler with its default list of fatal exceptions will be used.
      Parameters:
      errorHandler - The error handler.
    • setDeBatchingEnabled

      public void setDeBatchingEnabled(boolean deBatchingEnabled)
      Determine whether the container should de-batch batched messages (true) or call the listener with the batch (false). Default: true.
      Parameters:
      deBatchingEnabled - the deBatchingEnabled to set.
      See Also:
    • isDeBatchingEnabled

      protected boolean isDeBatchingEnabled()
    • setAdviceChain

      public void setAdviceChain(Advice... adviceChain)
      Public setter for the Advice to apply to listener executions.

      If a {code #setTransactionManager(PlatformTransactionManager) transactionManager} is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).

      Parameters:
      adviceChain - the advice chain to set
    • getAdviceChain

      protected Advice[] getAdviceChain()
    • setAfterReceivePostProcessors

      public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
      Set MessagePostProcessors that will be applied after message reception, before invoking the MessageListener. Often used to decompress data. Processors are invoked in order, depending on PriorityOrder, Order and finally unordered.
      Parameters:
      afterReceivePostProcessors - the post processor.
      Since:
      1.4.2
      See Also:
    • addAfterReceivePostProcessors

      public void addAfterReceivePostProcessors(MessagePostProcessor... postprocessors)
      Add MessagePostProcessors that will be applied after message reception, before invoking the MessageListener. Often used to decompress data. Processors are invoked in order, depending on PriorityOrder, Order and finally unordered.

      In contrast to setAfterReceivePostProcessors(MessagePostProcessor...), this method does not override the previously added afterReceivePostProcessors.

      Parameters:
      postprocessors - the post processor.
      Since:
      2.1.4
    • removeAfterReceivePostProcessor

      public boolean removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
      Remove the provided MessagePostProcessor from the afterReceivePostProcessors list.
      Parameters:
      afterReceivePostProcessor - the MessagePostProcessor to remove.
      Returns:
      the boolean if the provided post processor has been removed.
      Since:
      2.1.4
      See Also:
    • setAutoStartup

      public void setAutoStartup(boolean autoStartup)
      Set whether to automatically start the container after initialization.

      Default is "true"; set this to "false" to allow for manual startup through the start() method.

      Specified by:
      setAutoStartup in interface MessageListenerContainer
      Parameters:
      autoStartup - true for auto startup.
    • isAutoStartup

      public boolean isAutoStartup()
      Specified by:
      isAutoStartup in interface SmartLifecycle
    • setPhase

      public void setPhase(int phase)
      Specify the phase in which this container should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible.
      Parameters:
      phase - The phase.
    • getPhase

      public int getPhase()
      Specified by:
      getPhase in interface Phased
      Specified by:
      getPhase in interface SmartLifecycle
      Returns:
      The phase in which this container will be started and stopped.
    • getConnectionFactory

      public ConnectionFactory getConnectionFactory()
      Overrides:
      getConnectionFactory in class RabbitAccessor
      Returns:
      The ConnectionFactory that this accessor uses for obtaining RabbitMQ Connections.
    • setLookupKeyQualifier

      public void setLookupKeyQualifier(String lookupKeyQualifier)
      Set a qualifier that will prefix the connection factory lookup key; default none.
      Parameters:
      lookupKeyQualifier - the qualifier
      Since:
      1.6.9
      See Also:
    • isForceCloseChannel

      protected boolean isForceCloseChannel()
      Force close the channel if the consumer threads don't respond to a shutdown.
      Returns:
      true to force close.
      Since:
      1.7.4
    • setForceCloseChannel

      public void setForceCloseChannel(boolean forceCloseChannel)
      Set to true to force close the channel if the consumer threads don't respond to a shutdown. Default: true (since 2.0).
      Parameters:
      forceCloseChannel - true to force close.
      Since:
      1.7.4
    • getRoutingLookupKey

      @Nullable protected String getRoutingLookupKey()
      Return the lookup key if the connection factory is a RoutingConnectionFactory; null otherwise. The routing key is the comma-delimited list of queue names with all spaces removed and bracketed by [...], optionally prefixed by a qualifier, e.g. "foo[...]".
      Returns:
      the key or null.
      Since:
      1.6.9
      See Also:
    • getRoutingConnectionFactory

      @Nullable protected RoutingConnectionFactory getRoutingConnectionFactory()
      Return the (@link RoutingConnectionFactory} if the connection factory is a RoutingConnectionFactory; null otherwise.
      Returns:
      the RoutingConnectionFactory or null.
      Since:
      1.6.9
    • setConsumerTagStrategy

      public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
      Set the implementation of ConsumerTagStrategy to generate consumer tags. By default, the RabbitMQ server generates consumer tags.
      Parameters:
      consumerTagStrategy - the consumerTagStrategy to set.
      Since:
      1.4.5
    • getConsumerTagStrategy

      @Nullable protected ConsumerTagStrategy getConsumerTagStrategy()
      Return the consumer tag strategy to use.
      Returns:
      the strategy.
      Since:
      2.0
    • setConsumerArguments

      public void setConsumerArguments(Map<String,Object> args)
      Set consumer arguments.
      Parameters:
      args - the arguments.
      Since:
      1.3
    • getConsumerArguments

      public Map<String,Object> getConsumerArguments()
      Return the consumer arguments.
      Returns:
      the arguments.
      Since:
      2.0
    • setExclusive

      public void setExclusive(boolean exclusive)
      Set to true for an exclusive consumer.
      Parameters:
      exclusive - true for an exclusive consumer.
    • isExclusive

      protected boolean isExclusive()
      Return whether the consumers should be exclusive.
      Returns:
      true for exclusive consumers.
    • setNoLocal

      public void setNoLocal(boolean noLocal)
      Set to true for an no-local consumer.
      Parameters:
      noLocal - true for an no-local consumer.
    • isNoLocal

      protected boolean isNoLocal()
      Return whether the consumers should be no-local.
      Returns:
      true for no-local consumers.
    • setDefaultRequeueRejected

      public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
      Set the default behavior when a message is rejected, for example because the listener threw an exception. When true, messages will be requeued, when false, they will not. For versions of Rabbit that support dead-lettering, the message must not be requeued in order to be sent to the dead letter exchange. Setting to false causes all rejections to not be requeued. When true, the default can be overridden by the listener throwing an AmqpRejectAndDontRequeueException. Default true.
      Parameters:
      defaultRequeueRejected - true to reject by default.
    • isDefaultRequeueRejected

      protected boolean isDefaultRequeueRejected()
      Return the default requeue rejected.
      Returns:
      the boolean.
      Since:
      2.0
      See Also:
    • setPrefetchCount

      public void setPrefetchCount(int prefetchCount)
      Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.
      Parameters:
      prefetchCount - the prefetch count
      See Also:
      • Channel.basicQos(int, boolean)
    • getPrefetchCount

      protected int getPrefetchCount()
      Return the prefetch count.
      Returns:
      the count.
      Since:
      2.0
    • setGlobalQos

      public void setGlobalQos(boolean globalQos)
      Apply prefetchCount to the entire channel.
      Parameters:
      globalQos - true for a channel-wide prefetch.
      Since:
      2.2.17
      See Also:
      • Channel.basicQos(int, boolean)
    • isGlobalQos

      protected boolean isGlobalQos()
    • setShutdownTimeout

      public void setShutdownTimeout(long shutdownTimeout)
      The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Defaults to 5 seconds.
      Parameters:
      shutdownTimeout - the shutdown timeout to set
    • getShutdownTimeout

      protected long getShutdownTimeout()
    • setIdleEventInterval

      public void setIdleEventInterval(long idleEventInterval)
      How often to emit ListenerContainerIdleEvents in milliseconds.
      Parameters:
      idleEventInterval - the interval.
    • getIdleEventInterval

      protected long getIdleEventInterval()
    • getLastReceive

      protected long getLastReceive()
      Get the time the last message was received - initialized to container start time.
      Returns:
      the time.
    • setTransactionManager

      public void setTransactionManager(PlatformTransactionManager transactionManager)
      Set the transaction manager to use.
      Parameters:
      transactionManager - the transaction manager.
    • getTransactionManager

      @Nullable protected PlatformTransactionManager getTransactionManager()
    • setTransactionAttribute

      public void setTransactionAttribute(TransactionAttribute transactionAttribute)
      Set the transaction attribute to use when using an external transaction manager.
      Parameters:
      transactionAttribute - the transaction attribute to set
    • getTransactionAttribute

      protected TransactionAttribute getTransactionAttribute()
    • setTaskExecutor

      public void setTaskExecutor(Executor taskExecutor)
      Set a task executor for the container - used to create the consumers not at runtime.
      Parameters:
      taskExecutor - the task executor.
    • getTaskExecutor

      protected Executor getTaskExecutor()
    • setRecoveryInterval

      public void setRecoveryInterval(long recoveryInterval)
      Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.
      Parameters:
      recoveryInterval - The recovery interval.
    • setRecoveryBackOff

      public void setRecoveryBackOff(BackOff recoveryBackOff)
      Specify the BackOff for interval between recovery attempts. The default is 5000 ms, that is, 5 seconds. With the BackOff you can supply the maxAttempts for recovery before the stop() will be performed.
      Parameters:
      recoveryBackOff - The BackOff to recover.
      Since:
      1.5
    • getRecoveryBackOff

      protected BackOff getRecoveryBackOff()
    • setMessagePropertiesConverter

      public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
      Set the MessagePropertiesConverter for this listener container.
      Parameters:
      messagePropertiesConverter - The properties converter.
    • getMessagePropertiesConverter

      protected MessagePropertiesConverter getMessagePropertiesConverter()
    • getAmqpAdmin

      @Nullable protected AmqpAdmin getAmqpAdmin()
    • setAmqpAdmin

      public void setAmqpAdmin(AmqpAdmin amqpAdmin)
      Set the AmqpAdmin, used to declare any auto-delete queues, bindings etc. when the container is started. Only needed if those queues use conditional declaration (have a 'declared-by' attribute). If not specified, an internal admin will be used which will attempt to declare all elements not having a 'declared-by' attribute.
      Parameters:
      amqpAdmin - the AmqpAdmin to use
      Since:
      2.1
    • setMissingQueuesFatal

      public void setMissingQueuesFatal(boolean missingQueuesFatal)
      If all the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.

      When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.

      Parameters:
      missingQueuesFatal - the missingQueuesFatal to set.
      Since:
      1.3.5
      See Also:
    • isMissingQueuesFatal

      protected boolean isMissingQueuesFatal()
    • isMissingQueuesFatalSet

      protected boolean isMissingQueuesFatalSet()
    • setMismatchedQueuesFatal

      public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
      Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc.). Default false.
      Parameters:
      mismatchedQueuesFatal - true to fail initialization when this condition occurs.
      Since:
      1.6
    • isMismatchedQueuesFatal

      protected boolean isMismatchedQueuesFatal()
    • setPossibleAuthenticationFailureFatal

      public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
    • doSetPossibleAuthenticationFailureFatal

      protected final void doSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
    • isPossibleAuthenticationFailureFatal

      public boolean isPossibleAuthenticationFailureFatal()
    • isPossibleAuthenticationFailureFatalSet

      protected boolean isPossibleAuthenticationFailureFatalSet()
    • isAsyncReplies

      protected boolean isAsyncReplies()
    • setAutoDeclare

      public void setAutoDeclare(boolean autoDeclare)
      Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().
      Parameters:
      autoDeclare - the boolean flag to indicate an declaration operation.
      Since:
      1.4
      See Also:
    • isAutoDeclare

      protected boolean isAutoDeclare()
    • setFailedDeclarationRetryInterval

      public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
      Set the interval between passive queue declaration attempts in milliseconds.
      Parameters:
      failedDeclarationRetryInterval - the interval, default 5000.
      Since:
      1.3.9
    • getFailedDeclarationRetryInterval

      protected long getFailedDeclarationRetryInterval()
    • isStatefulRetryFatalWithNullMessageId

      protected boolean isStatefulRetryFatalWithNullMessageId()
    • setStatefulRetryFatalWithNullMessageId

      public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)
      Set whether a message with a null messageId is fatal for the consumer when using stateful retry. When false, instead of stopping the consumer, the message is rejected and not requeued - it will be discarded or routed to the dead letter queue, if so configured. Default true.
      Parameters:
      statefulRetryFatalWithNullMessageId - true for fatal.
      Since:
      2.0
    • setExclusiveConsumerExceptionLogger

      public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
      Set a ConditionalExceptionLogger for logging exclusive consumer failures. The default is to log such failures at DEBUG level (since 3.1, previously WARN).
      Parameters:
      exclusiveConsumerExceptionLogger - the conditional exception logger.
      Since:
      1.5
    • getExclusiveConsumerExceptionLogger

      protected ConditionalExceptionLogger getExclusiveConsumerExceptionLogger()
    • setAlwaysRequeueWithTxManagerRollback

      public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
      Set to true to always requeue on transaction rollback with an external TransactionManager. With earlier releases, when a transaction manager was configured, a transaction rollback always requeued the message. This was inconsistent with local transactions where the normal defaultRequeueRejected and AmqpRejectAndDontRequeueException logic was honored to determine whether the message was requeued. RabbitMQ does not consider the message delivery to be part of the transaction. This boolean was introduced in 1.7.1, set to true by default, to be consistent with previous behavior. Starting with version 2.0, it is false by default.
      Parameters:
      alwaysRequeueWithTxManagerRollback - true to always requeue on rollback.
      Since:
      1.7.1.
    • isAlwaysRequeueWithTxManagerRollback

      protected boolean isAlwaysRequeueWithTxManagerRollback()
    • setErrorHandlerLoggerName

      public void setErrorHandlerLoggerName(String errorHandlerLoggerName)
      Set the name (category) of the logger used to log exceptions thrown by the error handler. It defaults to the container's logger but can be overridden if you want it to log at a different level to the container. Such exceptions are logged at the ERROR level.
      Parameters:
      errorHandlerLoggerName - the logger name.
      Since:
      2.0.8
    • setBatchingStrategy

      public void setBatchingStrategy(BatchingStrategy batchingStrategy)
      Set a batching strategy to use when de-batching messages. Default is SimpleBatchingStrategy.
      Parameters:
      batchingStrategy - the strategy.
      Since:
      2.2
      See Also:
    • getBatchingStrategy

      protected BatchingStrategy getBatchingStrategy()
    • getAfterReceivePostProcessors

      protected Collection<MessagePostProcessor> getAfterReceivePostProcessors()
    • setObservationConvention

      public void setObservationConvention(RabbitListenerObservationConvention observationConvention)
      Set an observation convention; used to add additional key/values to observations.
      Parameters:
      observationConvention - the convention.
      Since:
      3.0
    • getConsumeDelay

      protected long getConsumeDelay()
      Get the consumeDelay - a time to wait before consuming in ms.
      Returns:
      the consume delay.
      Since:
      2.3
    • setConsumeDelay

      public void setConsumeDelay(long consumeDelay)
      Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin with concurrency > 1, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.
      Parameters:
      consumeDelay - the consume delay.
      Since:
      2.3
    • getJavaLangErrorHandler

      protected AbstractMessageListenerContainer.JavaLangErrorHandler getJavaLangErrorHandler()
    • setjavaLangErrorHandler

      public void setjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler)
      Provide a JavaLangErrorHandler implementation; by default, System.exit(99) is called.
      Parameters:
      javaLangErrorHandler - the handler.
      Since:
      2.2.12
    • setMessageAckListener

      public void setMessageAckListener(MessageAckListener messageAckListener)
      Set a MessageAckListener to use when ack a message(messages) in AcknowledgeMode.AUTO mode.
      Parameters:
      messageAckListener - the messageAckListener.
      Since:
      2.4.6
      See Also:
    • getMessageAckListener

      protected MessageAckListener getMessageAckListener()
    • isForceStop

      protected boolean isForceStop()
      Stop container after current message(s) are processed and requeue any prefetched.
      Returns:
      true to stop when current message(s) are processed.
      Since:
      2.4.14
    • setForceStop

      public void setForceStop(boolean forceStop)
      Set to true to stop the container after the current message(s) are processed and requeue any prefetched. Useful when using exclusive or single-active consumers.
      Parameters:
      forceStop - true to stop when current message(s) are processed.
      Since:
      2.4.14
    • afterPropertiesSet

      public void afterPropertiesSet()
      Specified by:
      afterPropertiesSet in interface InitializingBean
      Specified by:
      afterPropertiesSet in interface MessageListenerContainer
      Overrides:
      afterPropertiesSet in class RabbitAccessor
    • setupMessageListener

      public void setupMessageListener(MessageListener messageListener)
      Description copied from interface: MessageListenerContainer
      Setup the message listener to use. Throws an IllegalArgumentException if that message listener type is not supported.
      Specified by:
      setupMessageListener in interface MessageListenerContainer
      Parameters:
      messageListener - the object to wrapped to the MessageListener.
    • validateConfiguration

      protected void validateConfiguration()
      Validate the configuration of this container.

      The default implementation is empty. To be overridden in subclasses.

    • initializeProxy

      protected void initializeProxy(Object delegate)
    • destroy

      public void destroy()
      Calls shutdown() when the BeanFactory destroys the container instance.
      Specified by:
      destroy in interface DisposableBean
      Overrides:
      destroy in class ObservableListenerContainer
      See Also:
    • initialize

      public void initialize()
      Initialize this container.

      Creates a Rabbit Connection and calls doInitialize().

    • shutdown

      public void shutdown()
      Stop the shared Connection, call shutdown(Runnable), and close this container.
    • shutdown

      public void shutdown(@Nullable Runnable callback)
      Stop the shared Connection, call shutdownAndWaitOrCallback(Runnable), and close this container.
      Parameters:
      callback - an optional Runnable to call when the stop is complete.
    • setNotRunning

      protected void setNotRunning()
    • doInitialize

      protected abstract void doInitialize()
      Register any invokers within this container.

      Subclasses need to implement this method for their specific invoker management process.

    • doShutdown

      protected void doShutdown()
      Close the registered invokers.

      Subclasses need to implement this method for their specific invoker management process.

      A shared Rabbit Connection, if any, will automatically be closed afterwards.

      See Also:
    • stop

      public void stop(Runnable callback)
      Specified by:
      stop in interface SmartLifecycle
    • shutdownAndWaitOrCallback

      protected void shutdownAndWaitOrCallback(@Nullable Runnable callback)
    • isActive

      public final boolean isActive()
      Returns:
      Whether this container is currently active, that is, whether it has been set up but not shut down yet.
    • start

      public void start()
      Start this container.
      Specified by:
      start in interface Lifecycle
      See Also:
    • doStart

      protected void doStart()
      Start this container, and notify all invoker tasks.
    • stop

      public void stop()
      Stop this container.
      Specified by:
      stop in interface Lifecycle
      See Also:
    • doStop

      protected void doStop()
      This method is invoked when the container is stopping.
    • isRunning

      public final boolean isRunning()
      Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
      Specified by:
      isRunning in interface Lifecycle
      See Also:
    • invokeErrorHandler

      protected void invokeErrorHandler(Throwable ex)
      Invoke the registered ErrorHandler, if any. Log at error level otherwise. The default error handler is a ConditionalRejectingErrorHandler with the default FatalExceptionStrategy implementation.
      Parameters:
      ex - the uncaught error that arose during Rabbit processing.
      See Also:
    • executeListener

      protected void executeListener(com.rabbitmq.client.Channel channel, Object data)
      Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
      Parameters:
      channel - the Rabbit Channel to operate on
      data - the received Rabbit Message
      See Also:
    • executeListenerAndHandleException

      protected void executeListenerAndHandleException(com.rabbitmq.client.Channel channel, Object data)
    • invokeListener

      protected void invokeListener(com.rabbitmq.client.Channel channel, Object data)
    • actualInvokeListener

      protected void actualInvokeListener(com.rabbitmq.client.Channel channel, Object data)
      Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
      Parameters:
      channel - the Rabbit Channel to operate on
      data - the received Rabbit Message or List of Message.
      See Also:
    • doInvokeListener

      protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data)
      Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded. An exception thrown from the listener will be wrapped in a ListenerExecutionFailedException.
      Parameters:
      listener - the Spring ChannelAwareMessageListener to invoke
      channel - the Rabbit Channel to operate on
      data - the received Rabbit Message or List of Message.
      See Also:
    • doInvokeListener

      protected void doInvokeListener(MessageListener listener, Object data)
      Invoke the specified listener as Spring Rabbit MessageListener.

      Default implementation performs a plain invocation of the onMessage method.

      Exception thrown from listener will be wrapped to ListenerExecutionFailedException.

      Parameters:
      listener - the Rabbit MessageListener to invoke
      data - the received Rabbit Message or List of Message.
      See Also:
    • isChannelLocallyTransacted

      protected boolean isChannelLocallyTransacted()
      Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.

      Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.

      Returns:
      whether the given Channel is locally transacted
      See Also:
    • handleListenerException

      protected void handleListenerException(Throwable ex)
      Handle the given exception that arose during listener execution.

      The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.

      Parameters:
      ex - the exception to handle
    • wrapToListenerExecutionFailedExceptionIfNeeded

      protected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data)
      Parameters:
      e - The Exception.
      data - The failed message.
      Returns:
      If 'e' is of type ListenerExecutionFailedException - return 'e' as it is, otherwise wrap it to ListenerExecutionFailedException and return.
    • publishConsumerFailedEvent

      protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t)
    • publishMissingQueueEvent

      protected void publishMissingQueueEvent(String queue)
    • publishIdleContainerEvent

      protected final void publishIdleContainerEvent(long idleTime)
    • updateLastReceive

      protected void updateLastReceive()
    • configureAdminIfNeeded

      protected void configureAdminIfNeeded()
    • checkMismatchedQueues

      protected void checkMismatchedQueues()
    • lazyLoad

      public void lazyLoad()
      Description copied from interface: MessageListenerContainer
      Do not check for missing or mismatched queues during startup. Used for lazily loaded message listener containers to avoid a deadlock when starting such containers. Applications lazily loading containers should verify the queue configuration before loading the container bean.
      Specified by:
      lazyLoad in interface MessageListenerContainer
    • redeclareElementsIfNecessary

      protected void redeclareElementsIfNecessary()
      Use AmqpAdmin.initialize() to redeclare everything if necessary. Since auto deletion of a queue can cause upstream elements (bindings, exchanges) to be deleted too, everything needs to be redeclared if a queue is missing. Declaration is idempotent so, aside from some network chatter, there is no issue, and we only will do it if we detect our queue is gone.

      In general, it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.

      Starting with version 1.6, if mismatchedQueuesFatal is true, the declarations are always attempted during restart so the listener will fail with a fatal error if mismatches occur.

    • causeChainHasImmediateAcknowledgeAmqpException

      protected boolean causeChainHasImmediateAcknowledgeAmqpException(Throwable ex)
      Traverse the cause chain and, if an ImmediateAcknowledgeAmqpException is found before an AmqpRejectAndDontRequeueException, return true. An Error will take precedence.
      Parameters:
      ex - the exception
      Returns:
      true if we should ack immediately.
      Since:
      1.6.6
    • prepareHolderForRollback

      protected void prepareHolderForRollback(@Nullable RabbitResourceHolder resourceHolder, RuntimeException exception)
      A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g. TransactionDefinition.PROPAGATION_NONE). In that case the delivery tags will have been processed manually.
      Parameters:
      resourceHolder - the bound resource holder (if a transaction is active).
      exception - the exception.
    • debatch

      @Nullable protected List<Message> debatch(Message message)