Class ReplyingKafkaTemplate<K,V,R>
- java.lang.Object
-
- org.springframework.kafka.core.KafkaTemplate<K,V>
-
- org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
-
- Type Parameters:
K- the key type.V- the outbound data type.R- the reply data type.
- All Implemented Interfaces:
java.util.EventListener,org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanNameAware,org.springframework.beans.factory.DisposableBean,org.springframework.beans.factory.InitializingBean,org.springframework.context.ApplicationContextAware,org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>,org.springframework.context.Lifecycle,org.springframework.context.Phased,org.springframework.context.SmartLifecycle,KafkaOperations<K,V>,BatchMessageListener<K,R>,ConsumerSeekAware,GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>,ReplyingKafkaOperations<K,V,R>
- Direct Known Subclasses:
AggregatingReplyingKafkaTemplate
public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, org.springframework.beans.factory.InitializingBean, org.springframework.context.SmartLifecycle, org.springframework.beans.factory.DisposableBean, ReplyingKafkaOperations<K,V,R>, ConsumerSeekAware
A KafkaTemplate that implements request/reply semantics.- Since:
- 2.1.3
- Author:
- Gary Russell, Artem Bilan
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
ConsumerSeekAware.ConsumerSeekCallback
-
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
-
-
Field Summary
-
Fields inherited from class org.springframework.kafka.core.KafkaTemplate
logger
-
Fields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT
-
-
Constructor Summary
Constructors Constructor Description ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description voidafterPropertiesSet()static DeserializationExceptioncheckDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.springframework.core.log.LogAccessor logger)Return aDeserializationExceptionif either the key or value failed deserialization; null otherwise.protected java.lang.ExceptioncheckForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record)Check for errors in a reply.voiddestroy()java.util.Collection<org.apache.kafka.common.TopicPartition>getAssignedReplyTopicPartitions()Return the topics/partitions assigned to the replying listener container.protected java.lang.StringgetCorrelationHeaderName()Return the correlation header name.protected java.time.DurationgetDefaultReplyTimeout()Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.intgetPhase()protected booleanhandleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.booleanisAutoStartup()protected booleanisPending(CorrelationKey correlationId)Return true if this correlation id is still active.booleanisRunning()protected voidlogLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)voidonFirstPoll()When using manual partition assignment, called when the first poll has completed; useful when usingauto.offset.reset=latestand you need to wait until the initial position has been established.voidonMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)Invoked with data from kafka.RequestReplyFuture<K,V,R>sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)Send a request and receive a reply with the default timeout.RequestReplyFuture<K,V,R>sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, java.time.Duration replyTimeout)Send a request and receive a reply.RequestReplyMessageFuture<K,V>sendAndReceive(org.springframework.messaging.Message<?> message)Send a request message and receive a reply message with the default timeout.RequestReplyMessageFuture<K,V>sendAndReceive(org.springframework.messaging.Message<?> message, java.time.Duration replyTimeout)Send a request message and receive a reply message.<P> RequestReplyTypedMessageFuture<K,V,P>sendAndReceive(org.springframework.messaging.Message<?> message, java.time.Duration replyTimeout, org.springframework.core.ParameterizedTypeReference<P> returnType)Send a request message and receive a reply message.<P> RequestReplyTypedMessageFuture<K,V,P>sendAndReceive(org.springframework.messaging.Message<?> message, org.springframework.core.ParameterizedTypeReference<P> returnType)Send a request message and receive a reply message.voidsetAutoStartup(boolean autoStartup)voidsetCorrelationHeaderName(java.lang.String correlationHeaderName)Set a custom header name for the correlation id.voidsetCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)Set a function to be called to establish a unique correlation key for each request record.voidsetDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.voidsetPhase(int phase)voidsetReplyErrorChecker(java.util.function.Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> replyErrorChecker)Set a function to examine replies for an error returned by the server.voidsetReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)Set a custom header name for the reply partition.voidsetReplyTopicHeaderName(java.lang.String replyTopicHeaderName)Set a custom header name for the reply topic.voidsetSharedReplyTopic(boolean sharedReplyTopic)Set to true when multiple templates are using the same topic for replies.voidsetTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)voidstart()voidstop()voidstop(java.lang.Runnable callback)booleanwaitForAssignment(java.time.Duration duration)Wait until partitions are assigned, e.g.-
Methods inherited from class org.springframework.kafka.core.KafkaTemplate
closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getMicrometerTagsProvider, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, receive, receive, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setConsumerFactory, setDefaultTopic, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setMicrometerTagsProvider, setProducerListener, setTransactionIdPrefix
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.BatchMessageListener
onMessage, wantsPollResult
-
Methods inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
onIdleContainer, onPartitionsAssigned, onPartitionsRevoked, registerSeekCallback, unregisterSeekCallback
-
Methods inherited from interface org.springframework.kafka.listener.GenericMessageListener
onMessage, onMessage, onMessage
-
Methods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive, usingCompletableFuture
-
-
-
-
Constructor Detail
-
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)
-
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
-
-
Method Detail
-
setTaskScheduler
public void setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)
-
getDefaultReplyTimeout
protected java.time.Duration getDefaultReplyTimeout()
Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.- Returns:
- the timeout.
- Since:
- 2.3
-
setDefaultReplyTimeout
public void setDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)
Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)call.- Parameters:
defaultReplyTimeout- the timeout.- Since:
- 2.3
-
isRunning
public boolean isRunning()
- Specified by:
isRunningin interfaceorg.springframework.context.Lifecycle
-
getPhase
public int getPhase()
- Specified by:
getPhasein interfaceorg.springframework.context.Phased- Specified by:
getPhasein interfaceorg.springframework.context.SmartLifecycle
-
setPhase
public void setPhase(int phase)
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartupin interfaceorg.springframework.context.SmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
-
getAssignedReplyTopicPartitions
public java.util.Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
Return the topics/partitions assigned to the replying listener container.- Returns:
- the topics/partitions.
-
setSharedReplyTopic
public void setSharedReplyTopic(boolean sharedReplyTopic)
Set to true when multiple templates are using the same topic for replies. This simply changes logs for unexpected replies to debug instead of error.- Parameters:
sharedReplyTopic- true if using a shared topic.- Since:
- 2.2
-
setCorrelationIdStrategy
public void setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
Set a function to be called to establish a unique correlation key for each request record.- Parameters:
correlationStrategy- the function.- Since:
- 2.3
-
setCorrelationHeaderName
public void setCorrelationHeaderName(java.lang.String correlationHeaderName)
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID.- Parameters:
correlationHeaderName- the header name.- Since:
- 2.3
-
getCorrelationHeaderName
protected java.lang.String getCorrelationHeaderName()
Return the correlation header name.- Returns:
- the header name.
- Since:
- 2.8.8
-
setReplyTopicHeaderName
public void setReplyTopicHeaderName(java.lang.String replyTopicHeaderName)
Set a custom header name for the reply topic. DefaultKafkaHeaders.REPLY_TOPIC.- Parameters:
replyTopicHeaderName- the header name.- Since:
- 2.3
-
setReplyPartitionHeaderName
public void setReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)
Set a custom header name for the reply partition. DefaultKafkaHeaders.REPLY_PARTITION.- Parameters:
replyPartitionHeaderName- the reply partition header name.- Since:
- 2.3
-
setReplyErrorChecker
public void setReplyErrorChecker(java.util.function.Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> replyErrorChecker)
Set a function to examine replies for an error returned by the server.- Parameters:
replyErrorChecker- the error checker function.- Since:
- 2.6.7
-
afterPropertiesSet
public void afterPropertiesSet()
- Specified by:
afterPropertiesSetin interfaceorg.springframework.beans.factory.InitializingBean
-
start
public void start()
- Specified by:
startin interfaceorg.springframework.context.Lifecycle
-
stop
public void stop()
- Specified by:
stopin interfaceorg.springframework.context.Lifecycle
-
stop
public void stop(java.lang.Runnable callback)
- Specified by:
stopin interfaceorg.springframework.context.SmartLifecycle
-
onFirstPoll
public void onFirstPoll()
Description copied from interface:ConsumerSeekAwareWhen using manual partition assignment, called when the first poll has completed; useful when usingauto.offset.reset=latestand you need to wait until the initial position has been established.- Specified by:
onFirstPollin interfaceConsumerSeekAware
-
waitForAssignment
public boolean waitForAssignment(java.time.Duration duration) throws java.lang.InterruptedExceptionDescription copied from interface:ReplyingKafkaOperationsWait until partitions are assigned, e.g. whenauto.offset.reset=latest. When using manual assignment, the duration must be greater than the container'spollTimeoutproperty.- Specified by:
waitForAssignmentin interfaceReplyingKafkaOperations<K,V,R>- Parameters:
duration- how long to wait.- Returns:
- true if the partitions have been assigned.
- Throws:
java.lang.InterruptedException- if the thread is interrupted while waiting.
-
sendAndReceive
public RequestReplyMessageFuture<K,V> sendAndReceive(org.springframework.messaging.Message<?> message)
Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message with the default timeout.- Specified by:
sendAndReceivein interfaceReplyingKafkaOperations<K,V,R>- Parameters:
message- the message to send.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public RequestReplyMessageFuture<K,V> sendAndReceive(org.springframework.messaging.Message<?> message, java.time.Duration replyTimeout)
Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message.- Specified by:
sendAndReceivein interfaceReplyingKafkaOperations<K,V,R>- Parameters:
message- the message to send.replyTimeout- the reply timeout; if null, the default will be used.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(org.springframework.messaging.Message<?> message, @Nullable org.springframework.core.ParameterizedTypeReference<P> returnType)
Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message.- Specified by:
sendAndReceivein interfaceReplyingKafkaOperations<K,V,R>- Type Parameters:
P- the reply payload type.- Parameters:
message- the message to send.returnType- a hint to the message converter for the reply payload type.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(org.springframework.messaging.Message<?> message, @Nullable java.time.Duration replyTimeout, @Nullable org.springframework.core.ParameterizedTypeReference<P> returnType)
Description copied from interface:ReplyingKafkaOperationsSend a request message and receive a reply message.- Specified by:
sendAndReceivein interfaceReplyingKafkaOperations<K,V,R>- Type Parameters:
P- the reply payload type.- Parameters:
message- the message to send.replyTimeout- the reply timeout; if null, the default will be used.returnType- a hint to the message converter for the reply payload type.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Description copied from interface:ReplyingKafkaOperationsSend a request and receive a reply with the default timeout.- Specified by:
sendAndReceivein interfaceReplyingKafkaOperations<K,V,R>- Parameters:
record- the record to send.- Returns:
- a RequestReplyFuture.
-
sendAndReceive
public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, @Nullable java.time.Duration replyTimeout)
Description copied from interface:ReplyingKafkaOperationsSend a request and receive a reply.- Specified by:
sendAndReceivein interfaceReplyingKafkaOperations<K,V,R>- Parameters:
record- the record to send.replyTimeout- the reply timeout; if null, the default will be used.- Returns:
- a RequestReplyFuture.
-
handleTimeout
protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.- Parameters:
correlationId- the correlation id.future- the future.- Returns:
- true to indicate the future has been completed.
- Since:
- 2.3
-
isPending
protected boolean isPending(CorrelationKey correlationId)
Return true if this correlation id is still active.- Parameters:
correlationId- the correlation id.- Returns:
- true if pending.
- Since:
- 2.3
-
destroy
public void destroy()
- Specified by:
destroyin interfaceorg.springframework.beans.factory.DisposableBean- Overrides:
destroyin classKafkaTemplate<K,V>
-
onMessage
public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
Description copied from interface:GenericMessageListenerInvoked with data from kafka.- Specified by:
onMessagein interfaceGenericMessageListener<K>- Parameters:
data- the data to be processed.
-
checkForErrors
@Nullable protected java.lang.Exception checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record)
Check for errors in a reply. The default implementation checks forDeserializationExceptions and invokes thereplyErrorCheckerfunction.- Parameters:
record- the record.- Returns:
- the exception, or null if none.
- Since:
- 2.6.7
-
checkDeserialization
@Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.springframework.core.log.LogAccessor logger)
Return aDeserializationExceptionif either the key or value failed deserialization; null otherwise. If you need to determine whether it was the key or value, callSerializationUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor)withSerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADERandSerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADERinstead.- Parameters:
record- the record.logger- aLogAccessor.- Returns:
- the
DeserializationExceptionornull. - Since:
- 2.2.15
-
logLateArrival
protected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)
-
-