public class DirectMessageListenerContainer extends AbstractMessageListenerContainer
SimpleMessageListenerContainer
is not so simple. Recent changes to the
rabbitmq java client has facilitated a much simpler listener container that invokes the
listener directly on the rabbit client consumer thread. There is no txSize property -
each message is acked (or nacked) individually.AbstractMessageListenerContainer.JavaLangErrorHandler, AbstractMessageListenerContainer.SharedConnectionNotInitializedException, AbstractMessageListenerContainer.WrappedTransactionException
Modifier and Type | Field and Description |
---|---|
protected List<org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.SimpleConsumer> |
consumers |
consumersMonitor, DEFAULT_DEBATCHING_ENABLED, DEFAULT_PREFETCH_COUNT, DEFAULT_RECOVERY_INTERVAL, DEFAULT_SHUTDOWN_TIMEOUT
logger
DEFAULT_PHASE
Constructor and Description |
---|
DirectMessageListenerContainer()
Create an instance;
RabbitAccessor.setConnectionFactory(ConnectionFactory) must
be called before starting. |
DirectMessageListenerContainer(ConnectionFactory connectionFactory)
Create an instance with the provided connection factory.
|
Modifier and Type | Method and Description |
---|---|
protected void |
actualStart() |
void |
addQueueNames(String... queueNames)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queues)
Add queue(s) to this container's list of queues.
|
protected void |
consumerRemoved(org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.SimpleConsumer consumer)
Called whenever a consumer is removed.
|
protected void |
doInitialize()
Register any invokers within this container.
|
protected void |
doRedeclareElementsIfNecessary() |
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Start this container, and notify all invoker tasks.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
protected int |
findIdleConsumer()
When adjusting down, return a consumer that can be canceled.
|
protected void |
processMonitorTask()
Subclasses can override this to take additional actions when the monitor task runs.
|
boolean |
removeQueueNames(String... queueNames)
Remove queue(s) from this container's list of queues.
|
boolean |
removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.
|
void |
setAckTimeout(long ackTimeout)
An approximate timeout; when
messagesPerAck is
greater than 1, and this time elapses since the last ack, the pending acks will be
sent either when the next message arrives, or a short time later if no additional
messages arrive. |
void |
setConsumersPerQueue(int consumersPerQueue)
Each queue runs in its own consumer; set this property to create multiple
consumers for each queue.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer - if true, the
consumers per queue must be 1. |
void |
setMessagesPerAck(int messagesPerAck)
Set the number of messages to receive before acknowledging (success).
|
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal.
|
void |
setMonitorInterval(long monitorInterval)
Set how often to run a task to check for failed consumers and idle containers.
|
void |
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setTaskScheduler(TaskScheduler taskScheduler)
Set the task scheduler to use for the task that monitors idle containers and
failed consumers.
|
actualInvokeListener, addAfterReceivePostProcessors, afterPropertiesSet, causeChainHasImmediateAcknowledgeAmqpException, checkMessageListener, checkMismatchedQueues, configureAdminIfNeeded, debatch, destroy, doInvokeListener, doInvokeListener, executeListener, getAcknowledgeMode, getAdviceChain, getAfterReceivePostProcessors, getAmqpAdmin, getApplicationContext, getApplicationEventPublisher, getBatchingStrategy, getBeanName, getConnectionFactory, getConsumeDelay, getConsumerArguments, getConsumerTagStrategy, getExclusiveConsumerExceptionLogger, getFailedDeclarationRetryInterval, getIdleEventInterval, getJavaLangErrorHandler, getLastReceive, getListenerId, getMessageListener, getMessagePropertiesConverter, getPhase, getPrefetchCount, getQueueNames, getQueueNamesAsSet, getQueueNamesToQueues, getRecoveryBackOff, getRoutingConnectionFactory, getRoutingLookupKey, getShutdownTimeout, getTaskExecutor, getTransactionAttribute, getTransactionManager, handleListenerException, initialize, initializeProxy, invokeErrorHandler, invokeListener, isActive, isAlwaysRequeueWithTxManagerRollback, isAutoDeclare, isAutoStartup, isChannelLocallyTransacted, isDeBatchingEnabled, isDefaultRequeueRejected, isExclusive, isExposeListenerChannel, isForceCloseChannel, isMismatchedQueuesFatal, isMissingQueuesFatal, isMissingQueuesFatalSet, isNoLocal, isPossibleAuthenticationFailureFatal, isPossibleAuthenticationFailureFatalSet, isRunning, isStatefulRetryFatalWithNullMessageId, lazyLoad, prepareHolderForRollback, publishConsumerFailedEvent, publishIdleContainerEvent, redeclareElementsIfNecessary, removeAfterReceivePostProcessor, setAcknowledgeMode, setAdviceChain, setAfterReceivePostProcessors, setAlwaysRequeueWithTxManagerRollback, setAmqpAdmin, setApplicationContext, setApplicationEventPublisher, setAutoDeclare, setAutoStartup, setBatchingStrategy, setBeanName, setConsumeDelay, setConsumerArguments, setConsumerTagStrategy, setDeBatchingEnabled, setDefaultRequeueRejected, setErrorHandler, setErrorHandlerLoggerName, setExclusiveConsumerExceptionLogger, setExposeListenerChannel, setFailedDeclarationRetryInterval, setForceCloseChannel, setIdleEventInterval, setjavaLangErrorHandler, setListenerId, setLookupKeyQualifier, setMessageListener, setMessagePropertiesConverter, setMicrometerEnabled, setMicrometerTags, setMismatchedQueuesFatal, setNoLocal, setPhase, setPossibleAuthenticationFailureFatal, setPrefetchCount, setQueues, setRecoveryBackOff, setRecoveryInterval, setShutdownTimeout, setStatefulRetryFatalWithNullMessageId, setTaskExecutor, setTransactionAttribute, setTransactionManager, setupMessageListener, shutdown, start, stop, stop, updateLastReceive, validateConfiguration, wrapToListenerExecutionFailedExceptionIfNeeded
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
isConsumerBatchEnabled
protected final List<org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.SimpleConsumer> consumers
public DirectMessageListenerContainer()
RabbitAccessor.setConnectionFactory(ConnectionFactory)
must
be called before starting.public DirectMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory
- the connection factory.public void setConsumersPerQueue(int consumersPerQueue)
consumersPerQueue
- the consumers per queue.public final void setExclusive(boolean exclusive)
consumers per queue
must be 1.setExclusive
in class AbstractMessageListenerContainer
exclusive
- true for an exclusive consumer.public void setTaskScheduler(TaskScheduler taskScheduler)
taskScheduler
- the scheduler.public void setMonitorInterval(long monitorInterval)
monitorInterval
- the interval; default 10000 but it will be adjusted down
to the smallest of this, idleEventInterval
/ 2
(if configured) or
failedDeclarationRetryInterval
.public void setQueueNames(String... queueName)
AbstractMessageListenerContainer
setQueueNames
in class AbstractMessageListenerContainer
queueName
- the desired queueName(s) (can not be null
)public final void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
Defaults to false for this container.
setMissingQueuesFatal
in class AbstractMessageListenerContainer
missingQueuesFatal
- the missingQueuesFatal to set.AbstractMessageListenerContainer.setAutoDeclare(boolean)
public void setMessagesPerAck(int messagesPerAck)
messagesPerAck
- the number of messages.setAckTimeout(long)
public void setAckTimeout(long ackTimeout)
messagesPerAck
is
greater than 1, and this time elapses since the last ack, the pending acks will be
sent either when the next message arrives, or a short time later if no additional
messages arrive. In that case, the actual time depends on the
monitorInterval
.ackTimeout
- the timeout in milliseconds (default 20000);setMessagesPerAck(int)
public void addQueueNames(String... queueNames)
AbstractMessageListenerContainer
addQueueNames
in class AbstractMessageListenerContainer
queueNames
- The queue(s) to add.public void addQueues(Queue... queues)
AbstractMessageListenerContainer
addQueues
in class AbstractMessageListenerContainer
queues
- The queue(s) to add.public boolean removeQueueNames(String... queueNames)
AbstractMessageListenerContainer
removeQueueNames
in class AbstractMessageListenerContainer
queueNames
- The queue(s) to remove.queueNames
List.public boolean removeQueues(Queue... queues)
AbstractMessageListenerContainer
removeQueues
in class AbstractMessageListenerContainer
queues
- The queue(s) to remove.queueNames
List.protected int findIdleConsumer()
protected void doInitialize()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
doInitialize
in class AbstractMessageListenerContainer
protected void doStart()
AbstractMessageListenerContainer
doStart
in class AbstractMessageListenerContainer
protected void doStop()
AbstractMessageListenerContainer
doStop
in class AbstractMessageListenerContainer
protected void actualStart()
protected void doRedeclareElementsIfNecessary()
protected void processMonitorTask()
protected void doShutdown()
AbstractMessageListenerContainer
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown
in class AbstractMessageListenerContainer
AbstractMessageListenerContainer.shutdown()
protected void consumerRemoved(org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.SimpleConsumer consumer)
consumer
- the consumer.