org.springframework.data.redis.listener
Class RedisMessageListenerContainer

java.lang.Object
  extended by org.springframework.data.redis.listener.RedisMessageListenerContainer
All Implemented Interfaces:
Aware, BeanNameAware, DisposableBean, InitializingBean, Lifecycle, Phased, SmartLifecycle

public class RedisMessageListenerContainer
extends Object
implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle

Container providing asynchronous behaviour for Redis message listeners. Handles the low level details of listening, converting and message dispatching.

As oppose to the low level Redis (one connection per subscription), the container uses only one connection that is 'multiplexed' for all registered listeners, the message dispatch being done through the task executor.

Note the container uses the connection in a lazy fashion (the connection is used only if at least one listener is configured).

Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order these methods accordingly.


Field Summary
static long DEFAULT_RECOVERY_INTERVAL
          The default recovery interval: 5000 ms = 5 seconds.
static String DEFAULT_THREAD_NAME_PREFIX
          Default thread name prefix: "RedisListeningContainer-".
protected  org.apache.commons.logging.Log logger
          Logger available to subclasses
 
Constructor Summary
RedisMessageListenerContainer()
           
 
Method Summary
 void addMessageListener(MessageListener listener, Collection<? extends Topic> topics)
          Adds a message listener to the (potentially running) container.
 void addMessageListener(MessageListener listener, Topic topic)
          Adds a message listener to the (potentially running) container.
 void afterPropertiesSet()
           
protected  TaskExecutor createDefaultTaskExecutor()
          Creates a default TaskExecutor.
 void destroy()
           
protected  void executeListener(MessageListener listener, Message message, byte[] pattern)
          Execute the specified listener.
 RedisConnectionFactory getConnectionFactory()
          Returns the connectionFactory.
 int getPhase()
           
protected  void handleListenerException(Throwable ex)
          Handle the given exception that arose during listener execution.
protected  void handleSubscriptionException(Throwable ex)
          Handle subscription task exception.
protected  void invokeErrorHandler(Throwable ex)
          Invoke the registered ErrorHandler, if any.
 boolean isActive()
          Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
 boolean isAutoStartup()
           
 boolean isRunning()
           
protected  void processMessage(MessageListener listener, Message message, byte[] pattern)
          Process a message received from the provider.
 void removeMessageListener(MessageListener listener)
          Removes the given message listener completely (from all topics).
 void removeMessageListener(MessageListener listener, Collection<? extends Topic> topics)
          Removes a message listener from the given topics.
 void removeMessageListener(MessageListener listener, Topic topic)
          Removes a message listener from the from the given topic.
 void setBeanName(String name)
           
 void setConnectionFactory(RedisConnectionFactory connectionFactory)
           
 void setErrorHandler(ErrorHandler errorHandler)
          Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
 void setMessageListeners(Map<? extends MessageListener,Collection<? extends Topic>> listeners)
          Attaches the given listeners (and their topics) to the container.
 void setRecoveryInterval(long recoveryInterval)
          Specify the interval between recovery attempts, in milliseconds.
 void setSubscriptionExecutor(Executor subscriptionExecutor)
          Sets the task execution used for subscribing to Redis channels.
 void setTaskExecutor(Executor taskExecutor)
          Sets the task executor used for running the message listeners when messages are received.
 void setTopicSerializer(RedisSerializer<String> serializer)
          Sets the serializer for converting the Topics into low-level channels and patterns.
protected  void sleepBeforeRecoveryAttempt()
          Sleep according to the specified recovery interval.
 void start()
           
 void stop()
           
 void stop(Runnable callback)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

logger

protected final org.apache.commons.logging.Log logger
Logger available to subclasses


DEFAULT_THREAD_NAME_PREFIX

public static final String DEFAULT_THREAD_NAME_PREFIX
Default thread name prefix: "RedisListeningContainer-".


DEFAULT_RECOVERY_INTERVAL

public static final long DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.

See Also:
Constant Field Values
Constructor Detail

RedisMessageListenerContainer

public RedisMessageListenerContainer()
Method Detail

afterPropertiesSet

public void afterPropertiesSet()
Specified by:
afterPropertiesSet in interface InitializingBean

createDefaultTaskExecutor

protected TaskExecutor createDefaultTaskExecutor()
Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified.

The default implementation builds a SimpleAsyncTaskExecutor with the specified bean name (or the class name, if no bean name specified) as thread name prefix.

See Also:
SimpleAsyncTaskExecutor.SimpleAsyncTaskExecutor(String)

destroy

public void destroy()
             throws Exception
Specified by:
destroy in interface DisposableBean
Throws:
Exception

isAutoStartup

public boolean isAutoStartup()
Specified by:
isAutoStartup in interface SmartLifecycle

stop

public void stop(Runnable callback)
Specified by:
stop in interface SmartLifecycle

getPhase

