Interface ReactiveStreamOperations<K,HK,HV>
- All Superinterfaces:
HashMapperProvider<HK,
HV>
Reactive Redis operations for Stream Commands.
- Since:
- 2.2
- Author:
- Mark Paluch, Christoph Strobl, Dengliming, Marcin Zielinski, John Blum
-
Method Summary
Modifier and TypeMethodDescriptiondefault reactor.core.publisher.Mono<Long>
acknowledge
(String group, Record<K, ?> record) Acknowledge the given record as processed.default reactor.core.publisher.Mono<Long>
acknowledge
(K key, String group, String... recordIds) Acknowledge one or more records as processed.reactor.core.publisher.Mono<Long>
acknowledge
(K key, String group, RecordId... recordIds) Acknowledge one or more records as processed.default reactor.core.publisher.Mono<RecordId>
Append a record to the streamkey
.default reactor.core.publisher.Flux<RecordId>
Append one or more records to the streamkey
.default reactor.core.publisher.Mono<RecordId>
Append a record, backed by aMap
holding the field/value pairs, to the stream.reactor.core.publisher.Mono<RecordId>
Append the record, backed by the given value, to the stream.Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument.claim
(K key, String consumerGroup, String newOwner, RedisStreamCommands.XClaimOptions xClaimOptions) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument.reactor.core.publisher.Flux<StreamInfo.XInfoConsumer>
Obtain information about every consumer in a specific consumer group for the stream stored at the specified key.default reactor.core.publisher.Mono<String>
createGroup
(K key, String group) Create a consumer group at thelatest offset
.reactor.core.publisher.Mono<String>
createGroup
(K key, ReadOffset readOffset, String group) Create a consumer group.default reactor.core.publisher.Mono<Long>
Removes the specified records from the stream.reactor.core.publisher.Mono<Long>
Removes the specified records from the stream.default reactor.core.publisher.Mono<Long>
Removes a givenRecord
from the stream.reactor.core.publisher.Mono<String>
deleteConsumer
(K key, Consumer consumer) Delete a consumer from a consumer group.deserializeRecord
(ByteBufferRecord record) Deserialize aByteBufferRecord
using the configured serialization context into aMapRecord
.reactor.core.publisher.Mono<String>
destroyGroup
(K key, String group) Destroy a consumer group.<V> HashMapper<V,
HK, HV> getHashMapper
(Class<V> targetType) Get theHashMapper
for a specific type.reactor.core.publisher.Flux<StreamInfo.XInfoGroup>
Obtain information about consumer groups associated with the stream stored at the specified key.reactor.core.publisher.Mono<StreamInfo.XInfoStream>
Obtain general information about the stream stored at the specified key.default <V> ObjectRecord<K,
V> Map records fromMapRecord
toObjectRecord
.reactor.core.publisher.Mono<PendingMessagesSummary>
Obtain thePendingMessagesSummary
for a given consumer group.reactor.core.publisher.Mono<PendingMessages>
default reactor.core.publisher.Mono<PendingMessages>
Obtained detailed information about all pending messages for a givenConsumer
.reactor.core.publisher.Mono<PendingMessages>
default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> Read all records from a stream within a specificRange
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> Read records from a stream within a specificRange
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> read
(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group asObjectRecord
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> read
(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group asObjectRecord
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> read
(Class<V> targetType, StreamOffset<K> stream) Read records from aStreamOffset
asObjectRecord
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> read
(Class<V> targetType, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s asObjectRecord
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> read
(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s asObjectRecord
.read
(Consumer consumer, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group.read
(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group.read
(StreamOffset<K> stream) Read records from aStreamOffset
asObjectRecord
.read
(StreamOffset<K>... streams) Read records from one or moreStreamOffset
s.read
(StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> reverseRange
(Class<V> targetType, K key, Range<String> range) Read records from a stream within a specificRange
in reverse order asObjectRecord
.default <V> reactor.core.publisher.Flux<ObjectRecord<K,
V>> Read records from a stream within a specificRange
applying aLimit
in reverse order asObjectRecord
.reverseRange
(K key, Range<String> range) Read records from a stream within a specificRange
in reverse order.reverseRange
(K key, Range<String> range, Limit limit) reactor.core.publisher.Mono<Long>
Get the length of a stream.reactor.core.publisher.Mono<Long>
Trims the stream tocount
elements.reactor.core.publisher.Mono<Long>
Trims the stream tocount
elements.
-
Method Details
-
acknowledge
Acknowledge one or more records as processed.- Parameters:
key
- the stream key.group
- name of the consumer group.recordIds
- record Id's to acknowledge.- Returns:
- the
Mono
emitting the length of acknowledged records. - See Also:
-
acknowledge
Acknowledge one or more records as processed.- Parameters:
key
- the stream key.group
- name of the consumer group.recordIds
- record Id's to acknowledge.- Returns:
- the
Mono
emitting the length of acknowledged records. - See Also:
-
acknowledge
Acknowledge the given record as processed.- Parameters:
group
- name of the consumer group.record
- theRecord
to acknowledge.- Returns:
- the
Mono
emitting the length of acknowledged records. - See Also:
-
add
default reactor.core.publisher.Flux<RecordId> add(K key, org.reactivestreams.Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher) Append one or more records to the streamkey
.- Parameters:
key
- the stream key.bodyPublisher
- record bodyPublisher
.- Returns:
- the record Ids.
- See Also:
-
add
Append a record to the streamkey
.- Parameters:
key
- the stream key.content
- record content as Map.- Returns:
- the
Mono
emitting theRecordId
. - See Also:
-
add
Append a record, backed by aMap
holding the field/value pairs, to the stream.- Parameters:
record
- the record to append.- Returns:
- the
Mono
emitting theRecordId
. - See Also:
-
add
Append the record, backed by the given value, to the stream. The value will be hashed and serialized.- Parameters:
record
- must not be null.- Returns:
- the
Mono
emitting theRecordId
. - See Also:
-
claim
default reactor.core.publisher.Flux<MapRecord<K,HK, claimHV>> (K key, String consumerGroup, String newOwner, Duration minIdleTime, RecordId... recordIds) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument. The message is claimed only if its idle time (ms) is greater than theminimum idle time
specified when calling XCLAIM.- Parameters:
key
-key
to the steam.consumerGroup
-name
of the consumer group.newOwner
-name
of the consumer claiming the message.minIdleTime
-minimum idle time
required for a message to be claimed.recordIds
-record IDs
to be claimed.- Returns:
Flux
of claimedMapRecords
.- See Also:
-
claim
reactor.core.publisher.Flux<MapRecord<K,HK, claimHV>> (K key, String consumerGroup, String newOwner, RedisStreamCommands.XClaimOptions xClaimOptions) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument. The message is claimed only if its idle time (ms) is greater than the givenminimum idle time
specified when calling XCLAIM.- Parameters:
key
-key
to the steam.consumerGroup
-name
of the consumer group.newOwner
-name
of the consumer claiming the message.xClaimOptions
- additional parameters for the CLAIM call.- Returns:
- a
Flux
of claimedMapRecords
. - See Also:
-
delete
Removes the specified records from the stream. Returns the number of records deleted, that may be different from the number of IDs passed in case certain IDs do not exist.- Parameters:
key
- the stream key.recordIds
- stream record Id's.- Returns:
- the
Mono
emitting the number of removed records. - See Also:
-
delete
Removes a givenRecord
from the stream.- Parameters:
record
- must not be null.- Returns:
- he
Mono
emitting the number of removed records.
-
delete
Removes the specified records from the stream. Returns the number of records deleted, that may be different from the number of IDs passed in case certain IDs do not exist.- Parameters:
key
- the stream key.recordIds
- stream record Id's.- Returns:
- the
Mono
emitting the number of removed records. - See Also:
-
createGroup
Create a consumer group at thelatest offset
. This command creates the stream if it does not already exist.- Parameters:
key
- the key the stream is stored at.group
- name of the consumer group.- Returns:
- the
Mono
emitting OK if successful.. null when used in pipeline / transaction.
-
createGroup
Create a consumer group. This command creates the stream if it does not already exist.- Parameters:
key
- the key the stream is stored at.readOffset
- theReadOffset
to apply.group
- name of the consumer group.- Returns:
- the
Mono
emitting OK if successful.
-
deleteConsumer
Delete a consumer from a consumer group.- Parameters:
key
- the stream key.consumer
- consumer identified by group name and consumer key.- Returns:
- the
Mono
OK if successful. null when used in pipeline / transaction.
-
destroyGroup
Destroy a consumer group.- Parameters:
key
- the stream key.group
- name of the consumer group.- Returns:
- the
Mono
OK if successful. null when used in pipeline / transaction.
-
consumers
Obtain information about every consumer in a specific consumer group for the stream stored at the specified key.- Parameters:
key
- the key the stream is stored at.group
- name of the consumer group.- Returns:
- null when used in pipeline / transaction.
- Since:
- 2.3
-
groups
Obtain information about consumer groups associated with the stream stored at the specified key.- Parameters:
key
- the key the stream is stored at.- Returns:
- null when used in pipeline / transaction.
- Since:
- 2.3
-
info
Obtain general information about the stream stored at the specified key.- Parameters:
key
- the key the stream is stored at.- Returns:
- null when used in pipeline / transaction.
- Since:
- 2.3
-
pending
Obtain thePendingMessagesSummary
for a given consumer group.- Parameters:
key
- the key the stream is stored at. Must not be null.group
- the name of the consumer group. Must not be null.- Returns:
- a summary of pending messages within the given consumer group or null when used in pipeline / transaction.
- Since:
- 2.3
- See Also:
-
pending
Obtained detailed information about all pending messages for a givenConsumer
.- Parameters:
key
- the key the stream is stored at. Must not be null.consumer
- the consumer to fetchPendingMessages
for. Must not be null.- Returns:
- pending messages for the given
Consumer
or null when used in pipeline / transaction. - Since:
- 2.3
- See Also:
-
pending
reactor.core.publisher.Mono<PendingMessages> pending(K key, String group, Range<?> range, long count) - Parameters:
key
- the key the stream is stored at. Must not be null.group
- the name of the consumer group. Must not be null.range
- the range of messages ids to search within. Must not be null.count
- limit the number of results.- Returns:
- pending messages for the given consumer group or null when used in pipeline / transaction.
- Since:
- 2.3
- See Also:
-
pending
reactor.core.publisher.Mono<PendingMessages> pending(K key, Consumer consumer, Range<?> range, long count) Obtain detailed information about pendingmessages
for a givenRange
andConsumer
within a consumer group.- Parameters:
key
- the key the stream is stored at. Must not be null.consumer
- the name of theConsumer
. Must not be null.range
- the range of messages ids to search within. Must not be null.count
- limit the number of results.- Returns:
- pending messages for the given
Consumer
or null when used in pipeline / transaction. - Since:
- 2.3
- See Also:
-
size
Get the length of a stream.- Parameters:
key
- the stream key.- Returns:
- the
Mono
emitting the length of the stream. - See Also:
-
range
Read records from a stream within a specificRange
.- Parameters:
key
- the stream key.range
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
range
- Parameters:
key
- the stream key.range
- must not be null.limit
- must not be null.- Returns:
- lthe
Flux
emitting records one by one. - See Also:
-
range
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> range(Class<V> targetType, K key, Range<String> range) Read all records from a stream within a specificRange
.- Parameters:
targetType
- the target type of the payload.key
- the stream key.range
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
range
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> range(Class<V> targetType, K key, Range<String> range, Limit limit) - Parameters:
targetType
- the target type of the payload.key
- the stream key.range
- must not be null.limit
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
Read records from aStreamOffset
asObjectRecord
.- Parameters:
stream
- the stream to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, StreamOffset<K> stream) Read records from aStreamOffset
asObjectRecord
.- Parameters:
targetType
- the target type of the payload.stream
- the stream to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
Read records from one or moreStreamOffset
s.- Parameters:
streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s asObjectRecord
.- Parameters:
targetType
- the target type of the payload.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
reactor.core.publisher.Flux<MapRecord<K,HK, readHV>> (StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s.- Parameters:
readOptions
- read arguments.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s asObjectRecord
.- Parameters:
targetType
- the target type of the payload.readOptions
- read arguments.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
default reactor.core.publisher.Flux<MapRecord<K,HK, readHV>> (Consumer consumer, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group.- Parameters:
consumer
- consumer/group.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group asObjectRecord
.- Parameters:
targetType
- the target type of the payload.consumer
- consumer/group.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
reactor.core.publisher.Flux<MapRecord<K,HK, readHV>> (Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group.- Parameters:
consumer
- consumer/group.readOptions
- read arguments.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
read
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams) Read records from one or moreStreamOffset
s using a consumer group asObjectRecord
.- Parameters:
targetType
- the target type of the payload.consumer
- consumer/group.readOptions
- read arguments.streams
- the streams to read from.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
reverseRange
Read records from a stream within a specificRange
in reverse order.- Parameters:
key
- the stream key.range
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
reverseRange
reactor.core.publisher.Flux<MapRecord<K,HK, reverseRangeHV>> (K key, Range<String> range, Limit limit) - Parameters:
key
- the stream key.range
- must not be null.limit
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
reverseRange
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> reverseRange(Class<V> targetType, K key, Range<String> range) Read records from a stream within a specificRange
in reverse order asObjectRecord
.- Parameters:
targetType
- the target type of the payload.key
- the stream key.range
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
reverseRange
default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> reverseRange(Class<V> targetType, K key, Range<String> range, Limit limit) Read records from a stream within a specificRange
applying aLimit
in reverse order asObjectRecord
.- Parameters:
targetType
- the target type of the payload.key
- the stream key.range
- must not be null.limit
- must not be null.- Returns:
- the
Flux
emitting records one by one. - See Also:
-
trim
Trims the stream tocount
elements.- Parameters:
key
- the stream key.count
- length of the stream.- Returns:
- number of removed entries.
- See Also:
-
trim
Trims the stream tocount
elements.- Parameters:
key
- the stream key.count
- length of the stream.approximateTrimming
- the trimming must be performed in a approximated way in order to maximize performances.- Returns:
- number of removed entries.
- Since:
- 2.4
- See Also:
-
getHashMapper
Get theHashMapper
for a specific type.- Specified by:
getHashMapper
in interfaceHashMapperProvider<K,
HK> - Type Parameters:
V
-- Parameters:
targetType
- must not be null.- Returns:
- the
HashMapper
suitable for a given type;
-
map
Map records fromMapRecord
toObjectRecord
.- Parameters:
record
- the stream records to map.targetType
- the target type of the payload.- Returns:
- the mapped
ObjectRecord
. - Since:
- 2.x
-
deserializeRecord
Deserialize aByteBufferRecord
using the configured serialization context into aMapRecord
.- Parameters:
record
- the stream record to map.- Returns:
- deserialized
MapRecord
. - Since:
- 2.x
-