Interface ReactiveStreamOperations<K,HK,HV>
- All Superinterfaces:
HashMapperProvider<HK,HV>
Reactive Redis operations for Stream Commands.
- Since:
- 2.2
- Author:
- Mark Paluch, Christoph Strobl, Dengliming, Marcin Zielinski, John Blum, jinkshower
-
Method Summary
Modifier and TypeMethodDescriptionacknowledge(@NonNull K key, @NonNull String group, @NonNull String @NonNull ... recordIds) Acknowledge one or more records as processed.acknowledge(@NonNull K key, @NonNull String group, @NonNull RecordId @NonNull ... recordIds) Acknowledge one or more records as processed.acknowledge(@NonNull String group, @NonNull Record<K, ?> record) Acknowledge the given record as processed.Append a record to the streamkey.add(@NonNull K key, @NonNull Map<? extends HK, ? extends HV> content, @NonNull RedisStreamCommands.XAddOptions xAddOptions) Append a record to the streamkeywith the specified options.Append one or more records to the streamkey.Append a record, backed by aMapholding the field/value pairs, to the stream.add(@NonNull MapRecord<K, ? extends HK, ? extends HV> record, @NonNull RedisStreamCommands.XAddOptions xAddOptions) Append a record, backed by aMapholding the field/value pairs, to the stream with the specified options.Append the record, backed by the given value, to the stream.add(@NonNull Record<K, ?> record, @NonNull RedisStreamCommands.XAddOptions xAddOptions) Append the record, backed by the given value, to the stream with the specified options.claim(@NonNull K key, @NonNull String consumerGroup, @NonNull String newOwner, @NonNull Duration minIdleTime, @NonNull RecordId @NonNull ... recordIds) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument.claim(@NonNull K key, @NonNull String consumerGroup, @NonNull String newOwner, @NonNull RedisStreamCommands.XClaimOptions xClaimOptions) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument.Obtain information about every consumer in a specific consumer group for the stream stored at the specified key.createGroup(@NonNull K key, @NonNull String group) Create a consumer group at thelatest offset.createGroup(@NonNull K key, @NonNull ReadOffset readOffset, @NonNull String group) Create a consumer group.Removes the specified records from the stream.Removes the specified records from the stream.Removes a givenRecordfrom the stream.deleteConsumer(@NonNull K key, @NonNull Consumer consumer) Delete a consumer from a consumer group.deserializeRecord(@NonNull ByteBufferRecord record) Deserialize aByteBufferRecordusing the configured serialization context into aMapRecord.destroyGroup(@NonNull K key, @NonNull String group) Destroy a consumer group.<V> HashMapper<V, HK, HV> getHashMapper(@NonNull Class<V> targetType) Get theHashMapperfor a specific type.Obtain information about consumer groups associated with the stream stored at the specified key.Obtain general information about the stream stored at the specified key.default <V> ObjectRecord<K, V> Map records fromMapRecordtoObjectRecord.Obtain thePendingMessagesSummaryfor a given consumer group.default Mono<PendingMessages> Obtained detailed information about all pending messages for a givenConsumer.Read records from a stream within a specificRange.default <V> Flux<ObjectRecord<K, V>> Read all records from a stream within a specificRange.default <V> Flux<ObjectRecord<K, V>> range(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) default <V> Flux<ObjectRecord<K, V>> read(@NonNull Class<V> targetType, @NonNull Consumer consumer, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets using a consumer group asObjectRecord.default <V> Flux<ObjectRecord<K, V>> read(@NonNull Class<V> targetType, @NonNull Consumer consumer, @NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets using a consumer group asObjectRecord.default <V> Flux<ObjectRecord<K, V>> read(@NonNull Class<V> targetType, @NonNull StreamOffset<K> stream) Read records from aStreamOffsetasObjectRecord.default <V> Flux<ObjectRecord<K, V>> read(@NonNull Class<V> targetType, @NonNull StreamOffset<K>... streams) Read records from one or moreStreamOffsets asObjectRecord.default <V> Flux<ObjectRecord<K, V>> read(@NonNull Class<V> targetType, @NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets asObjectRecord.read(@NonNull Consumer consumer, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets using a consumer group.read(@NonNull Consumer consumer, @NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K>... streams) Read records from one or moreStreamOffsets using a consumer group.read(@NonNull StreamOffset<K> stream) Read records from aStreamOffsetasObjectRecord.read(@NonNull StreamOffset<K>... streams) Read records from one or moreStreamOffsets.read(@NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets.reverseRange(@NonNull K key, @NonNull Range<String> range) Read records from a stream within a specificRangein reverse order.reverseRange(@NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) default <V> Flux<ObjectRecord<K, V>> reverseRange(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range) Read records from a stream within a specificRangein reverse order asObjectRecord.default <V> Flux<ObjectRecord<K, V>> reverseRange(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) Read records from a stream within a specificRangeapplying aLimitin reverse order asObjectRecord.Get the length of a stream.Trims the stream tocountelements.Trims the stream tocountelements.
-
Method Details
-
acknowledge
default Mono<Long> acknowledge(@NonNull K key, @NonNull String group, @NonNull String @NonNull ... recordIds) Acknowledge one or more records as processed.- Parameters:
key- the stream key.group- name of the consumer group.recordIds- record Id's to acknowledge.- Returns:
- the
Monoemitting the length of acknowledged records. - See Also:
-
acknowledge
Mono<Long> acknowledge(@NonNull K key, @NonNull String group, @NonNull RecordId @NonNull ... recordIds) Acknowledge one or more records as processed.- Parameters:
key- the stream key.group- name of the consumer group.recordIds- record Id's to acknowledge.- Returns:
- the
Monoemitting the length of acknowledged records. - See Also:
-
acknowledge
-
add
-
add
-
add
Mono<RecordId> add(@NonNull Record<K, ?> record, @NonNull RedisStreamCommands.XAddOptions xAddOptions) Append the record, backed by the given value, to the stream with the specified options. The value will be hashed and serialized. -
add
-
add
-
add
-
add
-
claim
default Flux<MapRecord<K,HK, claimHV>> (@NonNull K key, @NonNull String consumerGroup, @NonNull String newOwner, @NonNull Duration minIdleTime, @NonNull RecordId @NonNull ... recordIds) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument. The message is claimed only if its idle time (ms) is greater than theminimum idle timespecified when calling XCLAIM.- Parameters:
key-keyto the steam.consumerGroup-nameof the consumer group.newOwner-nameof the consumer claiming the message.minIdleTime-minimum idle timerequired for a message to be claimed.recordIds-record IDsto be claimed.- Returns:
Fluxof claimedMapRecords.- See Also:
-
claim
Flux<MapRecord<K,HK, claimHV>> (@NonNull K key, @NonNull String consumerGroup, @NonNull String newOwner, @NonNull RedisStreamCommands.XClaimOptions xClaimOptions) Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument. The message is claimed only if its idle time (ms) is greater than the givenminimum idle timespecified when calling XCLAIM.- Parameters:
key-keyto the steam.consumerGroup-nameof the consumer group.newOwner-nameof the consumer claiming the message.xClaimOptions- additional parameters for the CLAIM call.- Returns:
- a
Fluxof claimedMapRecords. - See Also:
-
delete
Removes the specified records from the stream. Returns the number of records deleted, that may be different from the number of IDs passed in case certain IDs do not exist.- Parameters:
key- the stream key.recordIds- stream record Id's.- Returns:
- the
Monoemitting the number of removed records. - See Also:
-
delete
-
delete
Removes the specified records from the stream. Returns the number of records deleted, that may be different from the number of IDs passed in case certain IDs do not exist.- Parameters:
key- the stream key.recordIds- stream record Id's.- Returns:
- the
Monoemitting the number of removed records. - See Also:
-
createGroup
Create a consumer group at thelatest offset. This command creates the stream if it does not already exist.- Parameters:
key- the key the stream is stored at.group- name of the consumer group.- Returns:
- the
Monoemitting OK if successful.. null when used in pipeline / transaction.
-
createGroup
Create a consumer group. This command creates the stream if it does not already exist.- Parameters:
key- the key the stream is stored at.readOffset- theReadOffsetto apply.group- name of the consumer group.- Returns:
- the
Monoemitting OK if successful.
-
deleteConsumer
-
destroyGroup
-
consumers
Obtain information about every consumer in a specific consumer group for the stream stored at the specified key.- Parameters:
key- the key the stream is stored at.group- name of the consumer group.- Returns:
- null when used in pipeline / transaction.
- Since:
- 2.3
-
groups
Obtain information about consumer groups associated with the stream stored at the specified key.- Parameters:
key- the key the stream is stored at.- Returns:
- null when used in pipeline / transaction.
- Since:
- 2.3
-
info
Obtain general information about the stream stored at the specified key.- Parameters:
key- the key the stream is stored at.- Returns:
- null when used in pipeline / transaction.
- Since:
- 2.3
-
pending
Obtain thePendingMessagesSummaryfor a given consumer group.- Parameters:
key- the key the stream is stored at. Must not be null.group- the name of the consumer group. Must not be null.- Returns:
- a summary of pending messages within the given consumer group or null when used in pipeline / transaction.
- Since:
- 2.3
- See Also:
-
pending
Obtained detailed information about all pending messages for a givenConsumer.- Parameters:
key- the key the stream is stored at. Must not be null.consumer- the consumer to fetchPendingMessagesfor. Must not be null.- Returns:
- pending messages for the given
Consumeror null when used in pipeline / transaction. - Since:
- 2.3
- See Also:
-
pending
Mono<PendingMessages> pending(@NonNull K key, @NonNull String group, @NonNull Range<?> range, long count) - Parameters:
key- the key the stream is stored at. Must not be null.group- the name of the consumer group. Must not be null.range- the range of messages ids to search within. Must not be null.count- limit the number of results.- Returns:
- pending messages for the given consumer group or null when used in pipeline / transaction.
- Since:
- 2.3
- See Also:
-
pending
Mono<PendingMessages> pending(@NonNull K key, @NonNull Consumer consumer, @NonNull Range<?> range, long count) Obtain detailed information about pendingmessagesfor a givenRangeandConsumerwithin a consumer group.- Parameters:
key- the key the stream is stored at. Must not be null.consumer- the name of theConsumer. Must not be null.range- the range of messages ids to search within. Must not be null.count- limit the number of results.- Returns:
- pending messages for the given
Consumeror null when used in pipeline / transaction. - Since:
- 2.3
- See Also:
-
size
-
range
-
range
- Parameters:
key- the stream key.range- must not be null.limit- must not be null.- Returns:
- lthe
Fluxemitting records one by one. - See Also:
-
range
default <V> Flux<ObjectRecord<K,V>> range(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range) Read all records from a stream within a specificRange.- Parameters:
targetType- the target type of the payload.key- the stream key.range- must not be null.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
range
default <V> Flux<ObjectRecord<K,V>> range(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) - Parameters:
targetType- the target type of the payload.key- the stream key.range- must not be null.limit- must not be null.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
Read records from aStreamOffsetasObjectRecord.- Parameters:
stream- the stream to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
default <V> Flux<ObjectRecord<K,V>> read(@NonNull Class<V> targetType, @NonNull StreamOffset<K> stream) Read records from aStreamOffsetasObjectRecord.- Parameters:
targetType- the target type of the payload.stream- the stream to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
Read records from one or moreStreamOffsets.- Parameters:
streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
default <V> Flux<ObjectRecord<K,V>> read(@NonNull Class<V> targetType, @NonNull StreamOffset<K>... streams) Read records from one or moreStreamOffsets asObjectRecord.- Parameters:
targetType- the target type of the payload.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
Flux<MapRecord<K,HK, readHV>> (@NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets.- Parameters:
readOptions- read arguments.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
default <V> Flux<ObjectRecord<K,V>> read(@NonNull Class<V> targetType, @NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets asObjectRecord.- Parameters:
targetType- the target type of the payload.readOptions- read arguments.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
default Flux<MapRecord<K,HK, readHV>> (@NonNull Consumer consumer, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets using a consumer group.- Parameters:
consumer- consumer/group.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
default <V> Flux<ObjectRecord<K,V>> read(@NonNull Class<V> targetType, @NonNull Consumer consumer, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets using a consumer group asObjectRecord.- Parameters:
targetType- the target type of the payload.consumer- consumer/group.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
Flux<MapRecord<K,HK, readHV>> (@NonNull Consumer consumer, @NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K>... streams) Read records from one or moreStreamOffsets using a consumer group.- Parameters:
consumer- consumer/group.readOptions- read arguments.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
read
default <V> Flux<ObjectRecord<K,V>> read(@NonNull Class<V> targetType, @NonNull Consumer consumer, @NonNull StreamReadOptions readOptions, @NonNull StreamOffset<K> @NonNull ... streams) Read records from one or moreStreamOffsets using a consumer group asObjectRecord.- Parameters:
targetType- the target type of the payload.consumer- consumer/group.readOptions- read arguments.streams- the streams to read from.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
reverseRange
-
reverseRange
-
reverseRange
default <V> Flux<ObjectRecord<K,V>> reverseRange(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range) Read records from a stream within a specificRangein reverse order asObjectRecord.- Parameters:
targetType- the target type of the payload.key- the stream key.range- must not be null.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
reverseRange
default <V> Flux<ObjectRecord<K,V>> reverseRange(@NonNull Class<V> targetType, @NonNull K key, @NonNull Range<String> range, @NonNull Limit limit) Read records from a stream within a specificRangeapplying aLimitin reverse order asObjectRecord.- Parameters:
targetType- the target type of the payload.key- the stream key.range- must not be null.limit- must not be null.- Returns:
- the
Fluxemitting records one by one. - See Also:
-
trim
-
trim
Trims the stream tocountelements.- Parameters:
key- the stream key.count- length of the stream.approximateTrimming- the trimming must be performed in a approximated way in order to maximize performances.- Returns:
- number of removed entries.
- Since:
- 2.4
- See Also:
-
getHashMapper
Get theHashMapperfor a specific type.- Specified by:
getHashMapperin interfaceHashMapperProvider<K,HK> - Type Parameters:
V-- Parameters:
targetType- must not be null.- Returns:
- the
HashMappersuitable for a given type;
-
map
Map records fromMapRecordtoObjectRecord.- Parameters:
record- the stream records to map.targetType- the target type of the payload.- Returns:
- the mapped
ObjectRecord. - Since:
- 2.x
-
deserializeRecord
Deserialize aByteBufferRecordusing the configured serialization context into aMapRecord.- Parameters:
record- the stream record to map.- Returns:
- deserialized
MapRecord. - Since:
- 2.x
-