Interface ReactiveStreamOperations<K,HK,HV>

All Superinterfaces:
HashMapperProvider<HK,HV>

public interface ReactiveStreamOperations<K,HK,HV> extends HashMapperProvider<HK,HV>
Reactive Redis operations for Stream Commands.
Since:
2.2
Author:
Mark Paluch, Christoph Strobl, Dengliming, Marcin Zielinski, John Blum
  • Method Details

    • acknowledge

      default reactor.core.publisher.Mono<Long> acknowledge(K key, String group, String... recordIds)
      Acknowledge one or more records as processed.
      Parameters:
      key - the stream key.
      group - name of the consumer group.
      recordIds - record Id's to acknowledge.
      Returns:
      the Mono emitting the length of acknowledged records.
      See Also:
    • acknowledge

      reactor.core.publisher.Mono<Long> acknowledge(K key, String group, RecordId... recordIds)
      Acknowledge one or more records as processed.
      Parameters:
      key - the stream key.
      group - name of the consumer group.
      recordIds - record Id's to acknowledge.
      Returns:
      the Mono emitting the length of acknowledged records.
      See Also:
    • acknowledge

      default reactor.core.publisher.Mono<Long> acknowledge(String group, Record<K,?> record)
      Acknowledge the given record as processed.
      Parameters:
      group - name of the consumer group.
      record - the Record to acknowledge.
      Returns:
      the Mono emitting the length of acknowledged records.
      See Also:
    • add

      default reactor.core.publisher.Flux<RecordId> add(K key, org.reactivestreams.Publisher<? extends Map<? extends HK,? extends HV>> bodyPublisher)
      Append one or more records to the stream key.
      Parameters:
      key - the stream key.
      bodyPublisher - record body Publisher.
      Returns:
      the record Ids.
      See Also:
    • add

      default reactor.core.publisher.Mono<RecordId> add(K key, Map<? extends HK,? extends HV> content)
      Append a record to the stream key.
      Parameters:
      key - the stream key.
      content - record content as Map.
      Returns:
      the Mono emitting the RecordId.
      See Also:
    • add

      default reactor.core.publisher.Mono<RecordId> add(MapRecord<K,? extends HK,? extends HV> record)
      Append a record, backed by a Map holding the field/value pairs, to the stream.
      Parameters:
      record - the record to append.
      Returns:
      the Mono emitting the RecordId.
      See Also:
    • add

      reactor.core.publisher.Mono<RecordId> add(Record<K,?> record)
      Append the record, backed by the given value, to the stream. The value will be hashed and serialized.
      Parameters:
      record - must not be null.
      Returns:
      the Mono emitting the RecordId.
      See Also:
    • claim

      default reactor.core.publisher.Flux<MapRecord<K,HK,HV>> claim(K key, String consumerGroup, String newOwner, Duration minIdleTime, RecordId... recordIds)
      Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument. The message is claimed only if its idle time (ms) is greater than the minimum idle time specified when calling XCLAIM.
      Parameters:
      key - key to the steam.
      consumerGroup - name of the consumer group.
      newOwner - name of the consumer claiming the message.
      minIdleTime - minimum idle time required for a message to be claimed.
      recordIds - record IDs to be claimed.
      Returns:
      Flux of claimed MapRecords.
      See Also:
    • claim

      reactor.core.publisher.Flux<MapRecord<K,HK,HV>> claim(K key, String consumerGroup, String newOwner, RedisStreamCommands.XClaimOptions xClaimOptions)
      Changes the ownership of a pending message so that the new owner is the consumer specified as the command argument. The message is claimed only if its idle time (ms) is greater than the given minimum idle time specified when calling XCLAIM.
      Parameters:
      key - key to the steam.
      consumerGroup - name of the consumer group.
      newOwner - name of the consumer claiming the message.
      xClaimOptions - additional parameters for the CLAIM call.
      Returns:
      a Flux of claimed MapRecords.
      See Also:
    • delete

      default reactor.core.publisher.Mono<Long> delete(K key, String... recordIds)
      Removes the specified records from the stream. Returns the number of records deleted, that may be different from the number of IDs passed in case certain IDs do not exist.
      Parameters:
      key - the stream key.
      recordIds - stream record Id's.
      Returns:
      the Mono emitting the number of removed records.
      See Also:
    • delete

      default reactor.core.publisher.Mono<Long> delete(Record<K,?> record)
      Removes a given Record from the stream.
      Parameters:
      record - must not be null.
      Returns:
      he Mono emitting the number of removed records.
    • delete

      reactor.core.publisher.Mono<Long> delete(K key, RecordId... recordIds)
      Removes the specified records from the stream. Returns the number of records deleted, that may be different from the number of IDs passed in case certain IDs do not exist.
      Parameters:
      key - the stream key.
      recordIds - stream record Id's.
      Returns:
      the Mono emitting the number of removed records.
      See Also:
    • createGroup

      default reactor.core.publisher.Mono<String> createGroup(K key, String group)
      Create a consumer group at the latest offset. This command creates the stream if it does not already exist.
      Parameters:
      key - the key the stream is stored at.
      group - name of the consumer group.
      Returns:
      the Mono emitting OK if successful.. null when used in pipeline / transaction.
    • createGroup

      reactor.core.publisher.Mono<String> createGroup(K key, ReadOffset readOffset, String group)
      Create a consumer group. This command creates the stream if it does not already exist.
      Parameters:
      key - the key the stream is stored at.
      readOffset - the ReadOffset to apply.
      group - name of the consumer group.
      Returns:
      the Mono emitting OK if successful.
    • deleteConsumer

      reactor.core.publisher.Mono<String> deleteConsumer(K key, Consumer consumer)
      Delete a consumer from a consumer group.
      Parameters:
      key - the stream key.
      consumer - consumer identified by group name and consumer key.
      Returns:
      the Mono OK if successful. null when used in pipeline / transaction.
    • destroyGroup

      reactor.core.publisher.Mono<String> destroyGroup(K key, String group)
      Destroy a consumer group.
      Parameters:
      key - the stream key.
      group - name of the consumer group.
      Returns:
      the Mono OK if successful. null when used in pipeline / transaction.
    • consumers

      reactor.core.publisher.Flux<StreamInfo.XInfoConsumer> consumers(K key, String group)
      Obtain information about every consumer in a specific consumer group for the stream stored at the specified key.
      Parameters:
      key - the key the stream is stored at.
      group - name of the consumer group.
      Returns:
      null when used in pipeline / transaction.
      Since:
      2.3
    • groups

      reactor.core.publisher.Flux<StreamInfo.XInfoGroup> groups(K key)
      Obtain information about consumer groups associated with the stream stored at the specified key.
      Parameters:
      key - the key the stream is stored at.
      Returns:
      null when used in pipeline / transaction.
      Since:
      2.3
    • info

      reactor.core.publisher.Mono<StreamInfo.XInfoStream> info(K key)
      Obtain general information about the stream stored at the specified key.
      Parameters:
      key - the key the stream is stored at.
      Returns:
      null when used in pipeline / transaction.
      Since:
      2.3
    • pending

      @Nullable reactor.core.publisher.Mono<PendingMessagesSummary> pending(K key, String group)
      Obtain the PendingMessagesSummary for a given consumer group.
      Parameters:
      key - the key the stream is stored at. Must not be null.
      group - the name of the consumer group. Must not be null.
      Returns:
      a summary of pending messages within the given consumer group or null when used in pipeline / transaction.
      Since:
      2.3
      See Also:
    • pending

      default reactor.core.publisher.Mono<PendingMessages> pending(K key, Consumer consumer)
      Obtained detailed information about all pending messages for a given Consumer.
      Parameters:
      key - the key the stream is stored at. Must not be null.
      consumer - the consumer to fetch PendingMessages for. Must not be null.
      Returns:
      pending messages for the given Consumer or null when used in pipeline / transaction.
      Since:
      2.3
      See Also:
    • pending

      reactor.core.publisher.Mono<PendingMessages> pending(K key, String group, Range<?> range, long count)
      Obtain detailed information about pending messages for a given Range within a consumer group.
      Parameters:
      key - the key the stream is stored at. Must not be null.
      group - the name of the consumer group. Must not be null.
      range - the range of messages ids to search within. Must not be null.
      count - limit the number of results.
      Returns:
      pending messages for the given consumer group or null when used in pipeline / transaction.
      Since:
      2.3
      See Also:
    • pending

      reactor.core.publisher.Mono<PendingMessages> pending(K key, Consumer consumer, Range<?> range, long count)
      Obtain detailed information about pending messages for a given Range and Consumer within a consumer group.
      Parameters:
      key - the key the stream is stored at. Must not be null.
      consumer - the name of the Consumer. Must not be null.
      range - the range of messages ids to search within. Must not be null.
      count - limit the number of results.
      Returns:
      pending messages for the given Consumer or null when used in pipeline / transaction.
      Since:
      2.3
      See Also:
    • size

      reactor.core.publisher.Mono<Long> size(K key)
      Get the length of a stream.
      Parameters:
      key - the stream key.
      Returns:
      the Mono emitting the length of the stream.
      See Also:
    • range

      default reactor.core.publisher.Flux<MapRecord<K,HK,HV>> range(K key, Range<String> range)
      Read records from a stream within a specific Range.
      Parameters:
      key - the stream key.
      range - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • range

      reactor.core.publisher.Flux<MapRecord<K,HK,HV>> range(K key, Range<String> range, Limit limit)
      Read records from a stream within a specific Range applying a Limit.
      Parameters:
      key - the stream key.
      range - must not be null.
      limit - must not be null.
      Returns:
      lthe Flux emitting records one by one.
      See Also:
    • range

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> range(Class<V> targetType, K key, Range<String> range)
      Read all records from a stream within a specific Range.
      Parameters:
      targetType - the target type of the payload.
      key - the stream key.
      range - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • range

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> range(Class<V> targetType, K key, Range<String> range, Limit limit)
      Read records from a stream within a specific Range applying a Limit.
      Parameters:
      targetType - the target type of the payload.
      key - the stream key.
      range - must not be null.
      limit - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default reactor.core.publisher.Flux<MapRecord<K,HK,HV>> read(StreamOffset<K> stream)
      Read records from a StreamOffset as ObjectRecord.
      Parameters:
      stream - the stream to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, StreamOffset<K> stream)
      Read records from a StreamOffset as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      stream - the stream to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default reactor.core.publisher.Flux<MapRecord<K,HK,HV>> read(StreamOffset<K>... streams)
      Read records from one or more StreamOffsets.
      Parameters:
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      reactor.core.publisher.Flux<MapRecord<K,HK,HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets.
      Parameters:
      readOptions - read arguments.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      readOptions - read arguments.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default reactor.core.publisher.Flux<MapRecord<K,HK,HV>> read(Consumer consumer, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets using a consumer group.
      Parameters:
      consumer - consumer/group.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets using a consumer group as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      consumer - consumer/group.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      reactor.core.publisher.Flux<MapRecord<K,HK,HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets using a consumer group.
      Parameters:
      consumer - consumer/group.
      readOptions - read arguments.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • read

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> read(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)
      Read records from one or more StreamOffsets using a consumer group as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      consumer - consumer/group.
      readOptions - read arguments.
      streams - the streams to read from.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • reverseRange

      default reactor.core.publisher.Flux<MapRecord<K,HK,HV>> reverseRange(K key, Range<String> range)
      Read records from a stream within a specific Range in reverse order.
      Parameters:
      key - the stream key.
      range - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • reverseRange

      reactor.core.publisher.Flux<MapRecord<K,HK,HV>> reverseRange(K key, Range<String> range, Limit limit)
      Read records from a stream within a specific Range applying a Limit in reverse order.
      Parameters:
      key - the stream key.
      range - must not be null.
      limit - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • reverseRange

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> reverseRange(Class<V> targetType, K key, Range<String> range)
      Read records from a stream within a specific Range in reverse order as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      key - the stream key.
      range - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • reverseRange

      default <V> reactor.core.publisher.Flux<ObjectRecord<K,V>> reverseRange(Class<V> targetType, K key, Range<String> range, Limit limit)
      Read records from a stream within a specific Range applying a Limit in reverse order as ObjectRecord.
      Parameters:
      targetType - the target type of the payload.
      key - the stream key.
      range - must not be null.
      limit - must not be null.
      Returns:
      the Flux emitting records one by one.
      See Also:
    • trim

      reactor.core.publisher.Mono<Long> trim(K key, long count)
      Trims the stream to count elements.
      Parameters:
      key - the stream key.
      count - length of the stream.
      Returns:
      number of removed entries.
      See Also:
    • trim

      reactor.core.publisher.Mono<Long> trim(K key, long count, boolean approximateTrimming)
      Trims the stream to count elements.
      Parameters:
      key - the stream key.
      count - length of the stream.
      approximateTrimming - the trimming must be performed in a approximated way in order to maximize performances.
      Returns:
      number of removed entries.
      Since:
      2.4
      See Also:
    • getHashMapper

      <V> HashMapper<V,HK,HV> getHashMapper(Class<V> targetType)
      Get the HashMapper for a specific type.
      Specified by:
      getHashMapper in interface HashMapperProvider<K,HK>
      Type Parameters:
      V -
      Parameters:
      targetType - must not be null.
      Returns:
      the HashMapper suitable for a given type;
    • map

      default <V> ObjectRecord<K,V> map(MapRecord<K,HK,HV> record, Class<V> targetType)
      Map records from MapRecord to ObjectRecord.
      Parameters:
      record - the stream records to map.
      targetType - the target type of the payload.
      Returns:
      the mapped ObjectRecord.
      Since:
      2.x
    • deserializeRecord

      MapRecord<K,HK,HV> deserializeRecord(ByteBufferRecord record)
      Deserialize a ByteBufferRecord using the configured serialization context into a MapRecord.
      Parameters:
      record - the stream record to map.
      Returns:
      deserialized MapRecord.
      Since:
      2.x