Class ReactiveKafkaConsumerTemplate<K,​V>

  • Type Parameters:
    K - the key type.
    V - the value type.

    public class ReactiveKafkaConsumerTemplate<K,​V>
    extends java.lang.Object
    Reactive kafka consumer operations implementation.
    Since:
    2.3.0
    Author:
    Mark Norkin
    • Constructor Summary

      Constructors 
      Constructor Description
      ReactiveKafkaConsumerTemplate​(reactor.kafka.receiver.ReceiverOptions<K,​V> receiverOptions)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()  
      reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,​java.lang.Long>> beginningOffsets​(org.apache.kafka.common.TopicPartition... partitions)  
      reactor.core.publisher.Mono<java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata>> committed​(java.util.Set<org.apache.kafka.common.TopicPartition> partitions)  
      <T> reactor.core.publisher.Mono<T> doOnConsumer​(java.util.function.Function<org.apache.kafka.clients.consumer.Consumer<K,​V>,​? extends T> function)  
      reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,​java.lang.Long>> endOffsets​(org.apache.kafka.common.TopicPartition... partitions)  
      reactor.core.publisher.Flux<reactor.util.function.Tuple2<java.lang.String,​java.util.List<org.apache.kafka.common.PartitionInfo>>> listTopics()  
      reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,​? extends org.apache.kafka.common.Metric>> metricsFromConsumer()  
      reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes​(java.util.Map<org.apache.kafka.common.TopicPartition,​java.lang.Long> timestampsToSearch)  
      reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor​(java.lang.String topic)  
      reactor.core.publisher.Mono<java.lang.Void> pause​(org.apache.kafka.common.TopicPartition... partitions)  
      reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()  
      reactor.core.publisher.Mono<java.lang.Long> position​(org.apache.kafka.common.TopicPartition partition)  
      reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,​V>> receive()  
      reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,​V>> receiveAtMostOnce()  
      reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,​V>> receiveAutoAck()  
      reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,​V>>> receiveExactlyOnce​(reactor.kafka.sender.TransactionManager transactionManager)
      Returns a Flux of consumer record batches that may be used for exactly once delivery semantics.
      reactor.core.publisher.Mono<java.lang.Void> resume​(org.apache.kafka.common.TopicPartition... partitions)  
      reactor.core.publisher.Mono<java.lang.Void> seek​(org.apache.kafka.common.TopicPartition partition, long offset)  
      reactor.core.publisher.Mono<java.lang.Void> seekToBeginning​(org.apache.kafka.common.TopicPartition... partitions)  
      reactor.core.publisher.Mono<java.lang.Void> seekToEnd​(org.apache.kafka.common.TopicPartition... partitions)  
      reactor.core.publisher.Flux<java.lang.String> subscription()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • ReactiveKafkaConsumerTemplate

        public ReactiveKafkaConsumerTemplate​(reactor.kafka.receiver.ReceiverOptions<K,​V> receiverOptions)
    • Method Detail

      • receive

        public reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,​V>> receive()
      • receiveAutoAck

        public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,​V>> receiveAutoAck()
      • receiveAtMostOnce

        public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,​V>> receiveAtMostOnce()
      • receiveExactlyOnce

        public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,​V>>> receiveExactlyOnce​(reactor.kafka.sender.TransactionManager transactionManager)
        Returns a Flux of consumer record batches that may be used for exactly once delivery semantics. A new transaction is started for each inner Flux and it is the responsibility of the consuming application to commit or abort the transaction using TransactionManager.commit() or TransactionManager.abort() after processing the Flux. The next batch of consumer records will be delivered only after the previous flux terminates. Offsets of records dispatched on each inner Flux are committed using the provided transactionManager within the transaction started for that Flux.

        Example usage:

         
         KafkaSender<Integer, Person> sender = sender(senderOptions());
         ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic));
         KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions);
         receiver.receiveExactlyOnce(sender.transactionManager())
         	 .concatMap(f -> sendAndCommit(f))
        	 .onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e)))
        	 .doOnCancel(() -> close());
        
         Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) {
         	return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key())))
        			.concatWith(sender.transactionManager().commit());
         }
         
         
        Parameters:
        transactionManager - Transaction manager used to begin new transaction for each inner Flux and commit offsets within that transaction
        Returns:
        Flux of consumer record batches processed within a transaction
      • doOnConsumer

        public <T> reactor.core.publisher.Mono<T> doOnConsumer​(java.util.function.Function<org.apache.kafka.clients.consumer.Consumer<K,​V>,​? extends T> function)
      • assignment

        public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()
      • subscription

        public reactor.core.publisher.Flux<java.lang.String> subscription()
      • seek

        public reactor.core.publisher.Mono<java.lang.Void> seek​(org.apache.kafka.common.TopicPartition partition,
                                                                long offset)
      • seekToBeginning

        public reactor.core.publisher.Mono<java.lang.Void> seekToBeginning​(org.apache.kafka.common.TopicPartition... partitions)
      • seekToEnd

        public reactor.core.publisher.Mono<java.lang.Void> seekToEnd​(org.apache.kafka.common.TopicPartition... partitions)
      • position

        public reactor.core.publisher.Mono<java.lang.Long> position​(org.apache.kafka.common.TopicPartition partition)
      • committed

        public reactor.core.publisher.Mono<java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata>> committed​(java.util.Set<org.apache.kafka.common.TopicPartition> partitions)
      • partitionsFromConsumerFor

        public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor​(java.lang.String topic)
      • paused

        public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()
      • pause

        public reactor.core.publisher.Mono<java.lang.Void> pause​(org.apache.kafka.common.TopicPartition... partitions)
      • resume

        public reactor.core.publisher.Mono<java.lang.Void> resume​(org.apache.kafka.common.TopicPartition... partitions)
      • metricsFromConsumer

        public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,​? extends org.apache.kafka.common.Metric>> metricsFromConsumer()
      • listTopics

        public reactor.core.publisher.Flux<reactor.util.function.Tuple2<java.lang.String,​java.util.List<org.apache.kafka.common.PartitionInfo>>> listTopics()
      • offsetsForTimes

        public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes​(java.util.Map<org.apache.kafka.common.TopicPartition,​java.lang.Long> timestampsToSearch)
      • beginningOffsets

        public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,​java.lang.Long>> beginningOffsets​(org.apache.kafka.common.TopicPartition... partitions)
      • endOffsets

        public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,​java.lang.Long>> endOffsets​(org.apache.kafka.common.TopicPartition... partitions)