public class ReactiveRedisMessageListenerContainer extends Object implements DisposableBean
ReactiveSubscription.ChannelMessage
for messages received via Redis Pub/Sub listeners. The stream
is infinite and registers Redis subscriptions. Handles the low level details of listening, converting and message
dispatching.
Note the container allocates a single connection when it is created and releases the connection on
destroy()
. Connections are allocated eagerly to not interfere with non-blocking use during application
operations. Using reactive infrastructure allows usage of a single connection due to channel multiplexing.
This class is thread-safe and allows subscription by multiple concurrent threads.
ReactiveSubscription
,
ReactivePubSubCommands
Constructor and Description |
---|
ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory connectionFactory)
Create a new
ReactiveRedisMessageListenerContainer given ReactiveRedisConnectionFactory . |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
reactor.core.publisher.Mono<Void> |
destroyLater() |
Collection<ReactiveSubscription> |
getActiveSubscriptions()
Return the currently active
subscriptions . |
reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> |
receive(ChannelTopic... channelTopics)
Subscribe to one or more
ChannelTopic s and receive a stream of ReactiveSubscription.ChannelMessage . |
<C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> |
receive(Iterable<? extends Topic> topics,
RedisSerializationContext.SerializationPair<C> channelSerializer,
RedisSerializationContext.SerializationPair<B> messageSerializer)
Subscribe to one or more
Topic s and receive a stream of ReactiveSubscription.ChannelMessage . |
<C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> |
receive(Iterable<? extends Topic> topics,
RedisSerializationContext.SerializationPair<C> channelSerializer,
RedisSerializationContext.SerializationPair<B> messageSerializer,
SubscriptionListener subscriptionListener)
Subscribe to one or more
Topic s and receive a stream of ReactiveSubscription.ChannelMessage . |
reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> |
receive(Iterable<? extends Topic> topics,
SubscriptionListener subscriptionListener)
Subscribe to one or more
Topic s and receive a stream of ReactiveSubscription.ChannelMessage . |
reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String,String>> |
receive(PatternTopic... patternTopics)
Subscribe to one or more
PatternTopic s and receive a stream of ReactiveSubscription.PatternMessage . |
reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>>> |
receiveLater(ChannelTopic... channelTopics)
Subscribe to one or more
ChannelTopic s and receive a stream of ReactiveSubscription.ChannelMessage once the returned
Mono completes. |
<C,B> reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>>> |
receiveLater(Iterable<? extends Topic> topics,
RedisSerializationContext.SerializationPair<C> channelSerializer,
RedisSerializationContext.SerializationPair<B> messageSerializer)
Subscribe to one or more
Topic s and receive a stream of ReactiveSubscription.ChannelMessage . |
reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String,String>>> |
receiveLater(PatternTopic... patternTopics)
Subscribe to one or more
PatternTopic s and receive a stream of ReactiveSubscription.PatternMessage once the returned
Mono completes. |
public ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory connectionFactory)
ReactiveRedisMessageListenerContainer
given ReactiveRedisConnectionFactory
.connectionFactory
- must not be null.public void destroy()
destroy
in interface DisposableBean
public reactor.core.publisher.Mono<Void> destroyLater()
Mono
signalling container termination.public Collection<ReactiveSubscription> getActiveSubscriptions()
subscriptions
.Set
of active ReactiveSubscription
public reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> receive(ChannelTopic... channelTopics)
ChannelTopic
s and receive a stream of ReactiveSubscription.ChannelMessage
. Messages and channel
names are treated as String
. The message stream subscribes lazily to the Redis channels and unsubscribes if
the Subscription
is cancelled
.channelTopics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if patternTopics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>>> receiveLater(ChannelTopic... channelTopics)
ChannelTopic
s and receive a stream of ReactiveSubscription.ChannelMessage
once the returned
Mono
completes. Messages and channel names are treated as String
. The message stream subscribes
lazily to the Redis channels and unsubscribes if the inner Subscription
is
cancelled
.
The returned Mono
completes once the connection has been subscribed to the given topics
. Note
that cancelling the returned Mono
can leave the connection in a subscribed state.
channelTopics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if patternTopics
is empty.public reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String,String>> receive(PatternTopic... patternTopics)
PatternTopic
s and receive a stream of ReactiveSubscription.PatternMessage
. Messages, pattern,
and channel names are treated as String
. The message stream subscribes lazily to the Redis channels and
unsubscribes if the Subscription
is cancelled
.patternTopics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if patternTopics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String,String>>> receiveLater(PatternTopic... patternTopics)
PatternTopic
s and receive a stream of ReactiveSubscription.PatternMessage
once the returned
Mono
completes. Messages, pattern, and channel names are treated as String
. The message stream
subscribes lazily to the Redis channels and unsubscribes if the inner Subscription
is
cancelled
.
The returned Mono
completes once the connection has been subscribed to the given topics
. Note
that cancelling the returned Mono
can leave the connection in a subscribed state.
patternTopics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if patternTopics
is empty.public reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> receive(Iterable<? extends Topic> topics, SubscriptionListener subscriptionListener)
Topic
s and receive a stream of ReactiveSubscription.ChannelMessage
. The stream may contain
ReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
given channelSerializer
and messageSerializer
. The message stream subscribes lazily to the Redis
channels and unsubscribes if the Subscription
is
cancelled
.topics
- the channels/patterns to subscribe.subscriptionListener
- listener to receive subscription/unsubscription notifications.InvalidDataAccessApiUsageException
- if patternTopics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public <C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer)
Topic
s and receive a stream of ReactiveSubscription.ChannelMessage
. The stream may contain
ReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
given channelSerializer
and messageSerializer
. The message stream subscribes lazily to the Redis
channels and unsubscribes if the Subscription
is
cancelled
.topics
- the channels/patterns to subscribe.InvalidDataAccessApiUsageException
- if topics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public <C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, SubscriptionListener subscriptionListener)
Topic
s and receive a stream of ReactiveSubscription.ChannelMessage
. The stream may contain
ReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
given channelSerializer
and messageSerializer
. The message stream subscribes lazily to the Redis
channels and unsubscribes if the Subscription
is
cancelled
. SubscriptionListener
is notified upon
subscription/unsubscription and can be used for synchronization.topics
- the channels to subscribe.channelSerializer
- serialization pair to decode the channel/pattern name.messageSerializer
- serialization pair to decode the message body.subscriptionListener
- listener to receive subscription/unsubscription notifications.InvalidDataAccessApiUsageException
- if topics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public <C,B> reactor.core.publisher.Mono<reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>>> receiveLater(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer)
Topic
s and receive a stream of ReactiveSubscription.ChannelMessage
. The returned Mono
completes once the connection has been subscribed to the given topics
. Note that cancelling the
returned Mono
can leave the connection in a subscribed state.topics
- the channels to subscribe.channelSerializer
- serialization pair to decode the channel/pattern name.messageSerializer
- serialization pair to decode the message body.InvalidDataAccessApiUsageException
- if topics
is empty.Copyright © 2011–2022 Pivotal Software, Inc.. All rights reserved.