K
- Stream key and Stream field type.V
- Stream value type.public interface StreamMessageListenerContainer<K,V extends Record<K,?>> extends SmartLifecycle
StreamMessageListenerContainer
can subscribe to a Redis Stream and consume incoming
messages
. StreamMessageListenerContainer
allows multiple stream read requests and returns a
Subscription
handle per read request. Cancelling the Subscription
terminates eventually background
polling. Messages are converted using key and value serializers
to support various
serialization strategies. StreamMessageListenerContainer
supports multiple modes of stream consumption:
Consumer
with external
StreamOperations.acknowledge(Object, String, String...)
acknowledge}Consumer
with auto-acknowledgeReadOffset
, StreamMessageListenerContainer
applies an individual strategy to obtain the next
ReadOffset
: ReadOffset.from(String)
Offset using a particular message Id: Start with the given offset and use the
last seen message Id
.ReadOffset.lastConsumed()
Last consumed: Start with the latest offset ($
) and use the last seen
message Id
.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset
($
) for subsequent reads.Consumer
ReadOffset.from(String)
Offset using a particular message Id: Start with the given offset and use the
last seen message Id
.ReadOffset.lastConsumed()
Last consumed: Start with the last consumed message by the consumer (>
)
and use the last consumed message by the consumer (>
) for subsequent reads.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset
($
) for subsequent reads.ReadOffset.latest()
bears the chance of dropped messages as messages can arrive in the
time during polling is suspended. Use messagedId's as offset or ReadOffset.lastConsumed()
to minimize the
chance of message loss.
StreamMessageListenerContainer
requires a Executor
to fork long-running polling tasks on a different
Thread
. This thread is used as event loop to poll for stream messages and invoke the
listener callback
.
StreamMessageListenerContainer
tasks propagate errors during stream reads and
listener notification
to a configurable ErrorHandler
. Errors stop a
Subscription
by default. Configuring a Predicate
for a StreamMessageListenerContainer.StreamReadRequest
allows conditional
subscription cancelling or continuing on all errors.
See the following example code how to use StreamMessageListenerContainer
:
RedisConnectionFactory factory = …; StreamMessageListenerContainer> container = StreamMessageListenerContainer.create(factory); Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), message -> …); container.start(); // later container.stop();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
,
StreamListener
,
StreamMessageListenerContainer.StreamReadRequest
,
StreamMessageListenerContainer.ConsumerStreamReadRequest
,
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder.executor(Executor)
,
ErrorHandler
,
StreamOperations
,
RedisConnectionFactory
,
StreamReceiver
Modifier and Type | Interface and Description |
---|---|
static class |
StreamMessageListenerContainer.ConsumerStreamReadRequest<K>
Request to read a Redis Stream with a
Consumer . |
static class |
StreamMessageListenerContainer.ConsumerStreamReadRequestBuilder<K>
Builder to build a
StreamMessageListenerContainer.ConsumerStreamReadRequest . |
static class |
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K,V extends Record<K,?>>
Options for
StreamMessageListenerContainer . |
static class |
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder<K,V extends Record<K,?>>
|
static class |
StreamMessageListenerContainer.StreamReadRequest<K>
Request to read a Redis Stream.
|
static class |
StreamMessageListenerContainer.StreamReadRequestBuilder<K>
Builder to build a
StreamMessageListenerContainer.StreamReadRequest . |
DEFAULT_PHASE
Modifier and Type | Method and Description |
---|---|
static StreamMessageListenerContainer<String,MapRecord<String,String,String>> |
create(RedisConnectionFactory connectionFactory)
|
static <K,V extends Record<K,?>> |
create(RedisConnectionFactory connectionFactory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K,V> options)
|
default Subscription |
receive(Consumer consumer,
StreamOffset<K> streamOffset,
StreamListener<K,V> listener)
Register a new subscription for a Redis Stream.
|
default Subscription |
receive(StreamOffset<K> streamOffset,
StreamListener<K,V> listener)
Register a new subscription for a Redis Stream.
|
default Subscription |
receiveAutoAck(Consumer consumer,
StreamOffset<K> streamOffset,
StreamListener<K,V> listener)
Register a new subscription for a Redis Stream.
|
Subscription |
register(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest,
StreamListener<K,V> listener)
Register a new subscription for a Redis Stream.
|
void |
remove(Subscription subscription)
Unregister a given
Subscription from the container. |
getPhase, isAutoStartup, stop
static StreamMessageListenerContainer<String,MapRecord<String,String,String>> create(RedisConnectionFactory connectionFactory)
connectionFactory
- must not be null.StreamMessageListenerContainer
.static <K,V extends Record<K,?>> StreamMessageListenerContainer<K,V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K,V> options)
StreamMessageListenerContainer
given RedisConnectionFactory
and
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.connectionFactory
- must not be null.options
- must not be null.StreamMessageListenerContainer
.default Subscription receive(StreamOffset<K> streamOffset, StreamListener<K,V> listener)
is already
running
the Subscription
will be added and run immediately, otherwise it'll be scheduled and started once
the container is actually started
.
Errors during org.springframework.data.redis.connection.RedisStreamCommands.StreamMessage
retrieval lead to
cancellation
of the underlying task.
On Lifecycle.stop()
all subscriptions
are cancelled prior to
shutting down the container itself.streamOffset
- the stream along its offset.listener
- must not be null.StreamOffset.create(Object, ReadOffset)
default Subscription receive(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K,V> listener)
is already
running
the Subscription
will be added and run immediately, otherwise it'll be scheduled and started once
the container is actually started
.
Every message must be acknowledged using
StreamOperations.acknowledge(Object, String, String...)
after
processing.
Errors during Record
retrieval lead to cancellation
of the underlying task.
On Lifecycle.stop()
all subscriptions
are cancelled prior to
shutting down the container itself.consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.listener
- must not be null.StreamOffset.create(Object, ReadOffset)
,
ReadOffset.lastConsumed()
default Subscription receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K,V> listener)
is already
running
the Subscription
will be added and run immediately, otherwise it'll be scheduled and started once
the container is actually started
.
Every message is acknowledged when received.
Errors during Record
retrieval lead to cancellation
of the underlying task.
On Lifecycle.stop()
all subscriptions
are cancelled prior to
shutting down the container itself.consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.listener
- must not be null.StreamOffset.create(Object, ReadOffset)
,
ReadOffset.lastConsumed()
Subscription register(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K,V> listener)
is already
running
the Subscription
will be added and run immediately, otherwise it'll be scheduled and started once
the container is actually started
.
Errors during Record
are tested against test cancellation predicate
whether to cancel the underlying task.
On Lifecycle.stop()
all subscriptions
are cancelled prior to
shutting down the container itself.
Errors during Record
retrieval are delegated to the given StreamMessageListenerContainer.StreamReadRequest.getErrorHandler()
.streamRequest
- must not be null.listener
- must not be null.StreamMessageListenerContainer.StreamReadRequest
,
StreamMessageListenerContainer.ConsumerStreamReadRequest
void remove(Subscription subscription)
Subscription
from the container. This prevents the Subscription
to be restarted
in a potential stop
/start
scenario.active
subcription
is cancelled
prior to removal.subscription
- must not be null.Copyright © 2011–2020 Pivotal Software, Inc.. All rights reserved.