Class DefaultKafkaConsumerFactory<K,V>

java.lang.Object
org.springframework.kafka.core.KafkaResourceFactory
org.springframework.kafka.core.DefaultKafkaConsumerFactory<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanNameAware, ConsumerFactory<K,V>

public class DefaultKafkaConsumerFactory<K,V> extends KafkaResourceFactory implements ConsumerFactory<K,V>, BeanNameAware
The ConsumerFactory implementation to produce new Consumer instances for provided Map configs and optional Deserializers on each ConsumerFactory.createConsumer() invocation.

If you are using Deserializers that have no-arg constructors and require no setup, then simplest to specify Deserializer classes against ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG and ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG keys in the configs passed to the DefaultKafkaConsumerFactory constructor.

If that is not possible, but you are using Deserializers that may be shared between all Consumer instances (and specifically that their close() method is a no-op), then you can pass in Deserializer instances for one or both of the key and value deserializers.

If neither of the above is true then you may provide a Supplier for one or both Deserializers which will be used to obtain Deserializer(s) each time a Consumer is created by the factory.

Author:
Gary Russell, Murali Reddy, Artem Bilan, Chris Gilbert
  • Constructor Details

    • DefaultKafkaConsumerFactory

      public DefaultKafkaConsumerFactory(Map<String,Object> configs)
      Construct a factory with the provided configuration.
      Parameters:
      configs - the configuration.
    • DefaultKafkaConsumerFactory

      public DefaultKafkaConsumerFactory(Map<String,Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
      Construct a factory with the provided configuration and deserializers. The deserializers' configure() methods will be called with the configuration map.
      Parameters:
      configs - the configuration.
      keyDeserializer - the key Deserializer.
      valueDeserializer - the value Deserializer.
    • DefaultKafkaConsumerFactory

      public DefaultKafkaConsumerFactory(Map<String,Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers)
      Construct a factory with the provided configuration and deserializers. The deserializers' configure() methods will be called with the configuration map unless configureDeserializers is false.
      Parameters:
      configs - the configuration.
      keyDeserializer - the key Deserializer.
      valueDeserializer - the value Deserializer.
      configureDeserializers - false to not configure the deserializers.
      Since:
      2.8.7
    • DefaultKafkaConsumerFactory

      public DefaultKafkaConsumerFactory(Map<String,Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
      Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers' configure() methods will be called with the configuration map.
      Parameters:
      configs - the configuration.
      keyDeserializerSupplier - the key Deserializer supplier function.
      valueDeserializerSupplier - the value Deserializer supplier function.
      Since:
      2.3
    • DefaultKafkaConsumerFactory

      public DefaultKafkaConsumerFactory(Map<String,Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers)
      Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers' configure() methods will be called with the configuration map unless configureDeserializers is false.
      Parameters:
      configs - the configuration.
      keyDeserializerSupplier - the key Deserializer supplier function.
      valueDeserializerSupplier - the value Deserializer supplier function.
      configureDeserializers - false to not configure the deserializers.
      Since:
      2.8.7
  • Method Details

    • setBeanName

      public void setBeanName(String name)
      Specified by:
      setBeanName in interface BeanNameAware
    • setKeyDeserializer

      public void setKeyDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)
      Set the key deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      keyDeserializer - the deserializer.
    • setValueDeserializer

      public void setValueDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
      Set the value deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      valueDeserializer - the value deserializer.
    • setKeyDeserializerSupplier

      public void setKeyDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier)
      Set a supplier to supply instances of the key deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      keyDeserializerSupplier - the supplier.
      Since:
      2.8
    • setValueDeserializerSupplier

      public void setValueDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
      Set a supplier to supply instances of the value deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      valueDeserializerSupplier - the supplier.
      Since:
      2.8
    • setConfigureDeserializers

      public void setConfigureDeserializers(boolean configureDeserializers)
      Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the producer configuration, e.g. if the deserializers are already fully configured.
      Parameters:
      configureDeserializers - false to not configure.
      Since:
      2.8.7
      See Also:
    • getConfigurationProperties

      public Map<String,Object> getConfigurationProperties()
      Description copied from interface: ConsumerFactory
      Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.
      Specified by:
      getConfigurationProperties in interface ConsumerFactory<K,V>
      Returns:
      the configs.
    • getKeyDeserializer

      public org.apache.kafka.common.serialization.Deserializer<K> getKeyDeserializer()
      Description copied from interface: ConsumerFactory
      Return the configured key deserializer (if provided as an object instead of a class name in the properties).
      Specified by:
      getKeyDeserializer in interface ConsumerFactory<K,V>
      Returns:
      the deserializer.
    • getValueDeserializer

      public org.apache.kafka.common.serialization.Deserializer<V> getValueDeserializer()
      Description copied from interface: ConsumerFactory
      Return the configured value deserializer (if provided as an object instead of a class name in the properties).
      Specified by:
      getValueDeserializer in interface ConsumerFactory<K,V>
      Returns:
      the deserializer.
    • getListeners

      public List<ConsumerFactory.Listener<K,V>> getListeners()
      Get the current list of listeners.
      Specified by:
      getListeners in interface ConsumerFactory<K,V>
      Returns:
      the listeners.
      Since:
      2.5
    • getPostProcessors

      public List<ConsumerPostProcessor<K,V>> getPostProcessors()
      Description copied from interface: ConsumerFactory
      Get the current list of post processors.
      Specified by:
      getPostProcessors in interface ConsumerFactory<K,V>
      Returns:
      the post processor.
    • addListener

      public void addListener(ConsumerFactory.Listener<K,V> listener)
      Add a listener.
      Specified by:
      addListener in interface ConsumerFactory<K,V>
      Parameters:
      listener - the listener.
      Since:
      2.5
    • addListener

      public void addListener(int index, ConsumerFactory.Listener<K,V> listener)
      Add a listener at a specific index.
      Specified by:
      addListener in interface ConsumerFactory<K,V>
      Parameters:
      index - the index (list position).
      listener - the listener.
      Since:
      2.5
    • addPostProcessor

      public void addPostProcessor(ConsumerPostProcessor<K,V> postProcessor)
      Description copied from interface: ConsumerFactory
      Add a post processor.
      Specified by:
      addPostProcessor in interface ConsumerFactory<K,V>
      Parameters:
      postProcessor - the post processor.
    • removePostProcessor

      public boolean removePostProcessor(ConsumerPostProcessor<K,V> postProcessor)
      Description copied from interface: ConsumerFactory
      Remove a post processor.
      Specified by:
      removePostProcessor in interface ConsumerFactory<K,V>
      Parameters:
      postProcessor - the post processor.
      Returns:
      true if removed.
    • removeListener

      public boolean removeListener(ConsumerFactory.Listener<K,V> listener)
      Remove a listener.
      Specified by:
      removeListener in interface ConsumerFactory<K,V>
      Parameters:
      listener - the listener.
      Returns:
      true if removed.
      Since:
      2.5
    • updateConfigs

      public void updateConfigs(Map<String,Object> updates)
      Description copied from interface: ConsumerFactory
      Update the consumer configuration map; useful for situations such as credential rotation.
      Specified by:
      updateConfigs in interface ConsumerFactory<K,V>
      Parameters:
      updates - the configuration properties to update.
    • removeConfig

      public void removeConfig(String configKey)
      Description copied from interface: ConsumerFactory
      Remove the specified key from the configuration map.
      Specified by:
      removeConfig in interface ConsumerFactory<K,V>
      Parameters:
      configKey - the key to remove.
    • createConsumer

      public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, @Nullable String clientIdSuffixArg, @Nullable Properties properties)
      Description copied from interface: ConsumerFactory
      Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides the client.id property, if present. In addition, consumer properties can be overridden if the factory implementation supports it.
      Specified by:
      createConsumer in interface ConsumerFactory<K,V>
      Parameters:
      groupId - the group id.
      clientIdPrefix - the prefix.
      clientIdSuffixArg - the suffix.
      properties - the properties to override.
      Returns:
      the consumer.
    • createKafkaConsumer

      protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefixArg, @Nullable String clientIdSuffixArg, @Nullable Properties properties)
    • createKafkaConsumer

      protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(Map<String,Object> configProps)
    • createRawConsumer

      protected org.apache.kafka.clients.consumer.Consumer<K,V> createRawConsumer(Map<String,Object> configProps)
      Create a Consumer. By default, this method returns an internal DefaultKafkaConsumerFactory<K,V>.ExtendedKafkaConsumer which is aware of provided into this listeners, therefore it is recommended to extend that class if listeners are still involved for a custom Consumer.
      Parameters:
      configProps - the configuration properties.
      Returns:
      the consumer.
      Since:
      2.5
    • isAutoCommit

      public boolean isAutoCommit()
      Description copied from interface: ConsumerFactory
      Return true if consumers created by this factory use auto commit.
      Specified by:
      isAutoCommit in interface ConsumerFactory<K,V>
      Returns:
      true if auto commit.