Class AggregatingReplyingKafkaTemplate<K,V,R>
- java.lang.Object
-
- org.springframework.kafka.core.KafkaTemplate<K,V>
-
- org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
-
- org.springframework.kafka.requestreply.AggregatingReplyingKafkaTemplate<K,V,R>
-
- Type Parameters:
K
- the key type.V
- the outbound data type.R
- the reply data type.
- All Implemented Interfaces:
java.util.EventListener
,org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanNameAware
,org.springframework.beans.factory.DisposableBean
,org.springframework.beans.factory.InitializingBean
,org.springframework.context.ApplicationContextAware
,org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>
,org.springframework.context.Lifecycle
,org.springframework.context.Phased
,org.springframework.context.SmartLifecycle
,KafkaOperations<K,V>
,BatchConsumerAwareMessageListener<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
,BatchMessageListener<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
,ConsumerSeekAware
,GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>>>
,ReplyingKafkaOperations<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
public class AggregatingReplyingKafkaTemplate<K,V,R> extends ReplyingKafkaTemplate<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> implements BatchConsumerAwareMessageListener<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
A replying template that aggregates multiple replies with the same correlation id.- Since:
- 2.3
- Author:
- Gary Russell
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
ConsumerSeekAware.ConsumerSeekCallback
-
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
-
-
Field Summary
Fields Modifier and Type Field Description static java.lang.String
AGGREGATED_RESULTS_TOPIC
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a normal release by the release strategy.static java.lang.String
PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a timeout.-
Fields inherited from class org.springframework.kafka.core.KafkaTemplate
logger
-
Fields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT
-
-
Constructor Summary
Constructors Constructor Description AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer, java.util.function.BiPredicate<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>,java.lang.Boolean> releaseStrategy)
Construct an instance using the provided parameter arguments.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected boolean
handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> future)
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.void
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>> data, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Invoked with data from kafka and provides access to theConsumer
.void
setCommitTimeout(java.time.Duration commitTimeout)
Set the timeout to use when committing offsets.void
setReturnPartialOnTimeout(boolean returnPartialOnTimeout)
Set to true to return a partial result when a request times out.-
Methods inherited from class org.springframework.kafka.requestreply.ReplyingKafkaTemplate
afterPropertiesSet, checkDeserialization, checkForErrors, destroy, getAssignedReplyTopicPartitions, getCorrelationHeaderName, getDefaultReplyTimeout, getPhase, isAutoStartup, isPending, isRunning, logLateArrival, onFirstPoll, onMessage, sendAndReceive, sendAndReceive, sendAndReceive, sendAndReceive, sendAndReceive, sendAndReceive, setAutoStartup, setCorrelationHeaderName, setCorrelationIdStrategy, setDefaultReplyTimeout, setPhase, setReplyErrorChecker, setReplyPartitionHeaderName, setReplyTopicHeaderName, setSharedReplyTopic, setTaskScheduler, start, stop, stop, waitForAssignment
-
Methods inherited from class org.springframework.kafka.core.KafkaTemplate
closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getMicrometerTagsProvider, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, receive, receive, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setConsumerFactory, setDefaultTopic, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setMicrometerTagsProvider, setProducerListener, setTransactionIdPrefix
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.BatchConsumerAwareMessageListener
onMessage
-
Methods inherited from interface org.springframework.kafka.listener.BatchMessageListener
onMessage, wantsPollResult
-
Methods inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
onIdleContainer, onPartitionsAssigned, onPartitionsRevoked, registerSeekCallback, unregisterSeekCallback
-
Methods inherited from interface org.springframework.kafka.listener.GenericMessageListener
onMessage, onMessage
-
Methods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive, usingCompletableFuture
-
-
-
-
Field Detail
-
AGGREGATED_RESULTS_TOPIC
public static final java.lang.String AGGREGATED_RESULTS_TOPIC
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a normal release by the release strategy.- See Also:
- Constant Field Values
-
PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
public static final java.lang.String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a timeout.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
AggregatingReplyingKafkaTemplate
public AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer, java.util.function.BiPredicate<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>,java.lang.Boolean> releaseStrategy)
Construct an instance using the provided parameter arguments. The releaseStrategy is consulted to determine when a collection is "complete".- Parameters:
producerFactory
- the producer factory.replyContainer
- the reply container.releaseStrategy
- the release strategy which is aBiPredicate
which is passed the current list and a boolean to indicate if this is for a normal delivery or a timeout (whensetReturnPartialOnTimeout(boolean)
is true. The predicate may modify the list of records.- Since:
- 2.3.5
-
-
Method Detail
-
setCommitTimeout
public void setCommitTimeout(java.time.Duration commitTimeout)
Set the timeout to use when committing offsets.- Parameters:
commitTimeout
- the timeout.
-
setReturnPartialOnTimeout
public void setReturnPartialOnTimeout(boolean returnPartialOnTimeout)
Set to true to return a partial result when a request times out.- Parameters:
returnPartialOnTimeout
- true to return a partial result.
-
onMessage
public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>> data, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Description copied from interface:GenericMessageListener
Invoked with data from kafka and provides access to theConsumer
. The default implementation throwsUnsupportedOperationException
.- Specified by:
onMessage
in interfaceBatchConsumerAwareMessageListener<K,V>
- Specified by:
onMessage
in interfaceGenericMessageListener<K>
- Parameters:
data
- the data to be processed.consumer
- the consumer.
-
handleTimeout
protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> future)
Description copied from class:ReplyingKafkaTemplate
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.- Overrides:
handleTimeout
in classReplyingKafkaTemplate<K,V,java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
- Parameters:
correlationId
- the correlation id.future
- the future.- Returns:
- true to indicate the future has been completed.
-
-