Class BlockingQueueConsumer


  • public class BlockingQueueConsumer
    extends Object
    Specialized consumer encapsulating knowledge of the broker connections and having its own lifecycle (start and stop).
    Author:
    Mark Pollack, Dave Syer, Gary Russell, Casper Mout, Artem Bilan, Alex Panchenko, Johno Crawford, Ian Roberts, Cao Weibo
    • Constructor Detail

      • BlockingQueueConsumer

        public BlockingQueueConsumer​(ConnectionFactory connectionFactory,
                                     MessagePropertiesConverter messagePropertiesConverter,
                                     ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
                                     AcknowledgeMode acknowledgeMode,
                                     boolean transactional,
                                     int prefetchCount,
                                     String... queues)
        Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started. RequeueRejected defaults to true.
        Parameters:
        connectionFactory - The connection factory.
        messagePropertiesConverter - The properties converter.
        activeObjectCounter - The active object counter; used during shutdown.
        acknowledgeMode - The acknowledgemode.
        transactional - Whether the channel is transactional.
        prefetchCount - The prefetch count.
        queues - The queues.
      • BlockingQueueConsumer

        public BlockingQueueConsumer​(ConnectionFactory connectionFactory,
                                     MessagePropertiesConverter messagePropertiesConverter,
                                     ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
                                     AcknowledgeMode acknowledgeMode,
                                     boolean transactional,
                                     int prefetchCount,
                                     boolean defaultRequeueRejected,
                                     String... queues)
        Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
        Parameters:
        connectionFactory - The connection factory.
        messagePropertiesConverter - The properties converter.
        activeObjectCounter - The active object counter; used during shutdown.
        acknowledgeMode - The acknowledge mode.
        transactional - Whether the channel is transactional.
        prefetchCount - The prefetch count.
        defaultRequeueRejected - true to reject requeued messages.
        queues - The queues.
      • BlockingQueueConsumer

        public BlockingQueueConsumer​(ConnectionFactory connectionFactory,
                                     MessagePropertiesConverter messagePropertiesConverter,
                                     ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
                                     AcknowledgeMode acknowledgeMode,
                                     boolean transactional,
                                     int prefetchCount,
                                     boolean defaultRequeueRejected,
                                     @Nullable
                                     Map<String,​Object> consumerArgs,
                                     String... queues)
        Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
        Parameters:
        connectionFactory - The connection factory.
        messagePropertiesConverter - The properties converter.
        activeObjectCounter - The active object counter; used during shutdown.
        acknowledgeMode - The acknowledge mode.
        transactional - Whether the channel is transactional.
        prefetchCount - The prefetch count.
        defaultRequeueRejected - true to reject requeued messages.
        consumerArgs - The consumer arguments (e.g. x-priority).
        queues - The queues.
      • BlockingQueueConsumer

        public BlockingQueueConsumer​(ConnectionFactory connectionFactory,
                                     MessagePropertiesConverter messagePropertiesConverter,
                                     ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
                                     AcknowledgeMode acknowledgeMode,
                                     boolean transactional,
                                     int prefetchCount,
                                     boolean defaultRequeueRejected,
                                     @Nullable
                                     Map<String,​Object> consumerArgs,
                                     boolean exclusive,
                                     String... queues)
        Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
        Parameters:
        connectionFactory - The connection factory.
        messagePropertiesConverter - The properties converter.
        activeObjectCounter - The active object counter; used during shutdown.
        acknowledgeMode - The acknowledge mode.
        transactional - Whether the channel is transactional.
        prefetchCount - The prefetch count.
        defaultRequeueRejected - true to reject requeued messages.
        consumerArgs - The consumer arguments (e.g. x-priority).
        exclusive - true if the consumer is to be exclusive.
        queues - The queues.
      • BlockingQueueConsumer

        public BlockingQueueConsumer​(ConnectionFactory connectionFactory,
                                     MessagePropertiesConverter messagePropertiesConverter,
                                     ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
                                     AcknowledgeMode acknowledgeMode,
                                     boolean transactional,
                                     int prefetchCount,
                                     boolean defaultRequeueRejected,
                                     @Nullable
                                     Map<String,​Object> consumerArgs,
                                     boolean noLocal,
                                     boolean exclusive,
                                     String... queues)
        Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.
        Parameters:
        connectionFactory - The connection factory.
        messagePropertiesConverter - The properties converter.
        activeObjectCounter - The active object counter; used during shutdown.
        acknowledgeMode - The acknowledge mode.
        transactional - Whether the channel is transactional.
        prefetchCount - The prefetch count.
        defaultRequeueRejected - true to reject requeued messages.
        consumerArgs - The consumer arguments (e.g. x-priority).
        noLocal - true if the consumer is to be no-local.
        exclusive - true if the consumer is to be exclusive.
        queues - The queues.
        Since:
        1.7.4
    • Method Detail

      • getChannel

        public com.rabbitmq.client.Channel getChannel()
      • setShutdownTimeout

        public void setShutdownTimeout​(long shutdownTimeout)
      • setDeclarationRetries

        public void setDeclarationRetries​(int declarationRetries)
        Set the number of retries after passive queue declaration fails.
        Parameters:
        declarationRetries - The number of retries, default 3.
        Since:
        1.3.9
        See Also:
        setFailedDeclarationRetryInterval(long)
      • setFailedDeclarationRetryInterval

        public void setFailedDeclarationRetryInterval​(long failedDeclarationRetryInterval)
        Set the interval between passive queue declaration attempts in milliseconds.
        Parameters:
        failedDeclarationRetryInterval - the interval, default 5000.
        Since:
        1.3.9
        See Also:
        setDeclarationRetries(int)
      • setRetryDeclarationInterval

        public void setRetryDeclarationInterval​(long retryDeclarationInterval)
        When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).
        Parameters:
        retryDeclarationInterval - the interval, default 60000.
        Since:
        1.3.9
      • setTagStrategy

        public void setTagStrategy​(ConsumerTagStrategy tagStrategy)
        Set the ConsumerTagStrategy to use when generating consumer tags.
        Parameters:
        tagStrategy - the tagStrategy to set
        Since:
        1.4.5
      • setBackOffExecution

        public void setBackOffExecution​(BackOffExecution backOffExecution)
        Set the BackOffExecution to use for the recovery in the SimpleMessageListenerContainer.
        Parameters:
        backOffExecution - the backOffExecution.
        Since:
        1.5
      • setLocallyTransacted

        public void setLocallyTransacted​(boolean locallyTransacted)
        True if the channel is locally transacted.
        Parameters:
        locallyTransacted - the locally transacted to set.
        Since:
        1.6.6
      • setApplicationEventPublisher

        public void setApplicationEventPublisher​(ApplicationEventPublisher applicationEventPublisher)
      • setMissingQueuePublisher

        public void setMissingQueuePublisher​(Consumer<String> missingQueuePublisher)
        Set the publisher for a missing queue event.
        Parameters:
        missingQueuePublisher - the publisher.
        Since:
        2.1.18
      • setConsumeDelay

        public void setConsumeDelay​(long consumeDelay)
        Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin with concurrency > 1, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.
        Parameters:
        consumeDelay - the consume delay.
        Since:
        2.3
      • clearDeliveryTags

        public void clearDeliveryTags()
        Clear the delivery tags when rolling back with an external transaction manager.
        Since:
        1.6.6
      • setGlobalQos

        public void setGlobalQos​(boolean globalQos)
        Apply prefetch to the entire channel.
        Parameters:
        globalQos - true for a channel-wide prefetch.
        Since:
        2.2.17
        See Also:
        Channel.basicQos(int, boolean)
      • isNormalCancel

        public boolean isNormalCancel()
        Return true if cancellation is expected.
        Returns:
        true if expected.
      • basicCancel

        protected void basicCancel()
      • basicCancel

        protected void basicCancel​(boolean expected)
      • hasDelivery

        protected boolean hasDelivery()
      • cancelled

        protected boolean cancelled()
      • nextMessage

        @Nullable
        public Message nextMessage()
                            throws InterruptedException,
                                   com.rabbitmq.client.ShutdownSignalException
        Main application-side API: wait for the next message delivery and return it.
        Returns:
        the next message
        Throws:
        InterruptedException - if an interrupt is received while waiting
        com.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waiting
      • nextMessage

        @Nullable
        public Message nextMessage​(long timeout)
                            throws InterruptedException,
                                   com.rabbitmq.client.ShutdownSignalException
        Main application-side API: wait for the next message delivery and return it.
        Parameters:
        timeout - timeout in millisecond
        Returns:
        the next message or null if timed out
        Throws:
        InterruptedException - if an interrupt is received while waiting
        com.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waiting
      • stop

        public void stop()
      • rollbackOnExceptionIfNecessary

        public void rollbackOnExceptionIfNecessary​(Throwable ex)
        Perform a rollback, handling rollback exceptions properly.
        Parameters:
        ex - the thrown application exception or error
      • rollbackOnExceptionIfNecessary

        public void rollbackOnExceptionIfNecessary​(Throwable ex,
                                                   long tag)
        Perform a rollback, handling rollback exceptions properly.
        Parameters:
        ex - the thrown application exception or error
        tag - delivery tag; when specified (greater than or equal to 0) only that message is nacked.
        Since:
        2.2.21.
      • commitIfNecessary

        public boolean commitIfNecessary​(boolean localTx)
                                  throws IOException
        Perform a commit or message acknowledgement, as appropriate.
        Parameters:
        localTx - Whether the channel is locally transacted.
        Returns:
        true if at least one delivery tag exists.
        Throws:
        IOException - Any IOException.