K - the key type.V - the outbound data type.R - the reply data type.public class AggregatingReplyingKafkaTemplate<K,V,R> extends ReplyingKafkaTemplate<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> implements BatchConsumerAwareMessageListener<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>| Modifier and Type | Field and Description |
|---|---|
static java.lang.String |
AGGREGATED_RESULTS_TOPIC
Pseudo topic name for the "outer"
ConsumerRecord that has the aggregated
results in its value after a normal release by the release strategy. |
static java.lang.String |
PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
Pseudo topic name for the "outer"
ConsumerRecord that has the aggregated
results in its value after a timeout. |
logger| Constructor and Description |
|---|
AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory,
GenericMessageListenerContainer<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer,
java.util.function.BiPredicate<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>,java.lang.Boolean> releaseStrategy)
Construct an instance using the provided parameter arguments.
|
AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory,
GenericMessageListenerContainer<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer,
java.util.function.Predicate<java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> releaseStrategy)
Deprecated.
|
| Modifier and Type | Method and Description |
|---|---|
protected boolean |
handleTimeout(CorrelationKey correlationId,
RequestReplyFuture<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> future)
Used to inform subclasses that a request has timed out so they can clean up state
and, optionally, complete the future.
|
void |
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>> data,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Invoked with data from kafka and provides access to the
Consumer. |
void |
setCommitTimeout(java.time.Duration commitTimeout)
Set the timeout to use when committing offsets.
|
void |
setReturnPartialOnTimeout(boolean returnPartialOnTimeout)
Set to true to return a partial result when a request times out.
|
afterPropertiesSet, createCorrelationId, destroy, getAssignedReplyTopicPartitions, getDefaultReplyTimeout, getPhase, getReplyTimeout, isAutoStartup, isPending, isRunning, logLateArrival, onMessage, sendAndReceive, sendAndReceive, setAutoStartup, setCorrelationHeaderName, setCorrelationIdStrategy, setDefaultReplyTimeout, setPhase, setReplyPartitionHeaderName, setReplyTimeout, setReplyTopicHeaderName, setSharedReplyTopic, setTaskScheduler, start, stop, stopcloseProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getProducerFactory, getTransactionIdPrefix, inTransaction, isTransactional, metrics, partitionsFor, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, setCloseTimeout, setDefaultTopic, setMessageConverter, setProducerListener, setTransactionIdPrefixclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitonMessageonMessage, wantsPollResultonMessage, onMessagepublic static final java.lang.String AGGREGATED_RESULTS_TOPIC
ConsumerRecord that has the aggregated
results in its value after a normal release by the release strategy.public static final java.lang.String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
ConsumerRecord that has the aggregated
results in its value after a timeout.@Deprecated public AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer, java.util.function.Predicate<java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> releaseStrategy)
AggregatingReplyingKafkaTemplate(ProducerFactory, GenericMessageListenerContainer, BiPredicate)producerFactory - the producer factory.replyContainer - the reply container.releaseStrategy - the release strategy.public AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer, java.util.function.BiPredicate<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>,java.lang.Boolean> releaseStrategy)
producerFactory - the producer factory.replyContainer - the reply container.releaseStrategy - the release strategy which is a BiPredicate which is
passed the current list and a boolean to indicate if this is for a normal delivery
or a timeout (when setReturnPartialOnTimeout(boolean) is true. The
predicate may modify the list of records.public void setCommitTimeout(java.time.Duration commitTimeout)
commitTimeout - the timeout.public void setReturnPartialOnTimeout(boolean returnPartialOnTimeout)
returnPartialOnTimeout - true to return a partial result.public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>> data, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
GenericMessageListenerConsumer. The
default implementation throws UnsupportedOperationException.onMessage in interface BatchConsumerAwareMessageListener<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>onMessage in interface GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>>>data - the data to be processed.consumer - the consumer.protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> future)
ReplyingKafkaTemplatehandleTimeout in class ReplyingKafkaTemplate<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>correlationId - the correlation id.future - the future.