public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements BeanNameAware, DisposableBean, SmartLifecycle
Modifier and Type | Class and Description |
---|---|
static class |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception that indicates that the initial setup of this container's shared Rabbit Connection failed.
|
logger
Constructor and Description |
---|
AbstractMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet()
Delegates to
validateConfiguration() and initialize() . |
protected void |
checkMessageListener(Object messageListener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.
|
void |
destroy()
Calls
shutdown() when the BeanFactory destroys the container instance. |
protected abstract void |
doInitialize()
Register any invokers within this container.
|
protected void |
doInvokeListener(ChannelAwareMessageListener listener,
com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially
with its own transaction) to the listener if demanded.
|
protected void |
doInvokeListener(MessageListener listener,
Message message)
Invoke the specified listener as Spring Rabbit MessageListener.
|
protected abstract void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Start this container, and notify all invoker tasks.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
protected void |
executeListener(com.rabbitmq.client.Channel channel,
Message message)
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
|
AcknowledgeMode |
getAcknowledgeMode() |
protected String |
getBeanName()
Return the bean name that this listener container has been assigned in its containing bean factory, if any.
|
Object |
getMessageListener()
Return the message listener object to register.
|
int |
getPhase()
Return the phase in which this container will be started and stopped.
|
String[] |
getQueueNames()
Return the name of the queue to receive messages from.
|
protected String[] |
getRequiredQueueNames() |
protected void |
handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.
|
void |
initialize()
Initialize this container.
|
protected void |
invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any.
|
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
boolean |
isActive()
Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
|
boolean |
isAutoStartup() |
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
boolean |
isExposeListenerChannel()
Return whether to expose the listener
Channel to a registered ChannelAwareMessageListener . |
boolean |
isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
|
void |
setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement.
|
void |
setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.
|
void |
setBeanName(String beanName) |
void |
setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
|
void |
setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registered
ChannelAwareMessageListener as well as
to RabbitTemplate calls. |
void |
setMessageListener(Object messageListener)
Set the message listener implementation to register.
|
void |
setPhase(int phase)
Specify the phase in which this container should be started and stopped.
|
void |
setQueueNames(String... queueName)
Set the name of the queue to receive messages from.
|
void |
setQueues(Queue... queues) |
void |
shutdown()
Stop the shared Connection, call
doShutdown() , and close this container. |
void |
start()
Start this container.
|
void |
stop()
Stop this container.
|
void |
stop(Runnable callback) |
protected void |
validateConfiguration()
Validate the configuration of this container.
|
protected Exception |
wrapToListenerExecutionFailedExceptionIfNeeded(Exception e) |
convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to AcknowledgeMode.MANUAL
if the listener will send the acknowledgements itself using
Channel.basicAck(long, boolean)
. Manual acks are consistent with either a transactional or
non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a
single message then the transaction is probably unnecessary.
Set to AcknowledgeMode.NONE
to tell the broker not to expect any acknowledgements, and it will assume all
messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If
AcknowledgeMode.NONE
then the channel cannot be transactional (so the container will fail on start up if
that flag is accidentally set).
acknowledgeMode
- the acknowledge mode to set. Defaults to AcknowledgeMode.AUTO
AcknowledgeMode
public AcknowledgeMode getAcknowledgeMode()
public void setQueueNames(String... queueName)
queueName
- the desired queue (can not be null
)public void setQueues(Queue... queues)
public String[] getQueueNames()
protected String[] getRequiredQueueNames()
public boolean isExposeListenerChannel()
Channel
to a registered ChannelAwareMessageListener
.public void setExposeListenerChannel(boolean exposeListenerChannel)
ChannelAwareMessageListener
as well as
to RabbitTemplate
calls.
Default is "true", reusing the listener's Channel
. Turn this off to expose a fresh Rabbit Channel fetched
from the same underlying Rabbit Connection
instead.
Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplate
calls. So in terms of RabbitTemplate exposure, this
setting only affects locally transacted Channels.
ChannelAwareMessageListener
public void setMessageListener(Object messageListener)
MessageListener
object
or a Spring ChannelAwareMessageListener
object.IllegalArgumentException
- if the supplied listener is not a MessageListener
or a
ChannelAwareMessageListener
MessageListener
,
ChannelAwareMessageListener
protected void checkMessageListener(Object messageListener)
By default, only a Spring MessageListener
object or a Spring
ChannelAwareMessageListener
object will be accepted.
messageListener
- the message listener object to checkIllegalArgumentException
- if the supplied listener is not a MessageListener or SessionAwareMessageListenerMessageListener
,
ChannelAwareMessageListener
public Object getMessageListener()
public void setErrorHandler(ErrorHandler errorHandler)
public void setAutoStartup(boolean autoStartup)
Default is "true"; set this to "false" to allow for manual startup through the start()
method.
public boolean isAutoStartup()
isAutoStartup
in interface SmartLifecycle
public void setPhase(int phase)
public int getPhase()
public void setBeanName(String beanName)
setBeanName
in interface BeanNameAware
protected final String getBeanName()
public final void afterPropertiesSet()
validateConfiguration()
and initialize()
.afterPropertiesSet
in interface InitializingBean
afterPropertiesSet
in class RabbitAccessor
protected void validateConfiguration()
The default implementation is empty. To be overridden in subclasses.
public void destroy()
shutdown()
when the BeanFactory destroys the container instance.destroy
in interface DisposableBean
shutdown()
public void initialize()
Creates a Rabbit Connection and calls doInitialize()
.
public void shutdown()
doShutdown()
, and close this container.protected abstract void doInitialize() throws Exception
Subclasses need to implement this method for their specific invoker management process.
Exception
protected abstract void doShutdown()
Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
shutdown()
public final boolean isActive()
public void start()
protected void doStart() throws Exception
Exception
- if thrown by Rabbit API methodspublic void stop()
public void stop(Runnable callback)
stop
in interface SmartLifecycle
protected void doStop()
public final boolean isRunning()
protected void invokeErrorHandler(Throwable ex)
ex
- the uncaught error that arose during Rabbit processing.setErrorHandler(org.springframework.util.ErrorHandler)
protected void executeListener(com.rabbitmq.client.Channel channel, Message message) throws Throwable
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageThrowable
invokeListener(com.rabbitmq.client.Channel, org.springframework.amqp.core.Message)
,
handleListenerException(java.lang.Throwable)
protected void invokeListener(com.rabbitmq.client.Channel channel, Message message) throws Exception
channel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageException
Exception
- if thrown by Rabbit API methodssetMessageListener(java.lang.Object)
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Message message) throws Exception
listener
- the Spring ChannelAwareMessageListener to invokechannel
- the Rabbit Channel to operate onmessage
- the received Rabbit MessageException
- if thrown by Rabbit API methods or listener itself.
Exception thrown from listener will be wrapped to ListenerExecutionFailedException
.ChannelAwareMessageListener
,
setExposeListenerChannel(boolean)
protected void doInvokeListener(MessageListener listener, Message message) throws Exception
Default implementation performs a plain invocation of the onMessage
method.
ListenerExecutionFailedException
.listener
- the Rabbit MessageListener to invokemessage
- the received Rabbit MessageException
MessageListener.onMessage(org.springframework.amqp.core.Message)
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
channel
- the Channel to checkRabbitAccessor.isChannelTransacted()
protected void handleListenerException(Throwable ex)
The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
ex
- the exception to handleprotected Exception wrapToListenerExecutionFailedExceptionIfNeeded(Exception e)
e
- ListenerExecutionFailedException
- return 'e' as it is, otherwise wrap it to
ListenerExecutionFailedException
and return.