public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer
javax.jms.MessageConsumer
,
optionally participating in externally managed transactions.
This listener container variant is built for repeated polling attempts,
each invoking the receiveAndExecute(java.lang.Object, Session, MessageConsumer)
method. The MessageConsumer used
may be reobtained fo reach attempt or cached in between attempts; this is up
to the concrete implementation. The receive timeout for each attempt can be
configured through the "receiveTimeout"
property.
The underlying mechanism is based on standard JMS MessageConsumer handling,
which is perfectly compatible with both native JMS and JMS in a Java EE environment.
Neither the JMS MessageConsumer.setMessageListener
facility nor the JMS
ServerSessionPool facility is required. A further advantage of this approach is
full control over the listening process, allowing for custom scaling and throttling
and of concurrent message processing (which is up to concrete subclasses).
Message reception and listener execution can automatically be wrapped
in transactions through passing a Spring
PlatformTransactionManager
into the
"transactionManager"
property. This will usually
be a JtaTransactionManager
in a
Java EE enviroment, in combination with a JTA-aware JMS ConnectionFactory
obtained from JNDI (check your application server's documentation).
This base class does not assume any specific mechanism for asynchronous
execution of polling invokers. Check out DefaultMessageListenerContainer
for a concrete implementation which is based on Spring's
TaskExecutor
abstraction,
including dynamic scaling of concurrent consumers and automatic self recovery.
createListenerConsumer(Session)
,
receiveAndExecute(java.lang.Object, Session, MessageConsumer)
,
setTransactionManager(org.springframework.transaction.PlatformTransactionManager)
Modifier and Type | Class and Description |
---|---|
private class |
AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory
ResourceFactory implementation that delegates to this listener container's protected callback methods.
|
AbstractJmsListeningContainer.SharedConnectionNotInitializedException
Modifier and Type | Field and Description |
---|---|
static long |
DEFAULT_RECEIVE_TIMEOUT
The default receive timeout: 1000 ms = 1 second.
|
private long |
receiveTimeout |
private boolean |
sessionTransactedCalled |
private AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory |
transactionalResourceFactory |
private DefaultTransactionDefinition |
transactionDefinition |
private PlatformTransactionManager |
transactionManager |
lifecycleMonitor, sharedConnectionMonitor
RECEIVE_TIMEOUT_INDEFINITE_WAIT, RECEIVE_TIMEOUT_NO_WAIT
logger
Constructor and Description |
---|
AbstractPollingMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
protected MessageConsumer |
createListenerConsumer(Session session)
Create a MessageConsumer for the given JMS Session,
registering a MessageListener for the specified listener.
|
protected boolean |
doReceiveAndExecute(java.lang.Object invoker,
Session session,
MessageConsumer consumer,
TransactionStatus status)
Actually execute the listener for a message received from the given consumer,
fetching all requires resources and invoking the listener.
|
protected Connection |
getConnection(JmsResourceHolder holder)
Fetch an appropriate Connection from the given JmsResourceHolder.
|
protected long |
getReceiveTimeout()
Return the receive timeout (ms) configured for this listener container.
|
protected Session |
getSession(JmsResourceHolder holder)
Fetch an appropriate Session from the given JmsResourceHolder.
|
protected PlatformTransactionManager |
getTransactionManager()
Return the Spring PlatformTransactionManager to use for transactional
wrapping of message reception plus listener execution.
|
void |
initialize()
Initialize this container.
|
protected boolean |
isSessionLocallyTransacted(Session session)
This implementation checks whether the Session is externally synchronized.
|
protected void |
messageReceived(java.lang.Object invoker,
Session session)
Template method that gets called right when a new message has been received,
before attempting to process it.
|
protected void |
noMessageReceived(java.lang.Object invoker,
Session session)
Template method that gets called when no message has been received,
before returning to the receive loop again.
|
protected boolean |
receiveAndExecute(java.lang.Object invoker,
Session session,
MessageConsumer consumer)
Execute the listener for a message received from the given consumer,
wrapping the entire operation in an external transaction if demanded.
|
protected Message |
receiveMessage(MessageConsumer consumer)
Receive a message from the given consumer.
|
private void |
rollbackOnException(TransactionStatus status,
java.lang.Throwable ex)
Perform a rollback, handling rollback exceptions properly.
|
void |
setReceiveTimeout(long receiveTimeout)
Set the timeout to use for receive calls, in milliseconds.
|
void |
setSessionTransacted(boolean sessionTransacted)
Set the transaction mode that is used when creating a JMS
Session . |
void |
setTransactionManager(PlatformTransactionManager transactionManager)
Specify the Spring
PlatformTransactionManager
to use for transactional wrapping of message reception plus listener execution. |
void |
setTransactionName(java.lang.String transactionName)
Specify the transaction name to use for transactional wrapping.
|
void |
setTransactionTimeout(int transactionTimeout)
Specify the transaction timeout to use for transactional wrapping, in seconds.
|
protected boolean |
shouldCommitAfterNoMessageReceived(Session session)
Determine whether to trigger a commit after no message has been received.
|
checkMessageListener, commitIfNecessary, createConsumer, doExecuteListener, doInvokeListener, doInvokeListener, executeListener, getDefaultSubscriptionName, getDestination, getDestinationDescription, getDestinationName, getDurableSubscriptionName, getErrorHandler, getExceptionListener, getMessageConverter, getMessageListener, getMessageSelector, getSubscriptionName, handleListenerException, invokeErrorHandler, invokeExceptionListener, invokeListener, isAcceptMessagesWhileStopping, isExposeListenerSession, isPubSubNoLocal, isReplyPubSubDomain, isSubscriptionDurable, isSubscriptionShared, rollbackIfNecessary, rollbackOnExceptionIfNecessary, setAcceptMessagesWhileStopping, setConcurrency, setDestination, setDestinationName, setDurableSubscriptionName, setErrorHandler, setExceptionListener, setExposeListenerSession, setMessageConverter, setMessageListener, setMessageSelector, setPubSubNoLocal, setReplyPubSubDomain, setSubscriptionDurable, setSubscriptionName, setSubscriptionShared, setupMessageListener, validateConfiguration
afterPropertiesSet, createSharedConnection, destroy, doInitialize, doRescheduleTask, doShutdown, doStart, doStop, establishSharedConnection, getBeanName, getClientId, getPausedTaskCount, getPhase, getSharedConnection, isActive, isAutoStartup, isRunning, logRejectedTask, prepareSharedConnection, refreshSharedConnection, rescheduleTaskIfNecessary, resumePausedTasks, runningAllowed, setAutoStartup, setBeanName, setClientId, setPhase, sharedConnectionEnabled, shutdown, start, startSharedConnection, stop, stop, stopSharedConnection
getDestinationResolver, isPubSubDomain, receiveFromConsumer, resolveDestinationName, setDestinationResolver, setPubSubDomain
convertJmsAccessException, createConnection, createSession, getConnectionFactory, getSessionAcknowledgeMode, isClientAcknowledge, isSessionTransacted, setConnectionFactory, setSessionAcknowledgeMode, setSessionAcknowledgeModeName
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getDestinationResolver, isPubSubDomain
isAutoStartup, stop
public static final long DEFAULT_RECEIVE_TIMEOUT
private final AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory transactionalResourceFactory
private boolean sessionTransactedCalled
private PlatformTransactionManager transactionManager
private DefaultTransactionDefinition transactionDefinition
private long receiveTimeout
public AbstractPollingMessageListenerContainer()
public void setSessionTransacted(boolean sessionTransacted)
JmsAccessor
Session
.
Default is "false".
Note that within a JTA transaction, the parameters passed to
create(Queue/Topic)Session(boolean transacted, int acknowledgeMode)
method are not taken into account. Depending on the Java EE transaction context,
the container makes its own decisions on these values. Analogously, these
parameters are not taken into account within a locally managed transaction
either, since the accessor operates on an existing JMS Session in this case.
Setting this flag to "true" will use a short local JMS transaction when running outside of a managed transaction, and a synchronized local JMS transaction in case of a managed transaction (other than an XA transaction) being present. This has the effect of a local JMS transaction being managed alongside the main transaction (which might be a native JDBC transaction), with the JMS transaction committing right after the main transaction.
setSessionTransacted
in class JmsAccessor
javax.jms.Connection#createSession(boolean, int)
public void setTransactionManager(PlatformTransactionManager transactionManager)
PlatformTransactionManager
to use for transactional wrapping of message reception plus listener execution.
Default is none, not performing any transactional wrapping.
If specified, this will usually be a Spring
JtaTransactionManager
or one
of its subclasses, in combination with a JTA-aware ConnectionFactory that
this message listener container obtains its Connections from.
Note: Consider the use of local JMS transactions instead.
Simply switch the "sessionTransacted"
flag
to "true" in order to use a locally transacted JMS Session for the entire
receive processing, including any Session operations performed by a
SessionAwareMessageListener
(e.g. sending a response message). This
allows for fully synchronized Spring transactions based on local JMS
transactions, similar to what
JmsTransactionManager
provides. Check
AbstractMessageListenerContainer
's javadoc for
a discussion of transaction choices and message redelivery scenarios.
protected final PlatformTransactionManager getTransactionManager()
public void setTransactionName(java.lang.String transactionName)
TransactionDefinition.getName()
public void setTransactionTimeout(int transactionTimeout)
public void setReceiveTimeout(long receiveTimeout)
NOTE: This value needs to be smaller than the transaction timeout used by the transaction manager (in the appropriate unit, of course). 0 indicates no timeout at all; however, this is only feasible if not running within a transaction manager and generally discouraged since such a listener container cannot cleanly shut down. A negative value such as -1 indicates a no-wait receive operation.
JmsDestinationAccessor.receiveFromConsumer(MessageConsumer, long)
,
javax.jms.MessageConsumer#receive(long)
,
javax.jms.MessageConsumer#receiveNoWait()
,
javax.jms.MessageConsumer#receive()
,
setTransactionTimeout(int)
protected long getReceiveTimeout()
public void initialize()
AbstractJmsListeningContainer
Creates a JMS Connection, starts the javax.jms.Connection
(if "autoStartup"
hasn't been turned off),
and calls AbstractJmsListeningContainer.doInitialize()
.
initialize
in class AbstractJmsListeningContainer
protected MessageConsumer createListenerConsumer(Session session) throws JMSException
session
- the JMS Session to work onjavax.jms.JMSException
- if thrown by JMS methodsJMSException
receiveAndExecute(java.lang.Object, Session, MessageConsumer)
protected boolean receiveAndExecute(java.lang.Object invoker, Session session, MessageConsumer consumer) throws JMSException
session
- the JMS Session to work onconsumer
- the MessageConsumer to work onJMSException
- if thrown by JMS methodsdoReceiveAndExecute(java.lang.Object, Session, MessageConsumer, org.springframework.transaction.TransactionStatus)
protected boolean doReceiveAndExecute(java.lang.Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException
session
- the JMS Session to work onconsumer
- the MessageConsumer to work onstatus
- the TransactionStatus (may be null
)JMSException
- if thrown by JMS methods#doExecuteListener(javax.jms.Session, javax.jms.Message)
protected boolean isSessionLocallyTransacted(Session session)
isSessionLocallyTransacted
in class AbstractMessageListenerContainer
session
- the Session to checkJmsResourceHolder
protected boolean shouldCommitAfterNoMessageReceived(Session session)
session
- the current JMS Session which received no messageAbstractMessageListenerContainer.commitIfNecessary(Session, Message)
on the given Sessionprivate void rollbackOnException(TransactionStatus status, java.lang.Throwable ex)
status
- object representing the transactionex
- the thrown listener exception or errorprotected Message receiveMessage(MessageConsumer consumer) throws JMSException
consumer
- the MessageConsumer to usenull
if noneJMSException
- if thrown by JMS methodsprotected void messageReceived(java.lang.Object invoker, Session session)
invoker
- the invoker object (passed through)session
- the receiving JMS Sessionprotected void noMessageReceived(java.lang.Object invoker, Session session)
invoker
- the invoker object (passed through)session
- the receiving JMS Sessionprotected Connection getConnection(JmsResourceHolder holder)
This implementation accepts any JMS 1.1 Connection.
holder
- the JmsResourceHoldernull
if none foundprotected Session getSession(JmsResourceHolder holder)
This implementation accepts any JMS 1.1 Session.
holder
- the JmsResourceHoldernull
if none found