public int getPhase()
Specified by:
getPhase in interface Phased

isRunning

public boolean isRunning()
Specified by:
isRunning in interface Lifecycle

start

public void start()
Specified by:
start in interface Lifecycle

stop

public void stop()
Specified by:
stop in interface Lifecycle

processMessage

protected void processMessage(MessageListener listener,
                              Message message,
                              byte[] pattern)
Process a message received from the provider.

Parameters:
message -
pattern -

executeListener

protected void executeListener(MessageListener listener,
                               Message message,
                               byte[] pattern)
Execute the specified listener.

See Also:
handleListenerException(java.lang.Throwable)

isActive

public final boolean isActive()
Return whether this container is currently active, that is, whether it has been set up but not shut down yet.


handleListenerException

protected void handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.

The default implementation logs the exception at error level. This can be overridden in subclasses.

Parameters:
ex - the exception to handle

invokeErrorHandler

protected void invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any. Log at error level otherwise.

Parameters:
ex - the uncaught error that arose during message processing.
See Also:
setErrorHandler(org.springframework.util.ErrorHandler)

getConnectionFactory

public RedisConnectionFactory getConnectionFactory()
Returns the connectionFactory.

Returns:
Returns the connectionFactory

setConnectionFactory

public void setConnectionFactory(RedisConnectionFactory connectionFactory)
Parameters:
connectionFactory - The connectionFactory to set.

setBeanName

public void setBeanName(String name)
Specified by:
setBeanName in interface BeanNameAware

setTaskExecutor

public void setTaskExecutor(Executor taskExecutor)
Sets the task executor used for running the message listeners when messages are received. If no task executor is set, an instance of SimpleAsyncTaskExecutor will be used by default. The task executor can be adjusted depending on the work done by the listeners and the number of messages coming in.

Parameters:
taskExecutor - The taskExecutor to set.

setSubscriptionExecutor

public void setSubscriptionExecutor(Executor subscriptionExecutor)
Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, the setTaskExecutor(Executor) will be used. In some cases, this might be undersired as the listening to the connection is a long running task.

Note: This implementation uses at most one long running thread (depending on whether there are any listeners registered or not) and up to two threads during the initial registration.

Parameters:
subscriptionExecutor - The subscriptionExecutor to set.

setTopicSerializer

public void setTopicSerializer(RedisSerializer<String> serializer)
Sets the serializer for converting the Topics into low-level channels and patterns. By default, StringRedisSerializer is used.

Parameters:
serializer - The serializer to set.

setErrorHandler

public void setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default there will be no ErrorHandler so that error-level logging is the only result.


setMessageListeners

public void setMessageListeners(Map<? extends MessageListener,Collection<? extends Topic>> listeners)
Attaches the given listeners (and their topics) to the container.

Note: it's possible to call this method while the container is running forcing a reinitialization of the container. Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling this method at runtime is considered advanced usage.

Parameters:
listeners - map of message listeners and their associated topics

addMessageListener

public void addMessageListener(MessageListener listener,
                               Collection<? extends Topic> topics)
Adds a message listener to the (potentially running) container. If the container is running, the listener starts receiving (matching) messages as soon as possible.

Parameters:
listener - message listener
topics - message listener topic

addMessageListener

public void addMessageListener(MessageListener listener,
                               Topic topic)
Adds a message listener to the (potentially running) container. If the container is running, the listener starts receiving (matching) messages as soon as possible.

Parameters:
listener - message listener
topic - message topic

removeMessageListener

public void removeMessageListener(MessageListener listener,
                                  Collection<? extends Topic> topics)
Removes a message listener from the given topics. If the container is running, the listener stops receiving (matching) messages as soon as possible.

Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels. Similarly a null listener will unsubscribe all listeners from the given topic.

Parameters:
listener - message listener
topics - message listener topics

removeMessageListener

public void removeMessageListener(MessageListener listener,
                                  Topic topic)
Removes a message listener from the from the given topic. If the container is running, the listener stops receiving (matching) messages as soon as possible.

Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels. Similarly a null listener will unsubscribe all listeners from the given topic.

Parameters:
listener - message listener
topic - message topic

removeMessageListener

public void removeMessageListener(MessageListener listener)
Removes the given message listener completely (from all topics). If the container is running, the listener stops receiving (matching) messages as soon as possible. Similarly a null listener will unsubscribe all listeners from the given topic.

Parameters:
listener - message listener

handleSubscriptionException

protected void handleSubscriptionException(Throwable ex)
Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection failure (for example, Redis was restarted).

Parameters:
ex - Throwable exception

sleepBeforeRecoveryAttempt

protected void sleepBeforeRecoveryAttempt()
Sleep according to the specified recovery interval. Called between recovery attempts.


setRecoveryInterval

public void setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.

See Also:
handleSubscriptionException(java.lang.Throwable)