Class BlockingQueueConsumer
java.lang.Object
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer
Specialized consumer encapsulating knowledge of the broker
connections and having its own lifecycle (start and stop).
- Author:
- Mark Pollack, Dave Syer, Gary Russell, Casper Mout, Artem Bilan, Alex Panchenko, Johno Crawford, Ian Roberts, Cao Weibo
-
Constructor Summary
ConstructorDescriptionBlockingQueueConsumer
(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) Create a consumer.BlockingQueueConsumer
(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) Create a consumer.BlockingQueueConsumer
(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean exclusive, String... queues) Create a consumer.BlockingQueueConsumer
(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, String... queues) Create a consumer.BlockingQueueConsumer
(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues) Create a consumer. -
Method Summary
Modifier and TypeMethodDescriptionprotected void
protected void
basicCancel
(boolean expected) protected boolean
void
Clear the delivery tags when rolling back with an external transaction manager.boolean
commitIfNecessary
(boolean localTx) Perform a commit or message acknowledgement, as appropriate.com.rabbitmq.client.Channel
protected boolean
boolean
Return true if cancellation is expected.Main application-side API: wait for the next message delivery and return it.nextMessage
(long timeout) Main application-side API: wait for the next message delivery and return it.void
Perform a rollback, handling rollback exceptions properly.void
rollbackOnExceptionIfNecessary
(Throwable ex, long tag) Perform a rollback, handling rollback exceptions properly.void
setApplicationEventPublisher
(ApplicationEventPublisher applicationEventPublisher) void
setBackOffExecution
(BackOffExecution backOffExecution) Set theBackOffExecution
to use for the recovery in theSimpleMessageListenerContainer
.void
setConsumeDelay
(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms.void
setDeclarationRetries
(int declarationRetries) Set the number of retries after passive queue declaration fails.void
setFailedDeclarationRetryInterval
(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.void
setGlobalQos
(boolean globalQos) Apply prefetch to the entire channel.void
setLocallyTransacted
(boolean locallyTransacted) True if the channel is locally transacted.void
setMessageAckListener
(MessageAckListener messageAckListener) Set aMessageAckListener
to use when ack a message(messages) inAcknowledgeMode.AUTO
mode.void
setMissingQueuePublisher
(Consumer<String> missingQueuePublisher) Set the publisher for a missing queue event.void
setRetryDeclarationInterval
(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).void
setShutdownTimeout
(long shutdownTimeout) void
setTagStrategy
(ConsumerTagStrategy tagStrategy) Set theConsumerTagStrategy
to use when generating consumer tags.void
start()
void
stop()
toString()
-
Constructor Details
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started. RequeueRejected defaults to true.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledgemode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean exclusive, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).exclusive
- true if the consumer is to be exclusive.queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).noLocal
- true if the consumer is to be no-local.exclusive
- true if the consumer is to be exclusive.queues
- The queues.- Since:
- 1.7.4
-
-
Method Details
-
getChannel
public com.rabbitmq.client.Channel getChannel() -
getConsumerTags
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout) -
setDeclarationRetries
public void setDeclarationRetries(int declarationRetries) Set the number of retries after passive queue declaration fails.- Parameters:
declarationRetries
- The number of retries, default 3.- Since:
- 1.3.9
- See Also:
-
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval
- the interval, default 5000.- Since:
- 1.3.9
- See Also:
-
setRetryDeclarationInterval
public void setRetryDeclarationInterval(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).- Parameters:
retryDeclarationInterval
- the interval, default 60000.- Since:
- 1.3.9
-
setTagStrategy
Set theConsumerTagStrategy
to use when generating consumer tags.- Parameters:
tagStrategy
- the tagStrategy to set- Since:
- 1.4.5
-
setBackOffExecution
Set theBackOffExecution
to use for the recovery in theSimpleMessageListenerContainer
.- Parameters:
backOffExecution
- the backOffExecution.- Since:
- 1.5
-
getBackOffExecution
-
setLocallyTransacted
public void setLocallyTransacted(boolean locallyTransacted) True if the channel is locally transacted.- Parameters:
locallyTransacted
- the locally transacted to set.- Since:
- 1.6.6
-
setApplicationEventPublisher
-
setMissingQueuePublisher
Set the publisher for a missing queue event.- Parameters:
missingQueuePublisher
- the publisher.- Since:
- 2.1.18
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1
, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay
- the consume delay.- Since:
- 2.3
-
setMessageAckListener
Set aMessageAckListener
to use when ack a message(messages) inAcknowledgeMode.AUTO
mode.- Parameters:
messageAckListener
- the messageAckListener.- Since:
- 2.4.6
-
clearDeliveryTags
public void clearDeliveryTags()Clear the delivery tags when rolling back with an external transaction manager.- Since:
- 1.6.6
-
setGlobalQos
public void setGlobalQos(boolean globalQos) Apply prefetch to the entire channel.- Parameters:
globalQos
- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
-
Channel.basicQos(int, boolean)
-
isNormalCancel
public boolean isNormalCancel()Return true if cancellation is expected.- Returns:
- true if expected.
-
basicCancel
protected void basicCancel() -
basicCancel
protected void basicCancel(boolean expected) -
hasDelivery
protected boolean hasDelivery() -
cancelled
protected boolean cancelled() -
nextMessage
@Nullable public Message nextMessage() throws InterruptedException, com.rabbitmq.client.ShutdownSignalExceptionMain application-side API: wait for the next message delivery and return it.- Returns:
- the next message
- Throws:
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waiting
-
nextMessage
@Nullable public Message nextMessage(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException Main application-side API: wait for the next message delivery and return it.- Parameters:
timeout
- timeout in millisecond- Returns:
- the next message or null if timed out
- Throws:
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waiting
-
start
- Throws:
AmqpException
-
stop
public void stop() -
rollbackOnExceptionIfNecessary
Perform a rollback, handling rollback exceptions properly.- Parameters:
ex
- the thrown application exception or error
-
rollbackOnExceptionIfNecessary
Perform a rollback, handling rollback exceptions properly.- Parameters:
ex
- the thrown application exception or errortag
- delivery tag; when specified (greater than or equal to 0) only that message is nacked.- Since:
- 2.2.21.
-
commitIfNecessary
Perform a commit or message acknowledgement, as appropriate.- Parameters:
localTx
- Whether the channel is locally transacted.- Returns:
- true if at least one delivery tag exists.
- Throws:
IOException
- Any IOException.
-
toString
-