Class DefaultKafkaProducerFactory<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
EventListener,Aware,BeanNameAware,DisposableBean,ApplicationContextAware,ApplicationListener<ContextStoppedEvent>,Lifecycle,Phased,SmartLifecycle,ProducerFactory<K,V>
ProducerFactory implementation for a singleton shared Producer instance.
This implementation will return the same Producer instance (if transactions are
not enabled) for the provided Map configs and optional Serializer
implementations on each createProducer() invocation.
If you are using Serializers that have no-arg constructors and require no setup, then simplest to
specify Serializer classes against ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG keys in the configs passed to the
DefaultKafkaProducerFactory constructor.
If that is not possible, but you are sure that at least one of the following is true:
- only one
Producerwill use theSerializers - you are using
Serializers that may be shared betweenProducerinstances (and specifically that their close() method is a no-op) - you are certain that there is no risk of any single
Producerbeing closed while otherProducerinstances with the sameSerializers are in use
Serializer instances for one or both of the key and value serializers.
If none of the above is true then you may provide a Supplier function for one or both Serializers
which will be used to obtain Serializer(s) each time a Producer is created by the factory.
The Producer is wrapped and the underlying KafkaProducer instance is
not actually closed when Producer.close() is invoked. The KafkaProducer
is physically closed when DisposableBean.destroy() is invoked or when the
application context publishes a ContextStoppedEvent. You can also invoke
reset().
Setting setTransactionIdPrefix(String) enables transactions; in which case, a
cache of producers is maintained; closing a producer returns it to the cache. The
producers are closed and the cache is cleared when the factory is destroyed, the
application context stopped, or the reset() method is called.
- Author:
- Gary Russell, Murali Reddy, Nakul Mishra, Artem Bilan, Chris Gilbert, Thomas Strauß, Adrian Gygax, Soby Chacko
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classA wrapper class for the delegate.Nested classes/interfaces inherited from interface org.springframework.kafka.core.ProducerFactory
ProducerFactory.Listener<K,V> -
Field Summary
Fields inherited from interface org.springframework.kafka.core.ProducerFactory
DEFAULT_PHYSICAL_CLOSE_TIMEOUT, FACTORY_DOES_NOT_SUPPORT_METHODFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
ConstructorsConstructorDescriptionDefaultKafkaProducerFactory(Map<String, Object> configs) Construct a factory with the provided configuration.DefaultKafkaProducerFactory(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Construct a factory with the provided configuration andSerializerSuppliers.DefaultKafkaProducerFactory(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier, boolean configureSerializers) Construct a factory with the provided configuration andSerializerSuppliers.DefaultKafkaProducerFactory(Map<String, Object> configs, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Construct a factory with the provided configuration andSerializers.DefaultKafkaProducerFactory(Map<String, Object> configs, org.apache.kafka.common.serialization.Serializer<K> keySerializer, org.apache.kafka.common.serialization.Serializer<V> valueSerializer, boolean configureSerializers) Construct a factory with the provided configuration andSerializers. -
Method Summary
Modifier and TypeMethodDescriptionvoidaddListener(int index, ProducerFactory.Listener<K, V> listener) Add a listener at a specific index.voidaddListener(ProducerFactory.Listener<K, V> listener) Add a listener.voidaddPostProcessor(ProducerPostProcessor<K, V> postProcessor) Add a post processor.voidWhen usingsetProducerPerThread(boolean)(true), call this method to close and release this thread's producer.copyWithConfigurationOverride(Map<String, Object> overrideProperties) Copy properties of the instance and the given properties to create a new producer factory.Subclasses must return a raw producer which will be wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer.Create a non-transactional producer.Create a producer which will be transactional if the factory is so configured.createProducer(String txIdPrefixArg) Create a producer with an overridden transaction id prefix.createRawProducer(Map<String, Object> rawConfigs) Subclasses must return a producer from thegetCache()or a new raw producer wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer.createTransactionalProducer(String txIdPrefix) voiddestroy()protected BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K, V>> getCache()protected BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K, V>> Return an unmodifiable reference to the configuration map for this factory.org.apache.kafka.common.serialization.Serializer<K> Return the configured key serializer (if provided as an object instead of a class name in the properties).Return a supplier for a key serializer.Get the current list of listeners.intgetPhase()Get the physical close timeout.Get the current list of post processors.Return the configuration of a producer.Return the transaction id prefix.getTxProducerConfigs(String transactionId) Return the configuration of a transactional producer.org.apache.kafka.common.serialization.Serializer<V> Return the configured value serializer (if provided as an object instead of a class name in the properties).Return a supplier for a value serializer.booleanIf true (default), programmatically provided serializers (via constructor or setters) will be configured using the producer configuration.booleanReturn true when there is a producer per thread.booleanvoidvoidremoveConfig(String configKey) Remove the specified key from the configuration map.booleanremoveListener(ProducerFactory.Listener<K, V> listener) Remove a listener.booleanremovePostProcessor(ProducerPostProcessor<K, V> postProcessor) Remove a post processor.protected final booleanremoveProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K, V> producerToRemove, Duration timeout) Remove the single shared producer and a thread-bound instance if present.voidreset()Close theProducer(s) and clear the cache of transactionalProducer(s).voidsetApplicationContext(ApplicationContext applicationContext) voidsetBeanName(String name) voidsetConfigureSerializers(boolean configureSerializers) Set to false (default true) to prevent programmatically provided serializers (via constructor or setters) from being configured using the producer configuration, e.g.voidsetKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer) Set a key serializer.voidsetKeySerializerSupplier(Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier) Set a supplier to supply instances of the key serializer.voidSet the maximum age for a producer; useful when using transactions and the broker might expire atransactional.iddue to inactivity.voidsetPhysicalCloseTimeout(int physicalCloseTimeout) The time to wait when physically closing the producer via the factory rather than closing the producer itself (whenreset(),#closeProducerFor(String), orcloseThreadBoundProducer()are invoked).voidsetProducerPerThread(boolean producerPerThread) Set to true to create a producer per thread instead of singleton that is shared by all clients.final voidsetTransactionIdPrefix(String transactionIdPrefix) Set a prefix for theProducerConfig.TRANSACTIONAL_ID_CONFIGconfig.voidsetTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy) Set the transaction suffix strategy.voidsetValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Set a value serializer.voidsetValueSerializerSupplier(Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Set a supplier to supply instances of the value serializer.voidstart()voidstop()booleanReturn true if the factory supports transactions.voidupdateConfigs(Map<String, Object> updates) Update the producer configuration map; useful for situations such as credential rotation.Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplierMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecutionMethods inherited from interface org.springframework.context.SmartLifecycle
isAutoStartup, stop
-
Constructor Details
-
DefaultKafkaProducerFactory
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Construct a factory with the provided configuration andSerializers. Also configures atransactionIdPrefixas a value from theProducerConfig.TRANSACTIONAL_ID_CONFIGif provided. This config is going to be overridden with a suffix for targetProducerinstance. The serializers'configure()methods will be called with the configuration map.- Parameters:
configs- the configuration.keySerializer- the keySerializer.valueSerializer- the valueSerializer.
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer, boolean configureSerializers) Construct a factory with the provided configuration andSerializers. Also configures atransactionIdPrefixas a value from theProducerConfig.TRANSACTIONAL_ID_CONFIGif provided. This config is going to be overridden with a suffix for targetProducerinstance. The serializers'configure()methods will be called with the configuration map unlessconfigureSerializersis false..- Parameters:
configs- the configuration.keySerializer- the keySerializer.valueSerializer- the valueSerializer.configureSerializers- set to false if serializers are already fully configured.- Since:
- 2.8.7
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Construct a factory with the provided configuration andSerializerSuppliers. Also configures atransactionIdPrefixas a value from theProducerConfig.TRANSACTIONAL_ID_CONFIGif provided. This config is going to be overridden with a suffix for targetProducerinstance. When the suppliers are invoked to get an instance, the serializers'configure()methods will be called with the configuration map.- Parameters:
configs- the configuration.keySerializerSupplier- the keySerializersupplier function.valueSerializerSupplier- the valueSerializersupplier function.- Since:
- 2.3
-
DefaultKafkaProducerFactory
public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier, boolean configureSerializers) Construct a factory with the provided configuration andSerializerSuppliers. Also configures atransactionIdPrefixas a value from theProducerConfig.TRANSACTIONAL_ID_CONFIGif provided. This config is going to be overridden with a suffix for targetProducerinstance. When the suppliers are invoked to get an instance, the serializers'configure()methods will be called with the configuration map unlessconfigureSerializersis false.- Parameters:
configs- the configuration.keySerializerSupplier- the keySerializersupplier function.valueSerializerSupplier- the valueSerializersupplier function.configureSerializers- set to false if serializers are already fully configured.- Since:
- 2.8.7
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContextin interfaceApplicationContextAware- Throws:
BeansException
-
setBeanName
- Specified by:
setBeanNamein interfaceBeanNameAware
-
setKeySerializer
public void setKeySerializer(@Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer) Set a key serializer. The serializer will be configured using the producer configuration, unlessconfigureSerializersis false.- Parameters:
keySerializer- the key serializer.- See Also:
-
setValueSerializer
public void setValueSerializer(@Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer) Set a value serializer. The serializer will be configured using the producer configuration, unlessconfigureSerializersis false.- Parameters:
valueSerializer- the value serializer.- See Also:
-
setKeySerializerSupplier
public void setKeySerializerSupplier(Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier) Set a supplier to supply instances of the key serializer. The serializer will be configured using the producer configuration, unlessconfigureSerializersis false.- Parameters:
keySerializerSupplier- the supplier.- Since:
- 2.8
- See Also:
-
setValueSerializerSupplier
public void setValueSerializerSupplier(Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier) Set a supplier to supply instances of the value serializer.- Parameters:
valueSerializerSupplier- the supplier. The serializer will be configured using the producer configuration, unlessconfigureSerializersis false.- Since:
- 2.8
- See Also:
-
setTransactionIdSuffixStrategy
Set the transaction suffix strategy.- Parameters:
transactionIdSuffixStrategy- the strategy.- Since:
- 3.2
-
isConfigureSerializers
public boolean isConfigureSerializers()If true (default), programmatically provided serializers (via constructor or setters) will be configured using the producer configuration. Set to false if the serializers are already fully configured.- Returns:
- true to configure.
- Since:
- 2.8.7
- See Also:
-
setConfigureSerializers
public void setConfigureSerializers(boolean configureSerializers) Set to false (default true) to prevent programmatically provided serializers (via constructor or setters) from being configured using the producer configuration, e.g. if the serializers are already fully configured.- Parameters:
configureSerializers- false to not configure.- Since:
- 2.8.7
- See Also:
-
setPhysicalCloseTimeout
public void setPhysicalCloseTimeout(int physicalCloseTimeout) The time to wait when physically closing the producer via the factory rather than closing the producer itself (whenreset(),#closeProducerFor(String), orcloseThreadBoundProducer()are invoked). Specified in seconds; defaultProducerFactory.DEFAULT_PHYSICAL_CLOSE_TIMEOUT.- Parameters:
physicalCloseTimeout- the timeout in seconds.- Since:
- 1.0.7
-
getPhysicalCloseTimeout
Get the physical close timeout.- Specified by:
getPhysicalCloseTimeoutin interfaceProducerFactory<K,V> - Returns:
- the timeout.
- Since:
- 2.5
-
setTransactionIdPrefix
Set a prefix for theProducerConfig.TRANSACTIONAL_ID_CONFIGconfig. By default, aProducerConfig.TRANSACTIONAL_ID_CONFIGvalue from configs is used as a prefix in the target producer configs.- Parameters:
transactionIdPrefix- the prefix.- Since:
- 1.3
-
getTransactionIdPrefix
Description copied from interface:ProducerFactoryReturn the transaction id prefix.- Specified by:
getTransactionIdPrefixin interfaceProducerFactory<K,V> - Returns:
- the prefix or null if not configured.
-
setProducerPerThread
public void setProducerPerThread(boolean producerPerThread) Set to true to create a producer per thread instead of singleton that is shared by all clients. Clients must callcloseThreadBoundProducer()to physically close the producer when it is no longer needed. These producers will not be closed bydestroy()orreset().- Parameters:
producerPerThread- true for a producer per thread.- Since:
- 2.3
- See Also:
-
isProducerPerThread
public boolean isProducerPerThread()Description copied from interface:ProducerFactoryReturn true when there is a producer per thread.- Specified by:
isProducerPerThreadin interfaceProducerFactory<K,V> - Returns:
- the producer per thread.
-
getKeySerializer
Description copied from interface:ProducerFactoryReturn the configured key serializer (if provided as an object instead of a class name in the properties).- Specified by:
getKeySerializerin interfaceProducerFactory<K,V> - Returns:
- the serializer.
-
getValueSerializer
Description copied from interface:ProducerFactoryReturn the configured value serializer (if provided as an object instead of a class name in the properties).- Specified by:
getValueSerializerin interfaceProducerFactory<K,V> - Returns:
- the serializer.
-
getKeySerializerSupplier
Description copied from interface:ProducerFactoryReturn a supplier for a key serializer. Useful for cloning to make a similar factory.- Specified by:
getKeySerializerSupplierin interfaceProducerFactory<K,V> - Returns:
- the supplier.
-
getValueSerializerSupplier
Description copied from interface:ProducerFactoryReturn a supplier for a value serializer. Useful for cloning to make a similar factory.- Specified by:
getValueSerializerSupplierin interfaceProducerFactory<K,V> - Returns:
- the supplier.
-
getConfigurationProperties
Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.- Specified by:
getConfigurationPropertiesin interfaceProducerFactory<K,V> - Returns:
- the configs.
- Since:
- 1.3
-
getListeners
Get the current list of listeners.- Specified by:
getListenersin interfaceProducerFactory<K,V> - Returns:
- the listeners.
- Since:
- 2.5
-
getPostProcessors
Description copied from interface:ProducerFactoryGet the current list of post processors.- Specified by:
getPostProcessorsin interfaceProducerFactory<K,V> - Returns:
- the post processors.
-
setMaxAge
Set the maximum age for a producer; useful when using transactions and the broker might expire atransactional.iddue to inactivity.- Parameters:
maxAge- the maxAge to set- Since:
- 2.5.8
-
start
-
stop
-
isRunning
-
getPhase
public int getPhase()- Specified by:
getPhasein interfacePhased- Specified by:
getPhasein interfaceSmartLifecycle
-
copyWithConfigurationOverride
Copy properties of the instance and the given properties to create a new producer factory.If the
DefaultKafkaProducerFactorymakes a copy of itself, the transaction id prefix is recovered from the properties. If you want to change the ID config, add a newProducerConfig.TRANSACTIONAL_ID_CONFIGkey to the override config.- Specified by:
copyWithConfigurationOverridein interfaceProducerFactory<K,V> - Parameters:
overrideProperties- the properties to be applied to the new factory- Returns:
DefaultKafkaProducerFactorywith properties applied- See Also:
-
addListener
Add a listener.- Specified by:
addListenerin interfaceProducerFactory<K,V> - Parameters:
listener- the listener.- Since:
- 2.5
-
addListener
Add a listener at a specific index.- Specified by:
addListenerin interfaceProducerFactory<K,V> - Parameters:
index- the index (list position).listener- the listener.- Since:
- 2.5
-
removeListener
Remove a listener.- Specified by:
removeListenerin interfaceProducerFactory<K,V> - Parameters:
listener- the listener.- Returns:
- true if removed.
- Since:
- 2.5
-
addPostProcessor
Description copied from interface:ProducerFactoryAdd a post processor.- Specified by:
addPostProcessorin interfaceProducerFactory<K,V> - Parameters:
postProcessor- the post processor.
-
removePostProcessor
Description copied from interface:ProducerFactoryRemove a post processor.- Specified by:
removePostProcessorin interfaceProducerFactory<K,V> - Parameters:
postProcessor- the post processor.- Returns:
- true if removed.
-
updateConfigs
Description copied from interface:ProducerFactoryUpdate the producer configuration map; useful for situations such as credential rotation.- Specified by:
updateConfigsin interfaceProducerFactory<K,V> - Parameters:
updates- the configuration properties to update.
-
removeConfig
Description copied from interface:ProducerFactoryRemove the specified key from the configuration map.- Specified by:
removeConfigin interfaceProducerFactory<K,V> - Parameters:
configKey- the key to remove.
-
transactionCapable
public boolean transactionCapable()Description copied from interface:ProducerFactoryReturn true if the factory supports transactions.- Specified by:
transactionCapablein interfaceProducerFactory<K,V> - Returns:
- true if transactional.
-
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean
-
onApplicationEvent
- Specified by:
onApplicationEventin interfaceApplicationListener<K>
-
reset
public void reset()Close theProducer(s) and clear the cache of transactionalProducer(s).- Specified by:
resetin interfaceProducerFactory<K,V> - Since:
- 2.2
-
createProducer
Description copied from interface:ProducerFactoryCreate a producer which will be transactional if the factory is so configured.- Specified by:
createProducerin interfaceProducerFactory<K,V> - Returns:
- the producer.
- See Also:
-
createProducer
public org.apache.kafka.clients.producer.Producer<K,V> createProducer(@Nullable String txIdPrefixArg) Description copied from interface:ProducerFactoryCreate a producer with an overridden transaction id prefix.- Specified by:
createProducerin interfaceProducerFactory<K,V> - Parameters:
txIdPrefixArg- the transaction id prefix.- Returns:
- the producer.
-
createNonTransactionalProducer
Description copied from interface:ProducerFactoryCreate a non-transactional producer.- Specified by:
createNonTransactionalProducerin interfaceProducerFactory<K,V> - Returns:
- the producer.
- See Also:
-
createKafkaProducer
Subclasses must return a raw producer which will be wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer.- Returns:
- the producer.
-
removeProducer
protected final boolean removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K, V> producerToRemove, Duration timeout) Remove the single shared producer and a thread-bound instance if present.- Parameters:
producerToRemove- the producer.timeout- the close timeout.- Returns:
- true if the producer was closed.
- Since:
- 2.2.13
-
createTransactionalProducer
Subclasses must return a producer from thegetCache()or a new raw producer wrapped in aDefaultKafkaProducerFactory.CloseSafeProducer.- Returns:
- the producer - cannot be null.
- Since:
- 1.3
-
createTransactionalProducer
-
createRawProducer
-
getCache
-
getCache
@Nullable protected BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache(@Nullable String txIdPrefix) -
closeThreadBoundProducer
public void closeThreadBoundProducer()When usingsetProducerPerThread(boolean)(true), call this method to close and release this thread's producer. Thread bound producers are not closed bydestroy()orreset()methods.- Specified by:
closeThreadBoundProducerin interfaceProducerFactory<K,V> - Since:
- 2.3
- See Also:
-
getProducerConfigs
-
getTxProducerConfigs
-