public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.context.SmartLifecycle
Modifier and Type | Class and Description |
---|---|
static class |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception that indicates that the initial setup of this container's shared Rabbit Connection failed.
|
Modifier and Type | Field and Description |
---|---|
static boolean |
DEFAULT_DEBATCHING_ENABLED |
logger
Constructor and Description |
---|
AbstractMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
void |
addQueueNames(java.lang.String... queueNames)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queues)
Add queue(s) to this container's list of queues.
|
void |
afterPropertiesSet()
Delegates to
validateConfiguration() and initialize() . |
protected boolean |
causeChainHasImmediateAcknowledgeAmqpException(java.lang.Throwable ex)
Traverse the cause chain and, if an
ImmediateAcknowledgeAmqpException
is found before an AmqpRejectAndDontRequeueException , return true. |
protected void |
checkMessageListener(java.lang.Object messageListener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.
|
void |
destroy()
Calls
shutdown() when the BeanFactory destroys the container instance. |
protected abstract void |
doInitialize()
Register any invokers within this container.
|
protected void |
doInvokeListener(ChannelAwareMessageListener listener,
com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially
with its own transaction) to the listener if demanded.
|
protected void |
doInvokeListener(MessageListener listener,
Message message)
Invoke the specified listener as Spring Rabbit MessageListener.
|
protected abstract void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Start this container, and notify all invoker tasks.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
protected void |
executeListener(com.rabbitmq.client.Channel channel,
Message messageIn)
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
|
AcknowledgeMode |
getAcknowledgeMode() |
protected org.springframework.context.ApplicationContext |
getApplicationContext() |
protected java.lang.String |
getBeanName() |
ConnectionFactory |
getConnectionFactory() |
java.lang.String |
getListenerId()
The 'id' attribute of the listener.
|
MessageConverter |
getMessageConverter() |
java.lang.Object |
getMessageListener() |
int |
getPhase() |
java.lang.String[] |
getQueueNames() |
protected java.util.Set<java.lang.String> |
getQueueNamesAsSet() |
protected java.lang.String[] |
getRequiredQueueNames()
Deprecated.
since 1.7.x; you can now start the container without queues.
|
protected RoutingConnectionFactory |
getRoutingConnectionFactory()
Return the (@link RoutingConnectionFactory} if the connection factory is a
RoutingConnectionFactory ; null otherwise. |
protected java.lang.String |
getRoutingLookupKey()
Return the lookup key if the connection factory is a
RoutingConnectionFactory ; null otherwise. |
protected void |
handleListenerException(java.lang.Throwable ex)
Handle the given exception that arose during listener execution.
|
void |
initialize()
Initialize this container.
|
protected void |
invokeErrorHandler(java.lang.Throwable ex)
Invoke the registered ErrorHandler, if any.
|
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
boolean |
isActive() |
boolean |
isAutoStartup() |
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
boolean |
isExposeListenerChannel() |
protected boolean |
isForceCloseChannel()
Force close the channel if the consumer threads don't respond to a shutdown.
|
boolean |
isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
|
boolean |
removeQueueNames(java.lang.String... queueNames)
Remove queue(s) from this container's list of queues.
|
boolean |
removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.
|
void |
setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement.
|
void |
setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
Set
MessagePostProcessor s that will be applied after message reception, before
invoking the MessageListener . |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.
|
void |
setBeanName(java.lang.String beanName) |
void |
setDeBatchingEnabled(boolean deBatchingEnabled)
Determine whether or not the container should de-batch batched
messages (true) or call the listener with the batch (false).
|
void |
setErrorHandler(org.springframework.util.ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
|
void |
setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registered
ChannelAwareMessageListener as well as
to RabbitTemplate calls. |
void |
setForceCloseChannel(boolean forceCloseChannel)
Set to true to force close the channel if the consumer threads don't respond to a
shutdown.
|
void |
setListenerId(java.lang.String listenerId) |
void |
setLookupKeyQualifier(java.lang.String lookupKeyQualifier)
Set a qualifier that will prefix the connection factory lookup key; default none.
|
void |
setMessageConverter(MessageConverter messageConverter)
Set the
MessageConverter strategy for converting AMQP Messages. |
void |
setMessageListener(java.lang.Object messageListener)
Set the message listener implementation to register.
|
void |
setPhase(int phase)
Specify the phase in which this container should be started and stopped.
|
void |
setQueueNames(java.lang.String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.
|
void |
setupMessageListener(java.lang.Object messageListener)
Setup the message listener to use.
|
void |
shutdown()
Stop the shared Connection, call
doShutdown() , and close this container. |
void |
start()
Start this container.
|
void |
stop()
Stop this container.
|
void |
stop(java.lang.Runnable callback) |
protected void |
validateConfiguration()
Validate the configuration of this container.
|
protected java.lang.Exception |
wrapToListenerExecutionFailedExceptionIfNeeded(java.lang.Exception e,
Message message) |
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public static final boolean DEFAULT_DEBATCHING_ENABLED
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to AcknowledgeMode.MANUAL
if the listener will send the acknowledgements itself using
Channel.basicAck(long, boolean)
. Manual acks are consistent with either a transactional or
non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a
single message then the transaction is probably unnecessary.
Set to AcknowledgeMode.NONE
to tell the broker not to expect any acknowledgements, and it will assume all
messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If
AcknowledgeMode.NONE
then the channel cannot be transactional (so the container will fail on start up if
that flag is accidentally set).
acknowledgeMode
- the acknowledge mode to set. Defaults to AcknowledgeMode.AUTO
AcknowledgeMode
public AcknowledgeMode getAcknowledgeMode()
public void setQueueNames(java.lang.String... queueName)
queueName
- the desired queueName(s) (can not be null
)public void setQueues(Queue... queues)
queues
- the desired queue(s) (can not be null
)public java.lang.String[] getQueueNames()
@Deprecated protected java.lang.String[] getRequiredQueueNames()
java.lang.IllegalStateException
- if no queues are defined.protected java.util.Set<java.lang.String> getQueueNamesAsSet()
public void addQueueNames(java.lang.String... queueNames)
queueNames
- The queue(s) to add.public void addQueues(Queue... queues)
queues
- The queue(s) to add.public boolean removeQueueNames(java.lang.String... queueNames)
queueNames
- The queue(s) to remove.queueNames
List.public boolean removeQueues(Queue... queues)
queues
- The queue(s) to remove.queueNames
List.public boolean isExposeListenerChannel()
Channel
to a registered ChannelAwareMessageListener
.public void setExposeListenerChannel(boolean exposeListenerChannel)
ChannelAwareMessageListener
as well as
to RabbitTemplate
calls.
Default is "true", reusing the listener's Channel
. Turn this off to expose a fresh Rabbit Channel fetched
from the same underlying Rabbit Connection
instead.
Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplate
calls. So in terms of RabbitTemplate exposure, this
setting only affects locally transacted Channels.
exposeListenerChannel
- true to expose the channel.ChannelAwareMessageListener
public void setMessageListener(java.lang.Object messageListener)
MessageListener
object
or a Spring ChannelAwareMessageListener
object.messageListener
- The listener.java.lang.IllegalArgumentException
- if the supplied listener is not a MessageListener
or a
ChannelAwareMessageListener
MessageListener
,
ChannelAwareMessageListener
protected void checkMessageListener(java.lang.Object messageListener)
By default, only a Spring MessageListener
object or a Spring
ChannelAwareMessageListener
object will be accepted.
messageListener
- the message listener object to checkjava.lang.IllegalArgumentException
- if the supplied listener is not a MessageListener or SessionAwareMessageListenerMessageListener
,
ChannelAwareMessageListener
public java.lang.Object getMessageListener()
public void setErrorHandler(org.springframework.util.ErrorHandler errorHandler)
errorHandler
- The error handler.public void setMessageConverter(MessageConverter messageConverter)
MessageConverter
strategy for converting AMQP Messages.messageConverter
- the message converter to usepublic MessageConverter getMessageConverter()
getMessageConverter
in interface MessageListenerContainer
MessageConverter
that can be used to
convert Message
, if any.public void setDeBatchingEnabled(boolean deBatchingEnabled)
deBatchingEnabled
- the deBatchingEnabled to set.public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
MessagePostProcessor
s that will be applied after message reception, before
invoking the MessageListener
. Often used to decompress data. Processors are invoked in order,
depending on PriorityOrder
, Order
and finally unordered.afterReceivePostProcessors
- the post processor.public void setAutoStartup(boolean autoStartup)
Default is "true"; set this to "false" to allow for manual startup through the start()
method.
autoStartup
- true for auto startup.public boolean isAutoStartup()
isAutoStartup
in interface org.springframework.context.SmartLifecycle
public void setPhase(int phase)
phase
- The phase.public int getPhase()
getPhase
in interface org.springframework.context.Phased
public void setBeanName(java.lang.String beanName)
setBeanName
in interface org.springframework.beans.factory.BeanNameAware
protected final java.lang.String getBeanName()
protected final org.springframework.context.ApplicationContext getApplicationContext()
public final void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
public ConnectionFactory getConnectionFactory()
getConnectionFactory
in class RabbitAccessor
Connections
.public void setLookupKeyQualifier(java.lang.String lookupKeyQualifier)
lookupKeyQualifier
- the qualifiergetRoutingLookupKey()
protected boolean isForceCloseChannel()
public void setForceCloseChannel(boolean forceCloseChannel)
forceCloseChannel
- true to force close.protected java.lang.String getRoutingLookupKey()
RoutingConnectionFactory
; null otherwise. The routing key is the
comma-delimited list of queue names with all spaces removed and bracketed by [...],
optionally prefixed by a qualifier, e.g. "foo[...]".setLookupKeyQualifier(String)
protected RoutingConnectionFactory getRoutingConnectionFactory()
RoutingConnectionFactory
; null otherwise.RoutingConnectionFactory
or null.public java.lang.String getListenerId()
public void setListenerId(java.lang.String listenerId)
public final void afterPropertiesSet()
validateConfiguration()
and initialize()
.afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
afterPropertiesSet
in class RabbitAccessor
public void setupMessageListener(java.lang.Object messageListener)
MessageListenerContainer
IllegalArgumentException
if that message listener type is not supported.setupMessageListener
in interface MessageListenerContainer
messageListener
- the object
to wrapped to the MessageListener
.protected void validateConfiguration()
The default implementation is empty. To be overridden in subclasses.
public void destroy()
shutdown()
when the BeanFactory destroys the container instance.destroy
in interface org.springframework.beans.factory.DisposableBean
shutdown()
public void initialize()
Creates a Rabbit Connection and calls doInitialize()
.
public void shutdown()
doShutdown()
, and close this container.protected abstract void doInitialize() throws java.lang.Exception
Subclasses need to implement this method for their specific invoker management process.
java.lang.Exception
- Any Exception.protected abstract void doShutdown()
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
shutdown()
public final boolean isActive()
public void start()
start
in interface org.springframework.context.Lifecycle
doStart()
protected void doStart() throws java.lang.Exception
java.lang.Exception
- if thrown by Rabbit API methodspublic void stop()
public void stop(java.lang.Runnable callback)
stop
in interface org.springframework.context.SmartLifecycle
protected void doStop()
public final boolean isRunning()
protected void invokeErrorHandler(java.lang.Throwable ex)
ConditionalRejectingErrorHandler
with
the default FatalExceptionStrategy
implementation.ex
- the uncaught error that arose during Rabbit processing.setErrorHandler(org.springframework.util.ErrorHandler)
protected void executeListener(com.rabbitmq.client.Channel channel, Message messageIn) throws java.lang.Throwable
channel
- the Rabbit Channel to operate onmessageIn
- the received Rabbit Messagejava.lang.Throwable
- Any Throwable.invokeListener(com.rabbitmq.client.Channel, org.springframework.amqp.core.Message)
,
handleListenerException(java.lang.Throwable)
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message) throws java.lang.Exception
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit Messagejava.lang.Exception
- if thrown by Rabbit API methodssetMessageListener(java.lang.Object)
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Message message) throws java.lang.Exception
listener
- the Spring ChannelAwareMessageListener to invokechannel
- the Rabbit Channel to operate onmessage
- the received Rabbit Messagejava.lang.Exception
- if thrown by Rabbit API methods or listener itself.
Exception thrown from listener will be wrapped to ListenerExecutionFailedException
.
ChannelAwareMessageListener
,
setExposeListenerChannel(boolean)
protected void doInvokeListener(MessageListener listener, Message message) throws java.lang.Exception
Default implementation performs a plain invocation of the onMessage
method.
Exception thrown from listener will be wrapped to ListenerExecutionFailedException
.
listener
- the Rabbit MessageListener to invokemessage
- the received Rabbit Messagejava.lang.Exception
- Any Exception.MessageListener.onMessage(org.springframework.amqp.core.Message)
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
channel
- the Channel to checkRabbitAccessor.isChannelTransacted()
protected void handleListenerException(java.lang.Throwable ex)
The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
ex
- the exception to handleprotected java.lang.Exception wrapToListenerExecutionFailedExceptionIfNeeded(java.lang.Exception e, Message message)
e
- The Exception.message
- The failed message.ListenerExecutionFailedException
- return 'e' as it is, otherwise wrap it to
ListenerExecutionFailedException
and return.protected boolean causeChainHasImmediateAcknowledgeAmqpException(java.lang.Throwable ex)
ImmediateAcknowledgeAmqpException
is found before an AmqpRejectAndDontRequeueException
, return true.
An Error
will take precedence.ex
- the exception