Redis Streams

Redis Streams model a log data structure in an abstract approach. Typically, logs are append-only data structures and are consumed from the beginning on, at a random position, or by streaming new messages.

Learn more about Redis Streams in the Redis reference documentation.

Redis Streams can be roughly divided into two areas of functionality:

  • Appending records

  • Consuming records

Although this pattern has similarities to Pub/Sub, the main difference lies in the persistence of messages and how they are consumed.

While Pub/Sub relies on the broadcasting of transient messages (i.e. if you don’t listen, you miss a message), Redis Stream use a persistent, append-only data type that retains messages until the stream is trimmed. Another difference in consumption is that Pub/Sub registers a server-side subscription. Redis pushes arriving messages to the client while Redis Streams require active polling.

The org.springframework.data.redis.connection and org.springframework.data.redis.stream packages provide the core functionality for Redis Streams.

Appending

To send a record, you can use, as with the other operations, either the low-level RedisConnection or the high-level StreamOperations. Both entities offer the add (xAdd) method, which accepts the record and the destination stream as arguments. While RedisConnection requires raw data (array of bytes), the StreamOperations lets arbitrary objects be passed in as records, as shown in the following example:

// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);

Stream records carry a Map, key-value tuples, as their payload. Appending a record to a stream returns the RecordId that can be used as further reference.

Consuming

On the consuming side, one can consume one or multiple streams. Redis Streams provide read commands that allow consumption of the stream from an arbitrary position (random access) within the known stream content and beyond the stream end to consume new stream record.

At the low-level, RedisConnection offers the xRead and xReadGroup methods that map the Redis commands for reading and reading within a consumer group, respectively. Note that multiple streams can be used as arguments.

Subscription commands in Redis can be blocking. That is, calling xRead on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the read command times out or receives a message.

To consume stream messages, one can either poll for messages in application code, or use one of the two Asynchronous reception through Message Listener Containers, the imperative or the reactive one. Each time a new records arrives, the container notifies the application code.

Synchronous reception

While stream consumption is typically associated with asynchronous processing, it is possible to consume messages synchronously. The overloaded StreamOperations.read(…) methods provide this functionality. During a synchronous receive, the calling thread potentially blocks until a message becomes available. The property StreamReadOptions.block specifies how long the receiver should wait before giving up waiting for a message.

// Read message through RedisTemplate
RedisTemplate template = …

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
				StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
				StreamReadOptions.empty().count(2),
				StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

Asynchronous reception through Message Listener Containers

Due to its blocking nature, low-level polling is not attractive, as it requires connection and thread management for every single consumer. To alleviate this problem, Spring Data offers message listeners, which do all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).

Spring Data ships with two implementations tailored to the used programming model:

  • StreamMessageListenerContainer acts as message listener container for imperative programming models. It is used to consume records from a Redis Stream and drive the StreamListener instances that are injected into it.

  • StreamReceiver provides a reactive variant of a message listener. It is used to consume messages from a Redis Stream as potentially infinite stream and emit stream messages through a Flux.

StreamMessageListenerContainer and StreamReceiver are responsible for all threading of message reception and dispatch into the listener for processing. A message listener container/receiver is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.

Both containers allow runtime configuration changes so that you can add or remove subscriptions while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a RedisConnection only when needed. If all the listeners are unsubscribed, it automatically performs a cleanup, and the thread is released.

Imperative StreamMessageListenerContainer

In a fashion similar to a Message-Driven Bean (MDB) in the EJB world, the Stream-Driven POJO (SDP) acts as a receiver for Stream messages. The one restriction on an SDP is that it must implement the org.springframework.data.redis.stream.StreamListener interface. Please also be aware that in the case where your POJO receives messages on multiple threads, it is important to ensure that your implementation is thread-safe.

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

StreamListener represents a functional interface so implementations can be rewritten using their Lambda form:

message -> {

    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
};

Once you’ve implemented your StreamListener, it’s time to create a message listener container and register a subscription:

RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
			.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
				containerOptions);

Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);

Please refer to the Javadoc of the various message listener containers for a full description of the features supported by each implementation.

Reactive StreamReceiver

Reactive consumption of streaming data sources typically happens through a Flux of events or messages. The reactive receiver implementation is provided with StreamReceiver and its overloaded receive(…) messages. The reactive approach requires fewer infrastructure resources such as threads in comparison to StreamMessageListenerContainer as it is leveraging threading resources provided by the driver. The receiving stream is a demand-driven publisher of StreamMessage:

Flux<MapRecord<String, String, String>> messages = …

return messages.doOnNext(it -> {
    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
});

Now we need to create the StreamReceiver and register a subscription to consume stream messages:

ReactiveRedisConnectionFactory connectionFactory = …

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
				.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);

Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));

Please refer to the Javadoc of the various message listener containers for a full description of the features supported by each implementation.

Demand-driven consumption uses backpressure signals to activate and deactivate polling. StreamReceiver subscriptions pause polling if the demand is satisfied until subscribers signal further demand. Depending on the ReadOffset strategy, this can cause messages to be skipped.

