Class ReplyingKafkaTemplate<K,V,R>
- java.lang.Object
- 
- org.springframework.kafka.core.KafkaTemplate<K,V>
- 
- org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
 
 
- 
- Type Parameters:
- K- the key type.
- V- the outbound data type.
- R- the reply data type.
 - All Implemented Interfaces:
- java.util.EventListener,- org.springframework.beans.factory.Aware,- org.springframework.beans.factory.BeanNameAware,- org.springframework.beans.factory.DisposableBean,- org.springframework.beans.factory.InitializingBean,- org.springframework.context.ApplicationContextAware,- org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>,- org.springframework.context.Lifecycle,- org.springframework.context.Phased,- org.springframework.context.SmartLifecycle,- KafkaOperations<K,V>,- BatchMessageListener<K,R>,- GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>,- ReplyingKafkaOperations<K,V,R>
 - Direct Known Subclasses:
- AggregatingReplyingKafkaTemplate
 
 public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, org.springframework.beans.factory.InitializingBean, org.springframework.context.SmartLifecycle, org.springframework.beans.factory.DisposableBean, ReplyingKafkaOperations<K,V,R> A KafkaTemplate that implements request/reply semantics.- Since:
- 2.1.3
- Author:
- Gary Russell, Artem Bilan
 
- 
- 
Nested Class Summary- 
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperationsKafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
 
- 
 - 
Field Summary- 
Fields inherited from class org.springframework.kafka.core.KafkaTemplatelogger
 
- 
 - 
Constructor SummaryConstructors Constructor Description ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
 - 
Method SummaryAll Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description voidafterPropertiesSet()static DeserializationExceptioncheckDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.springframework.core.log.LogAccessor logger)Return aDeserializationExceptionif either the key or value failed deserialization; null otherwise.protected java.lang.ExceptioncheckForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record)Check for errors in a reply.voiddestroy()java.util.Collection<org.apache.kafka.common.TopicPartition>getAssignedReplyTopicPartitions()Return the topics/partitions assigned to the replying listener container.protected java.time.DurationgetDefaultReplyTimeout()Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.intgetPhase()protected booleanhandleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.booleanisAutoStartup()protected booleanisPending(CorrelationKey correlationId)Return true if this correlation id is still active.booleanisRunning()protected voidlogLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)voidonMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)Invoked with data from kafka.RequestReplyFuture<K,V,R>sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)Send a request and receive a reply with the default timeout.RequestReplyFuture<K,V,R>sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, java.time.Duration replyTimeout)Send a request and receive a reply.RequestReplyMessageFuture<K,V>sendAndReceive(org.springframework.messaging.Message<?> message)Send a request message and receive a reply message with the default timeout.RequestReplyMessageFuture<K,V>sendAndReceive(org.springframework.messaging.Message<?> message, java.time.Duration replyTimeout)Send a request message and receive a reply message.<P> RequestReplyTypedMessageFuture<K,V,P>sendAndReceive(org.springframework.messaging.Message<?> message, java.time.Duration replyTimeout, org.springframework.core.ParameterizedTypeReference<P> returnType)Send a request message and receive a reply message.<P> RequestReplyTypedMessageFuture<K,V,P>sendAndReceive(org.springframework.messaging.Message<?> message, org.springframework.core.ParameterizedTypeReference<P> returnType)Send a request message and receive a reply message.voidsetAutoStartup(boolean autoStartup)voidsetCorrelationHeaderName(java.lang.String correlationHeaderName)Set a custom header name for the correlation id.voidsetCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)Set a function to be called to establish a unique correlation key for each request record.voidsetDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.voidsetPhase(int phase)voidsetReplyErrorChecker(java.util.function.Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> replyErrorChecker)Set a function to examine replies for an error returned by the server.voidsetReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)Set a custom header name for the reply partition.voidsetReplyTopicHeaderName(java.lang.String replyTopicHeaderName)Set a custom header name for the reply topic.voidsetSharedReplyTopic(boolean sharedReplyTopic)Set to true when multiple templates are using the same topic for replies.voidsetTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)voidstart()voidstop()voidstop(java.lang.Runnable callback)- 
Methods inherited from class org.springframework.kafka.core.KafkaTemplatecloseProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setDefaultTopic, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setProducerListener, setTransactionIdPrefix
 - 
Methods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 - 
Methods inherited from interface org.springframework.kafka.listener.BatchMessageListeneronMessage, wantsPollResult
 - 
Methods inherited from interface org.springframework.kafka.listener.GenericMessageListeneronMessage, onMessage, onMessage
 
- 
 
- 
- 
- 
Constructor Detail- 
ReplyingKafkaTemplatepublic ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer) 
 - 
ReplyingKafkaTemplatepublic ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush) 
 
- 
 - 
Method Detail- 
setTaskSchedulerpublic void setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler) 
 - 
getDefaultReplyTimeoutprotected java.time.Duration getDefaultReplyTimeout() Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.- Returns:
- the timeout.
- Since:
- 2.3
 
 - 
setDefaultReplyTimeoutpublic void setDefaultReplyTimeout(java.time.Duration defaultReplyTimeout) Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.- Parameters:
- defaultReplyTimeout- the timeout.
- Since:
- 2.3
 
 - 
isRunningpublic boolean isRunning() - Specified by:
- isRunningin interface- org.springframework.context.Lifecycle
 
 - 
getPhasepublic int getPhase() - Specified by:
- getPhasein interface- org.springframework.context.Phased
- Specified by:
- getPhasein interface- org.springframework.context.SmartLifecycle
 
 - 
setPhasepublic void setPhase(int phase) 
 - 
isAutoStartuppublic boolean isAutoStartup() - Specified by:
- isAutoStartupin interface- org.springframework.context.SmartLifecycle
 
 - 
setAutoStartuppublic void setAutoStartup(boolean autoStartup) 
 - 
getAssignedReplyTopicPartitionspublic java.util.Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions() Return the topics/partitions assigned to the replying listener container.- Returns:
- the topics/partitions.
 
 - 
setSharedReplyTopicpublic void setSharedReplyTopic(boolean sharedReplyTopic) Set to true when multiple templates are using the same topic for replies. This simply changes logs for unexpected replies to debug instead of error.- Parameters:
- sharedReplyTopic- true if using a shared topic.
- Since:
- 2.2
 
 - 
setCorrelationIdStrategypublic void setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy) Set a function to be called to establish a unique correlation key for each request record.- Parameters:
- correlationStrategy- the function.
- Since:
- 2.3
 
 - 
setCorrelationHeaderNamepublic void setCorrelationHeaderName(java.lang.String correlationHeaderName) Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID.- Parameters:
- correlationHeaderName- the header name.
- Since:
- 2.3
 
 - 
setReplyTopicHeaderNamepublic void setReplyTopicHeaderName(java.lang.String replyTopicHeaderName) Set a custom header name for the reply topic. DefaultKafkaHeaders.REPLY_TOPIC.- Parameters:
- replyTopicHeaderName- the header name.
- Since:
- 2.3
 
 - 
setReplyPartitionHeaderNamepublic void setReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName) Set a custom header name for the reply partition. DefaultKafkaHeaders.REPLY_PARTITION.- Parameters:
- replyPartitionHeaderName- the reply partition header name.
- Since:
- 2.3
 
 - 
setReplyErrorCheckerpublic void setReplyErrorChecker(java.util.function.Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> replyErrorChecker) Set a function to examine replies for an error returned by the server.- Parameters:
- replyErrorChecker- the error checker function.
- Since:
- 2.6.7
 
 - 
afterPropertiesSetpublic void afterPropertiesSet() - Specified by:
- afterPropertiesSetin interface- org.springframework.beans.factory.InitializingBean
 
 - 
startpublic void start() - Specified by:
- startin interface- org.springframework.context.Lifecycle
 
 - 
stoppublic void stop() - Specified by:
- stopin interface- org.springframework.context.Lifecycle
 
 - 
stoppublic void stop(java.lang.Runnable callback) - Specified by:
- stopin interface- org.springframework.context.SmartLifecycle
 
 - 
sendAndReceivepublic RequestReplyMessageFuture<K,V> sendAndReceive(org.springframework.messaging.Message<?> message) Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message with the default timeout.- Specified by:
- sendAndReceivein interface- ReplyingKafkaOperations<K,V,R>
- Parameters:
- message- the message to send.
- Returns:
- a RequestReplyMessageFuture.
 
 - 
sendAndReceivepublic RequestReplyMessageFuture<K,V> sendAndReceive(org.springframework.messaging.Message<?> message, java.time.Duration replyTimeout) Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message.- Specified by:
- sendAndReceivein interface- ReplyingKafkaOperations<K,V,R>
- Parameters:
- message- the message to send.
- replyTimeout- the reply timeout; if null, the default will be used.
- Returns:
- a RequestReplyMessageFuture.
 
 - 
sendAndReceivepublic <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(org.springframework.messaging.Message<?> message, @Nullable org.springframework.core.ParameterizedTypeReference<P> returnType) Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message.- Specified by:
- sendAndReceivein interface- ReplyingKafkaOperations<K,V,R>
- Type Parameters:
- P- the reply payload type.
- Parameters:
- message- the message to send.
- returnType- a hint to the message converter for the reply payload type.
- Returns:
- a RequestReplyMessageFuture.
 
 - 
sendAndReceivepublic <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(org.springframework.messaging.Message<?> message, @Nullable java.time.Duration replyTimeout, @Nullable org.springframework.core.ParameterizedTypeReference<P> returnType) Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message.- Specified by:
- sendAndReceivein interface- ReplyingKafkaOperations<K,V,R>
- Type Parameters:
- P- the reply payload type.
- Parameters:
- message- the message to send.
- replyTimeout- the reply timeout; if null, the default will be used.
- returnType- a hint to the message converter for the reply payload type.
- Returns:
- a RequestReplyMessageFuture.
 
 - 
sendAndReceivepublic RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record) Description copied from interface:ReplyingKafkaOperationsSend a request and receive a reply with the default timeout.- Specified by:
- sendAndReceivein interface- ReplyingKafkaOperations<K,V,R>
- Parameters:
- record- the record to send.
- Returns:
- a RequestReplyFuture.
 
 - 
sendAndReceivepublic RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, @Nullable java.time.Duration replyTimeout) Description copied from interface:ReplyingKafkaOperationsSend a request and receive a reply.- Specified by:
- sendAndReceivein interface- ReplyingKafkaOperations<K,V,R>
- Parameters:
- record- the record to send.
- replyTimeout- the reply timeout; if null, the default will be used.
- Returns:
- a RequestReplyFuture.
 
 - 
handleTimeoutprotected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future) Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.- Parameters:
- correlationId- the correlation id.
- future- the future.
- Returns:
- true to indicate the future has been completed.
- Since:
- 2.3
 
 - 
isPendingprotected boolean isPending(CorrelationKey correlationId) Return true if this correlation id is still active.- Parameters:
- correlationId- the correlation id.
- Returns:
- true if pending.
- Since:
- 2.3
 
 - 
destroypublic void destroy() - Specified by:
- destroyin interface- org.springframework.beans.factory.DisposableBean
- Overrides:
- destroyin class- KafkaTemplate<K,V>
 
 - 
onMessagepublic void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data) Description copied from interface:GenericMessageListenerInvoked with data from kafka.- Specified by:
- onMessagein interface- GenericMessageListener<K>
- Parameters:
- data- the data to be processed.
 
 - 
checkForErrors@Nullable protected java.lang.Exception checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record) Check for errors in a reply. The default implementation checks forDeserializationExceptions and invokes thereplyErrorCheckerfunction.- Parameters:
- record- the record.
- Returns:
- the exception, or null if none.
- Since:
- 2.6.7
 
 - 
checkDeserialization@Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.springframework.core.log.LogAccessor logger) Return aDeserializationExceptionif either the key or value failed deserialization; null otherwise. If you need to determine whether it was the key or value, callListenerUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor)withErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADERandErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERinstead.- Parameters:
- record- the record.
- logger- a- LogAccessor.
- Returns:
- the DeserializationExceptionornull.
- Since:
- 2.2.15
 
 - 
logLateArrivalprotected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId) 
 
- 
 
-