Class ReplyingKafkaTemplate<K,V,R>
java.lang.Object
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
- Type Parameters:
K
- the key type.V
- the outbound data type.R
- the reply data type.
- All Implemented Interfaces:
EventListener
,Aware
,BeanNameAware
,DisposableBean
,InitializingBean
,SmartInitializingSingleton
,ApplicationContextAware
,ApplicationListener<ContextStoppedEvent>
,Lifecycle
,Phased
,SmartLifecycle
,KafkaOperations<K,
,V> BatchMessageListener<K,
,R> ConsumerSeekAware
,GenericMessageListener<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,
,R>>> ReplyingKafkaOperations<K,
V, R>
- Direct Known Subclasses:
AggregatingReplyingKafkaTemplate
public class ReplyingKafkaTemplate<K,V,R>
extends KafkaTemplate<K,V>
implements BatchMessageListener<K,R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K,V,R>, ConsumerSeekAware
A KafkaTemplate that implements request/reply semantics.
- Since:
- 2.1.3
- Author:
- Gary Russell, Artem Bilan
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
ConsumerSeekAware.ConsumerSeekCallback
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,
V, T>, KafkaOperations.ProducerCallback<K, V, T> -
Field Summary
Fields inherited from class org.springframework.kafka.core.KafkaTemplate
logger
Fields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorDescriptionReplyingKafkaTemplate
(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) ReplyingKafkaTemplate
(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) -
Method Summary
Modifier and TypeMethodDescriptionvoid
static DeserializationException
checkDeserialization
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, LogAccessor logger) Return aDeserializationException
if either the key or value failed deserialization; null otherwise.protected Exception
checkForErrors
(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record) Check for errors in a reply.void
destroy()
Collection<org.apache.kafka.common.TopicPartition>
Return the topics/partitions assigned to the replying listener container.protected String
Return the correlation header name.protected Duration
Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.int
getPhase()
protected boolean
handleTimeout
(Object correlationId, RequestReplyFuture<K, V, R> future) Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.boolean
protected boolean
protected boolean
Return true if this correlation id is still active.boolean
protected void
logLateArrival
(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record, Object correlationId) void
When using manual partition assignment, called when the first poll has completed; useful when usingauto.offset.reset=latest
and you need to wait until the initial position has been established.void
Invoked with data from kafka.sendAndReceive
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record) Send a request and receive a reply with the default timeout.sendAndReceive
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, Duration replyTimeout) Send a request and receive a reply.sendAndReceive
(Message<?> message) Send a request message and receive a reply message with the default timeout.sendAndReceive
(Message<?> message, Duration replyTimeout) Send a request message and receive a reply message.<P> RequestReplyTypedMessageFuture<K,
V, P> sendAndReceive
(Message<?> message, Duration replyTimeout, ParameterizedTypeReference<P> returnType) Send a request message and receive a reply message.<P> RequestReplyTypedMessageFuture<K,
V, P> sendAndReceive
(Message<?> message, ParameterizedTypeReference<P> returnType) Send a request message and receive a reply message.void
setAutoStartup
(boolean autoStartup) void
setBinaryCorrelation
(boolean binaryCorrelation) Set to false to use the String representation of the correlation as the correlationId rather than the binary representation.void
setCorrelationHeaderName
(String correlationHeaderName) Set a custom header name for the correlation id.void
setCorrelationIdStrategy
(Function<org.apache.kafka.clients.producer.ProducerRecord<K, V>, CorrelationKey> correlationStrategy) Set a function to be called to establish a unique correlation key for each request record.void
setDefaultReplyTimeout
(Duration defaultReplyTimeout) Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.void
setPhase
(int phase) void
setReplyErrorChecker
(Function<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception> replyErrorChecker) Set a function to examine replies for an error returned by the server.void
setReplyPartitionHeaderName
(String replyPartitionHeaderName) Set a custom header name for the reply partition.void
setReplyTopicHeaderName
(String replyTopicHeaderName) Set a custom header name for the reply topic.void
setSharedReplyTopic
(boolean sharedReplyTopic) Set to true when multiple templates are using the same topic for replies.void
setTaskScheduler
(TaskScheduler scheduler) void
start()
void
stop()
void
boolean
waitForAssignment
(Duration duration) Wait until partitions are assigned, e.g.Methods inherited from class org.springframework.kafka.core.KafkaTemplate
afterSingletonsInstantiated, closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getKafkaAdmin, getMessageConverter, getMicrometerTagsProvider, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, receive, receive, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setConsumerFactory, setDefaultTopic, setKafkaAdmin, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setMicrometerTagsProvider, setObservationConvention, setObservationEnabled, setProducerInterceptor, setProducerListener, setTransactionIdPrefix
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecution
Methods inherited from interface org.springframework.kafka.listener.BatchMessageListener
onMessage, wantsPollResult
Methods inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
onIdleContainer, onPartitionsAssigned, onPartitionsRevoked, registerSeekCallback, unregisterSeekCallback
Methods inherited from interface org.springframework.kafka.listener.GenericMessageListener
onMessage, onMessage, onMessage
Methods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive
-
Constructor Details
-
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) -
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush)
-
-
Method Details
-
setTaskScheduler
-
getDefaultReplyTimeout
Return the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.- Returns:
- the timeout.
- Since:
- 2.3
-
setDefaultReplyTimeout
Set the reply timeout used if no replyTimeout is provided in thesendAndReceive(ProducerRecord, Duration)
call.- Parameters:
defaultReplyTimeout
- the timeout.- Since:
- 2.3
-
isRunning
public boolean isRunning() -
getPhase
public int getPhase()- Specified by:
getPhase
in interfacePhased
- Specified by:
getPhase
in interfaceSmartLifecycle
-
setPhase
public void setPhase(int phase) -
isAutoStartup
public boolean isAutoStartup()- Specified by:
isAutoStartup
in interfaceSmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup) -
getAssignedReplyTopicPartitions
Return the topics/partitions assigned to the replying listener container.- Returns:
- the topics/partitions.
-
setCorrelationIdStrategy
public void setCorrelationIdStrategy(Function<org.apache.kafka.clients.producer.ProducerRecord<K, V>, CorrelationKey> correlationStrategy) Set a function to be called to establish a unique correlation key for each request record.- Parameters:
correlationStrategy
- the function.- Since:
- 2.3
-
setCorrelationHeaderName
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID
.- Parameters:
correlationHeaderName
- the header name.- Since:
- 2.3
-
getCorrelationHeaderName
Return the correlation header name.- Returns:
- the header name.
- Since:
- 2.8.8
-
setReplyTopicHeaderName
Set a custom header name for the reply topic. DefaultKafkaHeaders.REPLY_TOPIC
.- Parameters:
replyTopicHeaderName
- the header name.- Since:
- 2.3
-
setReplyPartitionHeaderName
Set a custom header name for the reply partition. DefaultKafkaHeaders.REPLY_PARTITION
.- Parameters:
replyPartitionHeaderName
- the reply partition header name.- Since:
- 2.3
-
setReplyErrorChecker
public void setReplyErrorChecker(Function<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception> replyErrorChecker) Set a function to examine replies for an error returned by the server.- Parameters:
replyErrorChecker
- the error checker function.- Since:
- 2.6.7
-
setBinaryCorrelation
public void setBinaryCorrelation(boolean binaryCorrelation) Set to false to use the String representation of the correlation as the correlationId rather than the binary representation. Default true.- Parameters:
binaryCorrelation
- false for String.- Since:
- 3.0
-
isBinaryCorrelation
protected boolean isBinaryCorrelation() -
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSet
in interfaceInitializingBean
-
start
public void start() -
stop
public void stop() -
stop
- Specified by:
stop
in interfaceSmartLifecycle
-
onFirstPoll
public void onFirstPoll()Description copied from interface:ConsumerSeekAware
When using manual partition assignment, called when the first poll has completed; useful when usingauto.offset.reset=latest
and you need to wait until the initial position has been established.- Specified by:
onFirstPoll
in interfaceConsumerSeekAware
-
waitForAssignment
Description copied from interface:ReplyingKafkaOperations
Wait until partitions are assigned, e.g. whenauto.offset.reset=latest
. When using manual assignment, the duration must be greater than the container'spollTimeout
property.- Specified by:
waitForAssignment
in interfaceReplyingKafkaOperations<K,
V, R> - Parameters:
duration
- how long to wait.- Returns:
- true if the partitions have been assigned.
- Throws:
InterruptedException
- if the thread is interrupted while waiting.
-
sendAndReceive
Description copied from interface:ReplyingKafkaOperations
Send a request message and receive a reply message with the default timeout.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,
V, R> - Parameters:
message
- the message to send.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
Description copied from interface:ReplyingKafkaOperations
Send a request message and receive a reply message.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,
V, R> - Parameters:
message
- the message to send.replyTimeout
- the reply timeout; if null, the default will be used.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V, sendAndReceiveP> (Message<?> message, @Nullable ParameterizedTypeReference<P> returnType) Description copied from interface:ReplyingKafkaOperations
Send a request message and receive a reply message.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,
V, R> - Type Parameters:
P
- the reply payload type.- Parameters:
message
- the message to send.returnType
- a hint to the message converter for the reply payload type.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V, sendAndReceiveP> (Message<?> message, @Nullable Duration replyTimeout, @Nullable ParameterizedTypeReference<P> returnType) Description copied from interface:ReplyingKafkaOperations
Send a request message and receive a reply message.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,
V, R> - Type Parameters:
P
- the reply payload type.- Parameters:
message
- the message to send.replyTimeout
- the reply timeout; if null, the default will be used.returnType
- a hint to the message converter for the reply payload type.- Returns:
- a RequestReplyMessageFuture.
-
sendAndReceive
public RequestReplyFuture<K,V, sendAndReceiveR> (org.apache.kafka.clients.producer.ProducerRecord<K, V> record) Description copied from interface:ReplyingKafkaOperations
Send a request and receive a reply with the default timeout.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,
V, R> - Parameters:
record
- the record to send.- Returns:
- a RequestReplyFuture.
-
sendAndReceive
public RequestReplyFuture<K,V, sendAndReceiveR> (org.apache.kafka.clients.producer.ProducerRecord<K, V> record, @Nullable Duration replyTimeout) Description copied from interface:ReplyingKafkaOperations
Send a request and receive a reply.- Specified by:
sendAndReceive
in interfaceReplyingKafkaOperations<K,
V, R> - Parameters:
record
- the record to send.replyTimeout
- the reply timeout; if null, the default will be used.- Returns:
- a RequestReplyFuture.
-
handleTimeout
Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.- Parameters:
correlationId
- the correlation id.future
- the future.- Returns:
- true to indicate the future has been completed.
- Since:
- 2.3
-
isPending
Return true if this correlation id is still active.- Parameters:
correlationId
- the correlation id.- Returns:
- true if pending.
- Since:
- 2.3
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
- Overrides:
destroy
in classKafkaTemplate<K,
V>
-
onMessage
Description copied from interface:GenericMessageListener
Invoked with data from kafka.- Specified by:
onMessage
in interfaceGenericMessageListener<K>
- Parameters:
data
- the data to be processed.
-
checkForErrors
@Nullable protected Exception checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record) Check for errors in a reply. The default implementation checks forDeserializationException
s and invokes thereplyErrorChecker
function.- Parameters:
record
- the record.- Returns:
- the exception, or null if none.
- Since:
- 2.6.7
-
checkDeserialization
@Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, LogAccessor logger) Return aDeserializationException
if either the key or value failed deserialization; null otherwise. If you need to determine whether it was the key or value, callSerializationUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor)
withSerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER
andSerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER
instead.- Parameters:
record
- the record.logger
- aLogAccessor
.- Returns:
- the
DeserializationException
ornull
. - Since:
- 2.2.15
-
logLateArrival
-