Class BlockingQueueConsumer

java.lang.Object
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer

public class BlockingQueueConsumer extends Object
Specialized consumer encapsulating knowledge of the broker connections and having its own lifecycle (start and stop).
Author:
Mark Pollack, Dave Syer, Gary Russell, Casper Mout, Artem Bilan, Alex Panchenko, Johno Crawford, Ian Roberts, Cao Weibo
  • Constructor Details

    • BlockingQueueConsumer

      public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues)
      Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started. RequeueRejected defaults to true.
      Parameters:
      connectionFactory - The connection factory.
      messagePropertiesConverter - The properties converter.
      activeObjectCounter - The active object counter; used during shutdown.
      acknowledgeMode - The acknowledgemode.
      transactional - Whether the channel is transactional.
      prefetchCount - The prefetch count.
      queues - The queues.
    • BlockingQueueConsumer

      public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues)
      Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
      Parameters:
      connectionFactory - The connection factory.
      messagePropertiesConverter - The properties converter.
      activeObjectCounter - The active object counter; used during shutdown.
      acknowledgeMode - The acknowledge mode.
      transactional - Whether the channel is transactional.
      prefetchCount - The prefetch count.
      defaultRequeueRejected - true to reject requeued messages.
      queues - The queues.
    • BlockingQueueConsumer

      public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, String... queues)
      Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
      Parameters:
      connectionFactory - The connection factory.
      messagePropertiesConverter - The properties converter.
      activeObjectCounter - The active object counter; used during shutdown.
      acknowledgeMode - The acknowledge mode.
      transactional - Whether the channel is transactional.
      prefetchCount - The prefetch count.
      defaultRequeueRejected - true to reject requeued messages.
      consumerArgs - The consumer arguments (e.g. x-priority).
      queues - The queues.
    • BlockingQueueConsumer

      public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, boolean exclusive, String... queues)
      Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
      Parameters:
      connectionFactory - The connection factory.
      messagePropertiesConverter - The properties converter.
      activeObjectCounter - The active object counter; used during shutdown.
      acknowledgeMode - The acknowledge mode.
      transactional - Whether the channel is transactional.
      prefetchCount - The prefetch count.
      defaultRequeueRejected - true to reject requeued messages.
      consumerArgs - The consumer arguments (e.g. x-priority).
      exclusive - true if the consumer is to be exclusive.
      queues - The queues.
    • BlockingQueueConsumer

      public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues)
      Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
      Parameters:
      connectionFactory - The connection factory.
      messagePropertiesConverter - The properties converter.
      activeObjectCounter - The active object counter; used during shutdown.
      acknowledgeMode - The acknowledge mode.
      transactional - Whether the channel is transactional.
      prefetchCount - The prefetch count.
      defaultRequeueRejected - true to reject requeued messages.
      consumerArgs - The consumer arguments (e.g. x-priority).
      noLocal - true if the consumer is to be no-local.
      exclusive - true if the consumer is to be exclusive.
      queues - The queues.
      Since:
      1.7.4
  • Method Details

    • getChannel

      public com.rabbitmq.client.Channel getChannel()
    • getConsumerTags

      public Collection<String> getConsumerTags()
    • setShutdownTimeout

      public void setShutdownTimeout(long shutdownTimeout)
    • setDeclarationRetries

      public void setDeclarationRetries(int declarationRetries)
      Set the number of retries after passive queue declaration fails.
      Parameters:
      declarationRetries - The number of retries, default 3.
      Since:
      1.3.9
      See Also:
    • setFailedDeclarationRetryInterval

      public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
      Set the interval between passive queue declaration attempts in milliseconds.
      Parameters:
      failedDeclarationRetryInterval - the interval, default 5000.
      Since:
      1.3.9
      See Also:
    • setRetryDeclarationInterval

      public void setRetryDeclarationInterval(long retryDeclarationInterval)
      When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).
      Parameters:
      retryDeclarationInterval - the interval, default 60000.
      Since:
      1.3.9
    • setTagStrategy

      public void setTagStrategy(ConsumerTagStrategy tagStrategy)
      Set the ConsumerTagStrategy to use when generating consumer tags.
      Parameters:
      tagStrategy - the tagStrategy to set
      Since:
      1.4.5
    • setBackOffExecution

      public void setBackOffExecution(BackOffExecution backOffExecution)
      Set the BackOffExecution to use for the recovery in the SimpleMessageListenerContainer.
      Parameters:
      backOffExecution - the backOffExecution.
      Since:
      1.5
    • getBackOffExecution

      public BackOffExecution getBackOffExecution()
    • setLocallyTransacted

      public void setLocallyTransacted(boolean locallyTransacted)
      True if the channel is locally transacted.
      Parameters:
      locallyTransacted - the locally transacted to set.
      Since:
      1.6.6
    • setApplicationEventPublisher

      public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher applicationEventPublisher)
    • setMissingQueuePublisher

      public void setMissingQueuePublisher(Consumer<String> missingQueuePublisher)
      Set the publisher for a missing queue event.
      Parameters:
      missingQueuePublisher - the publisher.
      Since:
      2.1.18
    • setConsumeDelay

      public void setConsumeDelay(long consumeDelay)
      Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin with concurrency > 1, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.
      Parameters:
      consumeDelay - the consume delay.
      Since:
      2.3
    • setMessageAckListener

      public void setMessageAckListener(MessageAckListener messageAckListener)
      Set a MessageAckListener to use when ack a message(messages) in AcknowledgeMode.AUTO mode.
      Parameters:
      messageAckListener - the messageAckListener.
      Since:
      2.4.6
    • clearDeliveryTags

      public void clearDeliveryTags()
      Clear the delivery tags when rolling back with an external transaction manager.
      Since:
      1.6.6
    • setGlobalQos

      public void setGlobalQos(boolean globalQos)
      Apply prefetch to the entire channel.
      Parameters:
      globalQos - true for a channel-wide prefetch.
      Since:
      2.2.17
      See Also:
      • Channel.basicQos(int, boolean)
    • isNormalCancel

      public boolean isNormalCancel()
      Return true if cancellation is expected.
      Returns:
      true if expected.
    • basicCancel

      protected void basicCancel()
    • basicCancel

      protected void basicCancel(boolean expected)
    • hasDelivery

      protected boolean hasDelivery()
    • cancelled

      protected boolean cancelled()
    • nextMessage

      @Nullable public Message nextMessage() throws InterruptedException, com.rabbitmq.client.ShutdownSignalException
      Main application-side API: wait for the next message delivery and return it.
      Returns:
      the next message
      Throws:
      InterruptedException - if an interrupt is received while waiting
      com.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waiting
    • nextMessage

      @Nullable public Message nextMessage(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException
      Main application-side API: wait for the next message delivery and return it.
      Parameters:
      timeout - timeout in millisecond
      Returns:
      the next message or null if timed out
      Throws:
      InterruptedException - if an interrupt is received while waiting
      com.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waiting
    • start

      public void start() throws AmqpException
      Throws:
      AmqpException
    • stop

      public void stop()
    • forceCloseAndClearQueue

      public void forceCloseAndClearQueue()
    • rollbackOnExceptionIfNecessary

      public void rollbackOnExceptionIfNecessary(Throwable ex)
      Perform a rollback, handling rollback exceptions properly.
      Parameters:
      ex - the thrown application exception or error
    • rollbackOnExceptionIfNecessary

      public void rollbackOnExceptionIfNecessary(Throwable ex, long tag)
      Perform a rollback, handling rollback exceptions properly.
      Parameters:
      ex - the thrown application exception or error
      tag - delivery tag; when specified (greater than or equal to 0) only that message is nacked.
      Since:
      2.2.21.
    • commitIfNecessary

      @Deprecated(forRemoval=true, since="3.1.2") public boolean commitIfNecessary(boolean localTx)
      Deprecated, for removal: This API element is subject to removal in a future version.
      in favor of commitIfNecessary(boolean, boolean)
      Perform a commit or message acknowledgement, as appropriate. NOTE: This method was never been intended tobe public.
      Parameters:
      localTx - Whether the channel is locally transacted.
      Returns:
      true if at least one delivery tag exists.
    • toString

      public String toString()
      Overrides:
      toString in class Object