Interface StreamMessageListenerContainer<K,V extends Record<K,?>>

Type Parameters:
K - Stream key and Stream field type.
V - Stream value type.
All Superinterfaces:
Lifecycle, Phased, SmartLifecycle

public interface StreamMessageListenerContainer<K,V extends Record<K,?>> extends SmartLifecycle
Abstraction used by the framework representing a message listener container. Not meant to be implemented externally.

Once created, a StreamMessageListenerContainer can subscribe to a Redis Stream and consume incoming messages. StreamMessageListenerContainer allows multiple stream read requests and returns a Subscription handle per read request. Cancelling the Subscription terminates eventually background polling. Messages are converted using key and value serializers to support various serialization strategies.
StreamMessageListenerContainer supports multiple modes of stream consumption:

Reading from a stream requires polling and a strategy to advance stream offsets. Depending on the initial ReadOffset, StreamMessageListenerContainer applies an individual strategy to obtain the next ReadOffset:
Standalone
Using Consumer
  • ReadOffset.from(String) Offset using a particular message Id: Start with the given offset and use the last seen message Id.
  • ReadOffset.lastConsumed() Last consumed: Start with the last consumed message by the consumer (>) and use the last consumed message by the consumer (>) for subsequent reads.
  • ReadOffset.latest() Last consumed: Start with the latest offset ($) and use latest offset ($) for subsequent reads.
Note: Using ReadOffset.latest() bears the chance of dropped messages as messages can arrive in the time during polling is suspended. Use messagedId's as offset or ReadOffset.lastConsumed() to minimize the chance of message loss.

StreamMessageListenerContainer requires a Executor to fork long-running polling tasks on a different Thread. This thread is used as event loop to poll for stream messages and invoke the listener callback.

StreamMessageListenerContainer tasks propagate errors during stream reads and listener notification to a configurable ErrorHandler. Errors stop a Subscription by default. Configuring a Predicate for a StreamMessageListenerContainer.StreamReadRequest allows conditional subscription cancelling or continuing on all errors.

See the following example code how to use StreamMessageListenerContainer:

 RedisConnectionFactory factory = …;

 StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(factory);
 Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), message -> …);

 container.start();

 // later
 container.stop();
 
Since:
2.2
Author:
Mark Paluch, Christoph Strobl, Christian Rest
See Also: