Class DefaultShareConsumerFactory<K,V>

java.lang.Object
org.springframework.kafka.core.KafkaResourceFactory
org.springframework.kafka.core.DefaultShareConsumerFactory<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanNameAware, ShareConsumerFactory<K,V>

public class DefaultShareConsumerFactory<K,V> extends KafkaResourceFactory implements ShareConsumerFactory<K,V>, BeanNameAware
The ShareConsumerFactory implementation to produce new ShareConsumer instances for provided Map configs and optional Deserializers on each createShareConsumer(String, String) invocation.

If you are using Deserializers that have no-arg constructors and require no setup, then simplest to specify Deserializer classes in the configs passed to the DefaultShareConsumerFactory constructor.

If that is not possible, but you are using Deserializers that may be shared between all ShareConsumer instances (and specifically that their close() method is a no-op), then you can pass in Deserializer instances for one or both of the key and value deserializers.

If neither of the above is true then you may provide a Supplier for one or both Deserializers which will be used to obtain Deserializer(s) each time a ShareConsumer is created by the factory.

Since:
4.0
Author:
Soby Chacko
  • Constructor Details

    • DefaultShareConsumerFactory

      public DefaultShareConsumerFactory(Map<String,Object> configs)
      Construct a factory with the provided configuration.
      Parameters:
      configs - the configuration.
    • DefaultShareConsumerFactory

      public DefaultShareConsumerFactory(Map<String,Object> configs, @Nullable Supplier<@Nullable org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<@Nullable org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
      Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers' configure() methods will be called with the configuration map.
      Parameters:
      configs - the configuration.
      keyDeserializerSupplier - the key Deserializer supplier function (nullable).
      valueDeserializerSupplier - the value Deserializer supplier function (nullable).
    • DefaultShareConsumerFactory

      public DefaultShareConsumerFactory(Map<String,Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers)
      Construct a factory with the provided configuration and deserializers. The deserializers' configure() methods will be called with the configuration map unless configureDeserializers is false.
      Parameters:
      configs - the configuration.
      keyDeserializer - the key Deserializer.
      valueDeserializer - the value Deserializer.
      configureDeserializers - false to not configure the deserializers.
    • DefaultShareConsumerFactory

      public DefaultShareConsumerFactory(Map<String,Object> configs, @Nullable Supplier<@Nullable org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<@Nullable org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers)
      Construct a factory with the provided configuration, deserializer suppliers, and deserializer config flag. When the suppliers are invoked to get an instance, the deserializers' configure() methods will be called with the configuration map unless configureDeserializers is false.
      Parameters:
      configs - the configuration.
      keyDeserializerSupplier - the key Deserializer supplier function (nullable).
      valueDeserializerSupplier - the value Deserializer supplier function (nullable).
      configureDeserializers - whether to configure deserializers.
  • Method Details

    • createShareConsumer

      public org.apache.kafka.clients.consumer.ShareConsumer<K,V> createShareConsumer(@Nullable String groupId, @Nullable String clientId)
      Create a share consumer with the provided group id and client id.
      Specified by:
      createShareConsumer in interface ShareConsumerFactory<K,V>
      Parameters:
      groupId - the group id (maybe null).
      clientId - the client id.
      Returns:
      the share consumer.
    • createRawConsumer

      protected org.apache.kafka.clients.consumer.ShareConsumer<K,V> createRawConsumer(@Nullable String groupId, @Nullable String clientId)
      Actually create the consumer.
      Parameters:
      groupId - the group id (maybe null).
      clientId - the client id.
      Returns:
      the share consumer.
    • setBeanName

      public void setBeanName(String name)
      Specified by:
      setBeanName in interface BeanNameAware
    • setKeyDeserializer

      public void setKeyDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)
      Set the key deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      keyDeserializer - the deserializer.
    • setValueDeserializer

      public void setValueDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
      Set the value deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      valueDeserializer - the value deserializer.
    • getKeyDeserializer

      public @Nullable org.apache.kafka.common.serialization.Deserializer<K> getKeyDeserializer()
      Description copied from interface: ShareConsumerFactory
      Return the configured key deserializer (if provided as an object instead of a class name in the properties).
      Specified by:
      getKeyDeserializer in interface ShareConsumerFactory<K,V>
      Returns:
      the deserializer.
    • getValueDeserializer

      public @Nullable org.apache.kafka.common.serialization.Deserializer<V> getValueDeserializer()
      Description copied from interface: ShareConsumerFactory
      Return the configured value deserializer (if provided as an object instead of a class name in the properties).
      Specified by:
      getValueDeserializer in interface ShareConsumerFactory<K,V>
      Returns:
      the deserializer.
    • setKeyDeserializerSupplier

      public void setKeyDeserializerSupplier(@Nullable Supplier<@Nullable org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier)
      Set a supplier to supply instances of the key deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      keyDeserializerSupplier - the supplier (nullable).
    • setValueDeserializerSupplier

      public void setValueDeserializerSupplier(@Nullable Supplier<@Nullable org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
      Set a supplier to supply instances of the value deserializer. The deserializer will be configured using the consumer configuration, unless configureDeserializers is false.
      Parameters:
      valueDeserializerSupplier - the supplier (nullable).
    • setConfigureDeserializers

      public void setConfigureDeserializers(boolean configureDeserializers)
      Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the consumer configuration, e.g. if the deserializers are already fully configured.
      Parameters:
      configureDeserializers - false to not configure.
      See Also:
    • getListeners

      public List<ShareConsumerFactory.Listener<K,V>> getListeners()
      Get the current list of listeners.
      Specified by:
      getListeners in interface ShareConsumerFactory<K,V>
      Returns:
      the listeners.
    • addListener

      public void addListener(ShareConsumerFactory.Listener<K,V> listener)
      Add a listener.
      Specified by:
      addListener in interface ShareConsumerFactory<K,V>
      Parameters:
      listener - the listener.
    • addListener

      public void addListener(int index, ShareConsumerFactory.Listener<K,V> listener)
      Add a listener at a specific index.

      This method allows insertion of a listener at a particular position in the internal listener list. While this enables ordering of listener callbacks (which can be important for certain monitoring or extension scenarios), there is intentionally no corresponding removeListener(int index) contract. Removing listeners by index is discouraged because the position of a listener can change if others are added or removed, making it easy to accidentally remove the wrong one. Managing listeners by their reference (object) is safer and less error-prone, especially as listeners are usually set up once during initialization.

      Specified by:
      addListener in interface ShareConsumerFactory<K,V>
      Parameters:
      index - the index (list position).
      listener - the listener to add.
    • removeListener

      public boolean removeListener(ShareConsumerFactory.Listener<K,V> listener)
      Remove a listener.
      Specified by:
      removeListener in interface ShareConsumerFactory<K,V>
      Parameters:
      listener - the listener.
      Returns:
      true if removed.
    • getConfigurationProperties

      public Map<String,Object> getConfigurationProperties()
      Description copied from interface: ShareConsumerFactory
      Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.
      Specified by:
      getConfigurationProperties in interface ShareConsumerFactory<K,V>
      Returns:
      the configs.