K
- the key type.V
- the value type.public class ReactiveKafkaConsumerTemplate<K,V>
extends java.lang.Object
Constructor and Description |
---|
ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.ReceiverOptions<K,V> receiverOptions) |
Modifier and Type | Method and Description |
---|---|
reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> |
assignment() |
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>> |
beginningOffsets(org.apache.kafka.common.TopicPartition... partitions) |
reactor.core.publisher.Mono<org.apache.kafka.clients.consumer.OffsetAndMetadata> |
committed(org.apache.kafka.common.TopicPartition partition) |
<T> reactor.core.publisher.Mono<T> |
doOnConsumer(java.util.function.Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function) |
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>> |
endOffsets(org.apache.kafka.common.TopicPartition... partitions) |
reactor.core.publisher.Flux<reactor.util.function.Tuple2<java.lang.String,java.util.List<org.apache.kafka.common.PartitionInfo>>> |
listTopics() |
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> |
metricsFromConsumer() |
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>> |
offsetsForTimes(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> timestampsToSearch) |
reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> |
partitionsFromConsumerFor(java.lang.String topic) |
reactor.core.publisher.Mono<java.lang.Void> |
pause(org.apache.kafka.common.TopicPartition... partitions) |
reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> |
paused() |
reactor.core.publisher.Mono<java.lang.Long> |
position(org.apache.kafka.common.TopicPartition partition) |
reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>> |
receive() |
reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> |
receiveAtMostOnce() |
reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> |
receiveAutoAck() |
reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
Returns a
Flux of consumer record batches that may be used for exactly once
delivery semantics. |
reactor.core.publisher.Mono<java.lang.Void> |
resume(org.apache.kafka.common.TopicPartition... partitions) |
reactor.core.publisher.Mono<java.lang.Void> |
seek(org.apache.kafka.common.TopicPartition partition,
long offset) |
reactor.core.publisher.Mono<java.lang.Void> |
seekToBeginning(org.apache.kafka.common.TopicPartition... partitions) |
reactor.core.publisher.Mono<java.lang.Void> |
seekToEnd(org.apache.kafka.common.TopicPartition... partitions) |
reactor.core.publisher.Flux<java.lang.String> |
subscription() |
public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAutoAck()
public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAtMostOnce()
public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
Flux
of consumer record batches that may be used for exactly once
delivery semantics. A new transaction is started for each inner Flux and it is the
responsibility of the consuming application to commit or abort the transaction
using TransactionManager.commit()
or TransactionManager.abort()
after processing the Flux. The next batch of consumer records will be delivered only
after the previous flux terminates. Offsets of records dispatched on each inner Flux
are committed using the provided transactionManager
within the transaction
started for that Flux.
Example usage:
KafkaSender<Integer, Person> sender = sender(senderOptions());
ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic));
KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions);
receiver.receiveExactlyOnce(sender.transactionManager())
.concatMap(f -> sendAndCommit(f))
.onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e)))
.doOnCancel(() -> close());
Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) {
return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key())))
.concatWith(sender.transactionManager().commit());
}
transactionManager
- Transaction manager used to begin new transaction for each
inner Flux and commit offsets within that transactionpublic <T> reactor.core.publisher.Mono<T> doOnConsumer(java.util.function.Function<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()
public reactor.core.publisher.Flux<java.lang.String> subscription()
public reactor.core.publisher.Mono<java.lang.Void> seek(org.apache.kafka.common.TopicPartition partition, long offset)
public reactor.core.publisher.Mono<java.lang.Void> seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
public reactor.core.publisher.Mono<java.lang.Void> seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
public reactor.core.publisher.Mono<java.lang.Long> position(org.apache.kafka.common.TopicPartition partition)
public reactor.core.publisher.Mono<org.apache.kafka.clients.consumer.OffsetAndMetadata> committed(org.apache.kafka.common.TopicPartition partition)
public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor(java.lang.String topic)
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()
public reactor.core.publisher.Mono<java.lang.Void> pause(org.apache.kafka.common.TopicPartition... partitions)
public reactor.core.publisher.Mono<java.lang.Void> resume(org.apache.kafka.common.TopicPartition... partitions)
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> metricsFromConsumer()
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<java.lang.String,java.util.List<org.apache.kafka.common.PartitionInfo>>> listTopics()
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> timestampsToSearch)
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>> beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,java.lang.Long>> endOffsets(org.apache.kafka.common.TopicPartition... partitions)