Class KafkaAdmin

All Implemented Interfaces:
Aware, SmartInitializingSingleton, ApplicationContextAware, KafkaAdminOperations

An admin that delegates to an AdminClient to create topics defined in the application context.
Since:
1.3
Author:
Gary Russell, Artem Bilan
  • Field Details

    • DEFAULT_CLOSE_TIMEOUT

      public static final Duration DEFAULT_CLOSE_TIMEOUT
      The default close timeout duration as 10 seconds.
  • Constructor Details

    • KafkaAdmin

      public KafkaAdmin(Map<String,Object> config)
      Create an instance with an AdminClient based on the supplied configuration.
      Parameters:
      config - the configuration for the AdminClient.
  • Method Details

    • setApplicationContext

      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
      Specified by:
      setApplicationContext in interface ApplicationContextAware
      Throws:
      BeansException
    • setCloseTimeout

      public void setCloseTimeout(int closeTimeout)
      Set the close timeout in seconds. Defaults to DEFAULT_CLOSE_TIMEOUT seconds.
      Parameters:
      closeTimeout - the timeout.
    • setOperationTimeout

      public void setOperationTimeout(int operationTimeout)
      Set the operation timeout in seconds. Defaults to 30 seconds.
      Parameters:
      operationTimeout - the timeout.
    • getOperationTimeout

      public int getOperationTimeout()
      Return the operation timeout in seconds.
      Returns:
      the timeout.
      Since:
      3.0.2
    • setFatalIfBrokerNotAvailable

      public void setFatalIfBrokerNotAvailable(boolean fatalIfBrokerNotAvailable)
      Set to true if you want the application context to fail to load if we are unable to connect to the broker during initialization, to check/add topics.
      Parameters:
      fatalIfBrokerNotAvailable - true to fail.
    • setAutoCreate

      public void setAutoCreate(boolean autoCreate)
      Set to false to suppress auto creation of topics during context initialization.
      Parameters:
      autoCreate - boolean flag to indicate creating topics or not during context initialization
      See Also:
    • setModifyTopicConfigs

      public void setModifyTopicConfigs(boolean modifyTopicConfigs)
      Set to true to compare the current topic configuration properties with those in the NewTopic bean, and update if different.
      Parameters:
      modifyTopicConfigs - true to check and update configs if necessary.
      Since:
      2.8.7
    • setCreateOrModifyTopic

      public void setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)
      Set a predicate that returns true if a discovered NewTopic bean should be considered for creation or modification by this admin instance. The default predicate returns true for all NewTopics. Used by the default implementation of newTopics().
      Parameters:
      createOrModifyTopic - the predicate.
      Since:
      2.9.10
      See Also:
    • getCreateOrModifyTopic

      protected Predicate<org.apache.kafka.clients.admin.NewTopic> getCreateOrModifyTopic()
      Return the predicate used to determine whether a NewTopic should be considered for creation or modification.
      Returns:
      the predicate.
      Since:
      2.9.10
      See Also:
    • setClusterId

      public void setClusterId(String clusterId)
      Set the cluster id. Use this to prevent attempting to fetch the cluster id from the broker, perhaps if the user does not have admin permissions.
      Parameters:
      clusterId - the clusterId to set
      Since:
      3.1
    • getConfigurationProperties

      public Map<String,Object> getConfigurationProperties()
      Description copied from interface: KafkaAdminOperations
      Get an unmodifiable copy of this admin's configuration.
      Specified by:
      getConfigurationProperties in interface KafkaAdminOperations
      Returns:
      the configuration map.
    • afterSingletonsInstantiated

      public void afterSingletonsInstantiated()
      Specified by:
      afterSingletonsInstantiated in interface SmartInitializingSingleton
    • initialize

      public final boolean initialize()
      Call this method to check/add topics; this might be needed if the broker was not available when the application context was initialized, and fatalIfBrokerNotAvailable is false, or autoCreate was set to false.
      Returns:
      true if successful.
      See Also:
    • newTopics

      protected Collection<org.apache.kafka.clients.admin.NewTopic> newTopics()
      Return a collection of NewTopics to create or modify. The default implementation retrieves all NewTopic beans in the application context and applies the setCreateOrModifyTopic(Predicate) predicate to each one. It also removes any TopicForRetryable bean if there is also a NewTopic with the same topic name. This is performed before calling the predicate.
      Returns:
      the collection of NewTopics.
      Since:
      2.9.10
      See Also:
    • clusterId

      @Nullable public String clusterId()
      Description copied from interface: KafkaAdminOperations
      Return the cluster id, if available.
      Specified by:
      clusterId in interface KafkaAdminOperations
      Returns:
      the describe cluster id.
    • createOrModifyTopics

      public void createOrModifyTopics(org.apache.kafka.clients.admin.NewTopic... topics)
      Description copied from interface: KafkaAdminOperations
      Create topics if they don't exist or increase the number of partitions if needed.
      Specified by:
      createOrModifyTopics in interface KafkaAdminOperations
      Parameters:
      topics - the topics.
    • describeTopics

      public Map<String,org.apache.kafka.clients.admin.TopicDescription> describeTopics(String... topicNames)
      Description copied from interface: KafkaAdminOperations
      Obtain TopicDescriptions for these topics.
      Specified by:
      describeTopics in interface KafkaAdminOperations
      Parameters:
      topicNames - the topic names.
      Returns:
      a map of name:topicDescription.