Class DefaultKafkaProducerFactory<K,V>

java.lang.Object
org.springframework.kafka.core.KafkaResourceFactory
org.springframework.kafka.core.DefaultKafkaProducerFactory<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
EventListener, Aware, BeanNameAware, DisposableBean, ApplicationContextAware, ApplicationListener<ContextStoppedEvent>, Lifecycle, Phased, SmartLifecycle, ProducerFactory<K,V>

The ProducerFactory implementation for a singleton shared Producer instance.

This implementation will return the same Producer instance (if transactions are not enabled) for the provided Map configs and optional Serializer implementations on each createProducer() invocation.

If you are using Serializers that have no-arg constructors and require no setup, then simplest to specify Serializer classes against ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG keys in the configs passed to the DefaultKafkaProducerFactory constructor.

If that is not possible, but you are sure that at least one of the following is true:

  • only one Producer will use the Serializers
  • you are using Serializers that may be shared between Producer instances (and specifically that their close() method is a no-op)
  • you are certain that there is no risk of any single Producer being closed while other Producer instances with the same Serializers are in use
then you can pass in Serializer instances for one or both of the key and value serializers.

If none of the above is true then you may provide a Supplier function for one or both Serializers which will be used to obtain Serializer(s) each time a Producer is created by the factory.

The Producer is wrapped and the underlying KafkaProducer instance is not actually closed when Producer.close() is invoked. The KafkaProducer is physically closed when DisposableBean.destroy() is invoked or when the application context publishes a ContextStoppedEvent. You can also invoke reset().

Setting setTransactionIdPrefix(String) enables transactions; in which case, a cache of producers is maintained; closing a producer returns it to the cache. The producers are closed and the cache is cleared when the factory is destroyed, the application context stopped, or the reset() method is called.

Author:
Gary Russell, Murali Reddy, Nakul Mishra, Artem Bilan, Chris Gilbert, Thomas Strauß
  • Constructor Details

    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(Map<String,Object> configs)
      Construct a factory with the provided configuration.
      Parameters:
      configs - the configuration.
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(Map<String,Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
      Construct a factory with the provided configuration and Serializers. Also configures a transactionIdPrefix as a value from the ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided. This config is going to be overridden with a suffix for target Producer instance. The serializers' configure() methods will be called with the configuration map.
      Parameters:
      configs - the configuration.
      keySerializer - the key Serializer.
      valueSerializer - the value Serializer.
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(Map<String,Object> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer, boolean configureSerializers)
      Construct a factory with the provided configuration and Serializers. Also configures a transactionIdPrefix as a value from the ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided. This config is going to be overridden with a suffix for target Producer instance. The serializers' configure() methods will be called with the configuration map unless configureSerializers is false..
      Parameters:
      configs - the configuration.
      keySerializer - the key Serializer.
      valueSerializer - the value Serializer.
      configureSerializers - set to false if serializers are already fully configured.
      Since:
      2.8.7
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(Map<String,Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
      Construct a factory with the provided configuration and Serializer Suppliers. Also configures a transactionIdPrefix as a value from the ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided. This config is going to be overridden with a suffix for target Producer instance. When the suppliers are invoked to get an instance, the serializers' configure() methods will be called with the configuration map.
      Parameters:
      configs - the configuration.
      keySerializerSupplier - the key Serializer supplier function.
      valueSerializerSupplier - the value Serializer supplier function.
      Since:
      2.3
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(Map<String,Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier, boolean configureSerializers)
      Construct a factory with the provided configuration and Serializer Suppliers. Also configures a transactionIdPrefix as a value from the ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided. This config is going to be overridden with a suffix for target Producer instance. When the suppliers are invoked to get an instance, the serializers' configure() methods will be called with the configuration map unless configureSerializers is false.
      Parameters:
      configs - the configuration.
      keySerializerSupplier - the key Serializer supplier function.
      valueSerializerSupplier - the value Serializer supplier function.
      configureSerializers - set to false if serializers are already fully configured.
      Since:
      2.8.7
  • Method Details