public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, SmartLifecycle
Modifier and Type | Class and Description |
---|---|
static class |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception that indicates that the initial setup of this container's shared Rabbit Connection failed.
|
Modifier and Type | Field and Description |
---|---|
static boolean |
DEFAULT_DEBATCHING_ENABLED |
logger
Constructor and Description |
---|
AbstractMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
void |
addQueueNames(String... queueNames)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queues)
Add queue(s) to this container's list of queues.
|
void |
afterPropertiesSet()
Delegates to
validateConfiguration() and initialize() . |
protected void |
checkMessageListener(Object messageListener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.
|
void |
destroy()
Calls
shutdown() when the BeanFactory destroys the container instance. |
protected abstract void |
doInitialize()
Register any invokers within this container.
|
protected void |
doInvokeListener(ChannelAwareMessageListener listener,
com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially
with its own transaction) to the listener if demanded.
|
protected void |
doInvokeListener(MessageListener listener,
Message message)
Invoke the specified listener as Spring Rabbit MessageListener.
|
protected abstract void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Start this container, and notify all invoker tasks.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
protected void |
executeListener(com.rabbitmq.client.Channel channel,
Message messageIn)
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
|
AcknowledgeMode |
getAcknowledgeMode() |
protected ApplicationContext |
getApplicationContext() |
protected String |
getBeanName() |
ConnectionFactory |
getConnectionFactory() |
MessageConverter |
getMessageConverter() |
Object |
getMessageListener() |
int |
getPhase() |
String[] |
getQueueNames() |
protected Set<String> |
getQueueNamesAsSet() |
protected String[] |
getRequiredQueueNames() |
protected void |
handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.
|
void |
initialize()
Initialize this container.
|
protected void |
invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any.
|
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
boolean |
isActive() |
boolean |
isAutoStartup() |
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
boolean |
isExposeListenerChannel() |
boolean |
isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
|
boolean |
removeQueueNames(String... queueNames)
Remove queue(s) from this container's list of queues.
|
boolean |
removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.
|
void |
setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement.
|
void |
setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
Set
MessagePostProcessor s that will be applied after message reception, before
invoking the MessageListener . |
void |
setApplicationContext(ApplicationContext applicationContext) |
void |
setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.
|
void |
setBeanName(String beanName) |
void |
setDeBatchingEnabled(boolean deBatchingEnabled)
Determine whether or not the container should de-batch batched
messages (true) or call the listener with the batch (false).
|
void |
setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
|
void |
setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registered
ChannelAwareMessageListener as well as
to RabbitTemplate calls. |
void |
setMessageConverter(MessageConverter messageConverter)
Set the
MessageConverter strategy for converting AMQP Messages. |
void |
setMessageListener(Object messageListener)
Set the message listener implementation to register.
|
void |
setPhase(int phase)
Specify the phase in which this container should be started and stopped.
|
void |
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.
|
void |
setupMessageListener(Object messageListener)
Setup the message listener to use.
|
void |
shutdown()
Stop the shared Connection, call
doShutdown() , and close this container. |
void |
start()
Start this container.
|
void |
stop()
Stop this container.
|
void |
stop(Runnable callback) |
protected void |
validateConfiguration()
Validate the configuration of this container.
|
protected Exception |
wrapToListenerExecutionFailedExceptionIfNeeded(Exception e,
Message message) |
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public static final boolean DEFAULT_DEBATCHING_ENABLED
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to AcknowledgeMode.MANUAL
if the listener will send the acknowledgements itself using
Channel.basicAck(long, boolean)
. Manual acks are consistent with either a transactional or
non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a
single message then the transaction is probably unnecessary.
Set to AcknowledgeMode.NONE
to tell the broker not to expect any acknowledgements, and it will assume all
messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If
AcknowledgeMode.NONE
then the channel cannot be transactional (so the container will fail on start up if
that flag is accidentally set).
acknowledgeMode
- the acknowledge mode to set. Defaults to AcknowledgeMode.AUTO
AcknowledgeMode
public AcknowledgeMode getAcknowledgeMode()
public void setQueueNames(String... queueName)
queueName
- the desired queueName(s) (can not be null
)public void setQueues(Queue... queues)
queues
- the desired queue(s) (can not be null
)public String[] getQueueNames()
protected String[] getRequiredQueueNames()
public void addQueueNames(String... queueNames)
queueNames
- The queue(s) to add.public void addQueues(Queue... queues)
queues
- The queue(s) to add.public boolean removeQueueNames(String... queueNames)
queueNames
- The queue(s) to remove.queueNames
List.public boolean removeQueues(Queue... queues)
queues
- The queue(s) to remove.queueNames
List.public boolean isExposeListenerChannel()
Channel
to a registered ChannelAwareMessageListener
.public void setExposeListenerChannel(boolean exposeListenerChannel)
ChannelAwareMessageListener
as well as
to RabbitTemplate
calls.
Default is "true", reusing the listener's Channel
. Turn this off to expose a fresh Rabbit Channel fetched
from the same underlying Rabbit Connection
instead.
Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplate
calls. So in terms of RabbitTemplate exposure, this
setting only affects locally transacted Channels.
exposeListenerChannel
- true to expose the channel.ChannelAwareMessageListener
public void setMessageListener(Object messageListener)
MessageListener
object
or a Spring ChannelAwareMessageListener
object.messageListener
- The listener.IllegalArgumentException
- if the supplied listener is not a MessageListener
or a
ChannelAwareMessageListener
MessageListener
,
ChannelAwareMessageListener
protected void checkMessageListener(Object messageListener)
By default, only a Spring MessageListener
object or a Spring
ChannelAwareMessageListener
object will be accepted.
messageListener
- the message listener object to checkIllegalArgumentException
- if the supplied listener is not a MessageListener or SessionAwareMessageListenerMessageListener
,
ChannelAwareMessageListener
public Object getMessageListener()
public void setErrorHandler(ErrorHandler errorHandler)
errorHandler
- The error handler.public void setMessageConverter(MessageConverter messageConverter)
MessageConverter
strategy for converting AMQP Messages.messageConverter
- the message converter to usepublic MessageConverter getMessageConverter()
getMessageConverter
in interface MessageListenerContainer
MessageConverter
that can be used to
convert Message
, if any.public void setDeBatchingEnabled(boolean deBatchingEnabled)
deBatchingEnabled
- the deBatchingEnabled to set.public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
MessagePostProcessor
s that will be applied after message reception, before
invoking the MessageListener
. Often used to decompress data. Processors are invoked in order,
depending on PriorityOrder
, Order
and finally unordered.afterReceivePostProcessors
- the post processor.public void setAutoStartup(boolean autoStartup)
Default is "true"; set this to "false" to allow for manual startup through the start()
method.
autoStartup
- true for auto startup.public boolean isAutoStartup()
isAutoStartup
in interface SmartLifecycle
public void setPhase(int phase)
phase
- The phase.public int getPhase()
public void setBeanName(String beanName)
setBeanName
in interface BeanNameAware
protected final String getBeanName()
protected final ApplicationContext getApplicationContext()
public final void setApplicationContext(ApplicationContext applicationContext)
setApplicationContext
in interface ApplicationContextAware
public ConnectionFactory getConnectionFactory()
getConnectionFactory
in class RabbitAccessor
Connections
.public final void afterPropertiesSet()
validateConfiguration()
and initialize()
.afterPropertiesSet
in interface InitializingBean
afterPropertiesSet
in class RabbitAccessor
public void setupMessageListener(Object messageListener)
MessageListenerContainer
IllegalArgumentException
if that message listener type is not supported.setupMessageListener
in interface MessageListenerContainer
messageListener
- the object
to wrapped to the MessageListener
.protected void validateConfiguration()
The default implementation is empty. To be overridden in subclasses.
public void destroy()
shutdown()
when the BeanFactory destroys the container instance.destroy
in interface DisposableBean
shutdown()
public void initialize()
Creates a Rabbit Connection and calls doInitialize()
.
public void shutdown()
doShutdown()
, and close this container.protected abstract void doInitialize() throws Exception
Subclasses need to implement this method for their specific invoker management process.
Exception
- Any Exception.protected abstract void doShutdown()
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
shutdown()
public final boolean isActive()
public void start()
protected void doStart() throws Exception
Exception
- if thrown by Rabbit API methodspublic void stop()
public void stop(Runnable callback)
stop
in interface SmartLifecycle
protected void doStop()
public final boolean isRunning()
protected void invokeErrorHandler(Throwable ex)
ConditionalRejectingErrorHandler
with
the default FatalExceptionStrategy
implementation.ex
- the uncaught error that arose during Rabbit processing.setErrorHandler(org.springframework.util.ErrorHandler)
protected void executeListener(com.rabbitmq.client.Channel channel, Message messageIn) throws Throwable
channel
- the Rabbit Channel to operate onmessageIn
- the received Rabbit MessageThrowable
- Any Throwable.invokeListener(com.rabbitmq.client.Channel, org.springframework.amqp.core.Message)
,
handleListenerException(java.lang.Throwable)
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message) throws Exception
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageException
- if thrown by Rabbit API methodssetMessageListener(java.lang.Object)
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Message message) throws Exception
listener
- the Spring ChannelAwareMessageListener to invokechannel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageException
- if thrown by Rabbit API methods or listener itself.
Exception thrown from listener will be wrapped to ListenerExecutionFailedException
.
ChannelAwareMessageListener
,
setExposeListenerChannel(boolean)
protected void doInvokeListener(MessageListener listener, Message message) throws Exception
Default implementation performs a plain invocation of the onMessage
method.
Exception thrown from listener will be wrapped to ListenerExecutionFailedException
.
listener
- the Rabbit MessageListener to invokemessage
- the received Rabbit MessageException
- Any Exception.MessageListener.onMessage(org.springframework.amqp.core.Message)
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
channel
- the Channel to checkRabbitAccessor.isChannelTransacted()
protected void handleListenerException(Throwable ex)
The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
ex
- the exception to handleprotected Exception wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Message message)
e
- The Exception.message
- The failed message.ListenerExecutionFailedException
- return 'e' as it is, otherwise wrap it to
ListenerExecutionFailedException
and return.