K
- the key type.V
- the value type.public class KafkaTemplate<K,V> extends java.lang.Object implements KafkaOperations<K,V>, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
DefaultKafkaProducerFactory
, the template is thread-safe. The producer factory
and KafkaProducer
ensure this; refer to their
respective javadocs.KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
Modifier and Type | Field and Description |
---|---|
protected org.springframework.core.log.LogAccessor |
logger |
Constructor and Description |
---|
KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
boolean autoFlush,
java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Create an instance using the supplied producer factory and autoFlush setting.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Create an instance using the supplied producer factory and properties, with
autoFlush false.
|
Modifier and Type | Method and Description |
---|---|
protected void |
closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer,
boolean inTx) |
void |
destroy() |
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.
|
<T> T |
execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the producer and return the result.
|
<T> T |
executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the operations and return the result.
|
void |
flush()
Flush the producer.
|
java.lang.String |
getDefaultTopic()
The default topic for send methods where a topic is not
provided.
|
MessageConverter |
getMessageConverter()
Return the message converter.
|
ProducerFactory<K,V> |
getProducerFactory()
Return the producer factory used by this template.
|
protected ProducerFactory<K,V> |
getProducerFactory(java.lang.String topic)
Return the producer factory used by this template based on the topic.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
getTheProducer(java.lang.String topic) |
java.lang.String |
getTransactionIdPrefix() |
boolean |
inTransaction()
Return true if the template is currently running in a transaction on the calling
thread.
|
boolean |
isAllowNonTransactional()
Return true if this template, when transactional, allows non-transactional operations.
|
boolean |
isTransactional()
Return true if the implementation supports transactions (has a transaction-capable
producer factory).
|
java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics()
See
Producer.metrics() . |
void |
onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event) |
java.util.List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(java.lang.String topic)
See
Producer.partitionsFor(String) . |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.springframework.messaging.Message<?> message)
Send a message with routing information in message headers.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send the provided
ProducerRecord . |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
K key,
V data)
Send the data to the provided topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
V data)
Send the data to the provided topic with no key or partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(K key,
V data)
Send the data to the default topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(V data)
Send the data to the default topic with no key or partition.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
When running in a transaction, send the consumer offset(s) to the transaction.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
When running in a transaction, send the consumer offset(s) to the transaction.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
java.lang.String consumerGroupId)
When running in a transaction, send the consumer offset(s) to the transaction.
|
void |
setAllowNonTransactional(boolean allowNonTransactional)
Set to true to allow a non-transactional send when the template is transactional.
|
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setBeanName(java.lang.String name) |
void |
setCloseTimeout(java.time.Duration closeTimeout)
Set the maximum time to wait when closing a producer; default 5 seconds.
|
void |
setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not
provided.
|
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.
|
void |
setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable micrometer timers, if micrometer is on the class path.
|
void |
setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.
|
void |
setProducerListener(ProducerListener<K,V> producerListener)
Set a
ProducerListener which will be invoked when Kafka acknowledges
a send operation. |
void |
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set a transaction id prefix to override the prefix in the producer factory.
|
public KafkaTemplate(ProducerFactory<K,V> producerFactory)
producerFactory
- the producer factory.public KafkaTemplate(ProducerFactory<K,V> producerFactory, @Nullable java.util.Map<java.lang.String,java.lang.Object> configOverrides)
DefaultKafkaProducerFactory
will be created with merged producer properties
with the overrides being applied after the supplied factory's properties.producerFactory
- the producer factory.configOverrides
- producer configuration properties to override.public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Set autoFlush to true
if you have configured the producer's
linger.ms
to a non-default value and wish send operations on this template
to occur immediately, regardless of that setting, or if you wish to block until the
broker has acknowledged receipt according to the producer's acks
property.
producerFactory
- the producer factory.autoFlush
- true to flush after each send.Producer.flush()
public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush, @Nullable java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Set autoFlush to true
if you have configured the producer's
linger.ms
to a non-default value and wish send operations on this template
to occur immediately, regardless of that setting, or if you wish to block until the
broker has acknowledged receipt according to the producer's acks
property.
If the configOverrides is not null or empty, a new
DefaultKafkaProducerFactory
will be created with merged producer properties
with the overrides being applied after the supplied factory's properties.
producerFactory
- the producer factory.autoFlush
- true to flush after each send.configOverrides
- producer configuration properties to override.Producer.flush()
public void setBeanName(java.lang.String name)
setBeanName
in interface org.springframework.beans.factory.BeanNameAware
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
public java.lang.String getDefaultTopic()
public void setDefaultTopic(java.lang.String defaultTopic)
defaultTopic
- the topic.public void setProducerListener(@Nullable ProducerListener<K,V> producerListener)
ProducerListener
which will be invoked when Kafka acknowledges
a send operation. By default a LoggingProducerListener
is configured
which logs errors only.producerListener
- the listener; may be null
.public MessageConverter getMessageConverter()
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter
- the message converter.public boolean isTransactional()
KafkaOperations
isTransactional
in interface KafkaOperations<K,V>
public java.lang.String getTransactionIdPrefix()
public void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
transactionIdPrefix
- the prefix.public void setCloseTimeout(java.time.Duration closeTimeout)
closeTimeout
- the close timeout.public void setAllowNonTransactional(boolean allowNonTransactional)
allowNonTransactional
- true to allow.public boolean isAllowNonTransactional()
KafkaOperations
isAllowNonTransactional
in interface KafkaOperations<K,V>
public void setMicrometerEnabled(boolean micrometerEnabled)
micrometerEnabled
- false to disable.public void setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
tags
- the tags.public ProducerFactory<K,V> getProducerFactory()
getProducerFactory
in interface KafkaOperations<K,V>
protected ProducerFactory<K,V> getProducerFactory(java.lang.String topic)
topic
- the topic.public void onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event)
onApplicationEvent
in interface org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(@Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(K key, @Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
key
- the key.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, K key, @Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
partition
- the partition.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, java.lang.Long timestamp, K key, @Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, K key, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.key
- the key.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, K key, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.partition
- the partition.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
KafkaOperations
ProducerRecord
.send
in interface KafkaOperations<K,V>
record
- the record.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.springframework.messaging.Message<?> message)
KafkaOperations
send
in interface KafkaOperations<K,V>
message
- the message to send.SendResult
.KafkaHeaders.TOPIC
,
KafkaHeaders.PARTITION_ID
,
KafkaHeaders.MESSAGE_KEY
public java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor(java.lang.String topic)
KafkaOperations
Producer.partitionsFor(String)
.partitionsFor
in interface KafkaOperations<K,V>
topic
- the topic.public java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
KafkaOperations
Producer.metrics()
.metrics
in interface KafkaOperations<K,V>
public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
KafkaOperations
execute
in interface KafkaOperations<K,V>
T
- the result type.callback
- the callback.public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
KafkaOperations
executeInTransaction
in interface KafkaOperations<K,V>
T
- the result type.callback
- the callback.public void flush()
Note It only makes sense to invoke this method if the
ProducerFactory
serves up a singleton producer (such as the
DefaultKafkaProducerFactory
).
flush
in interface KafkaOperations<K,V>
public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
KafkaOperations
KafkaUtils.getConsumerGroupId()
. It is
not necessary to call this method if the operations are invoked on a listener
container thread (and the listener container is configured with a
KafkaAwareTransactionManager
) since
the container will take care of sending the offsets to the transaction.sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
KafkaOperations
KafkaAwareTransactionManager
) since
the container will take care of sending the offsets to the transaction.sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.consumerGroupId
- the consumer's group.id.public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
KafkaOperations
KafkaAwareTransactionManager
) since
the container will take care of sending the offsets to the transaction.
Use with 2.5 brokers or later.sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.groupMetadata
- the consumer group metadata.Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inTx)
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
producerRecord
- the producer record.RecordMetadata
.public boolean inTransaction()
inTransaction
in interface KafkaOperations<K,V>
protected org.apache.kafka.clients.producer.Producer<K,V> getTheProducer(@Nullable java.lang.String topic)
public void destroy()
destroy
in interface org.springframework.beans.factory.DisposableBean