Redis Streams
Redis Streams model a log data structure in an abstract approach. Typically, logs are append-only data structures and are consumed from the beginning on, at a random position, or by streaming new messages.
Learn more about Redis Streams in the Redis reference documentation. |
Redis Streams can be roughly divided into two areas of functionality:
-
Appending records
-
Consuming records
Although this pattern has similarities to Pub/Sub, the main difference lies in the persistence of messages and how they are consumed.
While Pub/Sub relies on the broadcasting of transient messages (i.e. if you don’t listen, you miss a message), Redis Stream use a persistent, append-only data type that retains messages until the stream is trimmed. Another difference in consumption is that Pub/Sub registers a server-side subscription. Redis pushes arriving messages to the client while Redis Streams require active polling.
The org.springframework.data.redis.connection
and org.springframework.data.redis.stream
packages provide the core functionality for Redis Streams.
Appending
To send a record, you can use, as with the other operations, either the low-level RedisConnection
or the high-level StreamOperations
. Both entities offer the add
(xAdd
) method, which accepts the record and the destination stream as arguments. While RedisConnection
requires raw data (array of bytes), the StreamOperations
lets arbitrary objects be passed in as records, as shown in the following example:
// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);
// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);
Stream records carry a Map
, key-value tuples, as their payload. Appending a record to a stream returns the RecordId
that can be used as further reference.
Consuming
On the consuming side, one can consume one or multiple streams. Redis Streams provide read commands that allow consumption of the stream from an arbitrary position (random access) within the known stream content and beyond the stream end to consume new stream record.
At the low-level, RedisConnection
offers the xRead
and xReadGroup
methods that map the Redis commands for reading and reading within a consumer group, respectively. Note that multiple streams can be used as arguments.
Subscription commands in Redis can be blocking. That is, calling xRead on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the read command times out or receives a message.
|
To consume stream messages, one can either poll for messages in application code, or use one of the two Asynchronous reception through Message Listener Containers, the imperative or the reactive one. Each time a new records arrives, the container notifies the application code.
Synchronous reception
While stream consumption is typically associated with asynchronous processing, it is possible to consume messages synchronously. The overloaded StreamOperations.read(…)
methods provide this functionality. During a synchronous receive, the calling thread potentially blocks until a message becomes available. The property StreamReadOptions.block
specifies how long the receiver should wait before giving up waiting for a message.
// Read message through RedisTemplate
RedisTemplate template = …
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
StreamOffset.latest("my-stream"));
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
StreamReadOptions.empty().count(2),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
Asynchronous reception through Message Listener Containers
Due to its blocking nature, low-level polling is not attractive, as it requires connection and thread management for every single consumer. To alleviate this problem, Spring Data offers message listeners, which do all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).
Spring Data ships with two implementations tailored to the used programming model:
-
StreamMessageListenerContainer
acts as message listener container for imperative programming models. It is used to consume records from a Redis Stream and drive theStreamListener
instances that are injected into it. -
StreamReceiver
provides a reactive variant of a message listener. It is used to consume messages from a Redis Stream as potentially infinite stream and emit stream messages through aFlux
.
StreamMessageListenerContainer
and StreamReceiver
are responsible for all threading of message reception and dispatch into the listener for processing. A message listener container/receiver is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.
Both containers allow runtime configuration changes so that you can add or remove subscriptions while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a RedisConnection
only when needed. If all the listeners are unsubscribed, it automatically performs a cleanup, and the thread is released.
Imperative StreamMessageListenerContainer
In a fashion similar to a Message-Driven Bean (MDB) in the EJB world, the Stream-Driven POJO (SDP) acts as a receiver for Stream messages. The one restriction on an SDP is that it must implement the StreamListener
interface. Please also be aware that in the case where your POJO receives messages on multiple threads, it is important to ensure that your implementation is thread-safe.
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
StreamListener
represents a functional interface so implementations can be rewritten using their Lambda form:
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
};
Once you’ve implemented your StreamListener
, it’s time to create a message listener container and register a subscription:
RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);
Please refer to the Javadoc of the various message listener containers for a full description of the features supported by each implementation.
Reactive StreamReceiver
Reactive consumption of streaming data sources typically happens through a Flux
of events or messages. The reactive receiver implementation is provided with StreamReceiver
and its overloaded receive(…)
messages. The reactive approach requires fewer infrastructure resources such as threads in comparison to StreamMessageListenerContainer
as it is leveraging threading resources provided by the driver. The receiving stream is a demand-driven publisher of StreamMessage
:
Flux<MapRecord<String, String, String>> messages = …
return messages.doOnNext(it -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
});
Now we need to create the StreamReceiver
and register a subscription to consume stream messages:
ReactiveRedisConnectionFactory connectionFactory = …
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));
Please refer to the Javadoc of the various message listener containers for a full description of the features supported by each implementation.
Demand-driven consumption uses backpressure signals to activate and deactivate polling. StreamReceiver subscriptions pause polling if the demand is satisfied until subscribers signal further demand. Depending on the ReadOffset strategy, this can cause messages to be skipped.
|
Acknowledge
strategies
When you read with messages via a Consumer Group
, the server will remember that a given message was delivered and add it to the Pending Entries List (PEL). A list of messages delivered but not yet acknowledged.
Messages have to be acknowledged via StreamOperations.acknowledge
in order to be removed from the Pending Entries List as shown in the snippet below.
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...
container.receive(Consumer.from("my-group", "my-consumer"), (1)
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
msg -> {
// ...
redisTemplate.opsForStream().acknowledge("my-group", msg); (2)
});
1 | Read as my-consumer from group my-group. Received messages are not acknowledged. |
2 | Acknowledged the message after processing. |
To auto acknowledge messages on receive use receiveAutoAck instead of receive .
|
ReadOffset
strategies
Stream read operations accept a read offset specification to consume messages from the given offset on. ReadOffset
represents the read offset specification. Redis supports 3 variants of offsets, depending on whether you consume the stream standalone or within a consumer group:
-
ReadOffset.latest()
– Read the latest message. -
ReadOffset.from(…)
– Read after a specific message Id. -
ReadOffset.lastConsumed()
– Read after the last consumed message Id (consumer-group only).
In the context of a message container-based consumption, we need to advance (or increment) the read offset when consuming a message. Advancing depends on the requested ReadOffset
and consumption mode (with/without consumer groups). The following matrix explains how containers advance ReadOffset
:
Read offset | Standalone | Consumer Group |
---|---|---|
Latest |
Read latest message |
Read latest message |
Specific Message Id |
Use last seen message as the next MessageId |
Use last seen message as the next MessageId |
Last Consumed |
Use last seen message as the next MessageId |
Last consumed message as per consumer group |
Reading from a specific message id and the last consumed message can be considered safe operations that ensure consumption of all messages that were appended to the stream.
Using the latest message for read can skip messages that were added to the stream while the poll operation was in the state of dead time. Polling introduces a dead time in which messages can arrive between individual polling commands. Stream consumption is not a linear contiguous read but split into repeating XREAD
calls.
Serialization
Any Record sent to the stream needs to be serialized to its binary format. Due to the streams closeness to the hash data structure the stream key, field names and values use the according serializers configured on the RedisTemplate
.
Stream Property | Serializer | Description |
---|---|---|
key |
keySerializer |
used for |
field |
hashKeySerializer |
used for each map key in the payload |
value |
hashValueSerializer |
used for each map value in the payload |
Please make sure to review RedisSerializer
s in use and note that if you decide to not use any serializer you need to make sure those values are binary already.
Object Mapping
Simple Values
StreamOperations
allows to append simple values, via ObjectRecord
, directly to the stream without having to put those values into a Map
structure.
The value will then be assigned to an payload field and can be extracted when reading back the value.
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in("my-stream")
.ofObject("my-value");
redisTemplate()
.opsForStream()
.add(record); (1)
List<ObjectRecord<String, String>> records = redisTemplate()
.opsForStream()
.read(String.class, StreamOffset.fromStart("my-stream"));
1 | XADD my-stream * "_class" "java.lang.String" "_raw" "my-value" |
ObjectRecord
s pass through the very same serialization process as the all other records, thus the Record can also obtained using the untyped read operation returning a MapRecord
.
Complex Values
Adding a complex value to the stream can be done in 3 ways:
-
Convert to simple value using e. g. a String JSON representation.
-
Serialize the value with a suitable
RedisSerializer
. -
Convert the value into a
Map
suitable for serialization using aHashMapper
.
The first variant is the most straight forward one but neglects the field value capabilities offered by the stream structure, still the values in the stream will be readable for other consumers.
The 2nd option holds the same benefits as the first one, but may lead to a very specific consumer limitations as the all consumers must implement the very same serialization mechanism.
The HashMapper
approach is the a bit more complex one making use of the steams hash structure, but flattening the source. Still other consumers remain able to read the records as long as suitable serializer combinations are chosen.
HashMappers convert the payload to a Map with specific types. Make sure to use Hash-Key and Hash-Value serializers that are capable of (de-)serializing the hash.
|
ObjectRecord<String, User> record = StreamRecords.newRecord()
.in("user-logon")
.ofObject(new User("night", "angel"));
redisTemplate()
.opsForStream()
.add(record); (1)
List<ObjectRecord<String, User>> records = redisTemplate()
.opsForStream()
.read(User.class, StreamOffset.fromStart("user-logon"));
1 | XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel" |
StreamOperations
use by default ObjectHashMapper.
You may provide a HashMapper
suitable for your requirements when obtaining StreamOperations
.
redisTemplate()
.opsForStream(new Jackson2HashMapper(true))
.add(record); (1)
1 | XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel" |
A
|