K
- the key type.V
- the value type.public class KafkaTemplate<K,V> extends java.lang.Object implements KafkaOperations<K,V>
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
Modifier and Type | Field and Description |
---|---|
protected org.springframework.core.log.LogAccessor |
logger |
Constructor and Description |
---|
KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.
|
Modifier and Type | Method and Description |
---|---|
protected void |
closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer,
boolean inTx) |
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.
|
<T> T |
execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the producer and return the result.
|
<T> T |
executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the operations and return the result.
|
void |
flush()
Flush the producer.
|
java.lang.String |
getDefaultTopic()
The default topic for send methods where a topic is not
provided.
|
MessageConverter |
getMessageConverter()
Return the message converter.
|
ProducerFactory<K,V> |
getProducerFactory()
Return the producer factory used by this template.
|
java.lang.String |
getTransactionIdPrefix() |
boolean |
inTransaction()
Return true if the template is currently running in a transaction on the
calling thread.
|
boolean |
isTransactional()
Return true if the implementation supports transactions (has a transaction-capable
producer factory).
|
java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics()
See
Producer.metrics() . |
java.util.List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(java.lang.String topic)
See
Producer.partitionsFor(String) . |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.springframework.messaging.Message<?> message)
Send a message with routing information in message headers.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send the provided
ProducerRecord . |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
K key,
V data)
Send the data to the provided topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
V data)
Send the data to the provided topic with no key or partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(K key,
V data)
Send the data to the default topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(V data)
Send the data to the default topic with no key or partition.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
When running in a transaction, send the consumer offset(s) to the transaction.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
java.lang.String consumerGroupId)
When running in a transaction, send the consumer offset(s) to the transaction.
|
void |
setCloseTimeout(java.time.Duration closeTimeout)
Set the maximum time to wait when closing a producer; default 5 seconds.
|
void |
setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not
provided.
|
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.
|
void |
setProducerListener(ProducerListener<K,V> producerListener)
Set a
ProducerListener which will be invoked when Kafka acknowledges
a send operation. |
void |
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set a transaction id prefix to override the prefix in the producer factory.
|
public KafkaTemplate(ProducerFactory<K,V> producerFactory)
producerFactory
- the producer factory.public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Set autoFlush to true
if you have configured the producer's
linger.ms
to a non-default value and wish send operations on this template
to occur immediately, regardless of that setting, or if you wish to block until the
broker has acknowledged receipt according to the producer's acks
property.
producerFactory
- the producer factory.autoFlush
- true to flush after each send.Producer.flush()
public java.lang.String getDefaultTopic()
public void setDefaultTopic(java.lang.String defaultTopic)
defaultTopic
- the topic.public void setProducerListener(@Nullable ProducerListener<K,V> producerListener)
ProducerListener
which will be invoked when Kafka acknowledges
a send operation. By default a LoggingProducerListener
is configured
which logs errors only.producerListener
- the listener; may be null
.public MessageConverter getMessageConverter()
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter
- the message converter.public boolean isTransactional()
KafkaOperations
isTransactional
in interface KafkaOperations<K,V>
public java.lang.String getTransactionIdPrefix()
public void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
transactionIdPrefix
- the prefix.public void setCloseTimeout(java.time.Duration closeTimeout)
closeTimeout
- the close timeout.public ProducerFactory<K,V> getProducerFactory()
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(@Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(K key, @Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
key
- the key.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, K key, @Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
partition
- the partition.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, java.lang.Long timestamp, K key, @Nullable V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, K key, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.key
- the key.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, K key, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.partition
- the partition.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, @Nullable V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
KafkaOperations
ProducerRecord
.send
in interface KafkaOperations<K,V>
record
- the record.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.springframework.messaging.Message<?> message)
KafkaOperations
send
in interface KafkaOperations<K,V>
message
- the message to send.SendResult
.KafkaHeaders.TOPIC
,
KafkaHeaders.PARTITION_ID
,
KafkaHeaders.MESSAGE_KEY
public java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor(java.lang.String topic)
KafkaOperations
Producer.partitionsFor(String)
.partitionsFor
in interface KafkaOperations<K,V>
topic
- the topic.public java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
KafkaOperations
Producer.metrics()
.metrics
in interface KafkaOperations<K,V>
public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
KafkaOperations
execute
in interface KafkaOperations<K,V>
T
- the result type.callback
- the callback.public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
KafkaOperations
executeInTransaction
in interface KafkaOperations<K,V>
T
- the result type.callback
- the callback.public void flush()
Note It only makes sense to invoke this method if the
ProducerFactory
serves up a singleton producer (such as the
DefaultKafkaProducerFactory
).
flush
in interface KafkaOperations<K,V>
public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
KafkaOperations
KafkaUtils.getConsumerGroupId()
. It is
not necessary to call this method if the operations are invoked on a listener
container thread (and the listener container is configured with a
KafkaAwareTransactionManager
) since
the container will take care of sending the offsets to the transaction.sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
KafkaOperations
KafkaAwareTransactionManager
) since
the container will take care of sending the offsets to the transaction.sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.consumerGroupId
- the consumer's group.id.protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inTx)
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
producerRecord
- the producer record.RecordMetadata
.public boolean inTransaction()