Class AggregatingReplyingKafkaTemplate<K,V,R>
java.lang.Object
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
org.springframework.kafka.requestreply.AggregatingReplyingKafkaTemplate<K,V,R>
- Type Parameters:
K
- the key type.V
- the outbound data type.R
- the reply data type.
- All Implemented Interfaces:
EventListener
,Aware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ApplicationListener<ContextStoppedEvent>
,Lifecycle
,Phased
,SmartLifecycle
,KafkaOperations<K,
,V> BatchConsumerAwareMessageListener<K,
,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> BatchMessageListener<K,
,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> GenericMessageListener<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,
,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>>>> ReplyingKafkaOperations<K,
V, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>>
public class AggregatingReplyingKafkaTemplate<K,V,R>
extends ReplyingKafkaTemplate<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
implements BatchConsumerAwareMessageListener<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
A replying template that aggregates multiple replies with the same correlation id.
- Since:
- 2.3
- Author:
- Gary Russell
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,
V, T>, KafkaOperations.ProducerCallback<K, V, T> -
Field Summary
Modifier and TypeFieldDescriptionstatic final String
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a normal release by the release strategy.static final String
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a timeout.Fields inherited from class org.springframework.kafka.core.KafkaTemplate
logger
Fields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorDescriptionAggregatingReplyingKafkaTemplate
(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> replyContainer, BiPredicate<List<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>, Boolean> releaseStrategy) Construct an instance using the provided parameter arguments. -
Method Summary
Modifier and TypeMethodDescriptionprotected boolean
handleTimeout
(CorrelationKey correlationId, RequestReplyFuture<K, V, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> future) Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.void
onMessage
(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>>> data, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Invoked with data from kafka and provides access to theConsumer
.void
setCommitTimeout
(Duration commitTimeout) Set the timeout to use when committing offsets.void
setReturnPartialOnTimeout
(boolean returnPartialOnTimeout) Set to true to return a partial result when a request times out.Methods inherited from class org.springframework.kafka.requestreply.ReplyingKafkaTemplate
afterPropertiesSet, checkDeserialization, checkForErrors, destroy, getAssignedReplyTopicPartitions, getDefaultReplyTimeout, getPhase, isAutoStartup, isPending, isRunning, logLateArrival, onMessage, sendAndReceive, sendAndReceive, sendAndReceive, sendAndReceive, sendAndReceive, sendAndReceive, setAutoStartup, setCorrelationHeaderName, setCorrelationIdStrategy, setDefaultReplyTimeout, setPhase, setReplyErrorChecker, setReplyPartitionHeaderName, setReplyTopicHeaderName, setSharedReplyTopic, setTaskScheduler, start, stop, stop
Methods inherited from class org.springframework.kafka.core.KafkaTemplate
closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, receive, receive, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setConsumerFactory, setDefaultTopic, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setProducerInterceptor, setProducerListener, setTransactionIdPrefix
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.kafka.listener.BatchConsumerAwareMessageListener
onMessage
Methods inherited from interface org.springframework.kafka.listener.BatchMessageListener
onMessage, wantsPollResult
Methods inherited from interface org.springframework.kafka.listener.GenericMessageListener
onMessage, onMessage
Methods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive
-
Field Details
-
AGGREGATED_RESULTS_TOPIC
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a normal release by the release strategy.- See Also:
-
PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
Pseudo topic name for the "outer"ConsumerRecord
that has the aggregated results in its value after a timeout.- See Also:
-
-
Constructor Details
-
AggregatingReplyingKafkaTemplate
public AggregatingReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> replyContainer, BiPredicate<List<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>, Boolean> releaseStrategy) Construct an instance using the provided parameter arguments. The releaseStrategy is consulted to determine when a collection is "complete".- Parameters:
producerFactory
- the producer factory.replyContainer
- the reply container.releaseStrategy
- the release strategy which is aBiPredicate
which is passed the current list and a boolean to indicate if this is for a normal delivery or a timeout (whensetReturnPartialOnTimeout(boolean)
is true. The predicate may modify the list of records.- Since:
- 2.3.5
-
-
Method Details
-
setCommitTimeout
Set the timeout to use when committing offsets.- Parameters:
commitTimeout
- the timeout.
-
setReturnPartialOnTimeout
public void setReturnPartialOnTimeout(boolean returnPartialOnTimeout) Set to true to return a partial result when a request times out.- Parameters:
returnPartialOnTimeout
- true to return a partial result.
-
onMessage
public void onMessage(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>>> data, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Description copied from interface:GenericMessageListener
Invoked with data from kafka and provides access to theConsumer
. The default implementation throwsUnsupportedOperationException
.- Specified by:
onMessage
in interfaceBatchConsumerAwareMessageListener<K,
V> - Specified by:
onMessage
in interfaceGenericMessageListener<K>
- Parameters:
data
- the data to be processed.consumer
- the consumer.
-
handleTimeout
protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K, V, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> future) Description copied from class:ReplyingKafkaTemplate
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.- Overrides:
handleTimeout
in classReplyingKafkaTemplate<K,
V, Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K, R>>> - Parameters:
correlationId
- the correlation id.future
- the future.- Returns:
- true to indicate the future has been completed.
-