public class ReactiveRedisMessageListenerContainer extends Object implements DisposableBean
ReactiveSubscription.ChannelMessage
for messages received via Redis Pub/Sub listeners. The stream
is infinite and registers Redis subscriptions. Handles the low level details of listening, converting and message
dispatching.
Note the container allocates a single connection when it is created and releases the connection on
destroy()
. Connections are allocated eagerly to not interfere with non-blocking use during application
operations. Using reactive infrastructure allows usage of a single connection due to channel multiplexing.
This class is thread-safe and allows subscription by multiple concurrent threads.ReactiveSubscription
,
ReactivePubSubCommands
Constructor and Description |
---|
ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory connectionFactory)
Create a new
ReactiveRedisMessageListenerContainer given ReactiveRedisConnectionFactory . |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
reactor.core.publisher.Mono<Void> |
destroyLater() |
Collection<ReactiveSubscription> |
getActiveSubscriptions()
Return the currently active
subscriptions . |
reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> |
receive(ChannelTopic... channelTopics)
Subscribe to one or more
ChannelTopic s and receive a stream of ReactiveSubscription.ChannelMessage . |
<C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> |
receive(Iterable<? extends Topic> topics,
RedisSerializationContext.SerializationPair<C> channelSerializer,
RedisSerializationContext.SerializationPair<B> messageSerializer)
Subscribe to one or more
Topic s and receive a stream of ReactiveSubscription.ChannelMessage The stream may contain
ReactiveSubscription.PatternMessage if subscribed to patterns. |
reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String,String>> |
receive(PatternTopic... patternTopics)
Subscribe to one or more
PatternTopic s and receive a stream of ReactiveSubscription.PatternMessage . |
public ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory connectionFactory)
ReactiveRedisMessageListenerContainer
given ReactiveRedisConnectionFactory
.connectionFactory
- must not be null.public void destroy()
destroy
in interface DisposableBean
public reactor.core.publisher.Mono<Void> destroyLater()
Mono
signalling container termination.public Collection<ReactiveSubscription> getActiveSubscriptions()
subscriptions
.Set
of active ReactiveSubscription
public reactor.core.publisher.Flux<ReactiveSubscription.Message<String,String>> receive(ChannelTopic... channelTopics)
ChannelTopic
s and receive a stream of ReactiveSubscription.ChannelMessage
. Messages and channel
names are treated as String
. The message stream subscribes lazily to the Redis channels and unsubscribes if
the Subscription
is cancelled
.channelTopics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if patternTopics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public reactor.core.publisher.Flux<ReactiveSubscription.PatternMessage<String,String,String>> receive(PatternTopic... patternTopics)
PatternTopic
s and receive a stream of ReactiveSubscription.PatternMessage
. Messages, pattern,
and channel names are treated as String
. The message stream subscribes lazily to the Redis channels and
unsubscribes if the Subscription
is cancelled
.patternTopics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if patternTopics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
public <C,B> reactor.core.publisher.Flux<ReactiveSubscription.Message<C,B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer)
Topic
s and receive a stream of ReactiveSubscription.ChannelMessage
The stream may contain
ReactiveSubscription.PatternMessage
if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
given channelSerializer
and messageSerializer
. The message stream subscribes lazily to the Redis
channels and unsubscribes if the Subscription
is
cancelled
.topics
- the channels to subscribe.InvalidDataAccessApiUsageException
- if topics
is empty.#receive(Iterable, SerializationPair, SerializationPair)
Copyright © 2011–2020 Pivotal Software, Inc.. All rights reserved.