Class KafkaTemplate<K,​V>

  • Type Parameters:
    K - the key type.
    V - the value type.
    All Implemented Interfaces:
    java.util.EventListener, org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, KafkaOperations<K,​V>
    Direct Known Subclasses:
    ReplyingKafkaTemplate, RoutingKafkaTemplate

    public class KafkaTemplate<K,​V>
    extends java.lang.Object
    implements KafkaOperations<K,​V>, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
    A template for executing high-level operations. When used with a DefaultKafkaProducerFactory, the template is thread-safe. The producer factory and KafkaProducer ensure this; refer to their respective javadocs.
    Author:
    Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Guti?rrez
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected org.springframework.core.log.LogAccessor logger  
    • Constructor Summary

      Constructors 
      Constructor Description
      KafkaTemplate​(ProducerFactory<K,​V> producerFactory)
      Create an instance using the supplied producer factory and autoFlush false.
      KafkaTemplate​(ProducerFactory<K,​V> producerFactory, boolean autoFlush)
      Create an instance using the supplied producer factory and autoFlush setting.
      KafkaTemplate​(ProducerFactory<K,​V> producerFactory, boolean autoFlush, java.util.Map<java.lang.String,​java.lang.Object> configOverrides)
      Create an instance using the supplied producer factory and autoFlush setting.
      KafkaTemplate​(ProducerFactory<K,​V> producerFactory, java.util.Map<java.lang.String,​java.lang.Object> configOverrides)
      Create an instance using the supplied producer factory and properties, with autoFlush false.
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      protected void closeProducer​(org.apache.kafka.clients.producer.Producer<K,​V> producer, boolean inTx)  
      void destroy()  
      protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> doSend​(org.apache.kafka.clients.producer.ProducerRecord<K,​V> producerRecord)
      Send the producer record.
      <T> T execute​(KafkaOperations.ProducerCallback<K,​V,​T> callback)
      Execute some arbitrary operation(s) on the producer and return the result.
      <T> T executeInTransaction​(KafkaOperations.OperationsCallback<K,​V,​T> callback)
      Execute some arbitrary operation(s) on the operations and return the result.
      void flush()
      Flush the producer.
      java.lang.String getDefaultTopic()
      The default topic for send methods where a topic is not provided.
      MessageConverter getMessageConverter()
      Return the message converter.
      ProducerFactory<K,​V> getProducerFactory()
      Return the producer factory used by this template.
      protected ProducerFactory<K,​V> getProducerFactory​(java.lang.String topic)
      Return the producer factory used by this template based on the topic.
      protected org.apache.kafka.clients.producer.Producer<K,​V> getTheProducer​(java.lang.String topic)  
      java.lang.String getTransactionIdPrefix()  
      boolean inTransaction()
      Return true if the template is currently running in a transaction on the calling thread.
      boolean isAllowNonTransactional()
      Return true if this template, when transactional, allows non-transactional operations.
      boolean isTransactional()
      Return true if the implementation supports transactions (has a transaction-capable producer factory).
      java.util.Map<org.apache.kafka.common.MetricName,​? extends org.apache.kafka.common.Metric> metrics()
      See Producer.metrics().
      void onApplicationEvent​(org.springframework.context.event.ContextStoppedEvent event)  
      java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor​(java.lang.String topic)
      See Producer.partitionsFor(String).
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
      Send the data to the provided topic with the provided key and partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic, java.lang.Integer partition, K key, V data)
      Send the data to the provided topic with the provided key and partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic, K key, V data)
      Send the data to the provided topic with the provided key and no partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic, V data)
      Send the data to the provided topic with no key or partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(org.apache.kafka.clients.producer.ProducerRecord<K,​V> record)
      Send the provided ProducerRecord.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(org.springframework.messaging.Message<?> message)
      Send a message with routing information in message headers.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
      Send the data to the default topic with the provided key and partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(java.lang.Integer partition, K key, V data)
      Send the data to the default topic with the provided key and partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(K key, V data)
      Send the data to the default topic with the provided key and no partition.
      org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(V data)
      Send the data to the default topic with no key or partition.
      void sendOffsetsToTransaction​(java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
      When running in a transaction, send the consumer offset(s) to the transaction.
      void sendOffsetsToTransaction​(java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
      When running in a transaction, send the consumer offset(s) to the transaction.
      void sendOffsetsToTransaction​(java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
      When running in a transaction, send the consumer offset(s) to the transaction.
      void setAllowNonTransactional​(boolean allowNonTransactional)
      Set to true to allow a non-transactional send when the template is transactional.
      void setApplicationContext​(org.springframework.context.ApplicationContext applicationContext)  
      void setBeanName​(java.lang.String name)  
      void setCloseTimeout​(java.time.Duration closeTimeout)
      Set the maximum time to wait when closing a producer; default 5 seconds.
      void setDefaultTopic​(java.lang.String defaultTopic)
      Set the default topic for send methods where a topic is not provided.
      void setMessageConverter​(RecordMessageConverter messageConverter)
      Set the message converter to use.
      void setMicrometerEnabled​(boolean micrometerEnabled)
      Set to false to disable micrometer timers, if micrometer is on the class path.
      void setMicrometerTags​(java.util.Map<java.lang.String,​java.lang.String> tags)
      Set additional tags for the Micrometer listener timers.
      void setProducerListener​(ProducerListener<K,​V> producerListener)
      Set a ProducerListener which will be invoked when Kafka acknowledges a send operation.
      void setTransactionIdPrefix​(java.lang.String transactionIdPrefix)
      Set a transaction id prefix to override the prefix in the producer factory.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • logger

        protected final org.springframework.core.log.LogAccessor logger
    • Constructor Detail

      • KafkaTemplate

        public KafkaTemplate​(ProducerFactory<K,​V> producerFactory)
        Create an instance using the supplied producer factory and autoFlush false.
        Parameters:
        producerFactory - the producer factory.
      • KafkaTemplate

        public KafkaTemplate​(ProducerFactory<K,​V> producerFactory,
                             @Nullable
                             java.util.Map<java.lang.String,​java.lang.Object> configOverrides)
        Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a new DefaultKafkaProducerFactory will be created with merged producer properties with the overrides being applied after the supplied factory's properties.
        Parameters:
        producerFactory - the producer factory.
        configOverrides - producer configuration properties to override.
        Since:
        2.5
      • KafkaTemplate

        public KafkaTemplate​(ProducerFactory<K,​V> producerFactory,
                             boolean autoFlush)
        Create an instance using the supplied producer factory and autoFlush setting.

        Set autoFlush to true if you have configured the producer's linger.ms to a non-default value and wish send operations on this template to occur immediately, regardless of that setting, or if you wish to block until the broker has acknowledged receipt according to the producer's acks property.

        Parameters:
        producerFactory - the producer factory.
        autoFlush - true to flush after each send.
        See Also:
        Producer.flush()
      • KafkaTemplate

        public KafkaTemplate​(ProducerFactory<K,​V> producerFactory,
                             boolean autoFlush,
                             @Nullable
                             java.util.Map<java.lang.String,​java.lang.Object> configOverrides)
        Create an instance using the supplied producer factory and autoFlush setting.

        Set autoFlush to true if you have configured the producer's linger.ms to a non-default value and wish send operations on this template to occur immediately, regardless of that setting, or if you wish to block until the broker has acknowledged receipt according to the producer's acks property. If the configOverrides is not null or empty, a new DefaultKafkaProducerFactory will be created with merged producer properties with the overrides being applied after the supplied factory's properties.

        Parameters:
        producerFactory - the producer factory.
        autoFlush - true to flush after each send.
        configOverrides - producer configuration properties to override.
        Since:
        2.5
        See Also:
        Producer.flush()
    • Method Detail

      • setBeanName

        public void setBeanName​(java.lang.String name)
        Specified by:
        setBeanName in interface org.springframework.beans.factory.BeanNameAware
      • setApplicationContext

        public void setApplicationContext​(org.springframework.context.ApplicationContext applicationContext)
        Specified by:
        setApplicationContext in interface org.springframework.context.ApplicationContextAware
      • getDefaultTopic

        public java.lang.String getDefaultTopic()
        The default topic for send methods where a topic is not provided.
        Returns:
        the topic.
      • setDefaultTopic

        public void setDefaultTopic​(java.lang.String defaultTopic)
        Set the default topic for send methods where a topic is not provided.
        Parameters:
        defaultTopic - the topic.
      • setProducerListener

        public void setProducerListener​(@Nullable
                                        ProducerListener<K,​V> producerListener)
        Set a ProducerListener which will be invoked when Kafka acknowledges a send operation. By default a LoggingProducerListener is configured which logs errors only.
        Parameters:
        producerListener - the listener; may be null.
      • getMessageConverter

        public MessageConverter getMessageConverter()
        Return the message converter.
        Returns:
        the message converter.
      • setMessageConverter

        public void setMessageConverter​(RecordMessageConverter messageConverter)
        Set the message converter to use.
        Parameters:
        messageConverter - the message converter.
      • isTransactional

        public boolean isTransactional()
        Description copied from interface: KafkaOperations
        Return true if the implementation supports transactions (has a transaction-capable producer factory).
        Specified by:
        isTransactional in interface KafkaOperations<K,​V>
        Returns:
        true or false.
      • getTransactionIdPrefix

        public java.lang.String getTransactionIdPrefix()
      • setTransactionIdPrefix

        public void setTransactionIdPrefix​(java.lang.String transactionIdPrefix)
        Set a transaction id prefix to override the prefix in the producer factory.
        Parameters:
        transactionIdPrefix - the prefix.
        Since:
        2.3
      • setCloseTimeout

        public void setCloseTimeout​(java.time.Duration closeTimeout)
        Set the maximum time to wait when closing a producer; default 5 seconds.
        Parameters:
        closeTimeout - the close timeout.
        Since:
        2.1.14
      • setAllowNonTransactional

        public void setAllowNonTransactional​(boolean allowNonTransactional)
        Set to true to allow a non-transactional send when the template is transactional.
        Parameters:
        allowNonTransactional - true to allow.
        Since:
        2.4.3
      • isAllowNonTransactional

        public boolean isAllowNonTransactional()
        Description copied from interface: KafkaOperations
        Return true if this template, when transactional, allows non-transactional operations.
        Specified by:
        isAllowNonTransactional in interface KafkaOperations<K,​V>
        Returns:
        true to allow.
      • setMicrometerEnabled

        public void setMicrometerEnabled​(boolean micrometerEnabled)
        Set to false to disable micrometer timers, if micrometer is on the class path.
        Parameters:
        micrometerEnabled - false to disable.
        Since:
        2.5
      • setMicrometerTags

        public void setMicrometerTags​(java.util.Map<java.lang.String,​java.lang.String> tags)
        Set additional tags for the Micrometer listener timers.
        Parameters:
        tags - the tags.
        Since:
        2.5
      • getProducerFactory

        protected ProducerFactory<K,​V> getProducerFactory​(java.lang.String topic)
        Return the producer factory used by this template based on the topic. The default implementation returns the only producer factory.
        Parameters:
        topic - the topic.
        Returns:
        the factory.
        Since:
        2.5
      • onApplicationEvent

        public void onApplicationEvent​(org.springframework.context.event.ContextStoppedEvent event)
        Specified by:
        onApplicationEvent in interface org.springframework.context.ApplicationListener<K>
      • sendDefault

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(@Nullable
                                                                                                       V data)
        Description copied from interface: KafkaOperations
        Send the data to the default topic with no key or partition.
        Specified by:
        sendDefault in interface KafkaOperations<K,​V>
        Parameters:
        data - The data.
        Returns:
        a Future for the SendResult.
      • sendDefault

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(K key,
                                                                                                       @Nullable
                                                                                                       V data)
        Description copied from interface: KafkaOperations
        Send the data to the default topic with the provided key and no partition.
        Specified by:
        sendDefault in interface KafkaOperations<K,​V>
        Parameters:
        key - the key.
        data - The data.
        Returns:
        a Future for the SendResult.
      • sendDefault

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(java.lang.Integer partition,
                                                                                                       K key,
                                                                                                       @Nullable
                                                                                                       V data)
        Description copied from interface: KafkaOperations
        Send the data to the default topic with the provided key and partition.
        Specified by:
        sendDefault in interface KafkaOperations<K,​V>
        Parameters:
        partition - the partition.
        key - the key.
        data - the data.
        Returns:
        a Future for the SendResult.
      • sendDefault

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> sendDefault​(java.lang.Integer partition,
                                                                                                       java.lang.Long timestamp,
                                                                                                       K key,
                                                                                                       @Nullable
                                                                                                       V data)
        Description copied from interface: KafkaOperations
        Send the data to the default topic with the provided key and partition.
        Specified by:
        sendDefault in interface KafkaOperations<K,​V>
        Parameters:
        partition - the partition.
        timestamp - the timestamp of the record.
        key - the key.
        data - the data.
        Returns:
        a Future for the SendResult.
      • send

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic,
                                                                                                @Nullable
                                                                                                V data)
        Description copied from interface: KafkaOperations
        Send the data to the provided topic with no key or partition.
        Specified by:
        send in interface KafkaOperations<K,​V>
        Parameters:
        topic - the topic.
        data - The data.
        Returns:
        a Future for the SendResult.
      • send

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic,
                                                                                                K key,
                                                                                                @Nullable
                                                                                                V data)
        Description copied from interface: KafkaOperations
        Send the data to the provided topic with the provided key and no partition.
        Specified by:
        send in interface KafkaOperations<K,​V>
        Parameters:
        topic - the topic.
        key - the key.
        data - The data.
        Returns:
        a Future for the SendResult.
      • send

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic,
                                                                                                java.lang.Integer partition,
                                                                                                K key,
                                                                                                @Nullable
                                                                                                V data)
        Description copied from interface: KafkaOperations
        Send the data to the provided topic with the provided key and partition.
        Specified by:
        send in interface KafkaOperations<K,​V>
        Parameters:
        topic - the topic.
        partition - the partition.
        key - the key.
        data - the data.
        Returns:
        a Future for the SendResult.
      • send

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(java.lang.String topic,
                                                                                                java.lang.Integer partition,
                                                                                                java.lang.Long timestamp,
                                                                                                K key,
                                                                                                @Nullable
                                                                                                V data)
        Description copied from interface: KafkaOperations
        Send the data to the provided topic with the provided key and partition.
        Specified by:
        send in interface KafkaOperations<K,​V>
        Parameters:
        topic - the topic.
        partition - the partition.
        timestamp - the timestamp of the record.
        key - the key.
        data - the data.
        Returns:
        a Future for the SendResult.
      • send

        public org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> send​(org.apache.kafka.clients.producer.ProducerRecord<K,​V> record)
        Description copied from interface: KafkaOperations
        Send the provided ProducerRecord.
        Specified by:
        send in interface KafkaOperations<K,​V>
        Parameters:
        record - the record.
        Returns:
        a Future for the SendResult.
      • partitionsFor

        public java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor​(java.lang.String topic)
        Description copied from interface: KafkaOperations
        See Producer.partitionsFor(String).
        Specified by:
        partitionsFor in interface KafkaOperations<K,​V>
        Parameters:
        topic - the topic.
        Returns:
        the partition info.
      • metrics

        public java.util.Map<org.apache.kafka.common.MetricName,​? extends org.apache.kafka.common.Metric> metrics()
        Description copied from interface: KafkaOperations
        See Producer.metrics().
        Specified by:
        metrics in interface KafkaOperations<K,​V>
        Returns:
        the metrics.
      • execute

        public <T> T execute​(KafkaOperations.ProducerCallback<K,​V,​T> callback)
        Description copied from interface: KafkaOperations
        Execute some arbitrary operation(s) on the producer and return the result.
        Specified by:
        execute in interface KafkaOperations<K,​V>
        Type Parameters:
        T - the result type.
        Parameters:
        callback - the callback.
        Returns:
        the result.
      • executeInTransaction

        public <T> T executeInTransaction​(KafkaOperations.OperationsCallback<K,​V,​T> callback)
        Description copied from interface: KafkaOperations
        Execute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).
        Specified by:
        executeInTransaction in interface KafkaOperations<K,​V>
        Type Parameters:
        T - the result type.
        Parameters:
        callback - the callback.
        Returns:
        the result.
      • sendOffsetsToTransaction

        public void sendOffsetsToTransaction​(java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
        Description copied from interface: KafkaOperations
        When running in a transaction, send the consumer offset(s) to the transaction. The group id is obtained from KafkaUtils.getConsumerGroupId(). It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with a KafkaAwareTransactionManager) since the container will take care of sending the offsets to the transaction.
        Specified by:
        sendOffsetsToTransaction in interface KafkaOperations<K,​V>
        Parameters:
        offsets - The offsets.
      • sendOffsetsToTransaction

        public void sendOffsetsToTransaction​(java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
                                             java.lang.String consumerGroupId)
        Description copied from interface: KafkaOperations
        When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with a KafkaAwareTransactionManager) since the container will take care of sending the offsets to the transaction.
        Specified by:
        sendOffsetsToTransaction in interface KafkaOperations<K,​V>
        Parameters:
        offsets - The offsets.
        consumerGroupId - the consumer's group.id.
      • sendOffsetsToTransaction

        public void sendOffsetsToTransaction​(java.util.Map<org.apache.kafka.common.TopicPartition,​org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
                                             org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
        Description copied from interface: KafkaOperations
        When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with a KafkaAwareTransactionManager) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.
        Specified by:
        sendOffsetsToTransaction in interface KafkaOperations<K,​V>
        Parameters:
        offsets - The offsets.
        groupMetadata - the consumer group metadata.
        See Also:
        Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
      • closeProducer

        protected void closeProducer​(org.apache.kafka.clients.producer.Producer<K,​V> producer,
                                     boolean inTx)
      • doSend

        protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,​V>> doSend​(org.apache.kafka.clients.producer.ProducerRecord<K,​V> producerRecord)
        Send the producer record.
        Parameters:
        producerRecord - the producer record.
        Returns:
        a Future for the RecordMetadata.
      • inTransaction

        public boolean inTransaction()
        Return true if the template is currently running in a transaction on the calling thread.
        Specified by:
        inTransaction in interface KafkaOperations<K,​V>
        Returns:
        true if a transaction is running.
        Since:
        2.2.1
      • getTheProducer

        protected org.apache.kafka.clients.producer.Producer<K,​V> getTheProducer​(@Nullable
                                                                                       java.lang.String topic)
      • destroy

        public void destroy()
        Specified by:
        destroy in interface org.springframework.beans.factory.DisposableBean