K - the key type.V - the value type.public class KafkaTemplate<K,V> extends java.lang.Object implements KafkaOperations<K,V>
KafkaOperations.ProducerCallback<K,V,T>| Modifier and Type | Field and Description |
|---|---|
protected org.apache.commons.logging.Log |
logger |
| Constructor and Description |
|---|
KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.
|
| Modifier and Type | Method and Description |
|---|---|
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.
|
<T> T |
execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the producer and return the result.
|
void |
flush()
Flush the producer.
|
java.lang.String |
getDefaultTopic()
The default topic for send methods where a topic is not
providing.
|
MessageConverter |
getMessageConverter()
Return the message converter.
|
java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics()
See
Producer.metrics(). |
java.util.List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(java.lang.String topic)
See
Producer.partitionsFor(String). |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.springframework.messaging.Message<?> message)
Send a message with routing information in message headers.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record) |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
int partition,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
int partition,
V data)
Send the data to the provided topic with the provided partition and no key.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
K key,
V data)
Send the data to the provided topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
V data)
Send the data to the provided topic with no key or partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(int partition,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(K key,
V data)
Send the data to the default topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(V data)
Send the data to the default topic with no key or partition.
|
void |
setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not
providing.
|
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.
|
void |
setProducerListener(ProducerListener<K,V> producerListener)
Set a
ProducerListener which will be invoked when Kafka acknowledges
a send operation. |
public KafkaTemplate(ProducerFactory<K,V> producerFactory)
producerFactory - the producer factory.public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Future.get() on the result.producerFactory - the producer factory.autoFlush - true to flush after each send.public java.lang.String getDefaultTopic()
public void setDefaultTopic(java.lang.String defaultTopic)
defaultTopic - the topic.public void setProducerListener(ProducerListener<K,V> producerListener)
ProducerListener which will be invoked when Kafka acknowledges
a send operation. By default a LoggingProducerListener is configured
which logs errors only.producerListener - the listener; may be null.public MessageConverter getMessageConverter()
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter - the message converter.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(K key, V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>key - the key.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(int partition, K key, V data)
KafkaOperationssendDefault in interface KafkaOperations<K,V>partition - the partition.key - the key.data - the data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, K key, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.key - the key.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, int partition, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.partition - the partition.data - The data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, int partition, K key, V data)
KafkaOperationssend in interface KafkaOperations<K,V>topic - the topic.partition - the partition.key - the key.data - the data.SendResult.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.springframework.messaging.Message<?> message)
KafkaOperationssend in interface KafkaOperations<K,V>message - the message to send.SendResult.KafkaHeaders.TOPIC,
KafkaHeaders.PARTITION_ID,
KafkaHeaders.MESSAGE_KEYpublic java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor(java.lang.String topic)
KafkaOperationsProducer.partitionsFor(String).partitionsFor in interface KafkaOperations<K,V>topic - the topic.public java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
KafkaOperationsProducer.metrics().metrics in interface KafkaOperations<K,V>public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
KafkaOperationsexecute in interface KafkaOperations<K,V>T - the result type.callback - the callback.public void flush()
Note It only makes sense to invoke this method if the
ProducerFactory serves up a singleton producer (such as the
DefaultKafkaProducerFactory).
flush in interface KafkaOperations<K,V>protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
producerRecord - the producer record.RecordMetadata.