Class KafkaTemplate<K,V> 
- Type Parameters:
- K- the key type.
- V- the value type.
- All Implemented Interfaces:
- EventListener,- Aware,- BeanNameAware,- DisposableBean,- SmartInitializingSingleton,- ApplicationContextAware,- ApplicationListener<ContextStoppedEvent>,- KafkaOperations<K,- V> 
- Direct Known Subclasses:
- ReplyingKafkaTemplate,- RoutingKafkaTemplate
DefaultKafkaProducerFactory, the template is thread-safe. The producer factory
 and KafkaProducer ensure this; refer to their
 respective javadocs.- Author:
- Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Soby Chacko, Gurps Bassi
- 
Nested Class SummaryNested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperationsKafkaOperations.OperationsCallback<K,V, T>, KafkaOperations.ProducerCallback<K, V, T> 
- 
Field SummaryFieldsFields inherited from interface org.springframework.kafka.core.KafkaOperationsDEFAULT_POLL_TIMEOUT
- 
Constructor SummaryConstructorsConstructorDescriptionKafkaTemplate(ProducerFactory<K, V> producerFactory) Create an instance using the supplied producer factory and autoFlush false.KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) Create an instance using the supplied producer factory and properties, with autoFlush false.
- 
Method SummaryModifier and TypeMethodDescriptionvoidprotected voidcloseProducer(org.apache.kafka.clients.producer.Producer<K, V> producer, boolean inTx) voiddestroy()protected CompletableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, io.micrometer.observation.Observation observation) Send the producer record.<T> Texecute(KafkaOperations.ProducerCallback<K, V, T> callback) Execute some arbitrary operation(s) on the producer and return the result.<T> TexecuteInTransaction(KafkaOperations.OperationsCallback<K, V, T> callback) Execute some arbitrary operation(s) on the operations and return the result.voidflush()Flush the producer.The default topic for send methods where a topic is not provided.Return theKafkaAdmin, used to find the cluster id for observation, if present.Return the message converter.Return the Micrometer tags provider.Return the producer factory used by this template.protected ProducerFactory<K,V> getProducerFactory(String topic) Return the producer factory used by this template based on the topic.getTheProducer(String topic) booleanReturn true if the template is currently running in a transaction on the calling thread.booleanReturn true if this template, when transactional, allows non-transactional operations.booleanReturn true if the implementation supports transactions (has a transaction-capable producer factory).Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()SeeProducer.metrics().voidList<org.apache.kafka.common.PartitionInfo>partitionsFor(String topic) SeeProducer.partitionsFor(String).Receive a single record.receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) Receive multiple records.Send the data to the provided topic with the provided key and partition.Send the data to the provided topic with the provided key and partition.Send the data to the provided topic with the provided key and no partition.Send the data to the provided topic with no key or partition.Send the providedProducerRecord.Send a message with routing information in message headers.sendDefault(Integer partition, Long timestamp, K key, V data) Send the data to the default topic with the provided key and partition.sendDefault(Integer partition, K key, V data) Send the data to the default topic with the provided key and partition.sendDefault(K key, V data) Send the data to the default topic with the provided key and no partition.sendDefault(V data) Send the data to the default topic with no key or partition.voidsendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata) When running in a transaction, send the consumer offset(s) to the transaction.voidsetAllowNonTransactional(boolean allowNonTransactional) Set to true to allow a non-transactional send when the template is transactional.voidsetApplicationContext(ApplicationContext applicationContext) voidsetBeanName(String name) voidsetCloseTimeout(Duration closeTimeout) Set the maximum time to wait when closing a producer; default 5 seconds.voidsetConsumerFactory(ConsumerFactory<K, V> consumerFactory) Set a consumer factory for receive operations.voidsetDefaultTopic(String defaultTopic) Set the default topic for send methods where a topic is not provided.voidsetKafkaAdmin(KafkaAdmin kafkaAdmin) Set theKafkaAdmin, used to find the cluster id for observation, if present.voidsetMessageConverter(RecordMessageConverter messageConverter) Set the message converter to use.voidsetMessagingConverter(SmartMessageConverter messageConverter) Set theSmartMessageConverterto use with the defaultMessagingMessageConverter.voidsetMicrometerEnabled(boolean micrometerEnabled) Set tofalseto disable micrometer timers, if micrometer is on the class path.voidsetMicrometerTags(Map<String, String> tags) Set additional tags for the Micrometer listener timers.voidsetMicrometerTagsProvider(Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) Set a function to provide dynamic tags based on the producer record.voidsetObservationConvention(KafkaTemplateObservationConvention observationConvention) Set a customKafkaTemplateObservationConvention.voidsetObservationEnabled(boolean observationEnabled) Set to true to enable observation via Micrometer.voidsetProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor) Set a producer interceptor on this template.voidsetProducerListener(ProducerListener<K, V> producerListener) Set aProducerListenerwhich will be invoked when Kafka acknowledges a send operation.voidsetTransactionIdPrefix(String transactionIdPrefix) Set a transaction id prefix to override the prefix in the producer factory.Methods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.context.ApplicationListenersupportsAsyncExecutionMethods inherited from interface org.springframework.kafka.core.KafkaOperationsreceive, receive
- 
Field Details- 
logger
 
