Class DefaultKafkaConsumerFactory<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanNameAware
,ApplicationContextAware
,ConsumerFactory<K,
V>
ConsumerFactory
implementation to produce new Consumer
instances
for provided Map
configs
and optional Deserializer
s on each ConsumerFactory.createConsumer()
invocation.
If you are using Deserializer
s that have no-arg constructors and require no setup, then simplest to
specify Deserializer
classes against ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
and
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
keys in the configs
passed to the
DefaultKafkaConsumerFactory
constructor.
If that is not possible, but you are using Deserializer
s that may be shared between all Consumer
instances (and specifically that their close() method is a no-op), then you can pass in Deserializer
instances for one or both of the key and value deserializers.
If neither of the above is true then you may provide a Supplier
for one or both Deserializer
s
which will be used to obtain Deserializer
(s) each time a Consumer
is created by the factory.
- Author:
- Gary Russell, Murali Reddy, Artem Bilan, Chris Gilbert, Adrian Gygax, Yaniv Nahoum, Sanghyeok An, Borahm Lee
-
Nested Class Summary
Modifier and TypeClassDescriptionprotected class
Nested classes/interfaces inherited from interface org.springframework.kafka.core.ConsumerFactory
ConsumerFactory.Listener<K,
V> -
Constructor Summary
ConstructorDescriptionDefaultKafkaConsumerFactory
(Map<String, Object> configs) Construct a factory with the provided configuration.DefaultKafkaConsumerFactory
(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory
(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers) Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory
(Map<String, Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Construct a factory with the provided configuration and deserializers.DefaultKafkaConsumerFactory
(Map<String, Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers) Construct a factory with the provided configuration and deserializers. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addListener
(int index, ConsumerFactory.Listener<K, V> listener) Add a listener at a specific index.void
addListener
(ConsumerFactory.Listener<K, V> listener) Add a listener.void
addPostProcessor
(ConsumerPostProcessor<K, V> postProcessor) Add a post processor.createConsumer
(String groupId, String clientIdPrefix, String clientIdSuffixArg, Properties properties) Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.id
property, if present.createKafkaConsumer
(String groupId, String clientIdPrefixArg, String clientIdSuffixArg, Properties properties) createKafkaConsumer
(Map<String, Object> configProps) createRawConsumer
(Map<String, Object> configProps) Create aConsumer
.Return an unmodifiable reference to the configuration map for this factory.org.apache.kafka.common.serialization.Deserializer<K>
Return the configured key deserializer (if provided as an object instead of a class name in the properties).Get the current list of listeners.Get the current list of post processors.org.apache.kafka.common.serialization.Deserializer<V>
Return the configured value deserializer (if provided as an object instead of a class name in the properties).boolean
Return true if consumers created by this factory use auto commit.void
removeConfig
(String configKey) Remove the specified key from the configuration map.boolean
removeListener
(ConsumerFactory.Listener<K, V> listener) Remove a listener.boolean
removePostProcessor
(ConsumerPostProcessor<K, V> postProcessor) Remove a post processor.void
setApplicationContext
(ApplicationContext applicationContext) void
setBeanName
(String name) void
setConfigureDeserializers
(boolean configureDeserializers) Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the consumer configuration, e.g.void
setKeyDeserializer
(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer) Set the key deserializer.void
setKeyDeserializerSupplier
(Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier) Set a supplier to supply instances of the key deserializer.void
setValueDeserializer
(org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Set the value deserializer.void
setValueDeserializerSupplier
(Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Set a supplier to supply instances of the value deserializer.void
updateConfigs
(Map<String, Object> updates) Update the consumer configuration map; useful for situations such as credential rotation.Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplier
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.kafka.core.ConsumerFactory
createConsumer, createConsumer, createConsumer, createConsumer
-
Constructor Details
-
DefaultKafkaConsumerFactory
Construct a factory with the provided configuration.- Parameters:
configs
- the configuration.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Construct a factory with the provided configuration and deserializers. The deserializers'configure()
methods will be called with the configuration map.- Parameters:
configs
- the configuration.keyDeserializer
- the keyDeserializer
.valueDeserializer
- the valueDeserializer
.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers) Construct a factory with the provided configuration and deserializers. The deserializers'configure()
methods will be called with the configuration map unlessconfigureDeserializers
is false.- Parameters:
configs
- the configuration.keyDeserializer
- the keyDeserializer
.valueDeserializer
- the valueDeserializer
.configureDeserializers
- false to not configure the deserializers.- Since:
- 2.8.7
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers'configure()
methods will be called with the configuration map.- Parameters:
configs
- the configuration.keyDeserializerSupplier
- the keyDeserializer
supplier function.valueDeserializerSupplier
- the valueDeserializer
supplier function.- Since:
- 2.3
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers) Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers'configure()
methods will be called with the configuration map unlessconfigureDeserializers
is false.- Parameters:
configs
- the configuration.keyDeserializerSupplier
- the keyDeserializer
supplier function.valueDeserializerSupplier
- the valueDeserializer
supplier function.configureDeserializers
- false to not configure the deserializers.- Since:
- 2.8.7
-
-
Method Details
-
setBeanName
- Specified by:
setBeanName
in interfaceBeanNameAware
-
setKeyDeserializer
public void setKeyDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer) Set the key deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializers
is false.- Parameters:
keyDeserializer
- the deserializer.
-
setValueDeserializer
public void setValueDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Set the value deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializers
is false.- Parameters:
valueDeserializer
- the value deserializer.
-
setKeyDeserializerSupplier
public void setKeyDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier) Set a supplier to supply instances of the key deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializers
is false.- Parameters:
keyDeserializerSupplier
- the supplier.- Since:
- 2.8
-
setValueDeserializerSupplier
public void setValueDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Set a supplier to supply instances of the value deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializers
is false.- Parameters:
valueDeserializerSupplier
- the supplier.- Since:
- 2.8
-
setConfigureDeserializers
public void setConfigureDeserializers(boolean configureDeserializers) Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the consumer configuration, e.g. if the deserializers are already fully configured.- Parameters:
configureDeserializers
- false to not configure.- Since:
- 2.8.7
- See Also:
-
getConfigurationProperties
Description copied from interface:ConsumerFactory
Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.- Specified by:
getConfigurationProperties
in interfaceConsumerFactory<K,
V> - Returns:
- the configs.
-
getKeyDeserializer
Description copied from interface:ConsumerFactory
Return the configured key deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getKeyDeserializer
in interfaceConsumerFactory<K,
V> - Returns:
- the deserializer.
-
getValueDeserializer
Description copied from interface:ConsumerFactory
Return the configured value deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getValueDeserializer
in interfaceConsumerFactory<K,
V> - Returns:
- the deserializer.
-
getListeners
Get the current list of listeners.- Specified by:
getListeners
in interfaceConsumerFactory<K,
V> - Returns:
- the listeners.
- Since:
- 2.5
-
getPostProcessors
Description copied from interface:ConsumerFactory
Get the current list of post processors.- Specified by:
getPostProcessors
in interfaceConsumerFactory<K,
V> - Returns:
- the post processor.
-
addListener
Add a listener.- Specified by:
addListener
in interfaceConsumerFactory<K,
V> - Parameters:
listener
- the listener.- Since:
- 2.5
-
addListener
Add a listener at a specific index.- Specified by:
addListener
in interfaceConsumerFactory<K,
V> - Parameters:
index
- the index (list position).listener
- the listener.- Since:
- 2.5
-
addPostProcessor
Description copied from interface:ConsumerFactory
Add a post processor.- Specified by:
addPostProcessor
in interfaceConsumerFactory<K,
V> - Parameters:
postProcessor
- the post processor.
-
removePostProcessor
Description copied from interface:ConsumerFactory
Remove a post processor.- Specified by:
removePostProcessor
in interfaceConsumerFactory<K,
V> - Parameters:
postProcessor
- the post processor.- Returns:
- true if removed.
-
removeListener
Remove a listener.- Specified by:
removeListener
in interfaceConsumerFactory<K,
V> - Parameters:
listener
- the listener.- Returns:
- true if removed.
- Since:
- 2.5
-
updateConfigs
Description copied from interface:ConsumerFactory
Update the consumer configuration map; useful for situations such as credential rotation.- Specified by:
updateConfigs
in interfaceConsumerFactory<K,
V> - Parameters:
updates
- the configuration properties to update.
-
removeConfig
Description copied from interface:ConsumerFactory
Remove the specified key from the configuration map.- Specified by:
removeConfig
in interfaceConsumerFactory<K,
V> - Parameters:
configKey
- the key to remove.
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, @Nullable String clientIdSuffixArg, @Nullable Properties properties) Description copied from interface:ConsumerFactory
Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.id
property, if present. In addition, consumer properties can be overridden if the factory implementation supports it.- Specified by:
createConsumer
in interfaceConsumerFactory<K,
V> - Parameters:
groupId
- the group id.clientIdPrefix
- the prefix.clientIdSuffixArg
- the suffix.properties
- the properties to override.- Returns:
- the consumer.
-
createKafkaConsumer
-
createKafkaConsumer
-
createRawConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createRawConsumer(Map<String, Object> configProps) Create aConsumer
. By default, this method returns an internalDefaultKafkaConsumerFactory<K,
which is aware of provided into thisV>.ExtendedKafkaConsumer listeners
, therefore it is recommended to extend that class iflisteners
are still involved for a customConsumer
.- Parameters:
configProps
- the configuration properties.- Returns:
- the consumer.
- Since:
- 2.5
-
isAutoCommit
public boolean isAutoCommit()Description copied from interface:ConsumerFactory
Return true if consumers created by this factory use auto commit.- Specified by:
isAutoCommit
in interfaceConsumerFactory<K,
V> - Returns:
- true if auto commit.
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-