Class ReactiveKafkaConsumerTemplate<K,V>
- java.lang.Object
-
- org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate<K,V>
-
- Type Parameters:
K
- the key type.V
- the value type.
public class ReactiveKafkaConsumerTemplate<K,V> extends java.lang.Object
Reactive kafka consumer operations implementation.- Since:
- 2.3.0
- Author:
- Mark Norkin
-
-
Constructor Summary
Constructors Constructor Description ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.ReceiverOptions<K,V> receiverOptions)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
assignment()
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>>
beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
reactor.core.publisher.Mono<java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>>
committed(java.util.Set<org.apache.kafka.common.TopicPartition> partitions)
<T> reactor.core.publisher.Mono<T>
doOnConsumer(java.util.function.Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>>
endOffsets(org.apache.kafka.common.TopicPartition... partitions)
reactor.core.publisher.Flux<reactor.util.function.Tuple2<java.lang.String,java.util.List<org.apache.kafka.common.PartitionInfo>>>
listTopics()
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>>
metricsFromConsumer()
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>>
offsetsForTimes(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> timestampsToSearch)
reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo>
partitionsFromConsumerFor(java.lang.String topic)
reactor.core.publisher.Mono<java.lang.Void>
pause(org.apache.kafka.common.TopicPartition... partitions)
reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
paused()
reactor.core.publisher.Mono<java.lang.Long>
position(org.apache.kafka.common.TopicPartition partition)
reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>
receive()
reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>
receiveAtMostOnce()
reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>
receiveAutoAck()
reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>>
receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
Returns aFlux
of consumer record batches that may be used for exactly once delivery semantics.reactor.core.publisher.Mono<java.lang.Void>
resume(org.apache.kafka.common.TopicPartition... partitions)
reactor.core.publisher.Mono<java.lang.Void>
seek(org.apache.kafka.common.TopicPartition partition, long offset)
reactor.core.publisher.Mono<java.lang.Void>
seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
reactor.core.publisher.Mono<java.lang.Void>
seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
reactor.core.publisher.Flux<java.lang.String>
subscription()
-
-
-
Method Detail
-
receiveAutoAck
public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAutoAck()
-
receiveAtMostOnce
public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAtMostOnce()
-
receiveExactlyOnce
public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
Returns aFlux
of consumer record batches that may be used for exactly once delivery semantics. A new transaction is started for each inner Flux and it is the responsibility of the consuming application to commit or abort the transaction usingTransactionManager.commit()
orTransactionManager.abort()
after processing the Flux. The next batch of consumer records will be delivered only after the previous flux terminates. Offsets of records dispatched on each inner Flux are committed using the providedtransactionManager
within the transaction started for that Flux.Example usage:
KafkaSender<Integer, Person> sender = sender(senderOptions()); ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic)); KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions); receiver.receiveExactlyOnce(sender.transactionManager()) .concatMap(f -> sendAndCommit(f)) .onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e))) .doOnCancel(() -> close()); Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) { return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key()))) .concatWith(sender.transactionManager().commit()); }
- Parameters:
transactionManager
- Transaction manager used to begin new transaction for each inner Flux and commit offsets within that transaction- Returns:
- Flux of consumer record batches processed within a transaction
-
doOnConsumer
public <T> reactor.core.publisher.Mono<T> doOnConsumer(java.util.function.Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
-
assignment
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()
-
subscription
public reactor.core.publisher.Flux<java.lang.String> subscription()
-
seek
public reactor.core.publisher.Mono<java.lang.Void> seek(org.apache.kafka.common.TopicPartition partition, long offset)
-
seekToBeginning
public reactor.core.publisher.Mono<java.lang.Void> seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
-
seekToEnd
public reactor.core.publisher.Mono<java.lang.Void> seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
-
position
public reactor.core.publisher.Mono<java.lang.Long> position(org.apache.kafka.common.TopicPartition partition)
-
committed
public reactor.core.publisher.Mono<java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>> committed(java.util.Set<org.apache.kafka.common.TopicPartition> partitions)
-
partitionsFromConsumerFor
public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor(java.lang.String topic)
-
paused
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()
-
pause
public reactor.core.publisher.Mono<java.lang.Void> pause(org.apache.kafka.common.TopicPartition... partitions)
-
resume
public reactor.core.publisher.Mono<java.lang.Void> resume(org.apache.kafka.common.TopicPartition... partitions)
-
metricsFromConsumer
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> metricsFromConsumer()
-
listTopics
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<java.lang.String,java.util.List<org.apache.kafka.common.PartitionInfo>>> listTopics()
-
offsetsForTimes
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> timestampsToSearch)
-
beginningOffsets
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>> beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
-
endOffsets
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>> endOffsets(org.apache.kafka.common.TopicPartition... partitions)
-
-