Class DefaultKafkaProducerFactory<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
EventListener
,Aware
,BeanNameAware
,DisposableBean
,ApplicationContextAware
,ApplicationListener<ContextStoppedEvent>
,Lifecycle
,Phased
,SmartLifecycle
,ProducerFactory<K,
V>
ProducerFactory
implementation for a singleton
shared Producer
instance.
This implementation will return the same Producer
instance (if transactions are
not enabled) for the provided Map
configs
and optional Serializer
implementations on each createProducer()
invocation.
If you are using Serializer
s that have no-arg constructors and require no setup, then simplest to
specify Serializer
classes against ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
and
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
keys in the configs
passed to the
DefaultKafkaProducerFactory
constructor.
If that is not possible, but you are sure that at least one of the following is true:
- only one
Producer
will use theSerializer
s - you are using
Serializer
s that may be shared betweenProducer
instances (and specifically that their close() method is a no-op) - you are certain that there is no risk of any single
Producer
being closed while otherProducer
instances with the sameSerializer
s are in use
Serializer
instances for one or both of the key and value serializers.
If none of the above is true then you may provide a Supplier
function for one or both Serializer
s
which will be used to obtain Serializer
(s) each time a Producer
is created by the factory.
The Producer
is wrapped and the underlying KafkaProducer
instance is
not actually closed when Producer.close()
is invoked. The KafkaProducer
is physically closed when DisposableBean.destroy()
is invoked or when the
application context publishes a ContextStoppedEvent
. You can also invoke
reset()
.
Setting setTransactionIdPrefix(String)
enables transactions; in which case, a
cache of producers is maintained; closing a producer returns it to the cache. The
producers are closed and the cache is cleared when the factory is destroyed, the
application context stopped, or the reset()
method is called.
- Author:
- Gary Russell, Murali Reddy, Nakul Mishra, Artem Bilan, Chris Gilbert, Thomas Strauß, Adrian Gygax, Soby Chacko
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static class
A wrapper class for the delegate.Nested classes/interfaces inherited from interface org.springframework.kafka.core.ProducerFactory
ProducerFactory.Listener<K,
V> -
Field Summary
Fields inherited from interface org.springframework.kafka.core.ProducerFactory
DEFAULT_PHYSICAL_CLOSE_TIMEOUT, FACTORY_DOES_NOT_SUPPORT_METHOD
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorsConstructorDescriptionDefaultKafkaProducerFactory
(Map<String, Object> configs) Construct a factory with the provided configuration.DefaultKafkaProducerFactory
(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Construct a factory with the provided configuration andSerializer
Suppliers.DefaultKafkaProducerFactory
(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier, boolean configureSerializers) Construct a factory with the provided configuration andSerializer
Suppliers.DefaultKafkaProducerFactory
(Map<String, Object> configs, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Construct a factory with the provided configuration andSerializer
s.DefaultKafkaProducerFactory
(Map<String, Object> configs, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, boolean configureSerializers) Construct a factory with the provided configuration andSerializer
s. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addListener
(int index, ProducerFactory.Listener<K, V> listener) Add a listener at a specific index.void
addListener
(ProducerFactory.Listener<K, V> listener) Add a listener.void
addPostProcessor
(ProducerPostProcessor<K, V> postProcessor) Add a post processor.void
When usingsetProducerPerThread(boolean)
(true), call this method to close and release this thread's producer.copyWithConfigurationOverride
(Map<String, Object> overrideProperties) Copy properties of the instance and the given properties to create a new producer factory.Subclasses must return a raw producer which will be wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer
.Create a non-transactional producer.Create a producer which will be transactional if the factory is so configured.createProducer
(String txIdPrefixArg) Create a producer with an overridden transaction id prefix.createRawProducer
(Map<String, Object> rawConfigs) Subclasses must return a producer from thegetCache()
or a new raw producer wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer
.createTransactionalProducer
(String txIdPrefix) void
destroy()
protected BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,
V>> getCache()
protected BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,
V>> Return an unmodifiable reference to the configuration map for this factory.org.apache.kafka.common.serialization.Serializer<K>
Return the configured key serializer (if provided as an object instead of a class name in the properties).Return a supplier for a key serializer.Get the current list of listeners.int
getPhase()
Get the physical close timeout.Get the current list of post processors.Return the configuration of a producer.Return the transaction id prefix.getTxProducerConfigs
(String transactionId) Return the configuration of a transactional producer.org.apache.kafka.common.serialization.Serializer<V>
Return the configured value serializer (if provided as an object instead of a class name in the properties).Return a supplier for a value serializer.boolean
If true (default), programmatically provided serializers (via constructor or setters) will be configured using the producer configuration.boolean
Return true when there is a producer per thread.boolean
void
void
removeConfig
(String configKey) Remove the specified key from the configuration map.boolean
removeListener
(ProducerFactory.Listener<K, V> listener) Remove a listener.boolean
removePostProcessor
(ProducerPostProcessor<K, V> postProcessor) Remove a post processor.protected final boolean
removeProducer
(DefaultKafkaProducerFactory.CloseSafeProducer<K, V> producerToRemove, Duration timeout) Remove the single shared producer and a thread-bound instance if present.void
reset()
Close theProducer
(s) and clear the cache of transactionalProducer
(s).void
setApplicationContext
(ApplicationContext applicationContext) void
setBeanName
(String name) void
setConfigureSerializers
(boolean configureSerializers) Set to false (default true) to prevent programmatically provided serializers (via constructor or setters) from being configured using the producer configuration, e.g.void
setKeySerializer
(org.apache.kafka.common.serialization.Serializer<K> keySerializer) Set a key serializer.void
setKeySerializerSupplier
(Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier) Set a supplier to supply instances of the key serializer.void
Set the maximum age for a producer; useful when using transactions and the broker might expire atransactional.id
due to inactivity.void
setPhysicalCloseTimeout
(int physicalCloseTimeout) The time to wait when physically closing the producer via the factory rather than closing the producer itself (whenreset()
,#closeProducerFor(String)
, orcloseThreadBoundProducer()
are invoked).void
setProducerPerThread
(boolean producerPerThread) Set to true to create a producer per thread instead of singleton that is shared by all clients.final void
setTransactionIdPrefix
(String transactionIdPrefix) Set a prefix for theProducerConfig.TRANSACTIONAL_ID_CONFIG
config.void
setTransactionIdSuffixStrategy
(TransactionIdSuffixStrategy transactionIdSuffixStrategy) Set the transaction suffix strategy.void
setValueSerializer
(org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Set a value serializer.void
setValueSerializerSupplier
(Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Set a supplier to supply instances of the value serializer.void
start()
void
stop()
boolean
Return true if the factory supports transactions.void
updateConfigs
(Map<String, Object> updates) Update the producer configuration map; useful for situations such as credential rotation.Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplier
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecution
Methods inherited from interface org.springframework.context.SmartLifecycle
isAutoStartup, stop
-
Constructor Details
-
DefaultKafkaProducerFactory
Construct a factory with the provided configuration.- Parameters:
configs
- the configuration.
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Construct a factory with the provided configuration andSerializer
s. Also configures atransactionIdPrefix
as a value from theProducerConfig.TRANSACTIONAL_ID_CONFIG
if provided. This config is going to be overridden with a suffix for targetProducer
instance. The serializers'configure()
methods will be called with the configuration map.- Parameters:
configs
- the configuration.keySerializer
- the keySerializer
.valueSerializer
- the valueSerializer
.
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer, boolean configureSerializers) Construct a factory with the provided configuration andSerializer
s. Also configures atransactionIdPrefix
as a value from theProducerConfig.TRANSACTIONAL_ID_CONFIG
if provided. This config is going to be overridden with a suffix for targetProducer
instance. The serializers'configure()
methods will be called with the configuration map unlessconfigureSerializers
is false..- Parameters:
configs
- the configuration.keySerializer
- the keySerializer
.valueSerializer
- the valueSerializer
.configureSerializers
- set to false if serializers are already fully configured.- Since:
- 2.8.7
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Construct a factory with the provided configuration andSerializer
Suppliers. Also configures atransactionIdPrefix
as a value from theProducerConfig.TRANSACTIONAL_ID_CONFIG
if provided. This config is going to be overridden with a suffix for targetProducer
instance. When the suppliers are invoked to get an instance, the serializers'configure()
methods will be called with the configuration map.- Parameters:
configs
- the configuration.keySerializerSupplier
- the keySerializer
supplier function.valueSerializerSupplier
- the valueSerializer
supplier function.- Since:
- 2.3
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier, boolean configureSerializers) Construct a factory with the provided configuration andSerializer
Suppliers. Also configures atransactionIdPrefix
as a value from theProducerConfig.TRANSACTIONAL_ID_CONFIG
if provided. This config is going to be overridden with a suffix for targetProducer
instance. When the suppliers are invoked to get an instance, the serializers'configure()
methods will be called with the configuration map unlessconfigureSerializers
is false.- Parameters:
configs
- the configuration.keySerializerSupplier
- the keySerializer
supplier function.valueSerializerSupplier
- the valueSerializer
supplier function.configureSerializers
- set to false if serializers are already fully configured.- Since:
- 2.8.7
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-
setBeanName
- Specified by:
setBeanName
in interfaceBeanNameAware
-
setKeySerializer
public void setKeySerializer(@Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer) Set a key serializer. The serializer will be configured using the producer configuration, unlessconfigureSerializers
is false.- Parameters:
keySerializer
- the key serializer.- See Also:
-
setValueSerializer
public void setValueSerializer(@Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Set a value serializer. The serializer will be configured using the producer configuration, unlessconfigureSerializers
is false.- Parameters:
valueSerializer
- the value serializer.- See Also:
-
setKeySerializerSupplier
public void setKeySerializerSupplier(Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier) Set a supplier to supply instances of the key serializer. The serializer will be configured using the producer configuration, unlessconfigureSerializers
is false.- Parameters:
keySerializerSupplier
- the supplier.- Since:
- 2.8
- See Also:
-
setValueSerializerSupplier
public void setValueSerializerSupplier(Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Set a supplier to supply instances of the value serializer.- Parameters:
valueSerializerSupplier
- the supplier. The serializer will be configured using the producer configuration, unlessconfigureSerializers
is false.- Since:
- 2.8
- See Also:
-
setTransactionIdSuffixStrategy
Set the transaction suffix strategy.- Parameters:
transactionIdSuffixStrategy
- the strategy.- Since:
- 3.2
-
isConfigureSerializers
public boolean isConfigureSerializers()If true (default), programmatically provided serializers (via constructor or setters) will be configured using the producer configuration. Set to false if the serializers are already fully configured.- Returns:
- true to configure.
- Since:
- 2.8.7
- See Also:
-
setConfigureSerializers
public void setConfigureSerializers(boolean configureSerializers) Set to false (default true) to prevent programmatically provided serializers (via constructor or setters) from being configured using the producer configuration, e.g. if the serializers are already fully configured.- Parameters:
configureSerializers
- false to not configure.- Since:
- 2.8.7
- See Also:
-
setPhysicalCloseTimeout
public void setPhysicalCloseTimeout(int physicalCloseTimeout) The time to wait when physically closing the producer via the factory rather than closing the producer itself (whenreset()
,#closeProducerFor(String)
, orcloseThreadBoundProducer()
are invoked). Specified in seconds; defaultProducerFactory.DEFAULT_PHYSICAL_CLOSE_TIMEOUT
.- Parameters:
physicalCloseTimeout
- the timeout in seconds.- Since:
- 1.0.7
-
getPhysicalCloseTimeout
Get the physical close timeout.- Specified by:
getPhysicalCloseTimeout
in interfaceProducerFactory<K,
V> - Returns:
- the timeout.
- Since:
- 2.5
-
setTransactionIdPrefix
Set a prefix for theProducerConfig.TRANSACTIONAL_ID_CONFIG
config. By default, aProducerConfig.TRANSACTIONAL_ID_CONFIG
value from configs is used as a prefix in the target producer configs.- Parameters:
transactionIdPrefix
- the prefix.- Since:
- 1.3
-
getTransactionIdPrefix
Description copied from interface:ProducerFactory
Return the transaction id prefix.- Specified by:
getTransactionIdPrefix
in interfaceProducerFactory<K,
V> - Returns:
- the prefix or null if not configured.
-
setProducerPerThread
public void setProducerPerThread(boolean producerPerThread) Set to true to create a producer per thread instead of singleton that is shared by all clients. Clients must callcloseThreadBoundProducer()
to physically close the producer when it is no longer needed. These producers will not be closed bydestroy()
orreset()
.- Parameters:
producerPerThread
- true for a producer per thread.- Since:
- 2.3
- See Also:
-
isProducerPerThread
public boolean isProducerPerThread()Description copied from interface:ProducerFactory
Return true when there is a producer per thread.- Specified by:
isProducerPerThread
in interfaceProducerFactory<K,
V> - Returns:
- the producer per thread.
-
getKeySerializer
Description copied from interface:ProducerFactory
Return the configured key serializer (if provided as an object instead of a class name in the properties).- Specified by:
getKeySerializer
in interfaceProducerFactory<K,
V> - Returns:
- the serializer.
-
getValueSerializer
Description copied from interface:ProducerFactory
Return the configured value serializer (if provided as an object instead of a class name in the properties).- Specified by:
getValueSerializer
in interfaceProducerFactory<K,
V> - Returns:
- the serializer.
-
getKeySerializerSupplier
Description copied from interface:ProducerFactory
Return a supplier for a key serializer. Useful for cloning to make a similar factory.- Specified by:
getKeySerializerSupplier
in interfaceProducerFactory<K,
V> - Returns:
- the supplier.
-
getValueSerializerSupplier
Description copied from interface:ProducerFactory
Return a supplier for a value serializer. Useful for cloning to make a similar factory.- Specified by:
getValueSerializerSupplier
in interfaceProducerFactory<K,
V> - Returns:
- the supplier.
-
getConfigurationProperties
Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.- Specified by:
getConfigurationProperties
in interfaceProducerFactory<K,
V> - Returns:
- the configs.
- Since:
- 1.3
-
getListeners
Get the current list of listeners.- Specified by:
getListeners
in interfaceProducerFactory<K,
V> - Returns:
- the listeners.
- Since:
- 2.5
-
getPostProcessors
Description copied from interface:ProducerFactory
Get the current list of post processors.- Specified by:
getPostProcessors
in interfaceProducerFactory<K,
V> - Returns:
- the post processors.
-
setMaxAge
Set the maximum age for a producer; useful when using transactions and the broker might expire atransactional.id
due to inactivity.- Parameters:
maxAge
- the maxAge to set- Since:
- 2.5.8
-
start
public void start() -
stop
public void stop() -
isRunning
public boolean isRunning() -
getPhase
public int getPhase()- Specified by:
getPhase
in interfacePhased
- Specified by:
getPhase
in interfaceSmartLifecycle
-
copyWithConfigurationOverride
Copy properties of the instance and the given properties to create a new producer factory.If the
DefaultKafkaProducerFactory
makes a copy of itself, the transaction id prefix is recovered from the properties. If you want to change the ID config, add a newProducerConfig.TRANSACTIONAL_ID_CONFIG
key to the override config.- Specified by:
copyWithConfigurationOverride
in interfaceProducerFactory<K,
V> - Parameters:
overrideProperties
- the properties to be applied to the new factory- Returns:
DefaultKafkaProducerFactory
with properties applied- See Also:
-
addListener
Add a listener.- Specified by:
addListener
in interfaceProducerFactory<K,
V> - Parameters:
listener
- the listener.- Since:
- 2.5
-
addListener
Add a listener at a specific index.- Specified by:
addListener
in interfaceProducerFactory<K,
V> - Parameters:
index
- the index (list position).listener
- the listener.- Since:
- 2.5
-
removeListener
Remove a listener.- Specified by:
removeListener
in interfaceProducerFactory<K,
V> - Parameters:
listener
- the listener.- Returns:
- true if removed.
- Since:
- 2.5
-
addPostProcessor
Description copied from interface:ProducerFactory
Add a post processor.- Specified by:
addPostProcessor
in interfaceProducerFactory<K,
V> - Parameters:
postProcessor
- the post processor.
-
removePostProcessor
Description copied from interface:ProducerFactory
Remove a post processor.- Specified by:
removePostProcessor
in interfaceProducerFactory<K,
V> - Parameters:
postProcessor
- the post processor.- Returns:
- true if removed.
-
updateConfigs
Description copied from interface:ProducerFactory
Update the producer configuration map; useful for situations such as credential rotation.- Specified by:
updateConfigs
in interfaceProducerFactory<K,
V> - Parameters:
updates
- the configuration properties to update.
-
removeConfig
Description copied from interface:ProducerFactory
Remove the specified key from the configuration map.- Specified by:
removeConfig
in interfaceProducerFactory<K,
V> - Parameters:
configKey
- the key to remove.
-
transactionCapable
public boolean transactionCapable()Description copied from interface:ProducerFactory
Return true if the factory supports transactions.- Specified by:
transactionCapable
in interfaceProducerFactory<K,
V> - Returns:
- true if transactional.
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
-
onApplicationEvent
- Specified by:
onApplicationEvent
in interfaceApplicationListener<K>
-
reset
public void reset()Close theProducer
(s) and clear the cache of transactionalProducer
(s).- Specified by:
reset
in interfaceProducerFactory<K,
V> - Since:
- 2.2
-
createProducer
Description copied from interface:ProducerFactory
Create a producer which will be transactional if the factory is so configured.- Specified by:
createProducer
in interfaceProducerFactory<K,
V> - Returns:
- the producer.
- See Also:
-
createProducer
public org.apache.kafka.clients.producer.Producer<K,V> createProducer(@Nullable String txIdPrefixArg) Description copied from interface:ProducerFactory
Create a producer with an overridden transaction id prefix.- Specified by:
createProducer
in interfaceProducerFactory<K,
V> - Parameters:
txIdPrefixArg
- the transaction id prefix.- Returns:
- the producer.
-
createNonTransactionalProducer
Description copied from interface:ProducerFactory
Create a non-transactional producer.- Specified by:
createNonTransactionalProducer
in interfaceProducerFactory<K,
V> - Returns:
- the producer.
- See Also:
-
createKafkaProducer
Subclasses must return a raw producer which will be wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer
.- Returns:
- the producer.
-
removeProducer
protected final boolean removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K, V> producerToRemove, Duration timeout) Remove the single shared producer and a thread-bound instance if present.- Parameters:
producerToRemove
- the producer.timeout
- the close timeout.- Returns:
- true if the producer was closed.
- Since:
- 2.2.13
-
createTransactionalProducer
Subclasses must return a producer from thegetCache()
or a new raw producer wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer
.- Returns:
- the producer - cannot be null.
- Since:
- 1.3
-
createTransactionalProducer
-
createRawProducer
-
getCache
-
getCache
@Nullable protected BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache(@Nullable String txIdPrefix) -
closeThreadBoundProducer
public void closeThreadBoundProducer()When usingsetProducerPerThread(boolean)
(true), call this method to close and release this thread's producer. Thread bound producers are not closed bydestroy()
orreset()
methods.- Specified by:
closeThreadBoundProducer
in interfaceProducerFactory<K,
V> - Since:
- 2.3
- See Also:
-
getProducerConfigs
Return the configuration of a producer.- Returns:
- the configuration of a producer.
- Since:
- 2.8.3
- See Also:
-
getTxProducerConfigs
Return the configuration of a transactional producer.- Parameters:
transactionId
- the transactionId.- Returns:
- the configuration of a transactional producer.
- Since:
- 2.8.3
- See Also:
-
doCreateTxProducer(String, String, BiPredicate)
-