K - Stream key and Stream field type.V - Stream value type.public interface StreamReceiver<K,V extends Record<K,?>>
StreamReceiver can subscribe to a Redis Stream and consume incoming records.
Consider a Flux of Record infinite. Cancelling the Subscription
terminates eventually background polling. Records are converted using key and value
serializers to support various serialization strategies. StreamReceiver supports three modes of stream consumption:
Consumer with external
acknowledgeConsumer with auto-acknowledgeReadOffset, StreamReceiver applies an individual strategy to obtain the next ReadOffset:
ReadOffset.from(String) Offset using a particular record Id: Start with the given offset and use the last
seen record Id.ReadOffset.lastConsumed() Last consumed: Start with the latest offset ($) and use the last seen
record Id.ReadOffset.latest() Last consumed: Start with the latest offset ($) and use latest offset
($) for subsequent reads.Consumer
ReadOffset.from(String) Offset using a particular record Id: Start with the given offset and use the last
seen record Id.ReadOffset.lastConsumed() Last consumed: Start with the last consumed record by the consumer (>)
and use the last consumed record by the consumer (>) for subsequent reads.ReadOffset.latest() Last consumed: Start with the latest offset ($) and use latest offset
($) for subsequent reads.ReadOffset.latest() bears the chance of dropped records as records can arrive in the time
during polling is suspended. Use recordId's as offset or ReadOffset.lastConsumed() to minimize the chance of
record loss.
StreamReceiver propagates errors during stream reads and deserialization as terminal error signal by default.
Configuring a resume function allows conditional resumption by
dropping the record or by propagating the error to terminate the subscription.
StreamReceiver:
ReactiveRedisConnectionFactory factory = …; StreamReceiverreceiver = StreamReceiver.create(factory); Flux > records = receiver.receive(StreamOffset.fromStart("my-stream")); recordFlux.doOnNext(record -> …);
StreamReceiver.StreamReceiverOptions.builder(),
ReactiveStreamOperations,
ReactiveRedisConnectionFactory,
StreamMessageListenerContainer| Modifier and Type | Interface and Description |
|---|---|
static class |
StreamReceiver.StreamReceiverOptions<K,V extends Record<K,?>>
Options for
StreamReceiver. |
static class |
StreamReceiver.StreamReceiverOptionsBuilder<K,V extends Record<K,?>>
Builder for
StreamReceiver.StreamReceiverOptions. |
| Modifier and Type | Method and Description |
|---|---|
static StreamReceiver<String,MapRecord<String,String,String>> |
create(ReactiveRedisConnectionFactory connectionFactory)
|
static <K,V extends Record<K,?>> |
create(ReactiveRedisConnectionFactory connectionFactory,
StreamReceiver.StreamReceiverOptions<K,V> options)
Create a new
StreamReceiver given ReactiveRedisConnectionFactory and StreamReceiver.StreamReceiverOptions. |
reactor.core.publisher.Flux<V> |
receive(Consumer consumer,
StreamOffset<K> streamOffset)
|
reactor.core.publisher.Flux<V> |
receive(StreamOffset<K> streamOffset)
|
reactor.core.publisher.Flux<V> |
receiveAutoAck(Consumer consumer,
StreamOffset<K> streamOffset)
|
static StreamReceiver<String,MapRecord<String,String,String>> create(ReactiveRedisConnectionFactory connectionFactory)
connectionFactory - must not be null.StreamReceiver.static <K,V extends Record<K,?>> StreamReceiver<K,V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K,V> options)
StreamReceiver given ReactiveRedisConnectionFactory and StreamReceiver.StreamReceiverOptions.connectionFactory - must not be null.options - must not be null.StreamReceiver.reactor.core.publisher.Flux<V> receive(StreamOffset<K> streamOffset)
records from the stream. Records
are consumed from Redis and delivered on the returned Flux when requests are made on the Flux. The receiver
is closed when the returned Flux terminates.
Every record must be acknowledged using
ReactiveStreamCommands.xAck(ByteBuffer, String, String...)streamOffset - the stream along its offset.Records.StreamOffset.create(Object, ReadOffset)reactor.core.publisher.Flux<V> receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset)
records from the stream. Records
are consumed from Redis and delivered on the returned Flux when requests are made on the Flux. The receiver
is closed when the returned Flux terminates.
Every record is acknowledged when received.consumer - consumer group, must not be null.streamOffset - the stream along its offset.Records.StreamOffset.create(Object, ReadOffset),
ReadOffset.lastConsumed()reactor.core.publisher.Flux<V> receive(Consumer consumer, StreamOffset<K> streamOffset)
records from the stream. Records
are consumed from Redis and delivered on the returned Flux when requests are made on the Flux. The receiver
is closed when the returned Flux terminates.
Every record must be acknowledged using
ReactiveStreamOperations.acknowledge(Object, String, String...) after
processing.consumer - consumer group, must not be null.streamOffset - the stream along its offset.Records.StreamOffset.create(Object, ReadOffset),
ReadOffset.lastConsumed()Copyright © 2011–2021 Pivotal Software, Inc.. All rights reserved.