- 
- 
Constructor Details- 
KafkaTemplateCreate an instance using the supplied producer factory and autoFlush false.- Parameters:
- producerFactory- the producer factory.
 
- 
KafkaTemplatepublic KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a newDefaultKafkaProducerFactorywill be created with merged producer properties with the overrides being applied after the supplied factory's properties.- Parameters:
- producerFactory- the producer factory.
- configOverrides- producer configuration properties to override.
- Since:
- 2.5
 
- 
KafkaTemplateCreate an instance using the supplied producer factory and autoFlush setting.Set autoFlush to trueif you wish for the send operations on this template to occur immediately, regardless of thelinger.msorbatch.sizeproperty values. This will also block until the broker has acknowledged receipt according to the producer'sacksproperty.- Parameters:
- producerFactory- the producer factory.
- autoFlush- true to flush after each send.
- See Also:
- 
- Producer.flush()
 
 
- 
KafkaTemplatepublic KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to trueif you wish for the send operations on this template to occur immediately, regardless of thelinger.msorbatch.sizeproperty values. This will also block until the broker has acknowledged receipt according to the producer'sacksproperty. If the configOverrides is not null or empty, a newProducerFactorywill be created usingProducerFactory.copyWithConfigurationOverride(java.util.Map)The factory shall apply the overrides after the supplied factory's properties. TheProducerPostProcessors from the original factory are copied over to keep instrumentation alive. RegisteredProducerFactory.Listeners are also added to the new factory. If the factory implementation does not support the copy operation, a generic copy of the ProducerFactory is created which will be of type DefaultKafkaProducerFactory.- Parameters:
- producerFactory- the producer factory.
- autoFlush- true to flush after each send.
- configOverrides- producer configuration properties to override.
- Since:
- 2.5
- See Also:
- 
- Producer.flush()
 
 
 
- 
- 
Method Details- 
setBeanName- Specified by:
- setBeanNamein interface- BeanNameAware
 
- 
setApplicationContext- Specified by:
- setApplicationContextin interface- ApplicationContextAware
 
- 
getDefaultTopicThe default topic for send methods where a topic is not provided.- Returns:
- the topic.
 
- 
setDefaultTopicSet the default topic for send methods where a topic is not provided.- Parameters:
- defaultTopic- the topic.
 
- 
setProducerListenerSet aProducerListenerwhich will be invoked when Kafka acknowledges a send operation. By default aLoggingProducerListeneris configured which logs errors only.- Parameters:
- producerListener- the listener; may be- null.
 
- 
getMessageConverterReturn the message converter.- Returns:
- the message converter.
 
- 
setMessageConverterSet the message converter to use.- Parameters:
- messageConverter- the message converter.
 
- 
setMessagingConverterSet theSmartMessageConverterto use with the defaultMessagingMessageConverter. Not allowed when a custommessageConverteris provided.- Parameters:
- messageConverter- the converter.
- Since:
- 2.7.1
 
- 
isTransactionalpublic boolean isTransactional()Description copied from interface:KafkaOperationsReturn true if the implementation supports transactions (has a transaction-capable producer factory).- Specified by:
- isTransactionalin interface- KafkaOperations<K,- V> 
- Returns:
- true or false.
 
- 
getTransactionIdPrefix
- 
setTransactionIdPrefixSet a transaction id prefix to override the prefix in the producer factory.- Parameters:
- transactionIdPrefix- the prefix.
- Since:
- 2.3
 
- 
setCloseTimeoutSet the maximum time to wait when closing a producer; default 5 seconds.- Parameters:
- closeTimeout- the close timeout.
- Since:
- 2.1.14
 
- 
setAllowNonTransactionalpublic void setAllowNonTransactional(boolean allowNonTransactional) Set to true to allow a non-transactional send when the template is transactional.- Parameters:
- allowNonTransactional- true to allow.
- Since:
- 2.4.3
 
- 
isAllowNonTransactionalpublic boolean isAllowNonTransactional()Description copied from interface:KafkaOperationsReturn true if this template, when transactional, allows non-transactional operations.- Specified by:
- isAllowNonTransactionalin interface- KafkaOperations<K,- V> 
- Returns:
- true to allow.
 
