Class RedisMessageListenerContainer
- All Implemented Interfaces:
Aware
,BeanNameAware
,DisposableBean
,InitializingBean
,Lifecycle
,Phased
,SmartLifecycle
As opposed to the low level Redis (one connection per subscription), the container uses only one connection that is
'multiplexed' for all registered listeners, the message dispatch being done through the
task executor
. It is recommended to configure the task executor (and subscription
executor when using a blocking Redis connector) instead of using the default SimpleAsyncTaskExecutor
for
reuse of thread pools.
The container uses a single Redis connection in a lazy fashion (the connection is used only if at least one listener
is configured). Listeners can be registered eagerly before starting
the container to subscribe to
all registered topics upon startup. Listeners are guaranteed to be subscribed after the start()
method
returns.
Subscriptions are retried gracefully using BackOff
that can be configured through
setRecoveryInterval(long)
until reaching the maximum number of attempts. Listener errors are handled through
a ErrorHandler
if configured.
This class can be used concurrently after initializing the container with afterPropertiesSet()
and
start()
allowing concurrent calls to addMessageListener(org.springframework.data.redis.connection.MessageListener, java.util.Collection<? extends org.springframework.data.redis.listener.Topic>)
and removeMessageListener(org.springframework.data.redis.connection.MessageListener, java.util.Collection<? extends org.springframework.data.redis.listener.Topic>)
without
external synchronization.
Listeners
that wish to receive subscription/unsubscription callbacks in response to
subscribe/unsubscribe commands can implement SubscriptionListener
.
- Author:
- Costin Leau, Jennifer Hickey, Way Joke, Thomas Darimont, Mark Paluch, John Blum
- See Also:
-
Field Summary
Modifier and TypeFieldDescriptionstatic final long
The default recovery interval: 5000 ms = 5 seconds.static final long
The default subscription wait time: 2000 ms = 2 seconds.static final String
Default thread name prefix: "RedisListeningContainer-".protected final Log
Logger available to subclassesFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
addMessageListener
(MessageListener listener, Collection<? extends Topic> topics) Adds a message listener to the (potentially running) container.void
addMessageListener
(MessageListener listener, Topic topic) Adds a message listener to the (potentially running) container.void
protected TaskExecutor
Creates a default TaskExecutor.void
destroy()
Destroy the container and stop it.Returns the connectionFactory.long
protected void
handleListenerException
(Throwable cause) Handle the given exception that arose during listener execution.protected void
handleSubscriptionException
(CompletableFuture<Void> future, BackOffExecution backOffExecution, Throwable cause) Handle subscription task exception.protected void
invokeErrorHandler
(Throwable cause) Invoke the registered ErrorHandler, if any.final boolean
isActive()
Return whether this container is currently active, that is, whether it has been set up but not shut down yet.boolean
boolean
protected void
processMessage
(MessageListener listener, Message message, byte[] source) Process a message received from the provider.void
removeMessageListener
(MessageListener listener) Removes the given message listener completely (from all topics).void
removeMessageListener
(MessageListener listener, Collection<? extends Topic> topics) Removes a message listener from the given topics.void
removeMessageListener
(MessageListener listener, Topic topic) Removes a message listener from the given topic.void
setBeanName
(String name) void
setConnectionFactory
(RedisConnectionFactory connectionFactory) void
setErrorHandler
(ErrorHandler errorHandler) Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.void
setMaxSubscriptionRegistrationWaitingTime
(long maxSubscriptionRegistrationWaitingTime) Specify the max time to wait for subscription registrations, in milliseconds The default is2000ms
, that is, 2 second.void
setMessageListeners
(Map<? extends MessageListener, Collection<? extends Topic>> listeners) Attaches the given listeners (and their topics) to the container.void
setRecoveryBackoff
(BackOff recoveryInterval) Specify the intervalBackOff
recovery attempts.void
setRecoveryInterval
(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds.void
setSubscriptionExecutor
(Executor subscriptionExecutor) Sets the task execution used for subscribing to Redis channels.void
setTaskExecutor
(Executor taskExecutor) Sets the task executor used for running the message listeners when messages are received.void
setTopicSerializer
(RedisSerializer<String> serializer) Sets the serializer for converting theTopic
s into low-level channels and patterns.void
start()
Startup the container and subscribe to topics iflisteners
were registered prior to starting the container.void
stop()
Stop the message listener container and cancel any subscriptions if the container islistening
.void
Stop the message listener container and cancel any subscriptions if the container islistening
.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.context.SmartLifecycle
getPhase, isAutoStartup
-
Field Details
-
DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_RECOVERY_INTERVALThe default recovery interval: 5000 ms = 5 seconds.- See Also:
-
DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME
public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIMEThe default subscription wait time: 2000 ms = 2 seconds.- See Also:
-
DEFAULT_THREAD_NAME_PREFIX
Default thread name prefix: "RedisListeningContainer-". -
logger
Logger available to subclasses
-
-
Constructor Details
-
RedisMessageListenerContainer
public RedisMessageListenerContainer()
-
-
Method Details
-
setErrorHandler
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, there will be no ErrorHandler so that error-level logging is the only result. -
setSubscriptionExecutor
Sets the task execution used for subscribing to Redis channels. By default, if no executor is set, thesetTaskExecutor(Executor)
will be used. In some cases, this might be undesired as the listening to the connection is a long-running task.Note: This implementation uses at most one long-running thread (depending on whether there are any listeners registered or not) and up to two threads during the initial registration.
- Parameters:
subscriptionExecutor
- the subscriptionExecutor to set.
-
setTaskExecutor
Sets the task executor used for running the message listeners when messages are received. If no task executor is set, an instance ofSimpleAsyncTaskExecutor
will be used by default. The task executor can be adjusted depending on the work done by the listeners and the number of messages coming in.- Parameters:
taskExecutor
- the taskExecutor to set.
-
getConnectionFactory
Returns the connectionFactory.- Returns:
- Returns the connectionFactory
-
setConnectionFactory
- Parameters:
connectionFactory
- The connectionFactory to set.
-
setTopicSerializer
Sets the serializer for converting theTopic
s into low-level channels and patterns. By default,StringRedisSerializer
is used.- Parameters:
serializer
- The serializer to set.
-
getMaxSubscriptionRegistrationWaitingTime
public long getMaxSubscriptionRegistrationWaitingTime() -
setMaxSubscriptionRegistrationWaitingTime
public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) Specify the max time to wait for subscription registrations, in milliseconds The default is2000ms
, that is, 2 second. The timeout applies for awaiting the subscription registration. Note that subscriptions can be created asynchronously and an expired timeout does not cancel the timeout.- Parameters:
maxSubscriptionRegistrationWaitingTime
- the maximum subscription registration wait time- See Also:
-
setRecoveryInterval
public void setRecoveryInterval(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds. -
setRecoveryBackoff
Specify the intervalBackOff
recovery attempts. -
setMessageListeners
public void setMessageListeners(Map<? extends MessageListener, Collection<? extends Topic>> listeners) Attaches the given listeners (and their topics) to the container.Note: it's possible to call this method while the container is running forcing a reinitialization of the container. Note however that this might cause some messages to be lost (while the container reinitializes) - hence calling this method at runtime is considered advanced usage.
- Parameters:
listeners
- map of message listeners and their associated topics
-
setBeanName
- Specified by:
setBeanName
in interfaceBeanNameAware
-
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSet
in interfaceInitializingBean
-
createDefaultTaskExecutor
Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified.The default implementation builds a
SimpleAsyncTaskExecutor
with the specified bean name (or the class name, if no bean name specified) as thread name prefix.- See Also:
-
destroy
Destroy the container and stop it.- Specified by:
destroy
in interfaceDisposableBean
- Throws:
Exception
-
start
public void start()Startup the container and subscribe to topics iflisteners
were registered prior to starting the container.This method is a potentially blocking method that blocks until a previous
stop()
is finished and until all previously registered listeners are successfully subscribed.Multiple calls to this method are ignored if the container is already running. Concurrent calls to this method are synchronized until the container is started up.
-
stop
public void stop()Stop the message listener container and cancel any subscriptions if the container islistening
. Stopping releases any allocated connections.This method is a potentially blocking method that blocks until a previous
start()
is finished and until the connection is closed if the container was listening.Multiple calls to this method are ignored if the container was already stopped. Concurrent calls to this method are synchronized until the container is stopped.
-
stop
Stop the message listener container and cancel any subscriptions if the container islistening
. Stopping releases any allocated connections.This method is a potentially blocking method that blocks until a previous
start()
is finished and until the connection is closed if the container was listening.Multiple calls to this method are ignored if the container was already stopped. Concurrent calls to this method are synchronized until the container is stopped.
- Specified by:
stop
in interfaceSmartLifecycle
- Parameters:
callback
- callback to notify when the container actually stops.
-
isRunning
public boolean isRunning() -
isListening
public boolean isListening() -
isActive
public final boolean isActive()Return whether this container is currently active, that is, whether it has been set up but not shut down yet. -
addMessageListener
Adds a message listener to the (potentially running) container. If the container is running, the listener starts receiving (matching) messages as soon as possible.- Parameters:
listener
- message listenertopics
- message listener topic
-
addMessageListener
Adds a message listener to the (potentially running) container. If the container is running, the listener starts receiving (matching) messages as soon as possible.- Parameters:
listener
- message listenertopic
- message topic
-
removeMessageListener
public void removeMessageListener(@Nullable MessageListener listener, Collection<? extends Topic> topics) Removes a message listener from the given topics. If the container is running, the listener stops receiving (matching) messages as soon as possible.Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels.
- Parameters:
listener
- message listenertopics
- message listener topics
-
removeMessageListener
Removes a message listener from the given topic. If the container is running, the listener stops receiving (matching) messages as soon as possible.Note that this method obeys the Redis (p)unsubscribe semantics - meaning an empty/null collection will remove listener from all channels.
- Parameters:
listener
- message listenertopic
- message topic
-
removeMessageListener
Removes the given message listener completely (from all topics). If the container is running, the listener stops receiving (matching) messages as soon as possible.- Parameters:
listener
- message listener
-
processMessage
Process a message received from the provider.- Parameters:
listener
- the message listener to notify.message
- the received message.source
- the source, either the channel or pattern.- See Also:
-
handleListenerException
Handle the given exception that arose during listener execution.The default implementation logs the exception at error level. This can be overridden in subclasses.
- Parameters:
cause
- the exception to handle
-
invokeErrorHandler
Invoke the registered ErrorHandler, if any. Log at error level otherwise.- Parameters:
cause
- the uncaught error that arose during message processing.- See Also:
-
handleSubscriptionException
protected void handleSubscriptionException(CompletableFuture<Void> future, BackOffExecution backOffExecution, Throwable cause) Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection failure (for example, Redis was restarted).- Parameters:
cause
- Throwable exception
-