K
- Stream key and Stream field type.V
- Stream value type.public interface StreamReceiver<K,V extends Record<K,?>>
Once created, a StreamReceiver
can subscribe to a Redis Stream and consume incoming records
.
Consider a Flux
of Record
infinite. Cancelling the Subscription
terminates eventually background polling. Records are converted using key and value
serializers
to support various serialization strategies.
StreamReceiver
supports three modes of stream consumption:
Consumer
with external
acknowledge
Consumer
with auto-acknowledgeReadOffset
, StreamReceiver
applies an individual strategy to obtain the next ReadOffset
:
ReadOffset.from(String)
Offset using a particular record Id: Start with the given offset and use the last
seen record Id
.ReadOffset.lastConsumed()
Last consumed: Start with the latest offset ($
) and use the last seen
record Id
.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset
($
) for subsequent reads.Consumer
ReadOffset.from(String)
Offset using a particular record Id: Start with the given offset and use the last
seen record Id
.ReadOffset.lastConsumed()
Last consumed: Start with the last consumed record by the consumer (>
)
and use the last consumed record by the consumer (>
) for subsequent reads.ReadOffset.latest()
Last consumed: Start with the latest offset ($
) and use latest offset
($
) for subsequent reads.ReadOffset.latest()
bears the chance of dropped records as records can arrive in the time
during polling is suspended. Use recordId's as offset or ReadOffset.lastConsumed()
to minimize the chance of
record loss.
StreamReceiver
propagates errors during stream reads and deserialization as terminal error signal by default.
Configuring a resume function
allows conditional resumption by
dropping the record or by propagating the error to terminate the subscription.
See the following example code how to use StreamReceiver
:
ReactiveRedisConnectionFactory factory = …; StreamReceiverreceiver = StreamReceiver.create(factory); Flux > records = receiver.receive(StreamOffset.fromStart("my-stream")); recordFlux.doOnNext(record -> …);
StreamReceiver.StreamReceiverOptions.builder()
,
ReactiveStreamOperations
,
ReactiveRedisConnectionFactory
,
StreamMessageListenerContainer
Modifier and Type | Interface and Description |
---|---|
static class |
StreamReceiver.StreamReceiverOptions<K,V extends Record<K,?>>
Options for
StreamReceiver . |
static class |
StreamReceiver.StreamReceiverOptionsBuilder<K,V extends Record<K,?>>
Builder for
StreamReceiver.StreamReceiverOptions . |
Modifier and Type | Method and Description |
---|---|
static StreamReceiver<String,MapRecord<String,String,String>> |
create(ReactiveRedisConnectionFactory connectionFactory)
|
static <K,V extends Record<K,?>> |
create(ReactiveRedisConnectionFactory connectionFactory,
StreamReceiver.StreamReceiverOptions<K,V> options)
Create a new
StreamReceiver given ReactiveRedisConnectionFactory and StreamReceiver.StreamReceiverOptions . |
reactor.core.publisher.Flux<V> |
receive(Consumer consumer,
StreamOffset<K> streamOffset)
|
reactor.core.publisher.Flux<V> |
receive(StreamOffset<K> streamOffset)
|
reactor.core.publisher.Flux<V> |
receiveAutoAck(Consumer consumer,
StreamOffset<K> streamOffset)
|
static StreamReceiver<String,MapRecord<String,String,String>> create(ReactiveRedisConnectionFactory connectionFactory)
connectionFactory
- must not be null.StreamReceiver
.static <K,V extends Record<K,?>> StreamReceiver<K,V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K,V> options)
StreamReceiver
given ReactiveRedisConnectionFactory
and StreamReceiver.StreamReceiverOptions
.connectionFactory
- must not be null.options
- must not be null.StreamReceiver
.reactor.core.publisher.Flux<V> receive(StreamOffset<K> streamOffset)
records
from the stream
. Records
are consumed from Redis and delivered on the returned Flux
when requests are made on the Flux. The receiver
is closed when the returned Flux
terminates.
Every record must be acknowledged using
ReactiveStreamCommands.xAck(ByteBuffer, String, String...)
streamOffset
- the stream along its offset.Record
s.StreamOffset.create(Object, ReadOffset)
reactor.core.publisher.Flux<V> receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset)
records
from the stream
. Records
are consumed from Redis and delivered on the returned Flux
when requests are made on the Flux. The receiver
is closed when the returned Flux
terminates.
Every record is acknowledged when received.
consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.Record
s.StreamOffset.create(Object, ReadOffset)
,
ReadOffset.lastConsumed()
reactor.core.publisher.Flux<V> receive(Consumer consumer, StreamOffset<K> streamOffset)
records
from the stream
. Records
are consumed from Redis and delivered on the returned Flux
when requests are made on the Flux. The receiver
is closed when the returned Flux
terminates.
Every record must be acknowledged using
ReactiveStreamOperations.acknowledge(Object, String, String...)
after
processing.
consumer
- consumer group, must not be null.streamOffset
- the stream along its offset.Record
s.StreamOffset.create(Object, ReadOffset)
,
ReadOffset.lastConsumed()
Copyright © 2011–2023 Pivotal Software, Inc.. All rights reserved.