Class KafkaTemplate<K,V>
- java.lang.Object
-
- org.springframework.kafka.core.KafkaTemplate<K,V>
-
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
java.util.EventListener
,org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanNameAware
,org.springframework.beans.factory.DisposableBean
,org.springframework.context.ApplicationContextAware
,org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>
,KafkaOperations<K,V>
- Direct Known Subclasses:
ReplyingKafkaTemplate
,RoutingKafkaTemplate
public class KafkaTemplate<K,V> extends java.lang.Object implements KafkaOperations<K,V>, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
A template for executing high-level operations. When used with aDefaultKafkaProducerFactory
, the template is thread-safe. The producer factory andKafkaProducer
ensure this; refer to their respective javadocs.- Author:
- Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strauß, Gurps Bassi
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.springframework.kafka.core.KafkaOperations
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessor
logger
-
Fields inherited from interface org.springframework.kafka.core.KafkaOperations
DEFAULT_POLL_TIMEOUT
-
-
Constructor Summary
Constructors Constructor Description KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush, java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Create an instance using the supplied producer factory and autoFlush setting.KafkaTemplate(ProducerFactory<K,V> producerFactory, java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Create an instance using the supplied producer factory and properties, with autoFlush false.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description protected void
closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inTx)
void
destroy()
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.<T> T
execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the producer and return the result.<T> T
executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
Execute some arbitrary operation(s) on the operations and return the result.void
flush()
Flush the producer.java.lang.String
getDefaultTopic()
The default topic for send methods where a topic is not provided.RecordMessageConverter
getMessageConverter()
Return the message converter.java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,java.util.Map<java.lang.String,java.lang.String>>
getMicrometerTagsProvider()
Return the Micrometer tags provider.ProducerFactory<K,V>
getProducerFactory()
Return the producer factory used by this template.protected ProducerFactory<K,V>
getProducerFactory(java.lang.String topic)
Return the producer factory used by this template based on the topic.protected org.apache.kafka.clients.producer.Producer<K,V>
getTheProducer(java.lang.String topic)
java.lang.String
getTransactionIdPrefix()
boolean
inTransaction()
Return true if the template is currently running in a transaction on the calling thread.boolean
isAllowNonTransactional()
Return true if this template, when transactional, allows non-transactional operations.boolean
isTransactional()
Return true if the implementation supports transactions (has a transaction-capable producer factory).java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>
metrics()
SeeProducer.metrics()
.void
onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event)
java.util.List<org.apache.kafka.common.PartitionInfo>
partitionsFor(java.lang.String topic)
SeeProducer.partitionsFor(String)
.org.apache.kafka.clients.consumer.ConsumerRecord<K,V>
receive(java.lang.String topic, int partition, long offset, java.time.Duration pollTimeout)
Receive a single record.org.apache.kafka.clients.consumer.ConsumerRecords<K,V>
receive(java.util.Collection<TopicPartitionOffset> requested, java.time.Duration pollTimeout)
Receive multiple records.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
send(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
Send the data to the provided topic with the provided key and partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
send(java.lang.String topic, java.lang.Integer partition, K key, V data)
Send the data to the provided topic with the provided key and partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
send(java.lang.String topic, K key, V data)
Send the data to the provided topic with the provided key and no partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
send(java.lang.String topic, V data)
Send the data to the provided topic with no key or partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send the providedProducerRecord
.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
send(org.springframework.messaging.Message<?> message)
Send a message with routing information in message headers.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
sendDefault(java.lang.Integer partition, java.lang.Long timestamp, K key, V data)
Send the data to the default topic with the provided key and partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
sendDefault(java.lang.Integer partition, K key, V data)
Send the data to the default topic with the provided key and partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
sendDefault(K key, V data)
Send the data to the default topic with the provided key and no partition.org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>>
sendDefault(V data)
Send the data to the default topic with no key or partition.void
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
Deprecated.void
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
Deprecated.void
sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
When running in a transaction, send the consumer offset(s) to the transaction.void
setAllowNonTransactional(boolean allowNonTransactional)
Set to true to allow a non-transactional send when the template is transactional.void
setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
void
setBeanName(java.lang.String name)
void
setCloseTimeout(java.time.Duration closeTimeout)
Set the maximum time to wait when closing a producer; default 5 seconds.void
setConsumerFactory(ConsumerFactory<K,V> consumerFactory)
Set a consumer factory for receive operations.void
setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not provided.void
setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.void
setMessagingConverter(org.springframework.messaging.converter.SmartMessageConverter messageConverter)
Set theSmartMessageConverter
to use with the defaultMessagingMessageConverter
.void
setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable micrometer timers, if micrometer is on the class path.void
setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.void
setMicrometerTagsProvider(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,java.util.Map<java.lang.String,java.lang.String>> micrometerTagsProvider)
Set a function to provide dynamic tags based on the producer record.void
setProducerListener(ProducerListener<K,V> producerListener)
Set aProducerListener
which will be invoked when Kafka acknowledges a send operation.void
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set a transaction id prefix to override the prefix in the producer factory.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.core.KafkaOperations
receive, receive, usingCompletableFuture
-
-
-
-
Constructor Detail
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K,V> producerFactory)
Create an instance using the supplied producer factory and autoFlush false.- Parameters:
producerFactory
- the producer factory.
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K,V> producerFactory, @Nullable java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Create an instance using the supplied producer factory and properties, with autoFlush false. If the configOverrides is not null or empty, a newDefaultKafkaProducerFactory
will be created with merged producer properties with the overrides being applied after the supplied factory's properties.- Parameters:
producerFactory
- the producer factory.configOverrides
- producer configuration properties to override.- Since:
- 2.5
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to
true
if you have configured the producer'slinger.ms
to a non-default value and wish send operations on this template to occur immediately, regardless of that setting, or if you wish to block until the broker has acknowledged receipt according to the producer'sacks
property.- Parameters:
producerFactory
- the producer factory.autoFlush
- true to flush after each send.- See Also:
Producer.flush()
-
KafkaTemplate
public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush, @Nullable java.util.Map<java.lang.String,java.lang.Object> configOverrides)
Create an instance using the supplied producer factory and autoFlush setting.Set autoFlush to
true
if you have configured the producer'slinger.ms
to a non-default value and wish send operations on this template to occur immediately, regardless of that setting, or if you wish to block until the broker has acknowledged receipt according to the producer'sacks
property. If the configOverrides is not null or empty, a newProducerFactory
will be created usingProducerFactory.copyWithConfigurationOverride(java.util.Map)
The factory shall apply the overrides after the supplied factory's properties. TheProducerPostProcessor
s from the original factory are copied over to keep instrumentation alive. RegisteredProducerFactory.Listener
s are also added to the new factory. If the factory implementation does not support the copy operation, a generic copy of the ProducerFactory is created which will be of type DefaultKafkaProducerFactory.- Parameters:
producerFactory
- the producer factory.autoFlush
- true to flush after each send.configOverrides
- producer configuration properties to override.- Since:
- 2.5
- See Also:
Producer.flush()
-
-
Method Detail
-
setBeanName
public void setBeanName(java.lang.String name)
- Specified by:
setBeanName
in interfaceorg.springframework.beans.factory.BeanNameAware
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
- Specified by:
setApplicationContext
in interfaceorg.springframework.context.ApplicationContextAware
-
getDefaultTopic
public java.lang.String getDefaultTopic()
The default topic for send methods where a topic is not provided.- Returns:
- the topic.
-
setDefaultTopic
public void setDefaultTopic(java.lang.String defaultTopic)
Set the default topic for send methods where a topic is not provided.- Parameters:
defaultTopic
- the topic.
-
setProducerListener
public void setProducerListener(@Nullable ProducerListener<K,V> producerListener)
Set aProducerListener
which will be invoked when Kafka acknowledges a send operation. By default aLoggingProducerListener
is configured which logs errors only.- Parameters:
producerListener
- the listener; may benull
.
-
getMessageConverter
public RecordMessageConverter getMessageConverter()
Return the message converter.- Returns:
- the message converter.
-
setMessageConverter
public void setMessageConverter(RecordMessageConverter messageConverter)
Set the message converter to use.- Parameters:
messageConverter
- the message converter.
-
setMessagingConverter
public void setMessagingConverter(org.springframework.messaging.converter.SmartMessageConverter messageConverter)
Set theSmartMessageConverter
to use with the defaultMessagingMessageConverter
. Not allowed when a custommessageConverter
is provided.- Parameters:
messageConverter
- the converter.- Since:
- 2.7.1
-
isTransactional
public boolean isTransactional()
Description copied from interface:KafkaOperations
Return true if the implementation supports transactions (has a transaction-capable producer factory).- Specified by:
isTransactional
in interfaceKafkaOperations<K,V>
- Returns:
- true or false.
-
getTransactionIdPrefix
public java.lang.String getTransactionIdPrefix()
-
setTransactionIdPrefix
public void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set a transaction id prefix to override the prefix in the producer factory.- Parameters:
transactionIdPrefix
- the prefix.- Since:
- 2.3
-
setCloseTimeout
public void setCloseTimeout(java.time.Duration closeTimeout)
Set the maximum time to wait when closing a producer; default 5 seconds.- Parameters:
closeTimeout
- the close timeout.- Since:
- 2.1.14
-
setAllowNonTransactional
public void setAllowNonTransactional(boolean allowNonTransactional)
Set to true to allow a non-transactional send when the template is transactional.- Parameters:
allowNonTransactional
- true to allow.- Since:
- 2.4.3
-
isAllowNonTransactional
public boolean isAllowNonTransactional()
Description copied from interface:KafkaOperations
Return true if this template, when transactional, allows non-transactional operations.- Specified by:
isAllowNonTransactional
in interfaceKafkaOperations<K,V>
- Returns:
- true to allow.
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable micrometer timers, if micrometer is on the class path.- Parameters:
micrometerEnabled
- false to disable.- Since:
- 2.5
-
setMicrometerTags
public void setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.- Parameters:
tags
- the tags.- Since:
- 2.5
-
setMicrometerTagsProvider
public void setMicrometerTagsProvider(@Nullable java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,java.util.Map<java.lang.String,java.lang.String>> micrometerTagsProvider)
Set a function to provide dynamic tags based on the producer record. These tags will be added to any static tags provided inmicrometerTags
. Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.- Parameters:
micrometerTagsProvider
- the micrometerTagsProvider.- Since:
- 2.9.8
- See Also:
setMicrometerEnabled(boolean)
,setMicrometerTags(Map)
-
getMicrometerTagsProvider
@Nullable public java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,java.util.Map<java.lang.String,java.lang.String>> getMicrometerTagsProvider()
Return the Micrometer tags provider.- Returns:
- the micrometerTagsProvider.
- Since:
- 2.9.8
-
getProducerFactory
public ProducerFactory<K,V> getProducerFactory()
Return the producer factory used by this template.- Specified by:
getProducerFactory
in interfaceKafkaOperations<K,V>
- Returns:
- the factory.
- Since:
- 2.2.5
-
getProducerFactory
protected ProducerFactory<K,V> getProducerFactory(java.lang.String topic)
Return the producer factory used by this template based on the topic. The default implementation returns the only producer factory.- Parameters:
topic
- the topic.- Returns:
- the factory.
- Since:
- 2.5
-
setConsumerFactory
public void setConsumerFactory(ConsumerFactory<K,V> consumerFactory)
Set a consumer factory for receive operations.- Parameters:
consumerFactory
- the consumer factory.- Since:
- 2.8
-
onApplicationEvent
public void onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event)
- Specified by:
onApplicationEvent
in interfaceorg.springframework.context.ApplicationListener<K>
-
sendDefault
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(@Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the default topic with no key or partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,V>
- Parameters:
data
- The data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
sendDefault
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(K key, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the default topic with the provided key and no partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,V>
- Parameters:
key
- the key.data
- The data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
sendDefault
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, K key, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the default topic with the provided key and partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,V>
- Parameters:
partition
- the partition.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
sendDefault
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> sendDefault(java.lang.Integer partition, java.lang.Long timestamp, K key, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the default topic with the provided key and partition.- Specified by:
sendDefault
in interfaceKafkaOperations<K,V>
- Parameters:
partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
send
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the provided topic with no key or partition.- Specified by:
send
in interfaceKafkaOperations<K,V>
- Parameters:
topic
- the topic.data
- The data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
send
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, K key, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the provided topic with the provided key and no partition.- Specified by:
send
in interfaceKafkaOperations<K,V>
- Parameters:
topic
- the topic.key
- the key.data
- The data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
send
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, K key, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the provided topic with the provided key and partition.- Specified by:
send
in interfaceKafkaOperations<K,V>
- Parameters:
topic
- the topic.partition
- the partition.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
send
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, @Nullable V data)
Description copied from interface:KafkaOperations
Send the data to the provided topic with the provided key and partition.- Specified by:
send
in interfaceKafkaOperations<K,V>
- Parameters:
topic
- the topic.partition
- the partition.timestamp
- the timestamp of the record.key
- the key.data
- the data.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
send
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Description copied from interface:KafkaOperations
Send the providedProducerRecord
.- Specified by:
send
in interfaceKafkaOperations<K,V>
- Parameters:
record
- the record.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaOperations.usingCompletableFuture()
-
send
public org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> send(org.springframework.messaging.Message<?> message)
Description copied from interface:KafkaOperations
Send a message with routing information in message headers. The message payload may be converted before sending.- Specified by:
send
in interfaceKafkaOperations<K,V>
- Parameters:
message
- the message to send.- Returns:
- a Future for the
SendResult
. - See Also:
KafkaHeaders.TOPIC
,KafkaHeaders.PARTITION
,KafkaHeaders.KEY
,KafkaOperations.usingCompletableFuture()
-
partitionsFor
public java.util.List<org.apache.kafka.common.PartitionInfo> partitionsFor(java.lang.String topic)
Description copied from interface:KafkaOperations
SeeProducer.partitionsFor(String)
.- Specified by:
partitionsFor
in interfaceKafkaOperations<K,V>
- Parameters:
topic
- the topic.- Returns:
- the partition info.
-
metrics
public java.util.Map<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
Description copied from interface:KafkaOperations
SeeProducer.metrics()
.- Specified by:
metrics
in interfaceKafkaOperations<K,V>
- Returns:
- the metrics.
-
execute
public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
Description copied from interface:KafkaOperations
Execute some arbitrary operation(s) on the producer and return the result.- Specified by:
execute
in interfaceKafkaOperations<K,V>
- Type Parameters:
T
- the result type.- Parameters:
callback
- the callback.- Returns:
- the result.
-
executeInTransaction
public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
Description copied from interface:KafkaOperations
Execute some arbitrary operation(s) on the operations and return the result. The operations are invoked within a local transaction and do not participate in a global transaction (if present).- Specified by:
executeInTransaction
in interfaceKafkaOperations<K,V>
- Type Parameters:
T
- the result type.- Parameters:
callback
- the callback.- Returns:
- the result.
-
flush
public void flush()
Flush the producer.Note It only makes sense to invoke this method if the
ProducerFactory
serves up a singleton producer (such as theDefaultKafkaProducerFactory
).- Specified by:
flush
in interfaceKafkaOperations<K,V>
-
sendOffsetsToTransaction
@Deprecated public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
Deprecated.Description copied from interface:KafkaOperations
When running in a transaction, send the consumer offset(s) to the transaction. The group id is obtained fromKafkaUtils.getConsumerGroupId()
. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with aKafkaAwareTransactionManager
) since the container will take care of sending the offsets to the transaction.- Specified by:
sendOffsetsToTransaction
in interfaceKafkaOperations<K,V>
- Parameters:
offsets
- The offsets.
-
sendOffsetsToTransaction
@Deprecated public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
Deprecated.Description copied from interface:KafkaOperations
When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with aKafkaAwareTransactionManager
) since the container will take care of sending the offsets to the transaction.- Specified by:
sendOffsetsToTransaction
in interfaceKafkaOperations<K,V>
- Parameters:
offsets
- The offsets.consumerGroupId
- the consumer's group.id.
-
sendOffsetsToTransaction
public void sendOffsetsToTransaction(java.util.Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
Description copied from interface:KafkaOperations
When running in a transaction, send the consumer offset(s) to the transaction. It is not necessary to call this method if the operations are invoked on a listener container thread (and the listener container is configured with aKafkaAwareTransactionManager
) since the container will take care of sending the offsets to the transaction. Use with 2.5 brokers or later.- Specified by:
sendOffsetsToTransaction
in interfaceKafkaOperations<K,V>
- Parameters:
offsets
- The offsets.groupMetadata
- the consumer group metadata.- See Also:
Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
-
receive
@Nullable public org.apache.kafka.clients.consumer.ConsumerRecord<K,V> receive(java.lang.String topic, int partition, long offset, java.time.Duration pollTimeout)
Description copied from interface:KafkaOperations
Receive a single record.- Specified by:
receive
in interfaceKafkaOperations<K,V>
- Parameters:
topic
- the topic.partition
- the partition.offset
- the offset.pollTimeout
- the timeout.- Returns:
- the record or null.
-
receive
public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> receive(java.util.Collection<TopicPartitionOffset> requested, java.time.Duration pollTimeout)
Description copied from interface:KafkaOperations
Receive multiple records. Only absolute, positive offsets are supported.- Specified by:
receive
in interfaceKafkaOperations<K,V>
- Parameters:
requested
- a collection of record requests (topic/partition/offset).pollTimeout
- the timeout.- Returns:
- the record or null.
-
closeProducer
protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inTx)
-
doSend
protected org.springframework.util.concurrent.ListenableFuture<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord)
Send the producer record.- Parameters:
producerRecord
- the producer record.- Returns:
- a Future for the
RecordMetadata
.
-
inTransaction
public boolean inTransaction()
Return true if the template is currently running in a transaction on the calling thread.- Specified by:
inTransaction
in interfaceKafkaOperations<K,V>
- Returns:
- true if a transaction is running.
- Since:
- 2.2.1
-
getTheProducer
protected org.apache.kafka.clients.producer.Producer<K,V> getTheProducer(@Nullable java.lang.String topic)
-
destroy
public void destroy()
- Specified by:
destroy
in interfaceorg.springframework.beans.factory.DisposableBean
-
-