Class DefaultKafkaConsumerFactory<K,V>
- java.lang.Object
-
- org.springframework.kafka.core.KafkaResourceFactory
-
- org.springframework.kafka.core.DefaultKafkaConsumerFactory<K,V>
-
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanNameAware
,ConsumerFactory<K,V>
public class DefaultKafkaConsumerFactory<K,V> extends KafkaResourceFactory implements ConsumerFactory<K,V>, org.springframework.beans.factory.BeanNameAware
TheConsumerFactory
implementation to produce newConsumer
instances for providedMap
configs
and optionalDeserializer
s on eachConsumerFactory.createConsumer()
invocation.If you are using
Deserializer
s that have no-arg constructors and require no setup, then simplest to specifyDeserializer
classes againstConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
andConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
keys in theconfigs
passed to theDefaultKafkaConsumerFactory
constructor.If that is not possible, but you are using
Deserializer
s that may be shared between allConsumer
instances (and specifically that their close() method is a no-op), then you can pass inDeserializer
instances for one or both of the key and value deserializers.If neither of the above is true then you may provide a
Supplier
for one or bothDeserializer
s which will be used to obtainDeserializer
(s) each time aConsumer
is created by the factory.- Author:
- Gary Russell, Murali Reddy, Artem Bilan, Chris Gilbert
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.springframework.kafka.core.ConsumerFactory
ConsumerFactory.Listener<K,V>
-
-
Constructor Summary
Constructors Constructor Description DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Construct a factory with the provided configuration and deserializers.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description void
addListener(int index, ConsumerFactory.Listener<K,V> listener)
Add a listener at a specific index.void
addListener(ConsumerFactory.Listener<K,V> listener)
Add a listener.void
addPostProcessor(ConsumerPostProcessor<K,V> postProcessor)
Add a post processor.org.apache.kafka.clients.consumer.Consumer<K,V>
createConsumer(java.lang.String groupId, java.lang.String clientIdPrefix, java.lang.String clientIdSuffix)
Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.id
property, if present.org.apache.kafka.clients.consumer.Consumer<K,V>
createConsumer(java.lang.String groupId, java.lang.String clientIdPrefix, java.lang.String clientIdSuffixArg, java.util.Properties properties)
Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.id
property, if present.protected org.apache.kafka.clients.consumer.Consumer<K,V>
createKafkaConsumer(java.lang.String groupId, java.lang.String clientIdPrefix, java.lang.String clientIdSuffixArg)
Deprecated.protected org.apache.kafka.clients.consumer.Consumer<K,V>
createKafkaConsumer(java.lang.String groupId, java.lang.String clientIdPrefix, java.lang.String clientIdSuffixArg, java.util.Properties properties)
protected org.apache.kafka.clients.consumer.Consumer<K,V>
createKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)
protected org.apache.kafka.clients.consumer.Consumer<K,V>
createRawConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)
Create a Consumer.java.util.Map<java.lang.String,java.lang.Object>
getConfigurationProperties()
Return an unmodifiable reference to the configuration map for this factory.org.apache.kafka.common.serialization.Deserializer<K>
getKeyDeserializer()
Return the configured key deserializer (if provided as an object instead of a class name in the properties).java.util.List<ConsumerFactory.Listener<K,V>>
getListeners()
Get the current list of listeners.java.util.List<ConsumerPostProcessor<K,V>>
getPostProcessors()
Get the current list of post processors.org.apache.kafka.common.serialization.Deserializer<V>
getValueDeserializer()
Return the configured value deserializer (if provided as an object instead of a class name in the properties).boolean
isAutoCommit()
Return true if consumers created by this factory use auto commit.boolean
removeListener(ConsumerFactory.Listener<K,V> listener)
Remove a listener.boolean
removePostProcessor(ConsumerPostProcessor<K,V> postProcessor)
Remove a post processor.void
setBeanName(java.lang.String name)
void
setKeyDeserializer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)
Set the key deserializer.void
setValueDeserializer(org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Set the value deserializer.-
Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplier
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.core.ConsumerFactory
createConsumer, createConsumer, createConsumer
-
-
-
-
Constructor Detail
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.- Parameters:
configs
- the configuration.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Construct a factory with the provided configuration and deserializers.- Parameters:
configs
- the configuration.keyDeserializer
- the keyDeserializer
.valueDeserializer
- the valueDeserializer
.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
Construct a factory with the provided configuration and deserializer suppliers.- Parameters:
configs
- the configuration.keyDeserializerSupplier
- the keyDeserializer
supplier function.valueDeserializerSupplier
- the valueDeserializer
supplier function.- Since:
- 2.3
-
-
Method Detail
-
setBeanName
public void setBeanName(java.lang.String name)
- Specified by:
setBeanName
in interfaceorg.springframework.beans.factory.BeanNameAware
-
setKeyDeserializer
public void setKeyDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)
Set the key deserializer.- Parameters:
keyDeserializer
- the deserializer.
-
setValueDeserializer
public void setValueDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Set the value deserializer.- Parameters:
valueDeserializer
- the valuee deserializer.
-
getConfigurationProperties
public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
Description copied from interface:ConsumerFactory
Return an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.- Specified by:
getConfigurationProperties
in interfaceConsumerFactory<K,V>
- Returns:
- the configs.
-
getKeyDeserializer
public org.apache.kafka.common.serialization.Deserializer<K> getKeyDeserializer()
Description copied from interface:ConsumerFactory
Return the configured key deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getKeyDeserializer
in interfaceConsumerFactory<K,V>
- Returns:
- the deserializer.
-
getValueDeserializer
public org.apache.kafka.common.serialization.Deserializer<V> getValueDeserializer()
Description copied from interface:ConsumerFactory
Return the configured value deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getValueDeserializer
in interfaceConsumerFactory<K,V>
- Returns:
- the deserializer.
-
getListeners
public java.util.List<ConsumerFactory.Listener<K,V>> getListeners()
Get the current list of listeners.- Specified by:
getListeners
in interfaceConsumerFactory<K,V>
- Returns:
- the listeners.
- Since:
- 2.5
-
getPostProcessors
public java.util.List<ConsumerPostProcessor<K,V>> getPostProcessors()
Description copied from interface:ConsumerFactory
Get the current list of post processors.- Specified by:
getPostProcessors
in interfaceConsumerFactory<K,V>
- Returns:
- the post processor.
-
addListener
public void addListener(ConsumerFactory.Listener<K,V> listener)
Add a listener.- Specified by:
addListener
in interfaceConsumerFactory<K,V>
- Parameters:
listener
- the listener.- Since:
- 2.5
-
addListener
public void addListener(int index, ConsumerFactory.Listener<K,V> listener)
Add a listener at a specific index.- Specified by:
addListener
in interfaceConsumerFactory<K,V>
- Parameters:
index
- the index (list position).listener
- the listener.- Since:
- 2.5
-
addPostProcessor
public void addPostProcessor(ConsumerPostProcessor<K,V> postProcessor)
Description copied from interface:ConsumerFactory
Add a post processor.- Specified by:
addPostProcessor
in interfaceConsumerFactory<K,V>
- Parameters:
postProcessor
- the post processor.
-
removePostProcessor
public boolean removePostProcessor(ConsumerPostProcessor<K,V> postProcessor)
Description copied from interface:ConsumerFactory
Remove a post processor.- Specified by:
removePostProcessor
in interfaceConsumerFactory<K,V>
- Parameters:
postProcessor
- the post processor.- Returns:
- true if removed.
-
removeListener
public boolean removeListener(ConsumerFactory.Listener<K,V> listener)
Remove a listener.- Specified by:
removeListener
in interfaceConsumerFactory<K,V>
- Parameters:
listener
- the listener.- Returns:
- true if removed.
- Since:
- 2.5
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefix, @Nullable java.lang.String clientIdSuffix)
Description copied from interface:ConsumerFactory
Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.id
property, if present.- Specified by:
createConsumer
in interfaceConsumerFactory<K,V>
- Parameters:
groupId
- the group id.clientIdPrefix
- the prefix.clientIdSuffix
- the suffix.- Returns:
- the consumer.
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefix, @Nullable java.lang.String clientIdSuffixArg, @Nullable java.util.Properties properties)
Description copied from interface:ConsumerFactory
Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.id
property, if present. In addition, consumer properties can be overridden if the factory implementation supports it.- Specified by:
createConsumer
in interfaceConsumerFactory<K,V>
- Parameters:
groupId
- the group id.clientIdPrefix
- the prefix.clientIdSuffixArg
- the suffix.properties
- the properties to override.- Returns:
- the consumer.
-
createKafkaConsumer
@Deprecated protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefix, @Nullable java.lang.String clientIdSuffixArg)
Deprecated.
-
createKafkaConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefix, @Nullable java.lang.String clientIdSuffixArg, @Nullable java.util.Properties properties)
-
createKafkaConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)
-
createRawConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createRawConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)
Create a Consumer.- Parameters:
configProps
- the configuration properties.- Returns:
- the consumer.
- Since:
- 2.5
-
isAutoCommit
public boolean isAutoCommit()
Description copied from interface:ConsumerFactory
Return true if consumers created by this factory use auto commit.- Specified by:
isAutoCommit
in interfaceConsumerFactory<K,V>
- Returns:
- true if auto commit.
-
-