public class BlockingQueueConsumer extends Object
Constructor and Description |
---|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
Map<String,Object> consumerArgs,
boolean noLocal,
boolean exclusive,
String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
Map<String,Object> consumerArgs,
boolean exclusive,
String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
Map<String,Object> consumerArgs,
String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
String... queues)
Create a consumer.
|
Modifier and Type | Method and Description |
---|---|
protected void |
basicCancel() |
protected void |
basicCancel(boolean expected) |
protected boolean |
cancelled() |
void |
clearDeliveryTags()
Clear the delivery tags when rolling back with an external transaction
manager.
|
boolean |
commitIfNecessary(boolean localTx)
Perform a commit or message acknowledgement, as appropriate.
|
BackOffExecution |
getBackOffExecution() |
com.rabbitmq.client.Channel |
getChannel() |
Collection<String> |
getConsumerTags() |
protected boolean |
hasDelivery() |
boolean |
isNormalCancel()
Return true if cancellation is expected.
|
Message |
nextMessage()
Main application-side API: wait for the next message delivery and return it.
|
Message |
nextMessage(long timeout)
Main application-side API: wait for the next message delivery and return it.
|
void |
rollbackOnExceptionIfNecessary(Throwable ex)
Perform a rollback, handling rollback exceptions properly.
|
void |
setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) |
void |
setBackOffExecution(BackOffExecution backOffExecution)
Set the
BackOffExecution to use for the recovery in the SimpleMessageListenerContainer . |
void |
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.
|
void |
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.
|
void |
setLocallyTransacted(boolean locallyTransacted)
True if the channel is locally transacted.
|
void |
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only
a subset of the queues were available (milliseconds).
|
void |
setShutdownTimeout(long shutdownTimeout) |
void |
setTagStrategy(ConsumerTagStrategy tagStrategy)
Set the
ConsumerTagStrategy to use when generating consumer tags. |
void |
start() |
void |
stop() |
String |
toString() |
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues)
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledgemode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.queues
- The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues)
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.queues
- The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, String... queues)
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).queues
- The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, boolean exclusive, String... queues)
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).exclusive
- true if the consumer is to be exclusive.queues
- The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String,Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues)
connectionFactory
- The connection factory.messagePropertiesConverter
- The properties converter.activeObjectCounter
- The active object counter; used during shutdown.acknowledgeMode
- The acknowledge mode.transactional
- Whether the channel is transactional.prefetchCount
- The prefetch count.defaultRequeueRejected
- true to reject requeued messages.consumerArgs
- The consumer arguments (e.g. x-priority).noLocal
- true if the consumer is to be no-local.exclusive
- true if the consumer is to be exclusive.queues
- The queues.public com.rabbitmq.client.Channel getChannel()
public Collection<String> getConsumerTags()
public void setShutdownTimeout(long shutdownTimeout)
public void setDeclarationRetries(int declarationRetries)
declarationRetries
- The number of retries, default 3.setFailedDeclarationRetryInterval(long)
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
failedDeclarationRetryInterval
- the interval, default 5000.setDeclarationRetries(int)
public void setRetryDeclarationInterval(long retryDeclarationInterval)
retryDeclarationInterval
- the interval, default 60000.public void setTagStrategy(ConsumerTagStrategy tagStrategy)
ConsumerTagStrategy
to use when generating consumer tags.tagStrategy
- the tagStrategy to setpublic void setBackOffExecution(BackOffExecution backOffExecution)
BackOffExecution
to use for the recovery in the SimpleMessageListenerContainer
.backOffExecution
- the backOffExecution.public BackOffExecution getBackOffExecution()
public void setLocallyTransacted(boolean locallyTransacted)
locallyTransacted
- the locally transacted to set.public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
public void clearDeliveryTags()
public boolean isNormalCancel()
protected void basicCancel()
protected void basicCancel(boolean expected)
protected boolean hasDelivery()
protected boolean cancelled()
@Nullable public Message nextMessage() throws InterruptedException, com.rabbitmq.client.ShutdownSignalException
InterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waiting@Nullable public Message nextMessage(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException
timeout
- timeout in millisecondInterruptedException
- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException
- if the connection is shut down while waitingpublic void start() throws AmqpException
AmqpException
public void stop()
public void rollbackOnExceptionIfNecessary(Throwable ex)
ex
- the thrown application exception or errorpublic boolean commitIfNecessary(boolean localTx) throws IOException
localTx
- Whether the channel is locally transacted.IOException
- Any IOException.