Class ReplyingKafkaTemplate<K,V,R>
- java.lang.Object
-
- org.springframework.kafka.core.KafkaTemplate<K,V>
-
- org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
-
- Type Parameters:
K
- the key type.V
- the outbound data type.R
- the reply data type.
- All Implemented Interfaces:
java.util.EventListener
,org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanNameAware
,org.springframework.beans.factory.DisposableBean
,org.springframework.beans.factory.InitializingBean
,org.springframework.context.ApplicationContextAware
,org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>
,org.springframework.context.Lifecycle
,org.springframework.context.Phased
,org.springframework.context.SmartLifecycle
,KafkaOperations<K,V>
,BatchMessageListener<K,R>
,GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
,ReplyingKafkaOperations<K,V,R>
- Direct Known Subclasses:
AggregatingReplyingKafkaTemplate
public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, org.springframework.beans.factory.InitializingBean, org.springframework.context.SmartLifecycle, org.springframework.beans.factory.DisposableBean, ReplyingKafkaOperations<K,V,R>
A KafkaTemplate that implements request/reply semantics.- Since:
- 2.1.3
- Author:
- Gary Russell, Artem Bilan
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
-
-
Field Summary
-
Fields inherited from class org.springframework.kafka.core.KafkaTemplate
logger
-
-
Constructor Summary
Constructors Constructor Description ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)
ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
afterPropertiesSet()
static DeserializationException
checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.springframework.core.log.LogAccessor logger)
Return aDeserializationException
if either the key or value failed deserialization; null otherwise.void
destroy()
java.util.Collection<org.apache.kafka.common.TopicPartition>
getAssignedReplyTopicPartitions()
Return the topics/partitions assigned to the replying listener container.protected java.time.Duration
getDefaultReplyTimeout()
Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.int
getPhase()
protected boolean
handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.boolean
isAutoStartup()
protected boolean
isPending(CorrelationKey correlationId)
Return true if this correlation id is still active.boolean
isRunning()
protected void
logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)
void
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
Invoked with data from kafka.RequestReplyFuture<K,V,R>
sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send a request and receive a reply with the default timeout.RequestReplyFuture<K,V,R>
sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, java.time.Duration replyTimeout)
Send a request and receive a reply.void
setAutoStartup(boolean autoStartup)
void
setCorrelationHeaderName(java.lang.String correlationHeaderName)
Set a custom header name for the correlation id.void
setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
Set a function to be called to establish a unique correlation key for each request record.void
setDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)
Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.void
setPhase(int phase)
void
setReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)
Set a custom header name for the reply partition.void
setReplyTopicHeaderName(java.lang.String replyTopicHeaderName)
Set a custom header name for the reply topic.void
setSharedReplyTopic(boolean sharedReplyTopic)
Set to true when multiple templates are using the same topic for replies.void
setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)
void
start()
void
stop()
void
stop(java.lang.Runnable callback)
-
Methods inherited from class org.springframework.kafka.core.KafkaTemplate
closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setDefaultTopic, setMessageConverter, setMicrometerEnabled, setMicrometerTags, setProducerListener, setTransactionIdPrefix
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.BatchMessageListener
onMessage, wantsPollResult
-
Methods inherited from interface org.springframework.kafka.listener.GenericMessageListener
onMessage, onMessage, onMessage
-
-
-
-
Constructor Detail
-
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)
-
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
-
-
Method Detail
-
setTaskScheduler
public void setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)
-
getDefaultReplyTimeout
protected java.time.Duration getDefaultReplyTimeout()
Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.- Returns:
- the timeout.
- Since:
- 2.3
-
setDefaultReplyTimeout
public void setDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)
Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.- Parameters:
defaultReplyTimeout
- the timeout.- Since:
- 2.3
-
isRunning
public boolean isRunning()
- Specified by:
isRunning
in interfaceorg.springframework.context.Lifecycle
-
getPhase
public int getPhase()
- Specified by:
getPhase
in interfaceorg.springframework.context.Phased
- Specified by:
getPhase
in interfaceorg.springframework.context.SmartLifecycle
-
setPhase
public void setPhase(int phase)
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartup
in interfaceorg.springframework.context.SmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
-
getAssignedReplyTopicPartitions
public java.util.Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
Return the topics/partitions assigned to the replying listener container.- Returns:
- the topics/partitions.
-
setSharedReplyTopic
public void setSharedReplyTopic(boolean sharedReplyTopic)
Set to true when multiple templates are using the same topic for replies. This simply changes logs for unexpected replies to debug instead of error.- Parameters:
sharedReplyTopic
- true if using a shared topic.- Since:
- 2.2
-
setCorrelationIdStrategy
public void setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
Set a function to be called to establish a unique correlation key for each request record.- Parameters:
correlationStrategy
- the function.- Since:
- 2.3
-
setCorrelationHeaderName
public void setCorrelationHeaderName(java.lang.String correlationHeaderName)
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID
.- Parameters:
correlationHeaderName
- the header name.- Since:
- 2.3
-
setReplyTopicHeaderName
public void setReplyTopicHeaderName(java.lang.String replyTopicHeaderName)
Set a custom header name for the reply topic. DefaultKafkaHeaders.REPLY_TOPIC
.- Parameters:
replyTopicHeaderName
- the header name.- Since:
- 2.3
-
setReplyPartitionHeaderName
public void setReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)
Set a custom header name for the reply partition. DefaultKafkaHeaders.REPLY_PARTITION
.- Parameters:
replyPartitionHeaderName
- the reply partition header name.- Since:
- 2.3
-
afterPropertiesSet
public void afterPropertiesSet()
- Specified by:
afterPropertiesSet
in interfaceorg.springframework.beans.factory.InitializingBean
-
start
public void start()
- Specified by:
start
in interfaceorg.springframework.context.Lifecycle
-
stop
public void stop()
- Specified by:
stop
in interfaceorg.springframework.context.Lifecycle
-
stop
public void stop(java.lang.Runnable callback)
- Specified by:
stop
in interfaceorg.springframework.context.SmartLifecycle
-
sendAndReceive
public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Description copied from interface:ReplyingKafkaOperations
Send a request and receive a reply with the default timeout.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,V,R>
- Parameters:
record
- the record to send.- Returns:
- a RequestReplyFuture.
-
sendAndReceive
public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, @Nullable java.time.Duration replyTimeout)
Description copied from interface:ReplyingKafkaOperations
Send a request and receive a reply.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,V,R>
- Parameters:
record
- the record to send.replyTimeout
- the reply timeout; if null, the default will be used.- Returns:
- a RequestReplyFuture.
-
handleTimeout
protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.- Parameters:
correlationId
- the correlation id.future
- the future.- Returns:
- true to indicate the future has been completed.
- Since:
- 2.3
-
isPending
protected boolean isPending(CorrelationKey correlationId)
Return true if this correlation id is still active.- Parameters:
correlationId
- the correlation id.- Returns:
- true if pending.
- Since:
- 2.3
-
destroy
public void destroy()
- Specified by:
destroy
in interfaceorg.springframework.beans.factory.DisposableBean
- Overrides:
destroy
in classKafkaTemplate<K,V>
-
onMessage
public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
Description copied from interface:GenericMessageListener
Invoked with data from kafka.- Specified by:
onMessage
in interfaceGenericMessageListener<K>
- Parameters:
data
- the data to be processed.
-
checkDeserialization
@Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.springframework.core.log.LogAccessor logger)
Return aDeserializationException
if either the key or value failed deserialization; null otherwise. If you need to determine whether it was the key or value, callListenerUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor)
withErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
andErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
instead.- Parameters:
record
- the record.logger
- aLogAccessor
.- Returns:
- the
DeserializationException
ornull
. - Since:
- 2.2.15
-
logLateArrival
protected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)
-
-