Class BlockingQueueConsumer
- java.lang.Object
-
- org.springframework.amqp.rabbit.listener.BlockingQueueConsumer
-
public class BlockingQueueConsumer extends Object
Specialized consumer encapsulating knowledge of the broker connections and having its own lifecycle (start and stop).- Author:
- Mark Pollack, Dave Syer, Gary Russell, Casper Mout, Artem Bilan, Alex Panchenko, Johno Crawford, Ian Roberts
-
-
Constructor Summary
Constructors Constructor Description BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues)
Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String,Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues)
Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String,Object> consumerArgs, boolean exclusive, String... queues)
Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String,Object> consumerArgs, String... queues)
Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues)
Create a consumer.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
basicCancel()
protected void
basicCancel(boolean expected)
protected boolean
cancelled()
void
clearDeliveryTags()
Clear the delivery tags when rolling back with an external transaction manager.boolean
commitIfNecessary(boolean localTx)
Perform a commit or message acknowledgement, as appropriate.BackOffExecution
getBackOffExecution()
com.rabbitmq.client.Channel
getChannel()
Collection<String>
getConsumerTags()
protected boolean
hasDelivery()
boolean
isNormalCancel()
Return true if cancellation is expected.Message
nextMessage()
Main application-side API: wait for the next message delivery and return it.Message
nextMessage(long timeout)
Main application-side API: wait for the next message delivery and return it.void
rollbackOnExceptionIfNecessary(Throwable ex)
Perform a rollback, handling rollback exceptions properly.void
rollbackOnExceptionIfNecessary(Throwable ex, long tag)
Perform a rollback, handling rollback exceptions properly.void
setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
void
setBackOffExecution(BackOffExecution backOffExecution)
Set theBackOffExecution
to use for the recovery in theSimpleMessageListenerContainer
.void
setConsumeDelay(long consumeDelay)
Set the consumeDelay - a time to wait before consuming in ms.void
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.void
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.void
setGlobalQos(boolean globalQos)
Apply prefetch to the entire channel.void
setLocallyTransacted(boolean locallyTransacted)
True if the channel is locally transacted.void
setMissingQueuePublisher(Consumer<String> missingQueuePublisher)
Set the publisher for a missing queue event.void
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).void
setShutdownTimeout(long shutdownTimeout)
void
setTagStrategy(ConsumerTagStrategy tagStrategy)
Set theConsumerTagStrategy
to use when generating consumer tags.void
start()
void
stop()
String
toString()
-
-
-
Constructor Detail
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues)
Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started. RequeueRejected defaults to true.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledgemode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues)
Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, String... queues)
Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, boolean exclusive, String... queues)
Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).exclusive
- true if the consumer is to be exclusive.queues
- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues)
Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).noLocal
- true if the consumer is to be no-local.exclusive
- true if the consumer is to be exclusive.queues
- The queues.- Since:
- 1.7.4
-
-
Method Detail
-
getChannel
public com.rabbitmq.client.Channel getChannel()
-
getConsumerTags
public Collection<String> getConsumerTags()
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout)
-
setDeclarationRetries
public void setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.- Parameters:
declarationRetries
- The number of retries, default 3.- Since:
- 1.3.9
- See Also:
setFailedDeclarationRetryInterval(long)
-
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval
- the interval, default 5000.- Since:
- 1.3.9
- See Also:
setDeclarationRetries(int)
-
setRetryDeclarationInterval
public void setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).- Parameters:
retryDeclarationInterval
- the interval, default 60000.- Since:
- 1.3.9
-
setTagStrategy
public void setTagStrategy(ConsumerTagStrategy tagStrategy)
Set theConsumerTagStrategy
to use when generating consumer tags.- Parameters:
tagStrategy
- the tagStrategy to set- Since:
- 1.4.5
-
setBackOffExecution
public void setBackOffExecution(BackOffExecution backOffExecution)
Set theBackOffExecution
to use for the recovery in theSimpleMessageListenerContainer
.- Parameters:
backOffExecution
- the backOffExecution.- Since:
- 1.5
-
getBackOffExecution
public BackOffExecution getBackOffExecution()
-
setLocallyTransacted
public void setLocallyTransacted(boolean locallyTransacted)
True if the channel is locally transacted.- Parameters:
locallyTransacted
- the locally transacted to set.- Since:
- 1.6.6
-
setApplicationEventPublisher
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
-
setMissingQueuePublisher
public void setMissingQueuePublisher(Consumer<String> missingQueuePublisher)
Set the publisher for a missing queue event.- Parameters:
missingQueuePublisher
- the publisher.- Since:
- 2.1.18
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay)
Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1
, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay
- the consume delay.- Since:
- 2.3
-
clearDeliveryTags
public void clearDeliveryTags()
Clear the delivery tags when rolling back with an external transaction manager.- Since:
- 1.6.6
-
setGlobalQos
public void setGlobalQos(boolean globalQos)
Apply prefetch to the entire channel.- Parameters:
globalQos
- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
Channel.basicQos(int, boolean)
-
isNormalCancel
public boolean isNormalCancel()
Return true if cancellation is expected.- Returns:
- true if expected.
-
basicCancel
protected void basicCancel()
-
basicCancel
protected void basicCancel(boolean expected)
-
hasDelivery
protected boolean hasDelivery()
-
cancelled
protected boolean cancelled()
-
nextMessage
@Nullable public Message nextMessage() throws InterruptedException, com.rabbitmq.client.ShutdownSignalException
Main application-side API: wait for the next message delivery and return it.- Returns:
- the next message
- Throws:
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waiting
-
nextMessage
@Nullable public Message nextMessage(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException
Main application-side API: wait for the next message delivery and return it.- Parameters:
timeout
- timeout in millisecond- Returns:
- the next message or null if timed out
- Throws:
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waiting
-
start
public void start() throws AmqpException
- Throws:
AmqpException
-
stop
public void stop()
-
rollbackOnExceptionIfNecessary
public void rollbackOnExceptionIfNecessary(Throwable ex)
Perform a rollback, handling rollback exceptions properly.- Parameters:
ex
- the thrown application exception or error
-
rollbackOnExceptionIfNecessary
public void rollbackOnExceptionIfNecessary(Throwable ex, long tag)
Perform a rollback, handling rollback exceptions properly.- Parameters:
ex
- the thrown application exception or errortag
- delivery tag; when specified (greater than or equal to 0) only that message is nacked.- Since:
- 2.2.21.
-
commitIfNecessary
public boolean commitIfNecessary(boolean localTx) throws IOException
Perform a commit or message acknowledgement, as appropriate.- Parameters:
localTx
- Whether the channel is locally transacted.- Returns:
- true if at least one delivery tag exists.
- Throws:
IOException
- Any IOException.
-
-