K
- the key type.V
- the value type.public class DefaultKafkaProducerFactory<K,V> extends java.lang.Object implements ProducerFactory<K,V>, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
ProducerFactory
implementation for a singleton
shared Producer
instance.
This implementation will return the same Producer
instance (if transactions are
not enabled) for the provided Map
configs
and optional Serializer
implementations on each createProducer()
invocation.
If you are using Serializer
s that have no-arg constructors and require no setup, then simplest to
specify Serializer
classes against ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
and
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
keys in the configs
passed to the
DefaultKafkaProducerFactory
constructor.
If that is not possible, but you are sure that at least one of the following is true:
Producer
will use the Serializer
sSerializer
s that may be shared between Producer
instances (and specifically
that their close() method is a no-op)Producer
being closed while other
Producer
instances with the same Serializer
s are in useSerializer
instances for one or both of the key and value serializers.
If none of the above is true then you may provide a Supplier
function for one or both Serializer
s
which will be used to obtain Serializer
(s) each time a Producer
is created by the factory.
The Producer
is wrapped and the underlying KafkaProducer
instance is
not actually closed when Producer.close()
is invoked. The KafkaProducer
is physically closed when DisposableBean.destroy()
is invoked or when the
application context publishes a ContextStoppedEvent
. You can also invoke
reset()
.
Setting setTransactionIdPrefix(String)
enables transactions; in which case, a
cache of producers is maintained; closing a producer returns it to the cache. The
producers are closed and the cache is cleared when the factory is destroyed, the
application context stopped, or the reset()
method is called.
Modifier and Type | Class and Description |
---|---|
protected static class |
DefaultKafkaProducerFactory.CloseSafeProducer<K,V>
A wrapper class for the delegate.
|
Modifier and Type | Field and Description |
---|---|
static java.time.Duration |
DEFAULT_PHYSICAL_CLOSE_TIMEOUT
The default close timeout duration as 30 seconds.
|
Constructor and Description |
---|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.
|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Construct a factory with the provided configuration and
Serializer s. |
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier,
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
Construct a factory with the provided configuration and
Serializer Suppliers. |
Modifier and Type | Method and Description |
---|---|
void |
closeProducerFor(java.lang.String suffix)
Remove the specified producer from the cache and close it.
|
void |
closeThreadBoundProducer()
When using
setProducerPerThread(boolean) (true), call this method to close
and release this thread's producer. |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createKafkaProducer()
Subclasses must return a raw producer which will be wrapped in a
DefaultKafkaProducerFactory.CloseSafeProducer . |
org.apache.kafka.clients.producer.Producer<K,V> |
createProducer()
Create a producer.
|
org.apache.kafka.clients.producer.Producer<K,V> |
createProducer(java.lang.String txIdPrefixArg)
Create a producer with an overridden transaction id prefix.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducer()
Subclasses must return a producer from the
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer . |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducer(java.lang.String txIdPrefix) |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducerForPartition() |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducerForPartition(java.lang.String txIdPrefix) |
void |
destroy() |
protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> |
getCache() |
protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> |
getCache(java.lang.String txIdPrefix) |
java.util.Map<java.lang.String,java.lang.Object> |
getConfigurationProperties()
Return an unmodifiable reference to the configuration map for this factory.
|
protected java.lang.String |
getTransactionIdPrefix() |
boolean |
isProducerPerConsumerPartition()
Return the producerPerConsumerPartition.
|
boolean |
isRunning()
Deprecated.
Lifecycle is no longer implemented. |
void |
onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event) |
void |
reset()
Close the
Producer (s) and clear the cache of transactional
Producer (s). |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer) |
void |
setPhysicalCloseTimeout(int physicalCloseTimeout)
The time to wait when physically closing the producer via the factory rather than
closing the producer itself (when
reset() , #closeProducerFor(String) , or closeThreadBoundProducer() are invoked). |
void |
setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
Set to false to revert to the previous behavior of a simple incrementing
transactional.id suffix for each producer instead of maintaining a producer
for each group/topic/partition.
|
void |
setProducerPerThread(boolean producerPerThread)
Set to true to create a producer per thread instead of singleton that is shared by
all clients.
|
void |
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set a prefix for the
ProducerConfig.TRANSACTIONAL_ID_CONFIG config. |
void |
setValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer) |
boolean |
transactionCapable()
Return true if the factory supports transactions.
|
public static final java.time.Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT
public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
configs
- the configuration.public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Serializer
s.
Also configures a transactionIdPrefix
as a value from the
ProducerConfig.TRANSACTIONAL_ID_CONFIG
if provided.
This config is going to be overridden with a suffix for target Producer
instance.configs
- the configuration.keySerializer
- the key Serializer
.valueSerializer
- the value Serializer
.public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
Serializer
Suppliers.
Also configures a transactionIdPrefix
as a value from the
ProducerConfig.TRANSACTIONAL_ID_CONFIG
if provided.
This config is going to be overridden with a suffix for target Producer
instance.configs
- the configuration.keySerializerSupplier
- the key Serializer
supplier function.valueSerializerSupplier
- the value Serializer
supplier function.public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
org.springframework.beans.BeansException
public void setKeySerializer(@Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer)
public void setValueSerializer(@Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
public void setPhysicalCloseTimeout(int physicalCloseTimeout)
reset()
, #closeProducerFor(String)
, or closeThreadBoundProducer()
are invoked).
Specified in seconds; default DEFAULT_PHYSICAL_CLOSE_TIMEOUT
.physicalCloseTimeout
- the timeout in seconds.public final void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
ProducerConfig.TRANSACTIONAL_ID_CONFIG
config. By
default a ProducerConfig.TRANSACTIONAL_ID_CONFIG
value from configs is used
as a prefix in the target producer configs.transactionIdPrefix
- the prefix.protected java.lang.String getTransactionIdPrefix()
public void setProducerPerThread(boolean producerPerThread)
closeThreadBoundProducer()
to
physically close the producer when it is no longer needed. These producers will not
be closed by destroy()
or reset()
.producerPerThread
- true for a producer per thread.closeThreadBoundProducer()
public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
producerPerConsumerPartition
- false to revert.public boolean isProducerPerConsumerPartition()
isProducerPerConsumerPartition
in interface ProducerFactory<K,V>
public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
public boolean transactionCapable()
ProducerFactory
transactionCapable
in interface ProducerFactory<K,V>
public void destroy()
destroy
in interface org.springframework.beans.factory.DisposableBean
public void onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event)
onApplicationEvent
in interface org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>
public void reset()
Producer
(s) and clear the cache of transactional
Producer
(s).@Deprecated public boolean isRunning()
Lifecycle
is no longer implemented.public org.apache.kafka.clients.producer.Producer<K,V> createProducer()
ProducerFactory
createProducer
in interface ProducerFactory<K,V>
public org.apache.kafka.clients.producer.Producer<K,V> createProducer(@Nullable java.lang.String txIdPrefixArg)
ProducerFactory
createProducer
in interface ProducerFactory<K,V>
txIdPrefixArg
- the transaction id prefix.protected org.apache.kafka.clients.producer.Producer<K,V> createKafkaProducer()
DefaultKafkaProducerFactory.CloseSafeProducer
.protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducerForPartition()
protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducerForPartition(java.lang.String txIdPrefix)
protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer()
getCache()
or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer
.protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer(java.lang.String txIdPrefix)
@Nullable protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache()
@Nullable protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache(java.lang.String txIdPrefix)
public void closeProducerFor(java.lang.String suffix)
ProducerFactory
closeProducerFor
in interface ProducerFactory<K,V>
suffix
- the producer's transaction id suffix.public void closeThreadBoundProducer()
setProducerPerThread(boolean)
(true), call this method to close
and release this thread's producer. Thread bound producers are not closed by
destroy()
or reset()
methods.closeThreadBoundProducer
in interface ProducerFactory<K,V>
setProducerPerThread(boolean)