Package org.springframework.kafka.core
Class KafkaAdmin
java.lang.Object
org.springframework.kafka.core.KafkaResourceFactory
org.springframework.kafka.core.KafkaAdmin
- All Implemented Interfaces:
Aware
,SmartInitializingSingleton
,ApplicationContextAware
,KafkaAdminOperations
public class KafkaAdmin
extends KafkaResourceFactory
implements ApplicationContextAware, SmartInitializingSingleton, KafkaAdminOperations
An admin that delegates to an
AdminClient
to create topics defined
in the application context.- Since:
- 1.3
- Author:
- Gary Russell, Artem Bilan, Adrian Gygax, Sanghyeok An, Valentina Armenise, Anders Swanson, Omer Celik
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
Wrapper for a collection ofNewTopic
to facilitate declaring multiple topics as a single bean. -
Field Summary
Modifier and TypeFieldDescriptionstatic final Duration
The default close timeout duration as 10 seconds. -
Constructor Summary
ConstructorDescriptionKafkaAdmin
(Map<String, Object> config) Create an instance with anAdminClient
based on the supplied configuration. -
Method Summary
Modifier and TypeMethodDescriptionvoid
Return the cluster id, if available.void
createOrModifyTopics
(org.apache.kafka.clients.admin.NewTopic... topics) Create topics if they don't exist or increase the number of partitions if needed.describeTopics
(String... topicNames) ObtainTopicDescription
s for these topics.Get the clusterId property.Get an unmodifiable copy of this admin's configuration.protected Predicate<org.apache.kafka.clients.admin.NewTopic>
Return the predicate used to determine whether aNewTopic
should be considered for creation or modification.int
Return the operation timeout in seconds.final boolean
Call this method to check/add topics; this might be needed if the broker was not available when the application context was initialized, andfatalIfBrokerNotAvailable
is false, orautoCreate
was set to false.protected Collection<org.apache.kafka.clients.admin.NewTopic>
Return a collection ofNewTopic
s to create or modify.void
setApplicationContext
(ApplicationContext applicationContext) void
setAutoCreate
(boolean autoCreate) Set to false to suppress auto creation of topics during context initialization.void
setCloseTimeout
(int closeTimeout) Set the close timeout in seconds.void
setClusterId
(String clusterId) Set the cluster id.void
setCreateOrModifyTopic
(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic) Set a predicate that returns true if a discoveredNewTopic
bean should be considered for creation or modification by this admin instance.void
setFatalIfBrokerNotAvailable
(boolean fatalIfBrokerNotAvailable) Set to true if you want the application context to fail to load if we are unable to connect to the broker during initialization, to check/add topics.void
setModifyTopicConfigs
(boolean modifyTopicConfigs) Set to true to compare the current topic configuration properties with those in theNewTopic
bean, and update if different.void
setOperationTimeout
(int operationTimeout) Set the operation timeout in seconds.Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplier
-
Field Details
-
DEFAULT_CLOSE_TIMEOUT
The default close timeout duration as 10 seconds.
-
-
Constructor Details
-
KafkaAdmin
Create an instance with anAdminClient
based on the supplied configuration.- Parameters:
config
- the configuration for theAdminClient
.
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-
setCloseTimeout
public void setCloseTimeout(int closeTimeout) Set the close timeout in seconds. Defaults toDEFAULT_CLOSE_TIMEOUT
seconds.- Parameters:
closeTimeout
- the timeout.
-
setOperationTimeout
public void setOperationTimeout(int operationTimeout) Set the operation timeout in seconds. Defaults to 30 seconds.- Parameters:
operationTimeout
- the timeout.
-
getOperationTimeout
public int getOperationTimeout()Return the operation timeout in seconds.- Returns:
- the timeout.
- Since:
- 3.0.2
-
setFatalIfBrokerNotAvailable
public void setFatalIfBrokerNotAvailable(boolean fatalIfBrokerNotAvailable) Set to true if you want the application context to fail to load if we are unable to connect to the broker during initialization, to check/add topics.- Parameters:
fatalIfBrokerNotAvailable
- true to fail.
-
setAutoCreate
public void setAutoCreate(boolean autoCreate) Set to false to suppress auto creation of topics during context initialization.- Parameters:
autoCreate
- boolean flag to indicate creating topics or not during context initialization- See Also:
-
setModifyTopicConfigs
public void setModifyTopicConfigs(boolean modifyTopicConfigs) Set to true to compare the current topic configuration properties with those in theNewTopic
bean, and update if different.- Parameters:
modifyTopicConfigs
- true to check and update configs if necessary.- Since:
- 2.8.7
-
setCreateOrModifyTopic
public void setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic) Set a predicate that returns true if a discoveredNewTopic
bean should be considered for creation or modification by this admin instance. The default predicate returns true for allNewTopic
s. Used by the default implementation ofnewTopics()
.- Parameters:
createOrModifyTopic
- the predicate.- Since:
- 2.9.10
- See Also:
-
getCreateOrModifyTopic
Return the predicate used to determine whether aNewTopic
should be considered for creation or modification.- Returns:
- the predicate.
- Since:
- 2.9.10
- See Also:
-
setClusterId
Set the cluster id. Use this to prevent attempting to fetch the cluster id from the broker, perhaps if the user does not have admin permissions.- Parameters:
clusterId
- the clusterId to set- Since:
- 3.1
-
getClusterId
Get the clusterId property.- Returns:
- the cluster id.
- Since:
- 3.1.8
-
getConfigurationProperties
Description copied from interface:KafkaAdminOperations
Get an unmodifiable copy of this admin's configuration.- Specified by:
getConfigurationProperties
in interfaceKafkaAdminOperations
- Returns:
- the configuration map.
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()- Specified by:
afterSingletonsInstantiated
in interfaceSmartInitializingSingleton
-
initialize
public final boolean initialize()Call this method to check/add topics; this might be needed if the broker was not available when the application context was initialized, andfatalIfBrokerNotAvailable
is false, orautoCreate
was set to false.- Returns:
- true if successful.
- See Also:
-
newTopics
Return a collection ofNewTopic
s to create or modify. The default implementation retrieves allNewTopic
beans in the application context and applies thesetCreateOrModifyTopic(Predicate)
predicate to each one. It also removes anyTopicForRetryable
bean if there is also a NewTopic with the same topic name. This is performed before calling the predicate.- Returns:
- the collection of
NewTopic
s. - Since:
- 2.9.10
- See Also:
-
clusterId
Description copied from interface:KafkaAdminOperations
Return the cluster id, if available.- Specified by:
clusterId
in interfaceKafkaAdminOperations
- Returns:
- the describe cluster id.
-
createOrModifyTopics
public void createOrModifyTopics(org.apache.kafka.clients.admin.NewTopic... topics) Description copied from interface:KafkaAdminOperations
Create topics if they don't exist or increase the number of partitions if needed.- Specified by:
createOrModifyTopics
in interfaceKafkaAdminOperations
- Parameters:
topics
- the topics.
-
describeTopics
public Map<String,org.apache.kafka.clients.admin.TopicDescription> describeTopics(String... topicNames) Description copied from interface:KafkaAdminOperations
ObtainTopicDescription
s for these topics.- Specified by:
describeTopics
in interfaceKafkaAdminOperations
- Parameters:
topicNames
- the topic names.- Returns:
- a map of name:topicDescription.
-
getAdminConfig
-