Class ReactiveKafkaConsumerTemplate<K,V>
java.lang.Object
org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
Reactive kafka consumer operations implementation.
- Since:
- 2.3.0
- Author:
- Mark Norkin, Adrian Chlebosz, Marcus Voltolim
-
Constructor Summary
ConstructorDescriptionReactiveKafkaConsumerTemplate
(reactor.kafka.receiver.KafkaReceiver<K, V> kafkaReceiver) ReactiveKafkaConsumerTemplate
(reactor.kafka.receiver.ReceiverOptions<K, V> receiverOptions) -
Method Summary
Modifier and TypeMethodDescriptionreactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,
Long>> beginningOffsets
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Mono<Map<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata>> <T> reactor.core.publisher.Mono<T>
doOnConsumer
(Function<org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends T> function) reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,
Long>> endOffsets
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Flux<reactor.util.function.Tuple2<String,
List<org.apache.kafka.common.PartitionInfo>>> reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,
? extends org.apache.kafka.common.Metric>> reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes
(Map<org.apache.kafka.common.TopicPartition, Long> timestampsToSearch) reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo>
partitionsFromConsumerFor
(String topic) reactor.core.publisher.Mono<Void>
pause
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
paused()
reactor.core.publisher.Mono<Long>
position
(org.apache.kafka.common.TopicPartition partition) receive()
reactor.core.publisher.Flux<reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,
V>>> reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,
V>>> receiveExactlyOnce
(reactor.kafka.sender.TransactionManager transactionManager) Returns aFlux
of consumer record batches that may be used for exactly once delivery semantics.reactor.core.publisher.Mono<Void>
resume
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Mono<Void>
seek
(org.apache.kafka.common.TopicPartition partition, long offset) reactor.core.publisher.Mono<Void>
seekToBeginning
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Mono<Void>
seekToEnd
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Flux<String>
-
Constructor Details
-
ReactiveKafkaConsumerTemplate
-
ReactiveKafkaConsumerTemplate
-
-
Method Details
-
receive
-
receiveBatch
-
receiveAutoAck
-
receiveAtMostOnce
-
receiveExactlyOnce
public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager) Returns aFlux
of consumer record batches that may be used for exactly once delivery semantics. A new transaction is started for each inner Flux and it is the responsibility of the consuming application to commit or abort the transaction usingTransactionManager.commit()
orTransactionManager.abort()
after processing the Flux. The next batch of consumer records will be delivered only after the previous flux terminates. Offsets of records dispatched on each inner Flux are committed using the providedtransactionManager
within the transaction started for that Flux.Example usage:
KafkaSender<Integer, Person> sender = sender(senderOptions()); ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic)); KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions); receiver.receiveExactlyOnce(sender.transactionManager()) .concatMap(f -> sendAndCommit(f)) .onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e))) .doOnCancel(() -> close()); Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) { return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key()))) .concatWith(sender.transactionManager().commit()); }
- Parameters:
transactionManager
- Transaction manager used to begin new transaction for each inner Flux and commit offsets within that transaction- Returns:
- Flux of consumer record batches processed within a transaction
-
doOnConsumer
-
assignment
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment() -
subscription
-
seek
public reactor.core.publisher.Mono<Void> seek(org.apache.kafka.common.TopicPartition partition, long offset) -
seekToBeginning
public reactor.core.publisher.Mono<Void> seekToBeginning(org.apache.kafka.common.TopicPartition... partitions) -
seekToEnd
public reactor.core.publisher.Mono<Void> seekToEnd(org.apache.kafka.common.TopicPartition... partitions) -
position
-
committed
-
partitionsFromConsumerFor
public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor(String topic) -
paused
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused() -
pause
public reactor.core.publisher.Mono<Void> pause(org.apache.kafka.common.TopicPartition... partitions) -
resume
public reactor.core.publisher.Mono<Void> resume(org.apache.kafka.common.TopicPartition... partitions) -
metricsFromConsumer
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> metricsFromConsumer() -
listTopics
-
offsetsForTimes
-
beginningOffsets
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,Long>> beginningOffsets(org.apache.kafka.common.TopicPartition... partitions) -
endOffsets
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,Long>> endOffsets(org.apache.kafka.common.TopicPartition... partitions)
-