Package org.springframework.kafka.core
Class KafkaAdmin
- java.lang.Object
-
- org.springframework.kafka.core.KafkaResourceFactory
-
- org.springframework.kafka.core.KafkaAdmin
-
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.SmartInitializingSingleton,org.springframework.context.ApplicationContextAware,KafkaAdminOperations
public class KafkaAdmin extends KafkaResourceFactory implements org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.SmartInitializingSingleton, KafkaAdminOperations
An admin that delegates to anAdminClientto create topics defined in the application context.- Since:
- 1.3
- Author:
- Gary Russell, Artem Bilan
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classKafkaAdmin.NewTopicsWrapper for a collection ofNewTopicto facilitate declaring multiple topics as a single bean.
-
Field Summary
Fields Modifier and Type Field Description static java.time.DurationDEFAULT_CLOSE_TIMEOUTThe default close timeout duration as 10 seconds.
-
Constructor Summary
Constructors Constructor Description KafkaAdmin(java.util.Map<java.lang.String,java.lang.Object> config)Create an instance with anAdminClientbased on the supplied configuration.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidafterSingletonsInstantiated()voidcreateOrModifyTopics(org.apache.kafka.clients.admin.NewTopic... topics)Create topics if they don't exist or increase the number of partitions if needed.java.util.Map<java.lang.String,org.apache.kafka.clients.admin.TopicDescription>describeTopics(java.lang.String... topicNames)ObtainTopicDescriptions for these topics.java.util.Map<java.lang.String,java.lang.Object>getConfigurationProperties()Get an unmodifiable copy of this admin's configuration.protected java.util.function.Predicate<org.apache.kafka.clients.admin.NewTopic>getCreateOrModifyTopic()Return the predicate used to determine whether aNewTopicshould be considered for creation or modification.booleaninitialize()Call this method to check/add topics; this might be needed if the broker was not available when the application context was initialized, andfatalIfBrokerNotAvailableis false, orautoCreatewas set to false.protected java.util.Collection<org.apache.kafka.clients.admin.NewTopic>newTopics()Return a collection ofNewTopics to create or modify.voidsetApplicationContext(org.springframework.context.ApplicationContext applicationContext)voidsetAutoCreate(boolean autoCreate)Set to false to suppress auto creation of topics during context initialization.voidsetCloseTimeout(int closeTimeout)Set the close timeout in seconds.voidsetCreateOrModifyTopic(java.util.function.Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)Set a predicate that returns true if a discoveredNewTopicbean should be considered for creation or modification by this admin instance.voidsetFatalIfBrokerNotAvailable(boolean fatalIfBrokerNotAvailable)Set to true if you want the application context to fail to load if we are unable to connect to the broker during initialization, to check/add topics.voidsetModifyTopicConfigs(boolean modifyTopicConfigs)Set to true to compare the current topic configuration properties with those in theNewTopicbean, and update if different.voidsetOperationTimeout(int operationTimeout)Set the operation timeout in seconds.-
Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplier
-
-
-
-
Method Detail
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException- Specified by:
setApplicationContextin interfaceorg.springframework.context.ApplicationContextAware- Throws:
org.springframework.beans.BeansException
-
setCloseTimeout
public void setCloseTimeout(int closeTimeout)
Set the close timeout in seconds. Defaults toDEFAULT_CLOSE_TIMEOUTseconds.- Parameters:
closeTimeout- the timeout.
-
setOperationTimeout
public void setOperationTimeout(int operationTimeout)
Set the operation timeout in seconds. Defaults to 30 seconds.- Parameters:
operationTimeout- the timeout.
-
setFatalIfBrokerNotAvailable
public void setFatalIfBrokerNotAvailable(boolean fatalIfBrokerNotAvailable)
Set to true if you want the application context to fail to load if we are unable to connect to the broker during initialization, to check/add topics.- Parameters:
fatalIfBrokerNotAvailable- true to fail.
-
setAutoCreate
public void setAutoCreate(boolean autoCreate)
Set to false to suppress auto creation of topics during context initialization.- Parameters:
autoCreate- boolean flag to indicate creating topics or not during context initialization- See Also:
initialize()
-
setModifyTopicConfigs
public void setModifyTopicConfigs(boolean modifyTopicConfigs)
Set to true to compare the current topic configuration properties with those in theNewTopicbean, and update if different.- Parameters:
modifyTopicConfigs- true to check and update configs if necessary.- Since:
- 2.8.7
-
setCreateOrModifyTopic
public void setCreateOrModifyTopic(java.util.function.Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)
Set a predicate that returns true if a discoveredNewTopicbean should be considered for creation or modification by this admin instance. The default predicate returns true for allNewTopics. Used by the default implementation ofnewTopics().- Parameters:
createOrModifyTopic- the predicate.- Since:
- 2.9.10
- See Also:
newTopics()
-
getCreateOrModifyTopic
protected java.util.function.Predicate<org.apache.kafka.clients.admin.NewTopic> getCreateOrModifyTopic()
Return the predicate used to determine whether aNewTopicshould be considered for creation or modification.- Returns:
- the predicate.
- Since:
- 2.9.10
- See Also:
newTopics()
-
getConfigurationProperties
public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
Description copied from interface:KafkaAdminOperationsGet an unmodifiable copy of this admin's configuration.- Specified by:
getConfigurationPropertiesin interfaceKafkaAdminOperations- Returns:
- the configuration map.
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()
- Specified by:
afterSingletonsInstantiatedin interfaceorg.springframework.beans.factory.SmartInitializingSingleton
-
initialize
public final boolean initialize()
Call this method to check/add topics; this might be needed if the broker was not available when the application context was initialized, andfatalIfBrokerNotAvailableis false, orautoCreatewas set to false.- Returns:
- true if successful.
- See Also:
setFatalIfBrokerNotAvailable(boolean),setAutoCreate(boolean)
-
newTopics
protected java.util.Collection<org.apache.kafka.clients.admin.NewTopic> newTopics()
Return a collection ofNewTopics to create or modify. The default implementation retrieves allNewTopicbeans in the application context and applies thesetCreateOrModifyTopic(Predicate)predicate to each one. It also removes anyTopicForRetryablebean if there is also a NewTopic with the same topic name. This is performed before calling the predicate.- Returns:
- the collection of
NewTopics. - Since:
- 2.9.10
- See Also:
setCreateOrModifyTopic(Predicate)
-
createOrModifyTopics
public void createOrModifyTopics(org.apache.kafka.clients.admin.NewTopic... topics)
Description copied from interface:KafkaAdminOperationsCreate topics if they don't exist or increase the number of partitions if needed.- Specified by:
createOrModifyTopicsin interfaceKafkaAdminOperations- Parameters:
topics- the topics.
-
describeTopics
public java.util.Map<java.lang.String,org.apache.kafka.clients.admin.TopicDescription> describeTopics(java.lang.String... topicNames)
Description copied from interface:KafkaAdminOperationsObtainTopicDescriptions for these topics.- Specified by:
describeTopicsin interfaceKafkaAdminOperations- Parameters:
topicNames- the topic names.- Returns:
- a map of name:topicDescription.
-
-