Class KafkaTemplate<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
EventListener,Aware,BeanNameAware,DisposableBean,SmartInitializingSingleton,ApplicationContextAware,ApplicationListener<ContextStoppedEvent>,KafkaOperations<K,V>
- Direct Known Subclasses:
ReplyingKafkaTemplate,RoutingKafkaTemplate
DefaultKafkaProducerFactory, the template is thread-safe. The producer factory
and KafkaProducer ensure this; refer to their
respective javadocs.- Author:
- Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Soby Chacko, Gurps Bassi
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,V, T>, KafkaOperations.ProducerCallback<K, V, T> -
Field Summary
FieldsFields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT -
Constructor Summary
ConstructorsConstructorDescriptionKafkaTemplate(ProducerFactory<K, V> producerFactory) Create an instance using the supplied producer factory and autoFlush false.KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) Create an instance using the supplied producer factory and properties, with autoFlush false. -
Method Summary
Modifier and TypeMethodDescriptionvoidprotected voidcloseProducer(org.apache.kafka.clients.producer.Producer<K, V> producer, boolean inTx) voiddestroy()protected CompletableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, io.micrometer.observation.Observation observation) Send the producer record.<T> Texecute(KafkaOperations.ProducerCallback<K, V, T> callback) Execute some arbitrary operation(s) on the producer and return the result.<T> TexecuteInTransaction(KafkaOperations.OperationsCallback<K, V, T> callback) Execute some arbitrary operation(s) on the operations and return the result.voidflush()Flush the producer.The default topic for send methods where a topic is not provided.Return theKafkaAdmin, used to find the cluster id for observation, if present.Return the message converter.Return the Micrometer tags provider.Return the producer factory used by this template.protected ProducerFactory<K,V> getProducerFactory(String topic) Return the producer factory used by this template based on the topic.getTheProducer(String topic) booleanReturn true if the template is currently running in a transaction on the calling thread.booleanReturn true if this template, when transactional, allows non-transactional operations.booleanReturn true if the implementation supports transactions (has a transaction-capable producer factory).Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()SeeProducer.metrics().voidList<org.apache.kafka.common.PartitionInfo>partitionsFor(String topic) SeeProducer.partitionsFor(String).Receive a single record.receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) Receive multiple records.Send the data to the provided topic with the provided key and partition.Send the data to the provided topic with the provided key and partition.Send the data to the provided topic with the provided key and no partition.Send the data to the provided topic with no key or partition.Send the providedProducerRecord.Send a message with routing information in message headers.sendDefault(Integer partition, Long timestamp, K key, V data) Send the data to the default topic with the provided key and partition.sendDefault(Integer partition, K key, V data) Send the data to the default topic with the provided key and partition.sendDefault(K key, V data) Send the data to the default topic with the provided key and no partition.sendDefault(V data) Send the data to the default topic with no key or partition.voidsendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata) When running in a transaction, send the consumer offset(s) to the transaction.voidsetAllowNonTransactional(boolean allowNonTransactional) Set to true to allow a non-transactional send when the template is transactional.voidsetApplicationContext(ApplicationContext applicationContext) voidsetBeanName(String name) voidsetCloseTimeout(Duration closeTimeout) Set the maximum time to wait when closing a producer; default 5 seconds.voidsetConsumerFactory(ConsumerFactory<K, V> consumerFactory) Set a consumer factory for receive operations.voidsetDefaultTopic(String defaultTopic) Set the default topic for send methods where a topic is not provided.voidsetKafkaAdmin(KafkaAdmin kafkaAdmin) Set theKafkaAdmin, used to find the cluster id for observation, if present.voidsetMessageConverter(RecordMessageConverter messageConverter) Set the message converter to use.voidsetMessagingConverter(SmartMessageConverter messageConverter) Set theSmartMessageConverterto use with the defaultMessagingMessageConverter.voidsetMicrometerEnabled(boolean micrometerEnabled) Set tofalseto disable micrometer timers, if micrometer is on the class path.voidsetMicrometerTags(Map<String, String> tags) Set additional tags for the Micrometer listener timers.voidsetMicrometerTagsProvider(Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) Set a function to provide dynamic tags based on the producer record.voidsetObservationConvention(KafkaTemplateObservationConvention observationConvention) Set a customKafkaTemplateObservationConvention.voidsetObservationEnabled(boolean observationEnabled) Set to true to enable observation via Micrometer.voidsetProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor) Set a producer interceptor on this template.voidsetProducerListener(ProducerListener<K, V> producerListener) Set aProducerListenerwhich will be invoked when Kafka acknowledges a send operation.voidsetTransactionIdPrefix(String transactionIdPrefix) Set a transaction id prefix to override the prefix in the producer factory.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecutionMethods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive
-
Field Details
-
logger
-
-
Constructor Details
-
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush false.- Parameters:
producerFactory- the producer factory.
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a newDefaultKafkaProducerFactorywill be created with merged producer properties with the overrides being applied after the supplied factory's properties.- Parameters:
producerFactory- the producer factory.configOverrides- producer configuration properties to override.- Since:
- 2.5
-
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to
trueif you wish for the send operations on this template to occur immediately, regardless of thelinger.msorbatch.sizeproperty values. This will also block until the broker has acknowledged receipt according to the producer'sacksproperty.- Parameters:
producerFactory- the producer factory.autoFlush- true to flush after each send.- See Also:
-
Producer.flush()
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to
trueif you wish for the send operations on this template to occur immediately, regardless of thelinger.msorbatch.sizeproperty values. This will also block until the broker has acknowledged receipt according to the producer'sacksproperty. If the configOverrides is not null or empty, a newProducerFactorywill be created usingProducerFactory.copyWithConfigurationOverride(java.util.Map)The factory shall apply the overrides after the supplied factory's properties. TheProducerPostProcessors from the original factory are copied over to keep instrumentation alive. RegisteredProducerFactory.Listeners are also added to the new factory. If the factory implementation does not support the copy operation, a generic copy of the ProducerFactory is created which will be of type DefaultKafkaProducerFactory.- Parameters:
producerFactory- the producer factory.autoFlush- true to flush after each send.configOverrides- producer configuration properties to override.- Since:
- 2.5
- See Also:
-
Producer.flush()
-
-
Method Details
-
setBeanName
- Specified by:
setBeanNamein interfaceBeanNameAware
-
setApplicationContext
- Specified by:
setApplicationContextin interfaceApplicationContextAware
-
getDefaultTopic
The default topic for send methods where a topic is not provided.- Returns:
- the topic.
-
setDefaultTopic
Set the default topic for send methods where a topic is not provided.- Parameters:
defaultTopic- the topic.
-
setProducerListener
Set aProducerListenerwhich will be invoked when Kafka acknowledges a send operation. By default aLoggingProducerListeneris configured which logs errors only.- Parameters:
producerListener- the listener; may benull.
-
getMessageConverter
Return the message converter.- Returns:
- the message converter.
-
setMessageConverter
Set the message converter to use.- Parameters:
messageConverter- the message converter.
-
setMessagingConverter
Set theSmartMessageConverterto use with the defaultMessagingMessageConverter. Not allowed when a custommessageConverteris provided.- Parameters:
messageConverter- the converter.- Since:
- 2.7.1
-
isTransactional
public boolean isTransactional()Description copied from interface:KafkaOperationsReturn true if the implementation supports transactions (has a transaction-capable producer factory).- Specified by:
isTransactionalin interfaceKafkaOperations<K,V> - Returns:
- true or false.
-
getTransactionIdPrefix
-
setTransactionIdPrefix
Set a transaction id prefix to override the prefix in the producer factory.- Parameters:
transactionIdPrefix- the prefix.- Since:
- 2.3
-
setCloseTimeout
Set the maximum time to wait when closing a producer; default 5 seconds.- Parameters:
closeTimeout- the close timeout.- Since:
- 2.1.14
-
setAllowNonTransactional
public void setAllowNonTransactional(boolean allowNonTransactional) Set to true to allow a non-transactional send when the template is transactional.- Parameters:
allowNonTransactional- true to allow.- Since:
- 2.4.3
-
isAllowNonTransactional
public boolean isAllowNonTransactional()Description copied from interface:KafkaOperationsReturn true if this template, when transactional, allows non-transactional operations.- Specified by:
isAllowNonTransactionalin interfaceKafkaOperations<K,V> - Returns:
- true to allow.
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled) Set tofalseto disable micrometer timers, if micrometer is on the class path.- Parameters:
micrometerEnabled- false to disable.- Since:
- 2.5
-
setMicrometerTags
Set additional tags for the Micrometer listener timers.- Parameters:
tags- the tags.- Since:
- 2.5
-
setMicrometerTagsProvider
public void setMicrometerTagsProvider(@Nullable Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) Set a function to provide dynamic tags based on the producer record. These tags will be added to any static tags provided inmicrometerTags. Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.- Parameters:
micrometerTagsProvider- the micrometerTagsProvider.- Since:
- 2.9.8
- See Also:
-
getMicrometerTagsProvider
@Nullable public Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>, getMicrometerTagsProvider()Map<String, String>> Return the Micrometer tags provider.- Returns:
- the micrometerTagsProvider.
- Since:
- 2.9.8
-
getProducerFactory
Return the producer factory used by this template.- Specified by:
getProducerFactoryin interfaceKafkaOperations<K,V> - Returns:
- the factory.
- Since:
- 2.2.5
-
getProducerFactory
Return the producer factory used by this template based on the topic. The default implementation returns the only producer factory.- Parameters:
topic- the topic.- Returns:
- the factory.
- Since:
- 2.5
-
setConsumerFactory
Set a consumer factory for receive operations.- Parameters:
consumerFactory- the consumer factory.- Since:
- 2.8
-
setProducerInterceptor
public void setProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor) Set a producer interceptor on this template.- Parameters:
producerInterceptor- the producer interceptor- Since:
- 3.0
-
setObservationEnabled
public void setObservationEnabled(boolean observationEnabled) Set to true to enable observation via Micrometer.- Parameters:
observationEnabled- true to enable.- Since:
- 3.0
- See Also:
-
setObservationConvention
Set a customKafkaTemplateObservationConvention.- Parameters:
observationConvention- the convention.- Since:
- 3.0
-
getKafkaAdmin
Return theKafkaAdmin, used to find the cluster id for observation, if present.- Returns:
- the kafkaAdmin
- Since:
- 3.0.5
-
setKafkaAdmin
Set theKafkaAdmin, used to find the cluster id for observation, if present.- Parameters:
kafkaAdmin- the admin.
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()- Specified by:
afterSingletonsInstantiatedin interfaceSmartInitializingSingleton
-
onApplicationEvent
- Specified by:
onApplicationEventin interfaceApplicationListener<K>
-
sendDefault
Description copied from interface:KafkaOperationsSend the data to the default topic with no key or partition.- Specified by:
sendDefaultin interfaceKafkaOperations<K,V> - Parameters:
data- The data.- Returns:
- a Future for the
SendResult.
-
sendDefault
Description copied from interface:KafkaOperationsSend the data to the default topic with the provided key and no partition.- Specified by:
sendDefaultin interfaceKafkaOperations<K,V> - Parameters:
key- the key.data- The data.- Returns:
- a Future for the
SendResult.
-
sendDefault
Description copied from interface:KafkaOperationsSend the data to the default topic with the provided key and partition.- Specified by:
sendDefaultin interfaceKafkaOperations<K,V> - Parameters:
partition- the partition.key- the key.data- the data.- Returns:
- a Future for the
SendResult.
-
sendDefault
public CompletableFuture<SendResult<K,V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) Description copied from interface:KafkaOperationsSend the data to the default topic with the provided key and partition.- Specified by:
sendDefaultin interfaceKafkaOperations<K,V> - Parameters:
partition- the partition.timestamp- the timestamp of the record.key- the key.data- the data.- Returns:
- a Future for the
SendResult.
-
send
Description copied from interface:KafkaOperationsSend the data to the provided topic with no key or partition.- Specified by:
sendin interfaceKafkaOperations<K,V> - Parameters:
topic- the topic.data- The data.- Returns:
- a Future for the
SendResult.
-
send
Description copied from interface:KafkaOperationsSend the data to the provided topic with the provided key and no partition.- Specified by:
sendin interfaceKafkaOperations<K,V> - Parameters:
topic- the topic.key- the key.data- The data.- Returns:
- a Future for the
SendResult.
-
send
public CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, K key, @Nullable V data) Description copied from interface:KafkaOperationsSend the data to the provided topic with the provided key and partition.- Specified by:
sendin interfaceKafkaOperations<K,V> - Parameters:
topic- the topic.partition- the partition.key- the key.data- the data.- Returns:
- a Future for the
SendResult.
-
send
public CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) Description copied from interface:KafkaOperationsSend the data to the provided topic with the provided key and partition.- Specified by:
sendin interfaceKafkaOperations<K,V> - Parameters:
topic- the topic.partition- the partition.timestamp- the timestamp of the record.key- the key.data- the data.- Returns:
- a Future for the
SendResult.
-
send
public CompletableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K, V> record) Description copied from interface:KafkaOperationsSend the providedProducerRecord.- Specified by:
sendin interfaceKafkaOperations<K,V> - Parameters:
record- the record.- Returns:
- a Future for the
SendResult.
-
send
Description copied from interface:KafkaOperationsSend a message with routing information in message headers. The message payload may be converted before sending.- Specified by:
sendin interfaceKafkaOperations<K,V> - Parameters:
message- the message to send.- Returns:
- a Future for the
SendResult. - See Also:
-
partitionsFor
Description copied from interface:KafkaOperationsSeeProducer.partitionsFor(String).- Specified by:
partitionsForin interfaceKafkaOperations<K,V> - Parameters:
topic- the topic.- Returns:
- the partition info.
-
metrics
Description copied from interface:KafkaOperationsSeeProducer.metrics().- Specified by:
metricsin interfaceKafkaOperations<K,V> - Returns:
- the metrics.
-
execute
Description copied from interface:KafkaOperationsExecute some arbitrary operation(s) on the producer and return the result.- Specified by:
executein interfaceKafkaOperations<K,V> - Type Parameters:
T- the result type.- Parameters:
callback- the callback.- Returns:
- the result.
-
executeInTransaction
Description copied from interface:KafkaOperationsExecute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).- Specified by:
executeInTransactionin interfaceKafkaOperations<K,V> - Type Parameters:
T- the result type.- Parameters:
callback- the callback.- Returns:
- the result.
-
flush
public void flush()Flush the producer.Note It only makes sense to invoke this method if the
ProducerFactoryserves up a singleton producer (such as theDefaultKafkaProducerFactory).- Specified by:
flushin interfaceKafkaOperations<K,V>
-
sendOffsetsToTransaction
public void sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata) Description copied from interface:KafkaOperationsWhen running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with aKafkaAwareTransactionManager) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.- Specified by:
sendOffsetsToTransactionin interfaceKafkaOperations<K,V> - Parameters:
offsets- The offsets.groupMetadata- the consumer group metadata.- See Also:
-
Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
-
receive
@Nullable public org.apache.kafka.clients.consumer.ConsumerRecord<K,V> receive(String topic, int partition, long offset, Duration pollTimeout) Description copied from interface:KafkaOperationsReceive a single record.- Specified by:
receivein interfaceKafkaOperations<K,V> - Parameters:
topic- the topic.partition- the partition.offset- the offset.pollTimeout- the timeout.- Returns:
- the record or null.
-
receive
public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) Description copied from interface:KafkaOperationsReceive multiple records. Only absolute, positive offsets are supported.- Specified by:
receivein interfaceKafkaOperations<K,V> - Parameters:
requested- a collection of record requests (topic/partition/offset).pollTimeout- the timeout.- Returns:
- the record or null.
-
closeProducer
-
doSend
protected CompletableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, io.micrometer.observation.Observation observation) Send the producer record.- Parameters:
producerRecord- the producer record.observation- the observation.- Returns:
- a Future for the
RecordMetadata.
-
inTransaction
public boolean inTransaction()Return true if the template is currently running in a transaction on the calling thread.- Specified by:
inTransactionin interfaceKafkaOperations<K,V> - Returns:
- true if a transaction is running.
- Since:
- 2.2.1
-
getTheProducer
-
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean
-