- 
setMicrometerEnabledpublic void setMicrometerEnabled(boolean micrometerEnabled) Set tofalseto disable micrometer timers, if micrometer is on the class path.- Parameters:
- micrometerEnabled- false to disable.
- Since:
- 2.5
 
- 
setMicrometerTagsSet additional tags for the Micrometer listener timers.- Parameters:
- tags- the tags.
- Since:
- 2.5
 
- 
setMicrometerTagsProviderpublic void setMicrometerTagsProvider(@Nullable Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) Set a function to provide dynamic tags based on the producer record. These tags will be added to any static tags provided inmicrometerTags. Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.- Parameters:
- micrometerTagsProvider- the micrometerTagsProvider.
- Since:
- 2.9.8
- See Also:
 
- 
getMicrometerTagsProvider@Nullable public Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>, getMicrometerTagsProvider()Map<String, String>> Return the Micrometer tags provider.- Returns:
- the micrometerTagsProvider.
- Since:
- 2.9.8
 
- 
getProducerFactoryReturn the producer factory used by this template.- Specified by:
- getProducerFactoryin interface- KafkaOperations<K,- V> 
- Returns:
- the factory.
- Since:
- 2.2.5
 
- 
getProducerFactoryReturn the producer factory used by this template based on the topic. The default implementation returns the only producer factory.- Parameters:
- topic- the topic.
- Returns:
- the factory.
- Since:
- 2.5
 
- 
setConsumerFactorySet a consumer factory for receive operations.- Parameters:
- consumerFactory- the consumer factory.
- Since:
- 2.8
 
- 
setProducerInterceptorpublic void setProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor) Set a producer interceptor on this template.- Parameters:
- producerInterceptor- the producer interceptor
- Since:
- 3.0
 
- 
setObservationEnabledpublic void setObservationEnabled(boolean observationEnabled) Set to true to enable observation via Micrometer.- Parameters:
- observationEnabled- true to enable.
- Since:
- 3.0
- See Also:
 
- 
setObservationConventionSet a customKafkaTemplateObservationConvention.- Parameters:
- observationConvention- the convention.
- Since:
- 3.0
 
- 
getKafkaAdminReturn theKafkaAdmin, used to find the cluster id for observation, if present.- Returns:
- the kafkaAdmin
- Since:
- 3.0.5
 
- 
setKafkaAdminSet theKafkaAdmin, used to find the cluster id for observation, if present.- Parameters:
- kafkaAdmin- the admin.
 
- 
afterSingletonsInstantiatedpublic void afterSingletonsInstantiated()- Specified by:
- afterSingletonsInstantiatedin interface- SmartInitializingSingleton
 
- 
onApplicationEvent- Specified by:
- onApplicationEventin interface- ApplicationListener<K>
 
- 
sendDefaultDescription copied from interface:KafkaOperationsSend the data to the default topic with no key or partition.- Specified by:
- sendDefaultin interface- KafkaOperations<K,- V> 
- Parameters:
- data- The data.
- Returns:
- a Future for the SendResult.
 
- 
sendDefaultDescription copied from interface:KafkaOperationsSend the data to the default topic with the provided key and no partition.- Specified by:
- sendDefaultin interface- KafkaOperations<K,- V> 
- Parameters:
- key- the key.
- data- The data.
- Returns:
- a Future for the SendResult.
 
- 
sendDefaultDescription copied from interface:KafkaOperationsSend the data to the default topic with the provided key and partition.- Specified by:
- sendDefaultin interface- KafkaOperations<K,- V> 
- Parameters:
- partition- the partition.
- key- the key.
- data- the data.
- Returns:
- a Future for the SendResult.
 
- 
sendDefaultpublic CompletableFuture<SendResult<K,V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) Description copied from interface:KafkaOperationsSend the data to the default topic with the provided key and partition.- Specified by:
- sendDefaultin interface- KafkaOperations<K,- V> 
- Parameters:
- partition- the partition.
- timestamp- the timestamp of the record.
- key- the key.
- data- the data.
- Returns:
- a Future for the SendResult.
 
- 
sendDescription copied from interface:KafkaOperationsSend the data to the provided topic with no key or partition.- Specified by:
- sendin interface- KafkaOperations<K,- V> 
- Parameters:
- topic- the topic.
- data- The data.
- Returns:
- a Future for the SendResult.
 
- 
sendDescription copied from interface:KafkaOperationsSend the data to the provided topic with the provided key and no partition.- Specified by:
- sendin interface- KafkaOperations<K,- V> 
- Parameters:
- topic- the topic.
- key- the key.
- data- The data.
- Returns:
- a Future for the SendResult.
 
