K
- the key type.V
- the outbound data type.R
- the reply data type.public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, org.springframework.beans.factory.InitializingBean, org.springframework.context.SmartLifecycle, org.springframework.beans.factory.DisposableBean, ReplyingKafkaOperations<K,V,R>
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
logger
Constructor and Description |
---|
ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory,
GenericMessageListenerContainer<K,R> replyContainer) |
ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory,
GenericMessageListenerContainer<K,R> replyContainer,
boolean autoFlush) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
protected CorrelationKey |
createCorrelationId(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Deprecated.
in favor of
setCorrelationIdStrategy(Function) . |
void |
destroy() |
java.util.Collection<org.apache.kafka.common.TopicPartition> |
getAssignedReplyTopicPartitions()
Return the topics/partitions assigned to the replying listener container.
|
protected java.time.Duration |
getDefaultReplyTimeout()
Return the reply timeout used if no replyTimeout is provided in the
sendAndReceive(ProducerRecord, Duration) call. |
int |
getPhase() |
protected long |
getReplyTimeout()
Deprecated.
in favor of
getDefaultReplyTimeout() . |
protected boolean |
handleTimeout(CorrelationKey correlationId,
RequestReplyFuture<K,V,R> future)
Used to inform subclasses that a request has timed out so they can clean up state
and, optionally, complete the future.
|
boolean |
isAutoStartup() |
protected boolean |
isPending(CorrelationKey correlationId)
Return true if this correlation id is still active.
|
boolean |
isRunning() |
protected void |
logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record,
CorrelationKey correlationId) |
void |
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
Invoked with data from kafka.
|
RequestReplyFuture<K,V,R> |
sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send a request and receive a reply with the default timeout.
|
RequestReplyFuture<K,V,R> |
sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record,
java.time.Duration replyTimeout)
Send a request and receive a reply.
|
void |
setAutoStartup(boolean autoStartup) |
void |
setCorrelationHeaderName(java.lang.String correlationHeaderName)
Set a custom header name for the correlation id.
|
void |
setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
Set a function to be called to establish a unique correlation key for each request
record.
|
void |
setDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)
Set the reply timeout used if no replyTimeout is provided in the
sendAndReceive(ProducerRecord, Duration) call. |
void |
setPhase(int phase) |
void |
setReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)
Set a custom header name for the reply partition.
|
void |
setReplyTimeout(long replyTimeout)
Deprecated.
in favor of
setDefaultReplyTimeout(Duration) . |
void |
setReplyTopicHeaderName(java.lang.String replyTopicHeaderName)
Set a custom header name for the reply topic.
|
void |
setSharedReplyTopic(boolean sharedReplyTopic)
Set to true when multiple templates are using the same topic for replies.
|
void |
setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler) |
void |
start() |
void |
stop() |
void |
stop(java.lang.Runnable callback) |
closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getProducerFactory, getTransactionIdPrefix, inTransaction, isTransactional, metrics, partitionsFor, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, setCloseTimeout, setDefaultTopic, setMessageConverter, setProducerListener, setTransactionIdPrefix
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
onMessage, wantsPollResult
onMessage, onMessage, onMessage
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
public void setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)
@Deprecated protected long getReplyTimeout()
getDefaultReplyTimeout()
.sendAndReceive(ProducerRecord, Duration)
call.@Deprecated public void setReplyTimeout(long replyTimeout)
setDefaultReplyTimeout(Duration)
.sendAndReceive(ProducerRecord, Duration)
call.replyTimeout
- the timeout.protected java.time.Duration getDefaultReplyTimeout()
sendAndReceive(ProducerRecord, Duration)
call.public void setDefaultReplyTimeout(java.time.Duration defaultReplyTimeout)
sendAndReceive(ProducerRecord, Duration)
call.defaultReplyTimeout
- the timeout.public boolean isRunning()
isRunning
in interface org.springframework.context.Lifecycle
public int getPhase()
getPhase
in interface org.springframework.context.Phased
getPhase
in interface org.springframework.context.SmartLifecycle
public void setPhase(int phase)
public boolean isAutoStartup()
isAutoStartup
in interface org.springframework.context.SmartLifecycle
public void setAutoStartup(boolean autoStartup)
public java.util.Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
public void setSharedReplyTopic(boolean sharedReplyTopic)
sharedReplyTopic
- true if using a shared topic.public void setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
correlationStrategy
- the function.public void setCorrelationHeaderName(java.lang.String correlationHeaderName)
KafkaHeaders.CORRELATION_ID
.correlationHeaderName
- the header name.public void setReplyTopicHeaderName(java.lang.String replyTopicHeaderName)
KafkaHeaders.REPLY_TOPIC
.replyTopicHeaderName
- the header name.public void setReplyPartitionHeaderName(java.lang.String replyPartitionHeaderName)
KafkaHeaders.REPLY_PARTITION
.replyPartitionHeaderName
- the reply partition header name.public void afterPropertiesSet()
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
public void start()
start
in interface org.springframework.context.Lifecycle
public void stop()
stop
in interface org.springframework.context.Lifecycle
public void stop(java.lang.Runnable callback)
stop
in interface org.springframework.context.SmartLifecycle
public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
ReplyingKafkaOperations
sendAndReceive
in interface ReplyingKafkaOperations<K,V,R>
record
- the record to send.public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, @Nullable java.time.Duration replyTimeout)
ReplyingKafkaOperations
sendAndReceive
in interface ReplyingKafkaOperations<K,V,R>
record
- the record to send.replyTimeout
- the reply timeout; if null, the default will be used.protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)
correlationId
- the correlation id.future
- the future.protected boolean isPending(CorrelationKey correlationId)
correlationId
- the correlation id.public void destroy()
destroy
in interface org.springframework.beans.factory.DisposableBean
@Deprecated protected CorrelationKey createCorrelationId(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
setCorrelationIdStrategy(Function)
.record
- the record.public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
GenericMessageListener
onMessage
in interface GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
data
- the data to be processed.protected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)