Class SimpleMessageListenerContainer

All Implemented Interfaces:
MessageListenerContainer, Aware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ApplicationEventPublisherAware, Lifecycle, Phased, SmartLifecycle

public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
Since:
1.0
Author:
Mark Pollack, Mark Fisher, Dave Syer, Gary Russell, Artem Bilan, Alex Panchenko, Mat Jaggard, Yansong Ren, Tim Bourquin, Jeonggi Kim, Java4ye
  • Field Details

    • DEFAULT_RECEIVE_TIMEOUT

      public static final long DEFAULT_RECEIVE_TIMEOUT
      See Also:
  • Constructor Details

    • SimpleMessageListenerContainer

      public SimpleMessageListenerContainer()
      Default constructor for convenient dependency injection via setters.
    • SimpleMessageListenerContainer

      public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
      Create a listener container from the connection factory (mandatory).
      Parameters:
      connectionFactory - the ConnectionFactory
  • Method Details

    • setConcurrentConsumers

      public void setConcurrentConsumers(int concurrentConsumers)
      Specify the number of concurrent consumers to create. Default is 1.

      Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues. Cannot be more than maxConcurrentConsumers (if set).

      Parameters:
      concurrentConsumers - the minimum number of consumers to create.
      See Also:
    • setMaxConcurrentConsumers

      public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
      Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'. Consumers will be added on demand. Cannot be less than concurrentConsumers.
      Parameters:
      maxConcurrentConsumers - the maximum number of consumers.
      See Also:
    • setConcurrency

      public void setConcurrency(String concurrency)
      Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple upper limit String, e.g. "10" (a fixed number of consumers).

      This listener container will always hold on to the minimum number of consumers (setConcurrentConsumers(int)) and will slowly scale up to the maximum number of consumers setMaxConcurrentConsumers(int) in case of increasing load.

      Parameters:
      concurrency - the concurrency.
      Since:
      2.0
    • setExclusive

      public final void setExclusive(boolean exclusive)
      Set to true for an exclusive consumer - if true, the concurrency must be 1.
      Overrides:
      setExclusive in class AbstractMessageListenerContainer
      Parameters:
      exclusive - true for an exclusive consumer.
    • setStartConsumerMinInterval

      public final void setStartConsumerMinInterval(long startConsumerMinInterval)
      If maxConcurrentConsumers is greater then concurrentConsumers, and maxConcurrentConsumers has not been reached, specifies the minimum time (milliseconds) between starting new consumers on demand. Default is 10000 (10 seconds).
      Parameters:
      startConsumerMinInterval - The minimum interval between new consumer starts.
      See Also:
    • setStopConsumerMinInterval

      public final void setStopConsumerMinInterval(long stopConsumerMinInterval)
      If maxConcurrentConsumers is greater then concurrentConsumers, and the number of consumers exceeds concurrentConsumers, specifies the minimum time (milliseconds) between stopping idle consumers. Default is 60000 (1 minute).
      Parameters:
      stopConsumerMinInterval - The minimum interval between consumer stops.
      See Also:
    • setConsecutiveActiveTrigger

      public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
      If maxConcurrentConsumers is greater then concurrentConsumers, and maxConcurrentConsumers has not been reached, specifies the number of consecutive cycles when a single consumer was active, in order to consider starting a new consumer. If the consumer goes idle for one cycle, the counter is reset. This is impacted by the batchSize. Default is 10 consecutive messages.
      Parameters:
      consecutiveActiveTrigger - The number of consecutive receives to trigger a new consumer.
      See Also:
    • setConsecutiveIdleTrigger

      public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
      If maxConcurrentConsumers is greater then concurrentConsumers, and the number of consumers exceeds concurrentConsumers, specifies the number of consecutive receive attempts that return no data; after which we consider stopping a consumer. The idle time is effectively receiveTimeout * batchSize * this value because the consumer thread waits for a message for up to receiveTimeout up to batchSize times. Default is 10 consecutive idles.
      Parameters:
      consecutiveIdleTrigger - The number of consecutive timeouts to trigger stopping a consumer.
      See Also:
    • setReceiveTimeout

      public void setReceiveTimeout(long receiveTimeout)
      The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second).
      Parameters:
      receiveTimeout - the timeout.
      See Also:
    • setBatchReceiveTimeout

      public void setBatchReceiveTimeout(long batchReceiveTimeout)
      The number of milliseconds of timeout for gathering batch messages. It limits the time to wait to fill batchSize. Default is 0 (no timeout).
      Parameters:
      batchReceiveTimeout - the timeout for gathering batch messages.
      Since:
      3.1.2
      See Also:
    • setBatchSize

      public void setBatchSize(int batchSize)
      This property has several functions.

      When the channel is transacted, it determines how many messages to process in a single transaction. It should be less than or equal to the prefetch count.

      It also affects how often acks are sent when using AcknowledgeMode.AUTO - one ack per BatchSize.

      Finally, when setConsumerBatchEnabled(boolean) is true, it determines how many records to include in the batch as long as sufficient messages arrive within setReceiveTimeout(long).

      IMPORTANT The batch size represents the number of physical messages received. If AbstractMessageListenerContainer.setDeBatchingEnabled(boolean) is true and a message is a batch created by a producer, the actual number of messages received by the listener will be larger than this batch size.

      Default is 1.

      Parameters:
      batchSize - the batch size
      Since:
      2.2
      See Also:
    • setConsumerBatchEnabled

      public void setConsumerBatchEnabled(boolean consumerBatchEnabled)
      Set to true to present a list of messages based on the setBatchSize(int), if the listener supports it. This will coerce deBatchingEnabled to true as well.
      Parameters:
      consumerBatchEnabled - true to create message batches in the container.
      Since:
      2.2
      See Also:
    • isConsumerBatchEnabled

      public boolean isConsumerBatchEnabled()
      Description copied from interface: MessageListenerContainer
      Return true if this container is capable of (and configured to) create batches of consumed messages.
      Returns:
      true if enabled.
    • setMissingQueuesFatal

      public void setMissingQueuesFatal(boolean missingQueuesFatal)
      If all the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.

      When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.

      When true, if the queues are removed while the container is running, the container is stopped.

      Defaults to true for this container.

      Overrides:
      setMissingQueuesFatal in class AbstractMessageListenerContainer
      Parameters:
      missingQueuesFatal - the missingQueuesFatal to set.
      See Also:
    • setQueueNames

      public void setQueueNames(String... queueName)
      Description copied from class: AbstractMessageListenerContainer
      Set the name of the queue(s) to receive messages from.
      Specified by:
      setQueueNames in interface MessageListenerContainer
      Overrides:
      setQueueNames in class AbstractMessageListenerContainer
      Parameters:
      queueName - the desired queueName(s) (can not be null)
    • addQueueNames

      public void addQueueNames(String... queueName)
      Add queue(s) to this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. The queue must exist to avoid problems when restarting the consumers.
      Overrides:
      addQueueNames in class AbstractMessageListenerContainer
      Parameters:
      queueName - The queue to add.
    • removeQueueNames

      public boolean removeQueueNames(String... queueName)
      Remove queues from this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. At least one queue must remain.
      Overrides:
      removeQueueNames in class AbstractMessageListenerContainer
      Parameters:
      queueName - The queue to remove.
      Returns:
      the boolean result of removal on the target queueNames List.
    • addQueues

      public void addQueues(Queue... queue)
      Add queue(s) to this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. The queue must exist to avoid problems when restarting the consumers.
      Overrides:
      addQueues in class AbstractMessageListenerContainer
      Parameters:
      queue - The queue to add.
    • removeQueues

      public boolean removeQueues(Queue... queue)
      Remove queues from this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. At least one queue must remain.
      Overrides:
      removeQueues in class AbstractMessageListenerContainer
      Parameters:
      queue - The queue to remove.
      Returns:
      the boolean result of removal on the target queueNames List.
    • setDeclarationRetries

      public void setDeclarationRetries(int declarationRetries)
      Set the number of retries after passive queue declaration fails.
      Parameters:
      declarationRetries - The number of retries, default 3.
      Since:
      1.3.9
      See Also:
    • setRetryDeclarationInterval

      public void setRetryDeclarationInterval(long retryDeclarationInterval)
      When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).
      Parameters:
      retryDeclarationInterval - the interval, default 60000.
      Since:
      1.3.9
    • setConsumerStartTimeout

      public void setConsumerStartTimeout(long consumerStartTimeout)
      When starting a consumer, if this time (ms) elapses before the consumer starts, an error log is written; one possible cause would be if the taskExecutor has insufficient threads to support the container concurrency. Default 60000.
      Parameters:
      consumerStartTimeout - the timeout.
      Since:
      1.7.5
    • setEnforceImmediateAckForManual

      public void setEnforceImmediateAckForManual(boolean enforceImmediateAckForManual)
      Set to true to enforce Channel.basicAck(long, boolean) for AcknowledgeMode.MANUAL when ImmediateAcknowledgeAmqpException is thrown. This might be a tentative solution to not break behavior for current minor version.
      Parameters:
      enforceImmediateAckForManual - the flag to ack message for MANUAL mode on ImmediateAcknowledgeAmqpException
      Since:
      3.1.2
    • validateConfiguration

      protected void validateConfiguration()
      Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.
      Overrides:
      validateConfiguration in class AbstractMessageListenerContainer
    • sharedConnectionEnabled

      protected final boolean sharedConnectionEnabled()
      Always use a shared Rabbit Connection.
      Returns:
      true
    • doInitialize

      protected void doInitialize()
      Description copied from class: AbstractMessageListenerContainer
      Register any invokers within this container.

      Subclasses need to implement this method for their specific invoker management process.

      Specified by:
      doInitialize in class AbstractMessageListenerContainer
    • getActiveConsumerCount

      @ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
    • doStart

      protected void doStart()
      Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer to this container's task executor.
      Overrides:
      doStart in class AbstractMessageListenerContainer
    • shutdownAndWaitOrCallback

      protected void shutdownAndWaitOrCallback(@Nullable Runnable callback)
      Overrides:
      shutdownAndWaitOrCallback in class AbstractMessageListenerContainer
    • initializeConsumers

      protected int initializeConsumers()
    • adjustConsumers

      protected void adjustConsumers(int deltaArg)
      Adjust consumers depending on delta.
      Parameters:
      deltaArg - a negative value increases, positive decreases.
      Since:
      1.7.8
    • addAndStartConsumers

      protected void addAndStartConsumers(int delta)
      Start up to delta consumers, limited by setMaxConcurrentConsumers(int).
      Parameters:
      delta - the consumers to add.
    • createBlockingQueueConsumer

      protected BlockingQueueConsumer createBlockingQueueConsumer()
    • handleStartupFailure

      protected void handleStartupFailure(BackOffExecution backOffExecution)
    • publishConsumerFailedEvent

      protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t)
      Overrides:
      publishConsumerFailedEvent in class AbstractMessageListenerContainer
    • toString

      public String toString()
      Overrides:
      toString in class Object