- 
sendpublic CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, K key, @Nullable V data) Description copied from interface:KafkaOperationsSend the data to the provided topic with the provided key and partition.- Specified by:
- sendin interface- KafkaOperations<K,- V> 
- Parameters:
- topic- the topic.
- partition- the partition.
- key- the key.
- data- the data.
- Returns:
- a Future for the SendResult.
 
- 
sendpublic CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) Description copied from interface:KafkaOperationsSend the data to the provided topic with the provided key and partition.- Specified by:
- sendin interface- KafkaOperations<K,- V> 
- Parameters:
- topic- the topic.
- partition- the partition.
- timestamp- the timestamp of the record.
- key- the key.
- data- the data.
- Returns:
- a Future for the SendResult.
 
- 
sendpublic CompletableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K, V> record) Description copied from interface:KafkaOperationsSend the providedProducerRecord.- Specified by:
- sendin interface- KafkaOperations<K,- V> 
- Parameters:
- record- the record.
- Returns:
- a Future for the SendResult.
 
- 
sendDescription copied from interface:KafkaOperationsSend a message with routing information in message headers. The message payload may be converted before sending.- Specified by:
- sendin interface- KafkaOperations<K,- V> 
- Parameters:
- message- the message to send.
- Returns:
- a Future for the SendResult.
- See Also:
 
- 
partitionsForDescription copied from interface:KafkaOperationsSeeProducer.partitionsFor(String).- Specified by:
- partitionsForin interface- KafkaOperations<K,- V> 
- Parameters:
- topic- the topic.
- Returns:
- the partition info.
 
- 
metricsDescription copied from interface:KafkaOperationsSeeProducer.metrics().- Specified by:
- metricsin interface- KafkaOperations<K,- V> 
- Returns:
- the metrics.
 
- 
executeDescription copied from interface:KafkaOperationsExecute some arbitrary operation(s) on the producer and return the result.- Specified by:
- executein interface- KafkaOperations<K,- V> 
- Type Parameters:
- T- the result type.
- Parameters:
- callback- the callback.
- Returns:
- the result.
 
- 
executeInTransactionDescription copied from interface:KafkaOperationsExecute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).- Specified by:
- executeInTransactionin interface- KafkaOperations<K,- V> 
- Type Parameters:
- T- the result type.
- Parameters:
- callback- the callback.
- Returns:
- the result.
 
- 
flushpublic void flush()Flush the producer.Note It only makes sense to invoke this method if the ProducerFactoryserves up a singleton producer (such as theDefaultKafkaProducerFactory).- Specified by:
- flushin interface- KafkaOperations<K,- V> 
 
- 
sendOffsetsToTransactionpublic void sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata) Description copied from interface:KafkaOperationsWhen running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with aKafkaAwareTransactionManager) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.- Specified by:
- sendOffsetsToTransactionin interface- KafkaOperations<K,- V> 
- Parameters:
- offsets- The offsets.
- groupMetadata- the consumer group metadata.
- See Also:
- 
- Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
 
 
- 
receive@Nullable public org.apache.kafka.clients.consumer.ConsumerRecord<K,V> receive(String topic, int partition, long offset, Duration pollTimeout) Description copied from interface:KafkaOperationsReceive a single record.- Specified by:
- receivein interface- KafkaOperations<K,- V> 
- Parameters:
- topic- the topic.
- partition- the partition.
- offset- the offset.
- pollTimeout- the timeout.
- Returns:
- the record or null.
 
- 
receivepublic org.apache.kafka.clients.consumer.ConsumerRecords<K,V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) Description copied from interface:KafkaOperationsReceive multiple records. Only absolute, positive offsets are supported.- Specified by:
- receivein interface- KafkaOperations<K,- V> 
- Parameters:
- requested- a collection of record requests (topic/partition/offset).
- pollTimeout- the timeout.
- Returns:
- the record or null.
 
- 
closeProducer
- 
doSendprotected CompletableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, io.micrometer.observation.Observation observation) Send the producer record.- Parameters:
- producerRecord- the producer record.
- observation- the observation.
- Returns:
- a Future for the RecordMetadata.
 
- 
inTransactionpublic boolean inTransaction()Return true if the template is currently running in a transaction on the calling thread.- Specified by:
- inTransactionin interface- KafkaOperations<K,- V> 
- Returns:
- true if a transaction is running.
- Since:
- 2.2.1
 
- 
getTheProducer
- 
destroypublic void destroy()- Specified by:
- destroyin interface- DisposableBean
 
 
-