Interface StreamMessageListenerContainer<K,V extends Record<K,?>>
- Type Parameters:
K
- Stream key and Stream field type.V
- Stream value type.
- All Superinterfaces:
Lifecycle
,Phased
,SmartLifecycle
Once created, a StreamMessageListenerContainer
can subscribe to a Redis Stream and consume incoming
messages
. StreamMessageListenerContainer
allows multiple stream read requests and returns a
Subscription
handle per read request. Cancelling the Subscription
terminates eventually background
polling. Messages are converted using key and value serializers
to support various
serialization strategies.
StreamMessageListenerContainer
supports multiple modes of stream consumption:
- Standalone
- Using a
Consumer
with externalStreamOperations.acknowledge(Object, String, String...)
acknowledge} - Using a
Consumer
with auto-acknowledge
ReadOffset
, StreamMessageListenerContainer
applies an individual strategy to obtain the next
ReadOffset
: Standalone
ReadOffset.from(String)
Offset using a particular message Id: Start with the given offset and use the last seenmessage Id
.ReadOffset.lastConsumed()
Last consumed: Start with the latest offset ($
) and use the last seenmessage Id
.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset ($
) for subsequent reads.
Using
Consumer
ReadOffset.from(String)
Offset using a particular message Id: Start with the given offset and use the last seenmessage Id
.ReadOffset.lastConsumed()
Last consumed: Start with the last consumed message by the consumer (>
) and use the last consumed message by the consumer (>
) for subsequent reads.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset ($
) for subsequent reads.
ReadOffset.latest()
bears the chance of dropped messages as messages can arrive in the
time during polling is suspended. Use messagedId's as offset or ReadOffset.lastConsumed()
to minimize the
chance of message loss.
StreamMessageListenerContainer
requires a Executor
to fork long-running polling tasks on a different
Thread
. This thread is used as event loop to poll for stream messages and invoke the
listener callback
.
StreamMessageListenerContainer
tasks propagate errors during stream reads and
listener notification
to a configurable ErrorHandler
. Errors stop a
Subscription
by default. Configuring a Predicate
for a StreamMessageListenerContainer.StreamReadRequest
allows conditional
subscription cancelling or continuing on all errors.
See the following example code how to use StreamMessageListenerContainer
:
RedisConnectionFactory factory = …; StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(factory); Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), message -> …); container.start(); // later container.stop();
- Since:
- 2.2
- Author:
- Mark Paluch, Christoph Strobl, Christian Rest
- See Also:
-
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
StreamListener
StreamMessageListenerContainer.StreamReadRequest
StreamMessageListenerContainer.ConsumerStreamReadRequest
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder.executor(Executor)
ErrorHandler
StreamOperations
RedisConnectionFactory
StreamReceiver
-
Nested Class Summary
Modifier and TypeInterfaceDescriptionstatic class
Request to read a Redis Stream with aConsumer
.static class
Builder to build aStreamMessageListenerContainer.ConsumerStreamReadRequest
.static class
Options forStreamMessageListenerContainer
.static class
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder<K,
V extends Record<K, ?>> static class
Request to read a Redis Stream.static class
Builder to build aStreamMessageListenerContainer.StreamReadRequest
. -
Field Summary
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Method Summary
Modifier and TypeMethodDescriptioncreate
(RedisConnectionFactory connectionFactory) static <K,
V extends Record<K, ?>>
StreamMessageListenerContainer<K,V> create
(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) default Subscription
receive
(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Register a new subscription for a Redis Stream.default Subscription
receive
(StreamOffset<K> streamOffset, StreamListener<K, V> listener) Register a new subscription for a Redis Stream.default Subscription
receiveAutoAck
(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Register a new subscription for a Redis Stream.register
(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) Register a new subscription for a Redis Stream.void
remove
(Subscription subscription) Unregister a givenSubscription
from the container.Methods inherited from interface org.springframework.context.SmartLifecycle
getPhase, isAutoStartup, stop
-
Method Details
-
create
static StreamMessageListenerContainer<String,MapRecord<String, createString, String>> (RedisConnectionFactory connectionFactory) - Parameters:
connectionFactory
- must not be null.- Returns:
- the new
StreamMessageListenerContainer
.
-
create
static <K,V extends Record<K, StreamMessageListenerContainer<K,?>> V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) Create a newStreamMessageListenerContainer
givenRedisConnectionFactory
andStreamMessageListenerContainer.StreamMessageListenerContainerOptions
.- Parameters:
connectionFactory
- must not be null.options
- must not be null.- Returns:
- the new
StreamMessageListenerContainer
.
-
receive
Register a new subscription for a Redis Stream. If theis already running
theSubscription
will be added and run immediately, otherwise it'll be scheduled and started once the container is actuallystarted
.Errors during
Record
retrieval lead tocancellation
of the underlying task.On
Lifecycle.stop()
allsubscriptions
are cancelled prior to shutting down the container itself.- Parameters:
streamOffset
- the stream along its offset.listener
- must not be null.- Returns:
- the subscription handle.
- See Also:
-
receive
default Subscription receive(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Register a new subscription for a Redis Stream. If theis already running
theSubscription
will be added and run immediately, otherwise it'll be scheduled and started once the container is actuallystarted
.Every message must be acknowledged using
StreamOperations.acknowledge(Object, String, String...)
after processing.Errors during
Record
retrieval lead tocancellation
of the underlying task.On
Lifecycle.stop()
allsubscriptions
are cancelled prior to shutting down the container itself.- Parameters:
consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.listener
- must not be null.- Returns:
- the subscription handle.
- See Also:
-
receiveAutoAck
default Subscription receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Register a new subscription for a Redis Stream. If theis already running
theSubscription
will be added and run immediately, otherwise it'll be scheduled and started once the container is actuallystarted
.Every message is acknowledged when received.
Errors during
Record
retrieval lead tocancellation
of the underlying task.On
Lifecycle.stop()
allsubscriptions
are cancelled prior to shutting down the container itself.- Parameters:
consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.listener
- must not be null.- Returns:
- the subscription handle.
- See Also:
-
register
Subscription register(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) Register a new subscription for a Redis Stream. If theis already running
theSubscription
will be added and run immediately, otherwise it'll be scheduled and started once the container is actuallystarted
.Errors during
Record
are tested against testcancellation predicate
whether to cancel the underlying task.On
Lifecycle.stop()
allsubscriptions
are cancelled prior to shutting down the container itself.Errors during
Record
retrieval are delegated to the givenStreamMessageListenerContainer.StreamReadRequest.getErrorHandler()
.- Parameters:
streamRequest
- must not be null.listener
- must not be null.- Returns:
- the subscription handle.
- See Also:
-
remove
Unregister a givenSubscription
from the container. This prevents theSubscription
to be restarted in a potentialstop
/start
scenario.
Anactive
subcription
iscancelled
prior to removal.- Parameters:
subscription
- must not be null.
-