Acknowledge strategies

When you read with messages via a Consumer Group, the server will remember that a given message was delivered and add it to the Pending Entries List (PEL). A list of messages delivered but not yet acknowledged.
Messages have to be acknowledged via StreamOperations.acknowledge in order to be removed from the Pending Entries List as shown in the snippet below.

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...

container.receive(Consumer.from("my-group", "my-consumer"), (1)
	StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
    msg -> {

	    // ...
	    redisTemplate.opsForStream().acknowledge("my-group", msg); (2)
    });
1 Read as my-consumer from group my-group. Received messages are not acknowledged.
2 Acknowledged the message after processing.
To auto acknowledge messages on receive use receiveAutoAck instead of receive.

ReadOffset strategies

Stream read operations accept a read offset specification to consume messages from the given offset on. ReadOffset represents the read offset specification. Redis supports 3 variants of offsets, depending on whether you consume the stream standalone or within a consumer group:

  • ReadOffset.latest() – Read the latest message.

  • ReadOffset.from(…) – Read after a specific message Id.

  • ReadOffset.lastConsumed() – Read after the last consumed message Id (consumer-group only).

In the context of a message container-based consumption, we need to advance (or increment) the read offset when consuming a message. Advancing depends on the requested ReadOffset and consumption mode (with/without consumer groups). The following matrix explains how containers advance ReadOffset:

Table 1. ReadOffset Advancing
Read offset Standalone Consumer Group

Latest

Read latest message

Read latest message

Specific Message Id

Use last seen message as the next MessageId

Use last seen message as the next MessageId

Last Consumed

Use last seen message as the next MessageId

Last consumed message as per consumer group

Reading from a specific message id and the last consumed message can be considered safe operations that ensure consumption of all messages that were appended to the stream. Using the latest message for read can skip messages that were added to the stream while the poll operation was in the state of dead time. Polling introduces a dead time in which messages can arrive between individual polling commands. Stream consumption is not a linear contiguous read but split into repeating XREAD calls.

Serialization

Any Record sent to the stream needs to be serialized to its binary format. Due to the streams closeness to the hash data structure the stream key, field names and values use the according serializers configured on the RedisTemplate.

Table 2. Stream Serialization
Stream Property Serializer Description

key

keySerializer

used for Record#getStream()

field

hashKeySerializer

used for each map key in the payload

value

hashValueSerializer

used for each map value in the payload

Please make sure to review RedisSerializers in use and note that if you decide to not use any serializer you need to make sure those values are binary already.

Object Mapping

Simple Values

StreamOperations allows to append simple values, via ObjectRecord, directly to the stream without having to put those values into a Map structure. The value will then be assigned to an payload field and can be extracted when reading back the value.

ObjectRecord<String, String> record = StreamRecords.newRecord()
    .in("my-stream")
    .ofObject("my-value");

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, String>> records = redisTemplate()
    .opsForStream()
    .read(String.class, StreamOffset.fromStart("my-stream"));
1 XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"

ObjectRecords pass through the very same serialization process as the all other records, thus the Record can also obtained using the untyped read operation returning a MapRecord.

Complex Values

Adding a complex value to the stream can be done in 3 ways:

  • Convert to simple value using e. g. a String JSON representation.

  • Serialize the value with a suitable RedisSerializer.

  • Convert the value into a Map suitable for serialization using a HashMapper.

The first variant is the most straight forward one but neglects the field value capabilities offered by the stream structure, still the values in the stream will be readable for other consumers. The 2nd option holds the same benefits as the first one, but may lead to a very specific consumer limitations as the all consumers must implement the very same serialization mechanism. The HashMapper approach is the a bit more complex one making use of the steams hash structure, but flattening the source. Still other consumers remain able to read the records as long as suitable serializer combinations are chosen.

HashMappers convert the payload to a Map with specific types. Make sure to use Hash-Key and Hash-Value serializers that are capable of (de-)serializing the hash.
ObjectRecord<String, User> record = StreamRecords.newRecord()
    .in("user-logon")
    .ofObject(new User("night", "angel"));

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, User>> records = redisTemplate()
    .opsForStream()
    .read(User.class, StreamOffset.fromStart("user-logon"));
1 XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel"

StreamOperations use by default ObjectHashMapper. You may provide a HashMapper suitable for your requirements when obtaining StreamOperations.

redisTemplate()
    .opsForStream(new Jackson2HashMapper(true))
    .add(record); (1)
1 XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel"

A StreamMessageListenerContainer may not be aware of any @TypeAlias used on domain types as those need to be resolved through a MappingContext. Make sure to initialize RedisMappingContext with a initialEntitySet.

@Bean
RedisMappingContext redisMappingContext() {
    RedisMappingContext ctx = new RedisMappingContext();
    ctx.setInitialEntitySet(Collections.singleton(Person.class));
    return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
    return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
    return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
    StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
            .objectMapper(hashMapper)
            .build();

    return StreamMessageListenerContainer.create(connectionFactory, options);
}