Class ReactiveKafkaConsumerTemplate<K,V>

java.lang.Object
org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate<K,V>
Type Parameters:
K - the key type.
V - the value type.

public class ReactiveKafkaConsumerTemplate<K,V> extends Object
Reactive kafka consumer operations implementation.
Since:
2.3.0
Author:
Mark Norkin, Adrian Chlebosz, Marcus Voltolim
  • Constructor Summary

    Constructors
    Constructor
    Description
    ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.KafkaReceiver<K,V> kafkaReceiver)
     
    ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.ReceiverOptions<K,V> receiverOptions)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
     
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,Long>>
    beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
     
    reactor.core.publisher.Mono<Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>>
    committed(Set<org.apache.kafka.common.TopicPartition> partitions)
     
    <T> reactor.core.publisher.Mono<T>
    doOnConsumer(Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
     
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,Long>>
    endOffsets(org.apache.kafka.common.TopicPartition... partitions)
     
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<String,List<org.apache.kafka.common.PartitionInfo>>>
     
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>>
     
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>>
    offsetsForTimes(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch)
     
    reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo>
     
    reactor.core.publisher.Mono<Void>
    pause(org.apache.kafka.common.TopicPartition... partitions)
     
    reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
     
    reactor.core.publisher.Mono<Long>
    position(org.apache.kafka.common.TopicPartition partition)
     
    reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>
     
    reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>
     
    reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>
     
    reactor.core.publisher.Flux<reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>>
     
    reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>>
    receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
    Returns a Flux of consumer record batches that may be used for exactly once delivery semantics.
    reactor.core.publisher.Mono<Void>
    resume(org.apache.kafka.common.TopicPartition... partitions)
     
    reactor.core.publisher.Mono<Void>
    seek(org.apache.kafka.common.TopicPartition partition, long offset)
     
    reactor.core.publisher.Mono<Void>
    seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
     
    reactor.core.publisher.Mono<Void>
    seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
     
    reactor.core.publisher.Flux<String>
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • ReactiveKafkaConsumerTemplate

      public ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.ReceiverOptions<K,V> receiverOptions)
    • ReactiveKafkaConsumerTemplate

      public ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.KafkaReceiver<K,V> kafkaReceiver)
  • Method Details

    • receive

      public reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>> receive()
    • receiveBatch

      public reactor.core.publisher.Flux<reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>> receiveBatch()
    • receiveAutoAck

      public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAutoAck()
    • receiveAtMostOnce

      public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAtMostOnce()
    • receiveExactlyOnce

      public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
      Returns a Flux of consumer record batches that may be used for exactly once delivery semantics. A new transaction is started for each inner Flux and it is the responsibility of the consuming application to commit or abort the transaction using TransactionManager.commit() or TransactionManager.abort() after processing the Flux. The next batch of consumer records will be delivered only after the previous flux terminates. Offsets of records dispatched on each inner Flux are committed using the provided transactionManager within the transaction started for that Flux.

      Example usage:

       
       KafkaSender<Integer, Person> sender = sender(senderOptions());
       ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic));
       KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions);
       receiver.receiveExactlyOnce(sender.transactionManager())
       	 .concatMap(f -> sendAndCommit(f))
      	 .onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e)))
      	 .doOnCancel(() -> close());
      
       Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) {
       	return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key())))
      			.concatWith(sender.transactionManager().commit());
       }
       
       
      Parameters:
      transactionManager - Transaction manager used to begin new transaction for each inner Flux and commit offsets within that transaction
      Returns:
      Flux of consumer record batches processed within a transaction
    • doOnConsumer

      public <T> reactor.core.publisher.Mono<T> doOnConsumer(Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
    • assignment

      public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()
    • subscription

      public reactor.core.publisher.Flux<String> subscription()
    • seek

      public reactor.core.publisher.Mono<Void> seek(org.apache.kafka.common.TopicPartition partition, long offset)
    • seekToBeginning

      public reactor.core.publisher.Mono<Void> seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
    • seekToEnd

      public reactor.core.publisher.Mono<Void> seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
    • position

      public reactor.core.publisher.Mono<Long> position(org.apache.kafka.common.TopicPartition partition)
    • committed

      public reactor.core.publisher.Mono<Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>> committed(Set<org.apache.kafka.common.TopicPartition> partitions)
    • partitionsFromConsumerFor

      public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor(String topic)
    • paused

      public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()
    • pause

      public reactor.core.publisher.Mono<Void> pause(org.apache.kafka.common.TopicPartition... partitions)
    • resume

      public reactor.core.publisher.Mono<Void> resume(org.apache.kafka.common.TopicPartition... partitions)
    • metricsFromConsumer

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> metricsFromConsumer()
    • listTopics

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<String,List<org.apache.kafka.common.PartitionInfo>>> listTopics()
    • offsetsForTimes

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch)
    • beginningOffsets

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,Long>> beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
    • endOffsets

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,Long>> endOffsets(org.apache.kafka.common.TopicPartition... partitions)