Interface StreamReceiver<K,V extends Record<K,?>>

Type Parameters:
K - Stream key and Stream field type.
V - Stream value type.

public interface StreamReceiver<K,V extends Record<K,?>>
A receiver to consume Redis Streams using reactive infrastructure.

Once created, a StreamReceiver can subscribe to a Redis Stream and consume incoming records. Consider a Flux of Record infinite. Cancelling the Subscription terminates eventually background polling. Records are converted using key and value serializers to support various serialization strategies.
StreamReceiver supports three modes of stream consumption:

Reading from a stream requires polling and a strategy to advance stream offsets. Depending on the initial ReadOffset, StreamReceiver applies an individual strategy to obtain the next ReadOffset:
Standalone
Using Consumer
  • ReadOffset.from(String) Offset using a particular record Id: Start with the given offset and use the last seen record Id.
  • ReadOffset.lastConsumed() Last consumed: Start with the last consumed record by the consumer (>) and use the last consumed record by the consumer (>) for subsequent reads.
  • ReadOffset.latest() Last consumed: Start with the latest offset ($) and use latest offset ($) for subsequent reads.
Note: Using ReadOffset.latest() bears the chance of dropped records as records can arrive in the time during polling is suspended. Use recordId's as offset or ReadOffset.lastConsumed() to minimize the chance of record loss.

StreamReceiver propagates errors during stream reads and deserialization as terminal error signal by default. Configuring a resume function allows conditional resumption by dropping the record or by propagating the error to terminate the subscription.

See the following example code how to use StreamReceiver:

 ReactiveRedisConnectionFactory factory = …;

 StreamReceiver<String, String, String> receiver = StreamReceiver.create(factory);
 Flux<MapRecord<String, String, String>> records = receiver.receive(StreamOffset.fromStart("my-stream"));

 recordFlux.doOnNext(record -> …);
 
Since:
2.2
Author:
Mark Paluch, Eddie McDaniel
See Also: