Class DefaultKafkaProducerFactory<K,​V>

  • Type Parameters:
    K - the key type.
    V - the value type.
    All Implemented Interfaces:
    java.util.EventListener, org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, ProducerFactory<K,​V>

    public class DefaultKafkaProducerFactory<K,​V>
    extends KafkaResourceFactory
    implements ProducerFactory<K,​V>, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
    The ProducerFactory implementation for a singleton shared Producer instance.

    This implementation will return the same Producer instance (if transactions are not enabled) for the provided Map configs and optional Serializer implementations on each createProducer() invocation.

    If you are using Serializers that have no-arg constructors and require no setup, then simplest to specify Serializer classes against ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG keys in the configs passed to the DefaultKafkaProducerFactory constructor.

    If that is not possible, but you are sure that at least one of the following is true:

    • only one Producer will use the Serializers
    • you are using Serializers that may be shared between Producer instances (and specifically that their close() method is a no-op)
    • you are certain that there is no risk of any single Producer being closed while other Producer instances with the same Serializers are in use
    then you can pass in Serializer instances for one or both of the key and value serializers.

    If none of the above is true then you may provide a Supplier function for one or both Serializers which will be used to obtain Serializer(s) each time a Producer is created by the factory.

    The Producer is wrapped and the underlying KafkaProducer instance is not actually closed when Producer.close() is invoked. The KafkaProducer is physically closed when DisposableBean.destroy() is invoked or when the application context publishes a ContextStoppedEvent. You can also invoke reset().

    Setting setTransactionIdPrefix(String) enables transactions; in which case, a cache of producers is maintained; closing a producer returns it to the cache. The producers are closed and the cache is cleared when the factory is destroyed, the application context stopped, or the reset() method is called.

    Author:
    Gary Russell, Murali Reddy, Nakul Mishra, Artem Bilan, Chris Gilbert
    • Constructor Detail

      • DefaultKafkaProducerFactory

        public DefaultKafkaProducerFactory​(java.util.Map<java.lang.String,​java.lang.Object> configs)
        Construct a factory with the provided configuration.
        Parameters:
        configs - the configuration.
      • DefaultKafkaProducerFactory

        public DefaultKafkaProducerFactory​(java.util.Map<java.lang.String,​java.lang.Object> configs,
                                           @Nullable
                                           org.apache.kafka.common.serialization.Serializer<K> keySerializer,
                                           @Nullable
                                           org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
        Construct a factory with the provided configuration and Serializers. Also configures a transactionIdPrefix as a value from the ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided. This config is going to be overridden with a suffix for target Producer instance.
        Parameters:
        configs - the configuration.
        keySerializer - the key Serializer.
        valueSerializer - the value Serializer.
      • DefaultKafkaProducerFactory

        public DefaultKafkaProducerFactory​(java.util.Map<java.lang.String,​java.lang.Object> configs,
                                           @Nullable
                                           java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier,
                                           @Nullable
                                           java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
        Construct a factory with the provided configuration and Serializer Suppliers. Also configures a transactionIdPrefix as a value from the ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided. This config is going to be overridden with a suffix for target Producer instance.
        Parameters:
        configs - the configuration.
        keySerializerSupplier - the key Serializer supplier function.
        valueSerializerSupplier - the value Serializer supplier function.
        Since:
        2.3
    • Method Detail

      • setApplicationContext

        public void setApplicationContext​(org.springframework.context.ApplicationContext applicationContext)
                                   throws org.springframework.beans.BeansException
        Specified by:
        setApplicationContext in interface org.springframework.context.ApplicationContextAware
        Throws:
        org.springframework.beans.BeansException
      • setBeanName

        public void setBeanName​(java.lang.String name)
        Specified by:
        setBeanName in interface org.springframework.beans.factory.BeanNameAware
      • setKeySerializer

        public void setKeySerializer​(@Nullable
                                     org.apache.kafka.common.serialization.Serializer<K> keySerializer)
        Set a key serializer.
        Parameters:
        keySerializer - the key serializer.
      • setValueSerializer

        public void setValueSerializer​(@Nullable
                                       org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
        Set a value serializer.
        Parameters:
        valueSerializer - the value serializer.
      • getPhysicalCloseTimeout

        public java.time.Duration getPhysicalCloseTimeout()
        Get the physical close timeout.
        Specified by:
        getPhysicalCloseTimeout in interface ProducerFactory<K,​V>
        Returns:
        the timeout.
        Since:
        2.5
      • setTransactionIdPrefix

        public final void setTransactionIdPrefix​(java.lang.String transactionIdPrefix)
        Set a prefix for the ProducerConfig.TRANSACTIONAL_ID_CONFIG config. By default a ProducerConfig.TRANSACTIONAL_ID_CONFIG value from configs is used as a prefix in the target producer configs.
        Parameters:
        transactionIdPrefix - the prefix.
        Since:
        1.3
      • getTransactionIdPrefix

        @Nullable
        public java.lang.String getTransactionIdPrefix()
        Description copied from interface: ProducerFactory
        Return the transaction id prefix.
        Specified by:
        getTransactionIdPrefix in interface ProducerFactory<K,​V>
        Returns:
        the prefix or null if not configured.
      • setProducerPerThread

        public void setProducerPerThread​(boolean producerPerThread)
        Set to true to create a producer per thread instead of singleton that is shared by all clients. Clients must call closeThreadBoundProducer() to physically close the producer when it is no longer needed. These producers will not be closed by destroy() or reset().
        Parameters:
        producerPerThread - true for a producer per thread.
        Since:
        2.3
        See Also:
        closeThreadBoundProducer()
      • isProducerPerThread

        public boolean isProducerPerThread()
        Description copied from interface: ProducerFactory
        Return true when there is a producer per thread.
        Specified by:
        isProducerPerThread in interface ProducerFactory<K,​V>
        Returns:
        the produver per thread.
      • setProducerPerConsumerPartition

        public void setProducerPerConsumerPartition​(boolean producerPerConsumerPartition)
        Set to false to revert to the previous behavior of a simple incrementing transactional.id suffix for each producer instead of maintaining a producer for each group/topic/partition.
        Parameters:
        producerPerConsumerPartition - false to revert.
        Since:
        1.3.7
      • isProducerPerConsumerPartition

        public boolean isProducerPerConsumerPartition()
        Return the producerPerConsumerPartition.
        Specified by:
        isProducerPerConsumerPartition in interface ProducerFactory<K,​V>
        Returns:
        the producerPerConsumerPartition.
        Since:
        1.3.8
      • getKeySerializerSupplier

        public java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> getKeySerializerSupplier()
        Description copied from interface: ProducerFactory
        Return a supplier for a key serializer. Useful for cloning to make a similar factory.
        Specified by:
        getKeySerializerSupplier in interface ProducerFactory<K,​V>
        Returns:
        the supplier.
      • getValueSerializerSupplier

        public java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> getValueSerializerSupplier()
        Description copied from interface: ProducerFactory
        Return a supplier for a value serializer. Useful for cloning to make a similar factory.
        Specified by:
        getValueSerializerSupplier in interface ProducerFactory<K,​V>
        Returns:
        the supplier.
      • getConfigurationProperties

        public java.util.Map<java.lang.String,​java.lang.Object> getConfigurationProperties()
        Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.
        Specified by:
        getConfigurationProperties in interface ProducerFactory<K,​V>
        Returns:
        the configs.
        Since:
        1.3
      • setMaxAge

        public void setMaxAge​(java.time.Duration maxAge)
        Set the maximum age for a producer; useful when using transactions and the broker might expire a transactional.id due to inactivity.
        Parameters:
        maxAge - the maxAge to set
        Since:
        2.5.8
      • addListener

        public void addListener​(int index,
                                ProducerFactory.Listener<K,​V> listener)
        Add a listener at a specific index.
        Specified by:
        addListener in interface ProducerFactory<K,​V>
        Parameters:
        index - the index (list position).
        listener - the listener.
        Since:
        2.5
      • updateConfigs

        public void updateConfigs​(java.util.Map<java.lang.String,​java.lang.Object> updates)
        Description copied from interface: ProducerFactory
        Update the producer configuration map; useful for situations such as credential rotation.
        Specified by:
        updateConfigs in interface ProducerFactory<K,​V>
        Parameters:
        updates - the configuration properties to update.
      • removeConfig

        public void removeConfig​(java.lang.String configKey)
        Description copied from interface: ProducerFactory
        Remove the specified key from the configuration map.
        Specified by:
        removeConfig in interface ProducerFactory<K,​V>
        Parameters:
        configKey - the key to remove.
      • transactionCapable

        public boolean transactionCapable()
        Description copied from interface: ProducerFactory
        Return true if the factory supports transactions.
        Specified by:
        transactionCapable in interface ProducerFactory<K,​V>
        Returns:
        true if transactional.
      • destroy

        public void destroy()
        Specified by:
        destroy in interface org.springframework.beans.factory.DisposableBean
      • onApplicationEvent

        public void onApplicationEvent​(org.springframework.context.event.ContextStoppedEvent event)
        Specified by:
        onApplicationEvent in interface org.springframework.context.ApplicationListener<K>
      • reset

        public void reset()
        Close the Producer(s) and clear the cache of transactional Producer(s).
        Specified by:
        reset in interface ProducerFactory<K,​V>
        Since:
        2.2
      • createProducer

        public org.apache.kafka.clients.producer.Producer<K,​V> createProducer​(@Nullable
                                                                                    java.lang.String txIdPrefixArg)
        Description copied from interface: ProducerFactory
        Create a producer with an overridden transaction id prefix.
        Specified by:
        createProducer in interface ProducerFactory<K,​V>
        Parameters:
        txIdPrefixArg - the transaction id prefix.
        Returns:
        the producer.
      • createKafkaProducer

        protected org.apache.kafka.clients.producer.Producer<K,​V> createKafkaProducer()
        Subclasses must return a raw producer which will be wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer.
        Returns:
        the producer.
      • createTransactionalProducerForPartition

        protected org.apache.kafka.clients.producer.Producer<K,​V> createTransactionalProducerForPartition()
      • createTransactionalProducerForPartition

        protected org.apache.kafka.clients.producer.Producer<K,​V> createTransactionalProducerForPartition​(java.lang.String txIdPrefix)
      • removeProducer

        protected final boolean removeProducer​(DefaultKafkaProducerFactory.CloseSafeProducer<K,​V> producerToRemove,
                                               java.time.Duration timeout)
        Remove the single shared producer and a thread-bound instance if present.
        Parameters:
        producerToRemove - the producer.
        timeout - the close timeout.
        Returns:
        always true.
        Since:
        2.2.13
      • createTransactionalProducer

        protected org.apache.kafka.clients.producer.Producer<K,​V> createTransactionalProducer()
        Subclasses must return a producer from the getCache() or a new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer.
        Returns:
        the producer - cannot be null.
        Since:
        1.3
      • createTransactionalProducer

        protected org.apache.kafka.clients.producer.Producer<K,​V> createTransactionalProducer​(java.lang.String txIdPrefix)
      • createRawProducer

        protected org.apache.kafka.clients.producer.Producer<K,​V> createRawProducer​(java.util.Map<java.lang.String,​java.lang.Object> rawConfigs)
      • closeProducerFor

        public void closeProducerFor​(java.lang.String suffix)
        Description copied from interface: ProducerFactory
        Remove the specified producer from the cache and close it.
        Specified by:
        closeProducerFor in interface ProducerFactory<K,​V>
        Parameters:
        suffix - the producer's transaction id suffix.