Class ReplyingKafkaTemplate<K,V,R>

java.lang.Object
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
Type Parameters:
K - the key type.
V - the outbound data type.
R - the reply data type.
All Implemented Interfaces:
EventListener, Aware, BeanNameAware, DisposableBean, InitializingBean, SmartInitializingSingleton, ApplicationContextAware, ApplicationListener<ContextStoppedEvent>, Lifecycle, Phased, SmartLifecycle, KafkaOperations<K,V>, BatchMessageListener<K,R>, ConsumerSeekAware, GenericMessageListener<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>, ReplyingKafkaOperations<K,V,R>
Direct Known Subclasses:
AggregatingReplyingKafkaTemplate

public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K,V,R>, ConsumerSeekAware
A KafkaTemplate that implements request/reply semantics.
Since:
2.1.3
Author:
Gary Russell, Artem Bilan
  • Constructor Details

  • Method Details

    • setTaskScheduler

      public void setTaskScheduler(TaskScheduler scheduler)
    • getDefaultReplyTimeout

      protected Duration getDefaultReplyTimeout()
      Return the reply timeout used if no replyTimeout is provided in the sendAndReceive(ProducerRecord, Duration) call.
      Returns:
      the timeout.
      Since:
      2.3
    • setDefaultReplyTimeout

      public void setDefaultReplyTimeout(Duration defaultReplyTimeout)
      Set the reply timeout used if no replyTimeout is provided in the sendAndReceive(ProducerRecord, Duration) call.
      Parameters:
      defaultReplyTimeout - the timeout.
      Since:
      2.3
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
    • getPhase

      public int getPhase()
      Specified by:
      getPhase in interface Phased
      Specified by:
      getPhase in interface SmartLifecycle
    • setPhase

      public void setPhase(int phase)
    • isAutoStartup

      public boolean isAutoStartup()
      Specified by:
      isAutoStartup in interface SmartLifecycle
    • setAutoStartup

      public void setAutoStartup(boolean autoStartup)
    • getAssignedReplyTopicPartitions

      public Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
      Return the topics/partitions assigned to the replying listener container.
      Returns:
      the topics/partitions.
    • setSharedReplyTopic

      public void setSharedReplyTopic(boolean sharedReplyTopic)
      Set to true when multiple templates are using the same topic for replies. This simply changes logs for unexpected replies to debug instead of error.
      Parameters:
      sharedReplyTopic - true if using a shared topic.
      Since:
      2.2
    • setCorrelationIdStrategy

      public void setCorrelationIdStrategy(Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
      Set a function to be called to establish a unique correlation key for each request record.
      Parameters:
      correlationStrategy - the function.
      Since:
      2.3
    • setCorrelationHeaderName

      public void setCorrelationHeaderName(String correlationHeaderName)
      Set a custom header name for the correlation id. Default KafkaHeaders.CORRELATION_ID.
      Parameters:
      correlationHeaderName - the header name.
      Since:
      2.3
    • getCorrelationHeaderName

      protected String getCorrelationHeaderName()
      Return the correlation header name.
      Returns:
      the header name.
      Since:
      2.8.8
    • setReplyTopicHeaderName

      public void setReplyTopicHeaderName(String replyTopicHeaderName)
      Set a custom header name for the reply topic. Default KafkaHeaders.REPLY_TOPIC.
      Parameters:
      replyTopicHeaderName - the header name.
      Since:
      2.3
    • setReplyPartitionHeaderName

      public void setReplyPartitionHeaderName(String replyPartitionHeaderName)
      Set a custom header name for the reply partition. Default KafkaHeaders.REPLY_PARTITION.
      Parameters:
      replyPartitionHeaderName - the reply partition header name.
      Since:
      2.3
    • setReplyErrorChecker

      public void setReplyErrorChecker(Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,Exception> replyErrorChecker)
      Set a function to examine replies for an error returned by the server.
      Parameters:
      replyErrorChecker - the error checker function.
      Since:
      2.6.7
    • setBinaryCorrelation

      public void setBinaryCorrelation(boolean binaryCorrelation)
      Set to false to use the String representation of the correlation as the correlationId rather than the binary representation. Default true.
      Parameters:
      binaryCorrelation - false for String.
      Since:
      3.0
    • isBinaryCorrelation

      protected boolean isBinaryCorrelation()
    • afterPropertiesSet

      public void afterPropertiesSet()
      Specified by:
      afterPropertiesSet in interface InitializingBean
    • start

      public void start()
      Specified by:
      start in interface Lifecycle
    • stop

      public void stop()
      Specified by:
      stop in interface Lifecycle
    • stop

      public void stop(Runnable callback)
      Specified by:
      stop in interface SmartLifecycle
    • onFirstPoll

      public void onFirstPoll()
      Description copied from interface: ConsumerSeekAware
      When using manual partition assignment, called when the first poll has completed; useful when using auto.offset.reset=latest and you need to wait until the initial position has been established.
      Specified by:
      onFirstPoll in interface ConsumerSeekAware
    • waitForAssignment

      public boolean waitForAssignment(Duration duration) throws InterruptedException
      Description copied from interface: ReplyingKafkaOperations
      Wait until partitions are assigned, e.g. when auto.offset.reset=latest. When using manual assignment, the duration must be greater than the container's pollTimeout property.
      Specified by:
      waitForAssignment in interface ReplyingKafkaOperations<K,V,R>
      Parameters:
      duration - how long to wait.
      Returns:
      true if the partitions have been assigned.
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
    • sendAndReceive

      public RequestReplyMessageFuture<K,V> sendAndReceive(Message<?> message)
      Description copied from interface: ReplyingKafkaOperations
      Send a request message and receive a reply message with the default timeout.
      Specified by:
      sendAndReceive in interface ReplyingKafkaOperations<K,V,R>
      Parameters:
      message - the message to send.
      Returns:
      a RequestReplyMessageFuture.
    • sendAndReceive

      public RequestReplyMessageFuture<K,V> sendAndReceive(Message<?> message, Duration replyTimeout)
      Description copied from interface: ReplyingKafkaOperations
      Send a request message and receive a reply message.
      Specified by:
      sendAndReceive in interface ReplyingKafkaOperations<K,V,R>
      Parameters:
      message - the message to send.
      replyTimeout - the reply timeout; if null, the default will be used.
      Returns:
      a RequestReplyMessageFuture.
    • sendAndReceive

      public <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(Message<?> message, @Nullable ParameterizedTypeReference<P> returnType)
      Description copied from interface: ReplyingKafkaOperations
      Send a request message and receive a reply message.
      Specified by:
      sendAndReceive in interface ReplyingKafkaOperations<K,V,R>
      Type Parameters:
      P - the reply payload type.
      Parameters:
      message - the message to send.
      returnType - a hint to the message converter for the reply payload type.
      Returns:
      a RequestReplyMessageFuture.
    • sendAndReceive

      public <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(Message<?> message, @Nullable Duration replyTimeout, @Nullable ParameterizedTypeReference<P> returnType)
      Description copied from interface: ReplyingKafkaOperations
      Send a request message and receive a reply message.
      Specified by:
      sendAndReceive in interface ReplyingKafkaOperations<K,V,R>
      Type Parameters:
      P - the reply payload type.
      Parameters:
      message - the message to send.
      replyTimeout - the reply timeout; if null, the default will be used.
      returnType - a hint to the message converter for the reply payload type.
      Returns:
      a RequestReplyMessageFuture.
    • sendAndReceive

      public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
      Description copied from interface: ReplyingKafkaOperations
      Send a request and receive a reply with the default timeout.
      Specified by:
      sendAndReceive in interface ReplyingKafkaOperations<K,V,R>
      Parameters:
      record - the record to send.
      Returns:
      a RequestReplyFuture.
    • sendAndReceive

      public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, @Nullable Duration replyTimeout)
      Description copied from interface: ReplyingKafkaOperations
      Send a request and receive a reply.
      Specified by:
      sendAndReceive in interface ReplyingKafkaOperations<K,V,R>
      Parameters:
      record - the record to send.
      replyTimeout - the reply timeout; if null, the default will be used.
      Returns:
      a RequestReplyFuture.
    • handleTimeout

      protected boolean handleTimeout(Object correlationId, RequestReplyFuture<K,V,R> future)
      Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.
      Parameters:
      correlationId - the correlation id.
      future - the future.
      Returns:
      true to indicate the future has been completed.
      Since:
      2.3
    • isPending

      protected boolean isPending(Object correlationId)
      Return true if this correlation id is still active.
      Parameters:
      correlationId - the correlation id.
      Returns:
      true if pending.
      Since:
      2.3
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Overrides:
      destroy in class KafkaTemplate<K,V>
    • onMessage

      public void onMessage(List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
      Description copied from interface: GenericMessageListener
      Invoked with data from kafka.
      Specified by:
      onMessage in interface GenericMessageListener<K>
      Parameters:
      data - the data to be processed.
    • checkForErrors

      @Nullable protected Exception checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record)
      Check for errors in a reply. The default implementation checks for DeserializationExceptions and invokes the replyErrorChecker function.
      Parameters:
      record - the record.
      Returns:
      the exception, or null if none.
      Since:
      2.6.7
    • checkDeserialization

      @Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, LogAccessor logger)
      Return a DeserializationException if either the key or value failed deserialization; null otherwise. If you need to determine whether it was the key or value, call SerializationUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor) with SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER and SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER instead.
      Parameters:
      record - the record.
      logger - a LogAccessor.
      Returns:
      the DeserializationException or null.
      Since:
      2.2.15
    • logLateArrival

      protected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, Object correlationId)