Class KafkaTemplate<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
EventListener
,Aware
,BeanNameAware
,DisposableBean
,SmartInitializingSingleton
,ApplicationContextAware
,ApplicationListener<ContextStoppedEvent>
,KafkaOperations<K,
V>
- Direct Known Subclasses:
ReplyingKafkaTemplate
,RoutingKafkaTemplate
DefaultKafkaProducerFactory
, the template is thread-safe. The producer factory
and KafkaProducer
ensure this; refer to their
respective javadocs.- Author:
- Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Soby Chacko, Gurps Bassi, Valentina Armenise
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,
V, T>, KafkaOperations.ProducerCallback<K, V, T> -
Field Summary
Fields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT
-
Constructor Summary
ConstructorDescriptionKafkaTemplate
(ProducerFactory<K, V> producerFactory) Create an instance using the supplied producer factory and autoFlush false.KafkaTemplate
(ProducerFactory<K, V> producerFactory, boolean autoFlush) Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate
(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate
(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) Create an instance using the supplied producer factory and properties, with autoFlush false. -
Method Summary
Modifier and TypeMethodDescriptionvoid
protected void
closeProducer
(org.apache.kafka.clients.producer.Producer<K, V> producer, boolean inTx) void
destroy()
protected CompletableFuture<SendResult<K,
V>> doSend
(org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, io.micrometer.observation.Observation observation) Send the producer record.<T> T
execute
(KafkaOperations.ProducerCallback<K, V, T> callback) Execute some arbitrary operation(s) on the producer and return the result.<T> T
executeInTransaction
(KafkaOperations.OperationsCallback<K, V, T> callback) Execute some arbitrary operation(s) on the operations and return the result.void
flush()
Flush the producer.The default topic for send methods where a topic is not provided.Return theKafkaAdmin
, used to find the cluster id for observation, if present.Return the message converter.Return the Micrometer tags provider.Return the producer factory used by this template.protected ProducerFactory<K,
V> getProducerFactory
(String topic) Return the producer factory used by this template based on the topic.getTheProducer
(String topic) boolean
Return true if the template is currently running in a transaction on the calling thread.boolean
Return true if this template, when transactional, allows non-transactional operations.boolean
Return true if the implementation supports transactions (has a transaction-capable producer factory).Map<org.apache.kafka.common.MetricName,
? extends org.apache.kafka.common.Metric> metrics()
SeeProducer.metrics()
.void
List<org.apache.kafka.common.PartitionInfo>
partitionsFor
(String topic) SeeProducer.partitionsFor(String)
.Receive a single record.receive
(Collection<TopicPartitionOffset> requested, Duration pollTimeout) Receive multiple records.Send the data to the provided topic with the provided key and partition.Send the data to the provided topic with the provided key and partition.Send the data to the provided topic with the provided key and no partition.Send the data to the provided topic with no key or partition.Send the providedProducerRecord
.Send a message with routing information in message headers.sendDefault
(Integer partition, Long timestamp, K key, V data) Send the data to the default topic with the provided key and partition.sendDefault
(Integer partition, K key, V data) Send the data to the default topic with the provided key and partition.sendDefault
(K key, V data) Send the data to the default topic with the provided key and no partition.sendDefault
(V data) Send the data to the default topic with no key or partition.void
sendOffsetsToTransaction
(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata) When running in a transaction, send the consumer offset(s) to the transaction.void
setAllowNonTransactional
(boolean allowNonTransactional) Set to true to allow a non-transactional send when the template is transactional.void
setApplicationContext
(ApplicationContext applicationContext) void
setBeanName
(String name) void
setCloseTimeout
(Duration closeTimeout) Set the maximum time to wait when closing a producer; default 5 seconds.void
setConsumerFactory
(ConsumerFactory<K, V> consumerFactory) Set a consumer factory for receive operations.void
setDefaultTopic
(String defaultTopic) Set the default topic for send methods where a topic is not provided.void
setKafkaAdmin
(KafkaAdmin kafkaAdmin) Set theKafkaAdmin
, used to find the cluster id for observation, if present.void
setMessageConverter
(RecordMessageConverter messageConverter) Set the message converter to use.void
setMessagingConverter
(SmartMessageConverter messageConverter) Set theSmartMessageConverter
to use with the defaultMessagingMessageConverter
.void
setMicrometerEnabled
(boolean micrometerEnabled) Set tofalse
to disable micrometer timers, if micrometer is on the class path.void
setMicrometerTags
(Map<String, String> tags) Set additional tags for the Micrometer listener timers.void
setMicrometerTagsProvider
(Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) Set a function to provide dynamic tags based on the producer record.void
setObservationConvention
(KafkaTemplateObservationConvention observationConvention) Set a customKafkaTemplateObservationConvention
.void
setObservationEnabled
(boolean observationEnabled) Set to true to enable observation via Micrometer.void
setProducerInterceptor
(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor) Set a producer interceptor on this template.void
setProducerListener
(ProducerListener<K, V> producerListener) Set aProducerListener
which will be invoked when Kafka acknowledges a send operation.void
setTransactionIdPrefix
(String transactionIdPrefix) Set a transaction id prefix to override the prefix in the producer factory.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecution
Methods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive
-
Field Details
-
logger
-
-
Constructor Details
-
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush false.- Parameters:
producerFactory
- the producer factory.
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a newDefaultKafkaProducerFactory
will be created with merged producer properties with the overrides being applied after the supplied factory's properties.- Parameters:
producerFactory
- the producer factory.configOverrides
- producer configuration properties to override.- Since:
- 2.5
-
KafkaTemplate
Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to
true
if you wish for the send operations on this template to occur immediately, regardless of thelinger.ms
orbatch.size
property values. This will also block until the broker has acknowledged receipt according to the producer'sacks
property.- Parameters:
producerFactory
- the producer factory.autoFlush
- true to flush after each send.- See Also:
-
Producer.flush()
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to
true
if you wish for the send operations on this template to occur immediately, regardless of thelinger.ms
orbatch.size
property values. This will also block until the broker has acknowledged receipt according to the producer'sacks
property. If the configOverrides is not null or empty, a newProducerFactory
will be created usingProducerFactory.copyWithConfigurationOverride(java.util.Map)
The factory shall apply the overrides after the supplied factory's properties. TheProducerPostProcessor
s from the original factory are copied over to keep instrumentation alive. RegisteredProducerFactory.Listener
s are also added to the new factory. If the factory implementation does not support the copy operation, a generic copy of the ProducerFactory is created which will be of type DefaultKafkaProducerFactory.- Parameters:
producerFactory
- the producer factory.autoFlush
- true to flush after each send.configOverrides
- producer configuration properties to override.- Since:
- 2.5
- See Also:
-
Producer.flush()
-
-
Method Details
-
setBeanName
- Specified by:
setBeanName
in interfaceBeanNameAware
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
-
getDefaultTopic
The default topic for send methods where a topic is not provided.- Returns:
- the topic.
-
setDefaultTopic
Set the default topic for send methods where a topic is not provided.- Parameters:
defaultTopic
- the topic.
-
setProducerListener
Set aProducerListener
which will be invoked when Kafka acknowledges a send operation. By default aLoggingProducerListener
is configured which logs errors only.- Parameters:
producerListener
- the listener; may benull
.
-
getMessageConverter
Return the message converter.- Returns:
- the message converter.
-
setMessageConverter
Set the message converter to use.- Parameters:
messageConverter
- the message converter.
-
setMessagingConverter
Set theSmartMessageConverter
to use with the defaultMessagingMessageConverter
. Not allowed when a custommessageConverter
is provided.- Parameters:
messageConverter
- the converter.- Since:
- 2.7.1
-
isTransactional
public boolean isTransactional()Description copied from interface:KafkaOperations
Return true if the implementation supports transactions (has a transaction-capable producer factory).- Specified by:
isTransactional
in interfaceKafkaOperations<K,
V> - Returns:
- true or false.
-
getTransactionIdPrefix
-
setTransactionIdPrefix
Set a transaction id prefix to override the prefix in the producer factory.- Parameters:
transactionIdPrefix
- the prefix.- Since:
- 2.3
-
setCloseTimeout
Set the maximum time to wait when closing a producer; default 5 seconds.- Parameters:
closeTimeout
- the close timeout.- Since:
- 2.1.14
-
setAllowNonTransactional
public void setAllowNonTransactional(boolean allowNonTransactional) Set to true to allow a non-transactional send when the template is transactional.- Parameters:
allowNonTransactional
- true to allow.- Since:
- 2.4.3
-
isAllowNonTransactional
public boolean isAllowNonTransactional()Description copied from interface:KafkaOperations
Return true if this template, when transactional, allows non-transactional operations.- Specified by:
isAllowNonTransactional
in interfaceKafkaOperations<K,
V> - Returns:
- true to allow.
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled) Set tofalse
to disable micrometer timers, if micrometer is on the class path.- Parameters:
micrometerEnabled
- false to disable.- Since:
- 2.5
-
setMicrometerTags
Set additional tags for the Micrometer listener timers.- Parameters:
tags
- the tags.- Since:
- 2.5
-
setMicrometerTagsProvider
public void setMicrometerTagsProvider(@Nullable Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) Set a function to provide dynamic tags based on the producer record. These tags will be added to any static tags provided inmicrometerTags
. Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.- Parameters:
micrometerTagsProvider
- the micrometerTagsProvider.- Since:
- 2.9.8
- See Also:
-
getMicrometerTagsProvider
@Nullable public Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>, getMicrometerTagsProvider()Map<String, String>> Return the Micrometer tags provider.- Returns:
- the micrometerTagsProvider.
- Since:
- 2.9.8
-
getProducerFactory
Return the producer factory used by this template.- Specified by:
getProducerFactory
in interfaceKafkaOperations<K,
V> - Returns:
- the factory.
- Since:
- 2.2.5
-
getProducerFactory
Return the producer factory used by this template based on the topic. The default implementation returns the only producer factory.- Parameters:
topic
- the topic.- Returns:
- the factory.
- Since:
- 2.5
-
setConsumerFactory
Set a consumer factory for receive operations.- Parameters:
consumerFactory
- the consumer factory.- Since:
- 2.8
-
setProducerInterceptor
public void setProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor) Set a producer interceptor on this template.- Parameters:
producerInterceptor
- the producer interceptor- Since:
- 3.0
-
setObservationEnabled
public void setObservationEnabled(boolean observationEnabled) Set to true to enable observation via Micrometer.- Parameters:
observationEnabled
- true to enable.- Since:
- 3.0
- See Also:
-
setObservationConvention
Set a customKafkaTemplateObservationConvention
.- Parameters:
observationConvention
- the convention.- Since:
- 3.0
-
getKafkaAdmin
Return theKafkaAdmin
, used to find the cluster id for observation, if present.- Returns:
- the kafkaAdmin
- Since:
- 3.0.5
-
setKafkaAdmin
Set theKafkaAdmin
, used to find the cluster id for observation, if present.- Parameters:
kafkaAdmin
- the admin.
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()- Specified by:
afterSingletonsInstantiated
in interfaceSmartInitializingSingleton
-
onApplicationEvent
- Specified by:
onApplicationEvent
in interfaceApplicationListener<K>
-
sendDefault
Description copied from interface:KafkaOperations
Send the data to the default topic with no key or partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,
V> - Parameters:
data
- The data.- Returns:
- a Future for the
SendResult
.
-
sendDefault
Description copied from interface:KafkaOperations
Send the data to the default topic with the provided key and no partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,
V> - Parameters:
key
- the key.data
- The data.- Returns:
- a Future for the
SendResult
.
-
sendDefault
Description copied from interface:KafkaOperations
Send the data to the default topic with the provided key and partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,
V> - Parameters:
partition
- the partition.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
.
-
sendDefault
public CompletableFuture<SendResult<K,V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) Description copied from interface:KafkaOperations
Send the data to the default topic with the provided key and partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,
V> - Parameters:
partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
.
-
send
Description copied from interface:KafkaOperations
Send the data to the provided topic with no key or partition.- Specified by:
send
in interfaceKafkaOperations<K,
V> - Parameters:
topic
- the topic.data
- The data.- Returns:
- a Future for the
SendResult
.
-
send
Description copied from interface:KafkaOperations
Send the data to the provided topic with the provided key and no partition.- Specified by:
send
in interfaceKafkaOperations<K,
V> - Parameters:
topic
- the topic.key
- the key.data
- The data.- Returns:
- a Future for the
SendResult
.
-
send
public CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, K key, @Nullable V data) Description copied from interface:KafkaOperations
Send the data to the provided topic with the provided key and partition.- Specified by:
send
in interfaceKafkaOperations<K,
V> - Parameters:
topic
- the topic.partition
- the partition.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
.
-
send
public CompletableFuture<SendResult<K,V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) Description copied from interface:KafkaOperations
Send the data to the provided topic with the provided key and partition.- Specified by:
send
in interfaceKafkaOperations<K,
V> - Parameters:
topic
- the topic.partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
.
-
send
public CompletableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K, V> record) Description copied from interface:KafkaOperations
Send the providedProducerRecord
.- Specified by:
send
in interfaceKafkaOperations<K,
V> - Parameters:
record
- the record.- Returns:
- a Future for the
SendResult
.
-
send
Description copied from interface:KafkaOperations
Send a message with routing information in message headers. The message payload may be converted before sending.- Specified by:
send
in interfaceKafkaOperations<K,
V> - Parameters:
message
- the message to send.- Returns:
- a Future for the
SendResult
. - See Also:
-
partitionsFor
Description copied from interface:KafkaOperations
SeeProducer.partitionsFor(String)
.- Specified by:
partitionsFor
in interfaceKafkaOperations<K,
V> - Parameters:
topic
- the topic.- Returns:
- the partition info.
-
metrics
Description copied from interface:KafkaOperations
SeeProducer.metrics()
.- Specified by:
metrics
in interfaceKafkaOperations<K,
V> - Returns:
- the metrics.
-
execute
Description copied from interface:KafkaOperations
Execute some arbitrary operation(s) on the producer and return the result.- Specified by:
execute
in interfaceKafkaOperations<K,
V> - Type Parameters:
T
- the result type.- Parameters:
callback
- the callback.- Returns:
- the result.
-
executeInTransaction
Description copied from interface:KafkaOperations
Execute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).- Specified by:
executeInTransaction
in interfaceKafkaOperations<K,
V> - Type Parameters:
T
- the result type.- Parameters:
callback
- the callback.- Returns:
- the result.
-
flush
public void flush()Flush the producer.Note It only makes sense to invoke this method if the
ProducerFactory
serves up a singleton producer (such as theDefaultKafkaProducerFactory
).- Specified by:
flush
in interfaceKafkaOperations<K,
V>
-
sendOffsetsToTransaction
public void sendOffsetsToTransaction(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata) Description copied from interface:KafkaOperations
When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with aKafkaAwareTransactionManager
) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.- Specified by:
sendOffsetsToTransaction
in interfaceKafkaOperations<K,
V> - Parameters:
offsets
- The offsets.groupMetadata
- the consumer group metadata.- See Also:
-
Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
-
receive
@Nullable public org.apache.kafka.clients.consumer.ConsumerRecord<K,V> receive(String topic, int partition, long offset, Duration pollTimeout) Description copied from interface:KafkaOperations
Receive a single record.- Specified by:
receive
in interfaceKafkaOperations<K,
V> - Parameters:
topic
- the topic.partition
- the partition.offset
- the offset.pollTimeout
- the timeout.- Returns:
- the record or null.
-
receive
public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) Description copied from interface:KafkaOperations
Receive multiple records. Only absolute, positive offsets are supported.- Specified by:
receive
in interfaceKafkaOperations<K,
V> - Parameters:
requested
- a collection of record requests (topic/partition/offset).pollTimeout
- the timeout.- Returns:
- the record or null.
-
closeProducer
-
doSend
protected CompletableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, io.micrometer.observation.Observation observation) Send the producer record.- Parameters:
producerRecord
- the producer record.observation
- the observation.- Returns:
- a Future for the
RecordMetadata
.
-
inTransaction
public boolean inTransaction()Return true if the template is currently running in a transaction on the calling thread.- Specified by:
inTransaction
in interfaceKafkaOperations<K,
V> - Returns:
- true if a transaction is running.
- Since:
- 2.2.1
-
getTheProducer
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
-