Class KafkaTemplate<K,V>

java.lang.Object
org.springframework.kafka.core.KafkaTemplate<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
EventListener, Aware, BeanNameAware, DisposableBean, SmartInitializingSingleton, ApplicationContextAware, ApplicationListener<ContextStoppedEvent>, KafkaOperations<K,V>
Direct Known Subclasses:
ReplyingKafkaTemplate, RoutingKafkaTemplate

A template for executing high-level operations. When used with a DefaultKafkaProducerFactory, the template is thread-safe. The producer factory and KafkaProducer ensure this; refer to their respective javadocs.
Author:
Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Soby Chacko, Gurps Bassi, Valentina Armenise, Christian Fredriksson
  • Field Details

  • Constructor Details

    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory)
      Create an instance using the supplied producer factory and autoFlush false.
      Parameters:
      producerFactory - the producer factory.
    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory, @Nullable Map<String,Object> configOverrides)
      Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a new DefaultKafkaProducerFactory will be created with merged producer properties with the overrides being applied after the supplied factory's properties.
      Parameters:
      producerFactory - the producer factory.
      configOverrides - producer configuration properties to override.
      Since:
      2.5
    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
      Create an instance using the supplied producer factory and autoFlush setting.

      Set autoFlush to true if you wish for the send operations on this template to occur immediately, regardless of the linger.ms or batch.size property values. This will also block until the broker has acknowledged receipt according to the producer's acks property.

      Parameters:
      producerFactory - the producer factory.
      autoFlush - true to flush after each send.
      See Also:
      • Producer.flush()
    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush, @Nullable Map<String,Object> configOverrides)
      Create an instance using the supplied producer factory and autoFlush setting.

      Set autoFlush to true if you wish for the send operations on this template to occur immediately, regardless of the linger.ms or batch.size property values. This will also block until the broker has acknowledged receipt according to the producer's acks property. If the configOverrides is not null or empty, a new ProducerFactory will be created using ProducerFactory.copyWithConfigurationOverride(java.util.Map) The factory shall apply the overrides after the supplied factory's properties. The ProducerPostProcessors from the original factory are copied over to keep instrumentation alive. Registered ProducerFactory.Listeners are also added to the new factory. If the factory implementation does not support the copy operation, a generic copy of the ProducerFactory is created which will be of type DefaultKafkaProducerFactory.

      Parameters:
      producerFactory - the producer factory.
      autoFlush - true to flush after each send.
      configOverrides - producer configuration properties to override.
      Since:
      2.5
      See Also:
      • Producer.flush()
  • Method Details

    • setBeanName

      public void setBeanName(String name)
      Specified by:
      setBeanName in interface BeanNameAware
    • setApplicationContext

      public void setApplicationContext(ApplicationContext applicationContext)
      Specified by:
      setApplicationContext in interface ApplicationContextAware
    • getDefaultTopic

      public String getDefaultTopic()
      The default topic for send methods where a topic is not provided.
      Returns:
      the topic.
    • setDefaultTopic

      public void setDefaultTopic(String defaultTopic)
      Set the default topic for send methods where a topic is not provided.
      Parameters:
      defaultTopic - the topic.
    • setProducerListener

      public void setProducerListener(@Nullable ProducerListener<K,V> producerListener)
      Set a ProducerListener which will be invoked when Kafka acknowledges a send operation. By default a LoggingProducerListener is configured which logs errors only.
      Parameters:
      producerListener - the listener; may be null.
    • getMessageConverter

      public RecordMessageConverter getMessageConverter()
      Return the message converter.
      Returns:
      the message converter.
    • setMessageConverter

      public void setMessageConverter(RecordMessageConverter messageConverter)
      Set the message converter to use.
      Parameters:
      messageConverter - the message converter.
    • setMessagingConverter

      public void setMessagingConverter(SmartMessageConverter messageConverter)
      Set the SmartMessageConverter to use with the default MessagingMessageConverter. Not allowed when a custom messageConverter is provided.
      Parameters:
      messageConverter - the converter.
      Since:
      2.7.1
    • isTransactional

      public boolean isTransactional()
      Description copied from interface: KafkaOperations
      Return true if the implementation supports transactions (has a transaction-capable producer factory).
      Specified by:
      isTransactional in interface KafkaOperations<K,V>
      Returns:
      true or false.
    • getTransactionIdPrefix

      public String getTransactionIdPrefix()
    • setTransactionIdPrefix

      public void setTransactionIdPrefix(String transactionIdPrefix)
      Set a transaction id prefix to override the prefix in the producer factory.
      Parameters:
      transactionIdPrefix - the prefix.
      Since:
      2.3
    • setCloseTimeout

      public void setCloseTimeout(Duration closeTimeout)
      Set the maximum time to wait when closing a producer; default 5 seconds.
      Parameters:
      closeTimeout - the close timeout.
      Since:
      2.1.14
    • setAllowNonTransactional

      public void setAllowNonTransactional(boolean allowNonTransactional)
      Set to true to allow a non-transactional send when the template is transactional.
      Parameters:
      allowNonTransactional - true to allow.
      Since:
      2.4.3
    • isAllowNonTransactional

      public boolean isAllowNonTransactional()
      Description copied from interface: KafkaOperations
      Return true if this template, when transactional, allows non-transactional operations.
      Specified by:
      isAllowNonTransactional in interface KafkaOperations<K,V>
      Returns:
      true to allow.
    • setMicrometerEnabled

      public void setMicrometerEnabled(boolean micrometerEnabled)
      Set to false to disable micrometer timers, if micrometer is on the class path.
      Parameters:
      micrometerEnabled - false to disable.
      Since:
      2.5
    • setMicrometerTags

      public void setMicrometerTags(@Nullable Map<String,String> tags)
      Set additional tags for the Micrometer listener timers.
      Parameters:
      tags - the tags.
      Since:
      2.5
    • setMicrometerTagsProvider

      public void setMicrometerTagsProvider(@Nullable Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,Map<String,String>> micrometerTagsProvider)
      Set a function to provide dynamic tags based on the producer record. These tags will be added to any static tags provided in micrometerTags. Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.
      Parameters:
      micrometerTagsProvider - the micrometerTagsProvider.
      Since:
      2.9.8
      See Also:
    • getMicrometerTagsProvider

      @Nullable public Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,Map<String,String>> getMicrometerTagsProvider()
      Return the Micrometer tags provider.
      Returns:
      the micrometerTagsProvider.
      Since:
      2.9.8
    • getProducerFactory

      public ProducerFactory<K,V> getProducerFactory()
      Return the producer factory used by this template.
      Specified by:
      getProducerFactory in interface KafkaOperations<K,V>
      Returns:
      the factory.
      Since:
      2.2.5
    • getProducerFactory

      protected ProducerFactory<K,V> getProducerFactory(String topic)
      Return the producer factory used by this template based on the topic. The default implementation returns the only producer factory.
      Parameters:
      topic - the topic.
      Returns:
      the factory.
      Since:
      2.5
    • setConsumerFactory

      public void setConsumerFactory(ConsumerFactory<K,V> consumerFactory)
      Set a consumer factory for receive operations.
      Parameters:
      consumerFactory - the consumer factory.
      Since:
      2.8
    • setProducerInterceptor

      public void setProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K,V> producerInterceptor)
      Set a producer interceptor on this template.
      Parameters:
      producerInterceptor - the producer interceptor
      Since:
      3.0
    • setObservationEnabled

      public void setObservationEnabled(boolean observationEnabled)
      Set to true to enable observation via Micrometer.
      Parameters:
      observationEnabled - true to enable.
      Since:
      3.0
      See Also:
    • setObservationConvention

      public void setObservationConvention(KafkaTemplateObservationConvention observationConvention)
      Parameters:
      observationConvention - the convention.
      Since:
      3.0
    • setObservationRegistry

      public void setObservationRegistry(io.micrometer.observation.ObservationRegistry observationRegistry)
      Configure the ObservationRegistry to use for recording observations.
      Parameters:
      observationRegistry - the observation registry to use.
      Since:
      3.3.1
    • getKafkaAdmin

      @Nullable public KafkaAdmin getKafkaAdmin()
      Return the KafkaAdmin, used to find the cluster id for observation, if present.
      Returns:
      the kafkaAdmin
      Since:
      3.0.5
    • setKafkaAdmin

      public void setKafkaAdmin(KafkaAdmin kafkaAdmin)
      Set the KafkaAdmin, used to find the cluster id for observation, if present.
      Parameters:
      kafkaAdmin - the admin.
    • afterSingletonsInstantiated

      public void afterSingletonsInstantiated()
      Specified by:
      afterSingletonsInstantiated in interface SmartInitializingSingleton
    • onApplicationEvent

      public void onApplicationEvent(ContextStoppedEvent event)
      Specified by:
      onApplicationEvent in interface ApplicationListener<K>
    • sendDefault

      public CompletableFuture<SendResult<K,V>> sendDefault(@Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the default topic with no key or partition.
      Specified by:
      sendDefault in interface KafkaOperations<K,V>
      Parameters:
      data - The data.
      Returns:
      a Future for the SendResult.
    • sendDefault

      public CompletableFuture<SendResult<K,V>> sendDefault(K key, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the default topic with the provided key and no partition.
      Specified by:
      sendDefault in interface KafkaOperations<K,V>
      Parameters:
      key - the key.
      data - The data.
      Returns:
      a Future for the SendResult.
    • sendDefault

      public CompletableFuture<SendResult<K,V>> sendDefault(Integer partition, K key, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the default topic with the provided key and partition.
      Specified by:
      sendDefault in interface KafkaOperations<K,V>
      Parameters:
      partition - the partition.
      key - the key.
      data - the data.
      Returns:
      a Future for the SendResult.
    • sendDefault

      public CompletableFuture<SendResult<K,V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the default topic with the provided key and partition.
      Specified by:
      sendDefault in interface KafkaOperations<K,V>
      Parameters:
      partition - the partition.
      timestamp - the timestamp of the record.
      key - the key.
      data - the data.
      Returns:
      a Future for the SendResult.
    • send

      public CompletableFuture<SendResult<K,V>> send(String topic, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the provided topic with no key or partition.
      Specified by:
      send in interface KafkaOperations<K,V>
      Parameters:
      topic - the topic.
      data - The data.
      Returns:
      a Future for the SendResult.
    • send

      public CompletableFuture<SendResult<K,V>> send(String topic, K key, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the provided topic with the provided key and no partition.
      Specified by:
      send in interface KafkaOperations<K,V>
      Parameters:
      topic - the topic.
      key - the key.
      data - The data.
      Returns:
      a Future for the SendResult.
    • send

      public CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, K key, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the provided topic with the provided key and partition.
      Specified by:
      send in interface KafkaOperations<K,V>
      Parameters:
      topic - the topic.
      partition - the partition.
      key - the key.
      data - the data.
      Returns:
      a Future for the SendResult.
    • send

      public CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)
      Description copied from interface: KafkaOperations
      Send the data to the provided topic with the provided key and partition.
      Specified by:
      send in interface KafkaOperations<K,V>
      Parameters:
      topic - the topic.
      partition - the partition.
      timestamp - the timestamp of the record.
      key - the key.
      data - the data.
      Returns:
      a Future for the SendResult.
    • send

      public CompletableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
      Description copied from interface: KafkaOperations
      Send the provided ProducerRecord.
      Specified by:
      send in interface KafkaOperations<K,V>
      Parameters:
      record - the record.
      Returns:
      a Future for the SendResult.
    • send

      public CompletableFuture<SendResult<K,V>> send(Message<?> message)
      Description copied from interface: KafkaOperations
      Send a message with routing information in message headers. The message payload may be converted before sending.
      Specified by:
      send in interface KafkaOperations<K,V>
      Parameters:
      message - the message to send.
      Returns:
      a Future for the SendResult.
      See Also:
    • partitionsFor

      public List<org.apache.kafka.common.PartitionInfo> partitionsFor(String topic)
      Description copied from interface: KafkaOperations
      See Producer.partitionsFor(String).
      Specified by:
      partitionsFor in interface KafkaOperations<K,V>
      Parameters:
      topic - the topic.
      Returns:
      the partition info.
    • metrics

      public Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
      Description copied from interface: KafkaOperations
      See Producer.metrics().
      Specified by:
      metrics in interface KafkaOperations<K,V>
      Returns:
      the metrics.
    • execute

      public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
      Description copied from interface: KafkaOperations
      Execute some arbitrary operation(s) on the producer and return the result.
      Specified by:
      execute in interface KafkaOperations<K,V>
      Type Parameters:
      T - the result type.
      Parameters:
      callback - the callback.
      Returns:
      the result.
    • executeInTransaction

      public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
      Description copied from interface: KafkaOperations
      Execute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).
      Specified by:
      executeInTransaction in interface KafkaOperations<K,V>
      Type Parameters:
      T - the result type.
      Parameters:
      callback - the callback.
      Returns:
      the result.
    • flush

      public void flush()
      Flush the producer.

      Note It only makes sense to invoke this method if the ProducerFactory serves up a singleton producer (such as the DefaultKafkaProducerFactory).

      Specified by:
      flush in interface KafkaOperations<K,V>
    • sendOffsetsToTransaction

      public void sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
      Description copied from interface: KafkaOperations
      When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with a KafkaAwareTransactionManager) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.
      Specified by:
      sendOffsetsToTransaction in interface KafkaOperations<K,V>
      Parameters:
      offsets - The offsets.
      groupMetadata - the consumer group metadata.
      See Also:
      • Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
    • receive

      @Nullable public org.apache.kafka.clients.consumer.ConsumerRecord<K,V> receive(String topic, int partition, long offset, Duration pollTimeout)
      Description copied from interface: KafkaOperations
      Receive a single record.
      Specified by:
      receive in interface KafkaOperations<K,V>
      Parameters:
      topic - the topic.
      partition - the partition.
      offset - the offset.
      pollTimeout - the timeout.
      Returns:
      the record or null.
    • receive

      public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout)
      Description copied from interface: KafkaOperations
      Receive multiple records. Only absolute, positive offsets are supported.
      Specified by:
      receive in interface KafkaOperations<K,V>
      Parameters:
      requested - a collection of record requests (topic/partition/offset).
      pollTimeout - the timeout.
      Returns:
      the record or null.
    • closeProducer

      protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inTx)
    • doSend

      protected CompletableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, io.micrometer.observation.Observation observation)
      Send the producer record.
      Parameters:
      producerRecord - the producer record.
      observation - the observation.
      Returns:
      a Future for the RecordMetadata.
    • inTransaction

      public boolean inTransaction()
      Return true if the template is currently running in a transaction on the calling thread.
      Specified by:
      inTransaction in interface KafkaOperations<K,V>
      Returns:
      true if a transaction is running.
      Since:
      2.2.1
    • getTheProducer

      protected org.apache.kafka.clients.producer.Producer<K,V> getTheProducer(@Nullable String topic)
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean