Class ReactiveRedisMessageListenerContainer
- All Implemented Interfaces:
DisposableBean
ReactiveSubscription.ChannelMessage
for messages received via Redis Pub/Sub listeners. The stream
is infinite and registers Redis subscriptions. Handles the low level details of listening, converting and message
dispatching.
Note the container allocates a single connection when it is created and releases the connection on
destroy()
. Connections are allocated eagerly to not interfere with non-blocking use during application
operations. Using reactive infrastructure allows usage of a single connection due to channel multiplexing.
This class is thread-safe and allows subscription by multiple concurrent threads.
- Since:
- 2.1
- Author:
- Mark Paluch, Christoph Strobl
- See Also:
-
Constructor Summary
ConstructorDescriptionReactiveRedisMessageListenerContainer
(ReactiveRedisConnectionFactory connectionFactory) Create a newReactiveRedisMessageListenerContainer
givenReactiveRedisConnectionFactory
. -
Method Summary
Modifier and TypeMethodDescriptionvoid
destroy()
reactor.core.publisher.Mono<Void>
Return the currently activesubscriptions
.reactor.core.publisher.Flux<ReactiveSubscription.Message<String,
String>> receive
(Iterable<? extends Topic> topics, SubscriptionListener subscriptionListener) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
.<C,
B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C, B>> receive
(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
.<C,
B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C, B>> receive
(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, SubscriptionListener subscriptionListener) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
.reactor.core.publisher.Flux<ReactiveSubscription.Message<String,
String>> receive
(ChannelTopic... channelTopics) Subscribe to one or moreChannelTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
.reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,
String, String>> receive
(PatternTopic... patternTopics) Subscribe to one or morePatternTopic
s and receive a stream ofReactiveSubscription.PatternMessage
.<C,
B> reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<C, B>>> receiveLater
(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
.reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<String,
String>>> receiveLater
(ChannelTopic... channelTopics) Subscribe to one or moreChannelTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
once the returnedMono
completes.reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,
String, String>>> receiveLater
(PatternTopic... patternTopics) Subscribe to one or morePatternTopic
s and receive a stream ofReactiveSubscription.PatternMessage
once the returnedMono
completes.
-
Constructor Details
-
ReactiveRedisMessageListenerContainer
Create a newReactiveRedisMessageListenerContainer
givenReactiveRedisConnectionFactory
.- Parameters:
connectionFactory
- must not be null.
-
-
Method Details
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
-
destroyLater
- Returns:
- the
Mono
signalling container termination.
-
getActiveSubscriptions
Return the currently activesubscriptions
.- Returns:
Set
of activeReactiveSubscription
-
receive
public reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> receive(ChannelTopic... channelTopics) Subscribe to one or moreChannelTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
. Messages and channel names are treated asString
. The message stream subscribes lazily to the Redis channels and unsubscribes if theSubscription
iscancelled
.- Parameters:
channelTopics
- the channels to subscribe.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- ifchannelTopics
is empty.- See Also:
-
receiveLater
public reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>>> receiveLater(ChannelTopic... channelTopics) Subscribe to one or moreChannelTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
once the returnedMono
completes. Messages and channel names are treated asString
. The message stream subscribes lazily to the Redis channels and unsubscribes if the innerSubscription
iscancelled
.The returned
Mono
completes once the connection has been subscribed to the giventopics
. Note that cancelling the returnedMono
can leave the connection in a subscribed state.- Parameters:
channelTopics
- the channels to subscribe.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- ifchannelTopics
is empty.- Since:
- 2.6
-
receive
public reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String, receiveString>> (PatternTopic... patternTopics) Subscribe to one or morePatternTopic
s and receive a stream ofReactiveSubscription.PatternMessage
. Messages, pattern, and channel names are treated asString
. The message stream subscribes lazily to the Redis channels and unsubscribes if theSubscription
iscancelled
.- Parameters:
patternTopics
- the channels to subscribe.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- ifpatternTopics
is empty.- See Also:
-
receiveLater
public reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String, receiveLaterString>>> (PatternTopic... patternTopics) Subscribe to one or morePatternTopic
s and receive a stream ofReactiveSubscription.PatternMessage
once the returnedMono
completes. Messages, pattern, and channel names are treated asString
. The message stream subscribes lazily to the Redis channels and unsubscribes if the innerSubscription
iscancelled
.The returned
Mono
completes once the connection has been subscribed to the giventopics
. Note that cancelling the returnedMono
can leave the connection in a subscribed state.- Parameters:
patternTopics
- the channels to subscribe.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- ifpatternTopics
is empty.- Since:
- 2.6
-
receive
public reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> receive(Iterable<? extends Topic> topics, SubscriptionListener subscriptionListener) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
. The stream may containReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the givenchannelSerializer
andmessageSerializer
. The message stream subscribes lazily to the Redis channels and unsubscribes if theSubscription
iscancelled
.- Parameters:
topics
- the channels/patterns to subscribe.subscriptionListener
- listener to receive subscription/unsubscription notifications.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- ifpatternTopics
is empty.- Since:
- 2.6
- See Also:
-
receive
public <C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
. The stream may containReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the givenchannelSerializer
andmessageSerializer
. The message stream subscribes lazily to the Redis channels and unsubscribes if theSubscription
iscancelled
.- Parameters:
topics
- the channels/patterns to subscribe.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- iftopics
is empty.- See Also:
-
receive
public <C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, SubscriptionListener subscriptionListener) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
. The stream may containReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the givenchannelSerializer
andmessageSerializer
. The message stream subscribes lazily to the Redis channels and unsubscribes if theSubscription
iscancelled
.SubscriptionListener
is notified upon subscription/unsubscription and can be used for synchronization.- Parameters:
topics
- the channels to subscribe.channelSerializer
- serialization pair to decode the channel/pattern name.messageSerializer
- serialization pair to decode the message body.subscriptionListener
- listener to receive subscription/unsubscription notifications.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- iftopics
is empty.- Since:
- 2.6
- See Also:
-
receiveLater
public <C,B> reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>>> receiveLater(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) Subscribe to one or moreTopic
s and receive a stream ofReactiveSubscription.ChannelMessage
. The returnedMono
completes once the connection has been subscribed to the giventopics
. Note that cancelling the returnedMono
can leave the connection in a subscribed state.- Parameters:
topics
- the channels to subscribe.channelSerializer
- serialization pair to decode the channel/pattern name.messageSerializer
- serialization pair to decode the message body.- Returns:
- the message stream.
- Throws:
InvalidDataAccessApiUsageException
- iftopics
is empty.- Since:
- 2.6
-