public class RedisMessageListenerContainer extends Object implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle
As oppose to the low level Redis (one connection per subscription), the container uses only one connection that is 'multiplexed' for all registered listeners, the message dispatch being done through the task executor.
Note the container uses the connection in a lazy fashion (the connection is used only if at least one listener is configured).
Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order these methods accordingly.
Modifier and Type | Field and Description |
---|---|
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.
|
static long |
DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
The default subscription wait time: 2000 ms = 2 seconds.
|
static String |
DEFAULT_THREAD_NAME_PREFIX
Default thread name prefix: "RedisListeningContainer-".
|
protected org.apache.commons.logging.Log |
logger
Logger available to subclasses
|
Constructor and Description |
---|
RedisMessageListenerContainer() |
Modifier and Type | Method and Description |
---|---|
void |
addMessageListener(MessageListener listener,
Collection<? extends Topic> topics)
Adds a message listener to the (potentially running) container.
|
void |
addMessageListener(MessageListener listener,
Topic topic)
Adds a message listener to the (potentially running) container.
|
void |
afterPropertiesSet() |
protected TaskExecutor |
createDefaultTaskExecutor()
Creates a default TaskExecutor.
|
void |
destroy() |
protected void |
executeListener(MessageListener listener,
Message message,
byte[] pattern)
Execute the specified listener.
|
RedisConnectionFactory |
getConnectionFactory()
Returns the connectionFactory.
|
long |
getMaxSubscriptionRegistrationWaitingTime() |
int |
getPhase() |
protected void |
handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.
|
protected void |
handleSubscriptionException(Throwable ex)
Handle subscription task exception.
|
protected void |
invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any.
|
boolean |
isActive()
Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
|
boolean |
isAutoStartup() |
boolean |
isRunning() |
protected void |
processMessage(MessageListener listener,
Message message,
byte[] pattern)
Process a message received from the provider.
|
void |
removeMessageListener(MessageListener listener)
Removes the given message listener completely (from all topics).
|
void |
removeMessageListener(MessageListener listener,
Collection<? extends Topic> topics)
Removes a message listener from the given topics.
|
void |
removeMessageListener(MessageListener listener,
Topic topic)
Removes a message listener from the from the given topic.
|
void |
setBeanName(String name) |
void |
setConnectionFactory(RedisConnectionFactory connectionFactory) |
void |
setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
|
void |
setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime)
Specify the max time to wait for subscription registrations, in milliseconds.
|
void |
setMessageListeners(Map<? extends MessageListener,Collection<? extends Topic>> listeners)
Attaches the given listeners (and their topics) to the container.
|
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.
|
void |
setSubscriptionExecutor(Executor subscriptionExecutor)
Sets the task execution used for subscribing to Redis channels.
|
void |
setTaskExecutor(Executor taskExecutor)
Sets the task executor used for running the message listeners when messages are received.
|
void |
setTopicSerializer(RedisSerializer<String> serializer)
Sets the serializer for converting the
Topic s into low-level channels and patterns. |
protected void |
sleepBeforeRecoveryAttempt()
Sleep according to the specified recovery interval.
|
void |
start() |
void |
stop() |
void |
stop(Runnable callback) |
protected final org.apache.commons.logging.Log logger
public static final String DEFAULT_THREAD_NAME_PREFIX
public static final long DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
public void afterPropertiesSet()
afterPropertiesSet
in interface InitializingBean
protected TaskExecutor createDefaultTaskExecutor()
The default implementation builds a SimpleAsyncTaskExecutor
with the
specified bean name (or the class name, if no bean name specified) as thread name prefix.
public void destroy() throws Exception
destroy
in interface DisposableBean
Exception
public boolean isAutoStartup()
isAutoStartup
in interface SmartLifecycle
public void stop(Runnable callback)
stop
in interface SmartLifecycle
protected void processMessage(MessageListener listener, Message message, byte[] pattern)
message
- pattern
- protected void executeListener(MessageListener listener, Message message, byte[] pattern)
public final boolean isActive()
protected void handleListenerException(Throwable ex)
The default implementation logs the exception at error level. This can be overridden in subclasses.
ex
- the exception to handleprotected void invokeErrorHandler(Throwable ex)
ex
- the uncaught error that arose during message processing.setErrorHandler(org.springframework.util.ErrorHandler)
public RedisConnectionFactory getConnectionFactory()
public void setConnectionFactory(RedisConnectionFactory connectionFactory)
connectionFactory
- The connectionFactory to set.public void setBeanName(String name)
setBeanName
in interface BeanNameAware
public void setTaskExecutor(Executor taskExecutor)
SimpleAsyncTaskExecutor
will be used by default. The task executor can be adjusted
depending on the work done by the listeners and the number of messages coming in.taskExecutor
- The taskExecutor to set.public void setSubscriptionExecutor(Executor subscriptionExecutor)
setTaskExecutor(Executor)
will be used. In some cases, this might be undersired as the listening to the
connection is a long running task.
Note: This implementation uses at most one long running thread (depending on whether there are any listeners registered or not) and up to two threads during the initial registration.
subscriptionExecutor
- The subscriptionExecutor to set.public void setTopicSerializer(RedisSerializer<String> serializer)
Topic
s into low-level channels and patterns. By default,
StringRedisSerializer
is used.serializer
- The serializer to set.public void setErrorHandler(ErrorHandler errorHandler)
public void setMessageListeners(Map<? extends MessageListener,Collection<? extends Topic>> listeners)
Note: it's possible to call this method while the container is running forcing a reinitialization of the container. Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling this method at runtime is considered advanced usage.
listeners
- map of message listeners and their associated topicspublic void addMessageListener(MessageListener listener, Collection<? extends Topic> topics)
listener
- message listenertopics
- message listener topicpublic void addMessageListener(MessageListener listener, Topic topic)
listener
- message listenertopic
- message topicpublic void removeMessageListener(MessageListener listener, Collection<? extends Topic> topics)
Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels. Similarly a null listener will unsubscribe all listeners from the given topic.
listener
- message listenertopics
- message listener topicspublic void removeMessageListener(MessageListener listener, Topic topic)
Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels. Similarly a null listener will unsubscribe all listeners from the given topic.
listener
- message listenertopic
- message topicpublic void removeMessageListener(MessageListener listener)
listener
- message listenerprotected void handleSubscriptionException(Throwable ex)
ex
- Throwable exceptionprotected void sleepBeforeRecoveryAttempt()
public void setRecoveryInterval(long recoveryInterval)
public long getMaxSubscriptionRegistrationWaitingTime()
public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime)
maxSubscriptionRegistrationWaitingTime
- DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
Copyright © 2011–2019 Pivotal Software, Inc.. All rights reserved.