K
- the key type.V
- the value type.public class KafkaTemplate<K,V> extends java.lang.Object implements KafkaOperations<K,V>
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
Modifier and Type | Field and Description |
---|---|
protected org.apache.commons.logging.Log |
logger |
Constructor and Description |
---|
KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.
|
KafkaTemplate(ProducerFactory<K,V> producerFactory,
boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.
|
Modifier and Type | Method and Description |
---|---|
protected void |
closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer,
boolean inLocalTx) |
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.
|
<T> T |
execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the producer and return the result.
|
<T> T |
executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the operations and return the result.
|
void |
flush()
Flush the producer.
|
java.lang.String |
getDefaultTopic()
The default topic for send methods where a topic is not
provided.
|
MessageConverter |
getMessageConverter()
Return the message converter.
|
protected boolean |
inTransaction() |
java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> |
metrics()
See
Producer.metrics() . |
java.util.List<org.apache.kafka.common.PartitionInfo> |
partitionsFor(java.lang.String topic)
See
Producer.partitionsFor(String) . |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.springframework.messaging.Message<?> message)
Send a message with routing information in message headers.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send the provided
ProducerRecord . |
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the provided topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
K key,
V data)
Send the data to the provided topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
send(java.lang.String topic,
V data)
Send the data to the provided topic with no key or partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(java.lang.Integer partition,
java.lang.Long timestamp,
K key,
V data)
Send the data to the default topic with the provided key and partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(K key,
V data)
Send the data to the default topic with the provided key and no partition.
|
org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> |
sendDefault(V data)
Send the data to the default topic with no key or partition.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
When running in a transaction (usually synchronized with some other transaction),
send the consumer offset(s) to the transaction.
|
void |
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets,
java.lang.String consumerGroupId)
When running in a transaction (usually synchronized with some other transaction),
send the consumer offset(s) to the transaction.
|
void |
setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not
provided.
|
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.
|
void |
setProducerListener(ProducerListener<K,V> producerListener)
Set a
ProducerListener which will be invoked when Kafka acknowledges
a send operation. |
public KafkaTemplate(ProducerFactory<K,V> producerFactory)
producerFactory
- the producer factory.public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Future.get()
on the result.producerFactory
- the producer factory.autoFlush
- true to flush after each send.public java.lang.String getDefaultTopic()
public void setDefaultTopic(java.lang.String defaultTopic)
defaultTopic
- the topic.public void setProducerListener(ProducerListener<K,V> producerListener)
ProducerListener
which will be invoked when Kafka acknowledges
a send operation. By default a LoggingProducerListener
is configured
which logs errors only.producerListener
- the listener; may be null
.public MessageConverter getMessageConverter()
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter
- the message converter.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(K key, V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
key
- the key.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, K key, V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
partition
- the partition.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
KafkaOperations
sendDefault
in interface KafkaOperations<K,V>
partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, K key, V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.key
- the key.data
- The data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, K key, V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.partition
- the partition.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
KafkaOperations
send
in interface KafkaOperations<K,V>
topic
- the topic.partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
KafkaOperations
ProducerRecord
.send
in interface KafkaOperations<K,V>
record
- the record.SendResult
.public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.springframework.messaging.Message<?> message)
KafkaOperations
send
in interface KafkaOperations<K,V>
message
- the message to send.SendResult
.KafkaHeaders.TOPIC
,
KafkaHeaders.PARTITION_ID
,
KafkaHeaders.MESSAGE_KEY
public java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor(java.lang.String topic)
KafkaOperations
Producer.partitionsFor(String)
.partitionsFor
in interface KafkaOperations<K,V>
topic
- the topic.public java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
KafkaOperations
Producer.metrics()
.metrics
in interface KafkaOperations<K,V>
public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
KafkaOperations
execute
in interface KafkaOperations<K,V>
T
- the result type.callback
- the callback.public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
KafkaOperations
executeInTransaction
in interface KafkaOperations<K,V>
T
- the result type.callback
- the callback.public void flush()
Note It only makes sense to invoke this method if the
ProducerFactory
serves up a singleton producer (such as the
DefaultKafkaProducerFactory
).
flush
in interface KafkaOperations<K,V>
public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
KafkaOperations
ProducerFactoryUtils.getConsumerGroupId()
. It is not necessary to call
this method if the operations are invoked on a listener container thread since the
container will take care of sending the offsets to the transaction.sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
KafkaOperations
sendOffsetsToTransaction
in interface KafkaOperations<K,V>
offsets
- The offsets.consumerGroupId
- the consumer's group.id.protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inLocalTx)
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
producerRecord
- the producer record.RecordMetadata
.protected boolean inTransaction()