public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
MessageConsumer.setMessageListener()
method to
create concurrent MessageConsumers for the specified listeners.
This is the simplest form of a message listener container. It creates a fixed number of JMS Sessions to invoke the listener, not allowing for dynamic adaptation to runtime demands. Its main advantage is its low level of complexity and the minimum requirements on the JMS provider: Not even the ServerSessionPool facility is required.
See the AbstractMessageListenerContainer
javadoc for details
on acknowledge modes and transaction options. Note that this container
exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:
that is, automatic message acknowledgment after listener execution,
with no redelivery in case of a user exception thrown but potential
redelivery in case of the JVM dying during listener execution.
For a different style of MessageListener handling, through looped
MessageConsumer.receive()
calls that also allow for
transactional reception of messages (registering them with XA transactions),
see DefaultMessageListenerContainer
.
javax.jms.MessageConsumer#setMessageListener
,
DefaultMessageListenerContainer
,
JmsMessageEndpointManager
AbstractJmsListeningContainer.SharedConnectionNotInitializedException
Modifier and Type | Field and Description |
---|---|
private int |
concurrentConsumers |
private boolean |
connectLazily |
private java.util.Set<MessageConsumer> |
consumers |
private java.lang.Object |
consumersMonitor |
private java.util.Set<Session> |
sessions |
private java.util.concurrent.Executor |
taskExecutor |
lifecycleMonitor, sharedConnectionMonitor
RECEIVE_TIMEOUT_INDEFINITE_WAIT, RECEIVE_TIMEOUT_NO_WAIT
logger
Constructor and Description |
---|
SimpleMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
protected MessageConsumer |
createListenerConsumer(Session session)
Create a MessageConsumer for the given JMS Session,
registering a MessageListener for the specified listener.
|
protected void |
doInitialize()
Creates the specified number of concurrent consumers,
in the form of a JMS Session plus associated MessageConsumer.
|
protected void |
doShutdown()
Destroy the registered JMS Sessions and associated MessageConsumers.
|
protected void |
doStart()
Re-initializes this container's JMS message consumers,
if not initialized already.
|
protected void |
initializeConsumers()
Initialize the JMS Sessions and MessageConsumers for this container.
|
void |
onException(JMSException ex)
JMS ExceptionListener implementation, invoked by the JMS provider in
case of connection failures.
|
protected void |
prepareSharedConnection(Connection connection)
Registers this listener container as JMS ExceptionListener on the shared connection.
|
protected void |
processMessage(Message message,
Session session)
Process a message received from the provider.
|
void |
setConcurrency(java.lang.String concurrency)
Specify concurrency limits via a "lower-upper" String, e.g.
|
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setConnectLazily(boolean connectLazily)
Specify whether to connect lazily, i.e.
|
void |
setTaskExecutor(java.util.concurrent.Executor taskExecutor)
Set the Spring TaskExecutor to use for executing the listener once
a message has been received by the provider.
|
protected boolean |
sharedConnectionEnabled()
Always use a shared JMS Connection.
|
protected void |
validateConfiguration()
Validate the configuration of this container.
|
checkMessageListener, commitIfNecessary, createConsumer, doExecuteListener, doInvokeListener, doInvokeListener, executeListener, getDefaultSubscriptionName, getDestination, getDestinationDescription, getDestinationName, getDurableSubscriptionName, getErrorHandler, getExceptionListener, getMessageConverter, getMessageListener, getMessageSelector, getSubscriptionName, handleListenerException, invokeErrorHandler, invokeExceptionListener, invokeListener, isAcceptMessagesWhileStopping, isExposeListenerSession, isPubSubNoLocal, isReplyPubSubDomain, isSessionLocallyTransacted, isSubscriptionDurable, isSubscriptionShared, rollbackIfNecessary, rollbackOnExceptionIfNecessary, setAcceptMessagesWhileStopping, setDestination, setDestinationName, setDurableSubscriptionName, setErrorHandler, setExceptionListener, setExposeListenerSession, setMessageConverter, setMessageListener, setMessageSelector, setPubSubNoLocal, setReplyPubSubDomain, setSubscriptionDurable, setSubscriptionName, setSubscriptionShared, setupMessageListener
afterPropertiesSet, createSharedConnection, destroy, doRescheduleTask, doStop, establishSharedConnection, getBeanName, getClientId, getPausedTaskCount, getPhase, getSharedConnection, initialize, isActive, isAutoStartup, isRunning, logRejectedTask, refreshSharedConnection, rescheduleTaskIfNecessary, resumePausedTasks, runningAllowed, setAutoStartup, setBeanName, setClientId, setPhase, shutdown, start, startSharedConnection, stop, stop, stopSharedConnection
getDestinationResolver, isPubSubDomain, receiveFromConsumer, resolveDestinationName, setDestinationResolver, setPubSubDomain
convertJmsAccessException, createConnection, createSession, getConnectionFactory, getSessionAcknowledgeMode, isClientAcknowledge, isSessionTransacted, setConnectionFactory, setSessionAcknowledgeMode, setSessionAcknowledgeModeName, setSessionTransacted
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getDestinationResolver, isPubSubDomain
isAutoStartup, stop
private boolean connectLazily
private int concurrentConsumers
private java.util.concurrent.Executor taskExecutor
private java.util.Set<Session> sessions
private java.util.Set<MessageConsumer> consumers
private final java.lang.Object consumersMonitor
public void setConnectLazily(boolean connectLazily)
Default is "false": connecting early, i.e. during the bean initialization phase. Set this flag to "true" in order to switch to lazy connecting if your target broker is likely to not have started up yet and you prefer to not even try a connection.
public void setConcurrency(java.lang.String concurrency)
This listener container will always hold on to the maximum number of
consumers setConcurrentConsumers(int)
since it is unable to scale.
This property is primarily supported for configuration compatibility with
DefaultMessageListenerContainer
. For this local listener container,
generally use setConcurrentConsumers(int)
instead.
setConcurrency
in class AbstractMessageListenerContainer
public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommendable in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.
Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable.
public void setTaskExecutor(java.util.concurrent.Executor taskExecutor)
Default is none, that is, to run in the JMS provider's own receive thread, blocking the provider's receive endpoint while executing the listener.
Specify a TaskExecutor for executing the listener in a different thread, rather than blocking the JMS provider, usually integrating with an existing thread pool. This allows to keep the number of concurrent consumers low (1) while still processing messages concurrently (decoupled from receiving!).
NOTE: Specifying a TaskExecutor for listener execution affects acknowledgement semantics. Messages will then always get acknowledged before listener execution, with the underlying Session immediately reused for receiving the next message. Using this in combination with a transacted session or with client acknowledgement will lead to unspecified results!
NOTE: Concurrent listener execution via a TaskExecutor will lead
to concurrent processing of messages that have been received by the same
underlying Session. As a consequence, it is not recommended to use
this setting with a SessionAwareMessageListener
, at least not
if the latter performs actual work on the given Session. A standard
javax.jms.MessageListener
will work fine, in general.
protected void validateConfiguration()
AbstractJmsListeningContainer
The default implementation is empty. To be overridden in subclasses.
validateConfiguration
in class AbstractMessageListenerContainer
protected final boolean sharedConnectionEnabled()
sharedConnectionEnabled
in class AbstractJmsListeningContainer
AbstractJmsListeningContainer.getSharedConnection()
protected void doInitialize() throws JMSException
doInitialize
in class AbstractJmsListeningContainer
JMSException
- if registration failedcreateListenerConsumer(Session)
protected void doStart() throws JMSException
doStart
in class AbstractJmsListeningContainer
JMSException
- if thrown by JMS API methodsAbstractJmsListeningContainer.startSharedConnection()
protected void prepareSharedConnection(Connection connection) throws JMSException
prepareSharedConnection
in class AbstractJmsListeningContainer
connection
- the Connection to prepareJMSException
- if the preparation efforts failedAbstractJmsListeningContainer.getClientId()
public void onException(JMSException ex)
ex
- the reported connection exceptionprotected void initializeConsumers() throws JMSException
JMSException
- in case of setup failureprotected MessageConsumer createListenerConsumer(Session session) throws JMSException
session
- the JMS Session to work onJMSException
- if thrown by JMS methodsAbstractMessageListenerContainer.executeListener(Session, Message)
protected void processMessage(Message message, Session session)
Executes the listener, exposing the current JMS Session as thread-bound resource (if "exposeListenerSession" is "true").
message
- the received JMS Messagesession
- the JMS Session to operate onAbstractMessageListenerContainer.executeListener(Session, Message)
,
AbstractMessageListenerContainer.setExposeListenerSession(boolean)
protected void doShutdown() throws JMSException
doShutdown
in class AbstractJmsListeningContainer
JMSException
- if shutdown failedAbstractJmsListeningContainer.shutdown()