Class ReplyingKafkaTemplate<K,​V,​R>

  • Type Parameters:
    K - the key type.
    V - the outbound data type.
    R - the reply data type.
    All Implemented Interfaces:
    java.util.EventListener, org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.context.Lifecycle, org.springframework.context.Phased, org.springframework.context.SmartLifecycle, KafkaOperations<K,​V>, BatchMessageListener<K,​R>, GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>, ReplyingKafkaOperations<K,​V,​R>
    Direct Known Subclasses:
    AggregatingReplyingKafkaTemplate

    public class ReplyingKafkaTemplate<K,​V,​R>
    extends KafkaTemplate<K,​V>
    implements BatchMessageListener<K,​R>, org.springframework.beans.factory.InitializingBean, org.springframework.context.SmartLifecycle, org.springframework.beans.factory.DisposableBean, ReplyingKafkaOperations<K,​V,​R>
    A KafkaTemplate that implements request/reply semantics.
    Since:
    2.1.3
    Author:
    Gary Russell, Artem Bilan
    • Method Detail

      • setTaskScheduler

        public void setTaskScheduler​(org.springframework.scheduling.TaskScheduler scheduler)
      • getDefaultReplyTimeout

        protected java.time.Duration getDefaultReplyTimeout()
        Return the reply timeout used if no replyTimeout is provided in the sendAndReceive(ProducerRecord, Duration) call.
        Returns:
        the timeout.
        Since:
        2.3
      • setDefaultReplyTimeout

        public void setDefaultReplyTimeout​(java.time.Duration defaultReplyTimeout)
        Set the reply timeout used if no replyTimeout is provided in the sendAndReceive(ProducerRecord, Duration) call.
        Parameters:
        defaultReplyTimeout - the timeout.
        Since:
        2.3
      • isRunning

        public boolean isRunning()
        Specified by:
        isRunning in interface org.springframework.context.Lifecycle
      • getPhase

        public int getPhase()
        Specified by:
        getPhase in interface org.springframework.context.Phased
        Specified by:
        getPhase in interface org.springframework.context.SmartLifecycle
      • setPhase

        public void setPhase​(int phase)
      • isAutoStartup

        public boolean isAutoStartup()
        Specified by:
        isAutoStartup in interface org.springframework.context.SmartLifecycle
      • setAutoStartup

        public void setAutoStartup​(boolean autoStartup)
      • getAssignedReplyTopicPartitions

        public java.util.Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
        Return the topics/partitions assigned to the replying listener container.
        Returns:
        the topics/partitions.
      • setSharedReplyTopic

        public void setSharedReplyTopic​(boolean sharedReplyTopic)
        Set to true when multiple templates are using the same topic for replies. This simply changes logs for unexpected replies to debug instead of error.
        Parameters:
        sharedReplyTopic - true if using a shared topic.
        Since:
        2.2
      • setCorrelationIdStrategy

        public void setCorrelationIdStrategy​(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,​V>,​CorrelationKey> correlationStrategy)
        Set a function to be called to establish a unique correlation key for each request record.
        Parameters:
        correlationStrategy - the function.
        Since:
        2.3
      • setCorrelationHeaderName

        public void setCorrelationHeaderName​(java.lang.String correlationHeaderName)
        Set a custom header name for the correlation id. Default KafkaHeaders.CORRELATION_ID.
        Parameters:
        correlationHeaderName - the header name.
        Since:
        2.3
      • setReplyTopicHeaderName

        public void setReplyTopicHeaderName​(java.lang.String replyTopicHeaderName)
        Set a custom header name for the reply topic. Default KafkaHeaders.REPLY_TOPIC.
        Parameters:
        replyTopicHeaderName - the header name.
        Since:
        2.3
      • setReplyPartitionHeaderName

        public void setReplyPartitionHeaderName​(java.lang.String replyPartitionHeaderName)
        Set a custom header name for the reply partition. Default KafkaHeaders.REPLY_PARTITION.
        Parameters:
        replyPartitionHeaderName - the reply partition header name.
        Since:
        2.3
      • setReplyErrorChecker

        public void setReplyErrorChecker​(java.util.function.Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,​?>,​java.lang.Exception> replyErrorChecker)
        Set a function to examine replies for an error returned by the server.
        Parameters:
        replyErrorChecker - the error checker function.
        Since:
        2.6.7
      • afterPropertiesSet

        public void afterPropertiesSet()
        Specified by:
        afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
      • start

        public void start()
        Specified by:
        start in interface org.springframework.context.Lifecycle
      • stop

        public void stop()
        Specified by:
        stop in interface org.springframework.context.Lifecycle
      • stop

        public void stop​(java.lang.Runnable callback)
        Specified by:
        stop in interface org.springframework.context.SmartLifecycle
      • sendAndReceive

        public RequestReplyFuture<K,​V,​R> sendAndReceive​(org.apache.kafka.clients.producer.ProducerRecord<K,​V> record,
                                                                    @Nullable
                                                                    java.time.Duration replyTimeout)
        Description copied from interface: ReplyingKafkaOperations
        Send a request and receive a reply.
        Specified by:
        sendAndReceive in interface ReplyingKafkaOperations<K,​V,​R>
        Parameters:
        record - the record to send.
        replyTimeout - the reply timeout; if null, the default will be used.
        Returns:
        a RequestReplyFuture.
      • handleTimeout

        protected boolean handleTimeout​(CorrelationKey correlationId,
                                        RequestReplyFuture<K,​V,​R> future)
        Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.
        Parameters:
        correlationId - the correlation id.
        future - the future.
        Returns:
        true to indicate the future has been completed.
        Since:
        2.3
      • isPending

        protected boolean isPending​(CorrelationKey correlationId)
        Return true if this correlation id is still active.
        Parameters:
        correlationId - the correlation id.
        Returns:
        true if pending.
        Since:
        2.3
      • destroy

        public void destroy()
        Specified by:
        destroy in interface org.springframework.beans.factory.DisposableBean
        Overrides:
        destroy in class KafkaTemplate<K,​V>
      • onMessage

        public void onMessage​(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>> data)
        Description copied from interface: GenericMessageListener
        Invoked with data from kafka.
        Specified by:
        onMessage in interface GenericMessageListener<K>
        Parameters:
        data - the data to be processed.
      • checkForErrors

        @Nullable
        protected java.lang.Exception checkForErrors​(org.apache.kafka.clients.consumer.ConsumerRecord<K,​R> record)
        Check for errors in a reply. The default implementation checks for DeserializationExceptions and invokes the replyErrorChecker function.
        Parameters:
        record - the record.
        Returns:
        the exception, or null if none.
        Since:
        2.6.7
      • logLateArrival

        protected void logLateArrival​(org.apache.kafka.clients.consumer.ConsumerRecord<K,​R> record,
                                      CorrelationKey correlationId)