K - the key type.V - the value type.public class KafkaTemplate<K,V> extends java.lang.Object implements KafkaOperations<K,V>
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>| Modifier and Type | Field and Description |
|---|---|
protected org.apache.commons.logging.Log |
logger |
| Constructor and Description |
|---|
KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer,
boolean inLocalTx) |
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.
|
<T> T |
execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the producer and return the result.
|
<T> T |
executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the operations and return the result.
|
void |
flush()
Flush the producer.
|
java.lang.String |
getDefaultTopic()
The default topic for send methods where a topic is not
provided.
|
MessageConverter |
getMessageConverter()
Return the message converter.
|
protected boolean |
inTransaction() |
boolean |
isTransactional()
Return true if this template supports transactions (has a transaction-capable
producer factory).
|
java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics()
See
Producer.metrics(). |
java.util.List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(java.lang.String topic)
See
Producer.partitionsFor(String). |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.springframework.messaging.Message<?> message)
Send a message with routing information in message headers.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send the provided
ProducerRecord. |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
K key,
V data)
Send the data to the provided topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
V data)
Send the data to the provided topic with no key or partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(K key,
V data)
Send the data to the default topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(V data)
Send the data to the default topic with no key or partition.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
When running in a transaction (usually synchronized with some other transaction),
send the consumer offset(s) to the transaction.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
java.lang.String consumerGroupId)
When running in a transaction (usually synchronized with some other transaction),
send the consumer offset(s) to the transaction.
|
void |
setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not
provided.
|
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.
|
void |
setProducerListener(ProducerListener<K,V> producerListener)
Set a
ProducerListener which will be invoked when Kafka acknowledges
a send operation. |
public KafkaTemplate(ProducerFactory<K,V> producerFactory)
producerFactory - the producer factory.public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Set autoFlush to true if you have configured the producer's
linger.ms to a non-default value and wish send operations on this template
to occur immediately, regardless of that setting, or if you wish to block until the
broker has acknowledged receipt according to the producer's acks property.
producerFactory - the producer factory.autoFlush - true to flush after each send.Producer.flush()public java.lang.String getDefaultTopic()
public void setDefaultTopic(java.lang.String defaultTopic)
defaultTopic - the topic.public void setProducerListener(ProducerListener<K,V> producerListener)
ProducerListener which will be invoked when Kafka acknowledges
a send operation. By default a LoggingProducerListener is configured
which logs errors only.producerListener - the listener; may be null.public MessageConverter getMessageConverter()
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter - the message converter.public boolean isTransactional()
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(K key, V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>key - the key.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, K key, V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>partition - the partition.key - the key.data - the data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>partition - the partition.timestamp - the timestamp of the record.key - the key.data - the data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, K key, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.key - the key.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, K key, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.partition - the partition.key - the key.data - the data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.partition - the partition.timestamp - the timestamp of the record.key - the key.data - the data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
KafkaOperationsProducerRecord.send in interface KafkaOperations<K,V>record - the record.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.springframework.messaging.Message<?> message)
KafkaOperationssend in interface KafkaOperations<K,V>message - the message to send.SendResult.KafkaHeaders.TOPIC,
KafkaHeaders.PARTITION_ID,
KafkaHeaders.MESSAGE_KEYpublic java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor(java.lang.String topic)
KafkaOperationsProducer.partitionsFor(String).partitionsFor in interface KafkaOperations<K,V>topic - the topic.public java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
KafkaOperationsProducer.metrics().metrics in interface KafkaOperations<K,V>public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
KafkaOperationsexecute in interface KafkaOperations<K,V>T - the result type.callback - the callback.public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
KafkaOperationsexecuteInTransaction in interface KafkaOperations<K,V>T - the result type.callback - the callback.public void flush()
Note It only makes sense to invoke this method if the
ProducerFactory serves up a singleton producer (such as the
DefaultKafkaProducerFactory).
flush in interface KafkaOperations<K,V>public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
KafkaOperationsProducerFactoryUtils.getConsumerGroupId(). It is not necessary to call
this method if the operations are invoked on a listener container thread since the
container will take care of sending the offsets to the transaction.sendOffsetsToTransaction in interface KafkaOperations<K,V>offsets - The offsets.public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
java.lang.String consumerGroupId)
KafkaOperationssendOffsetsToTransaction in interface KafkaOperations<K,V>offsets - The offsets.consumerGroupId - the consumer's group.id.protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inLocalTx)
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
producerRecord - the producer record.RecordMetadata.protected boolean inTransaction()