Class EmbeddedKafkaZKBroker

java.lang.Object
org.springframework.kafka.test.EmbeddedKafkaZKBroker
All Implemented Interfaces:
DisposableBean, InitializingBean, EmbeddedKafkaBroker

public class EmbeddedKafkaZKBroker extends Object implements EmbeddedKafkaBroker
An embedded Kafka Broker(s) and Zookeeper manager. This class is intended to be used in the unit tests.
Since:
2.2
Author:
Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski, Adrian Chlebosz, Soby Chacko, Sanghyeok An, Borahm Lee, Wouter Coekaerts
  • Field Details

    • SPRING_EMBEDDED_ZOOKEEPER_CONNECT

      public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT
      See Also:
    • DEFAULT_ZK_SESSION_TIMEOUT

      public static final int DEFAULT_ZK_SESSION_TIMEOUT
      See Also:
    • DEFAULT_ZK_CONNECTION_TIMEOUT

      public static final int DEFAULT_ZK_CONNECTION_TIMEOUT
      See Also:
  • Constructor Details

    • EmbeddedKafkaZKBroker

      public EmbeddedKafkaZKBroker(int count)
    • EmbeddedKafkaZKBroker

      public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics)
      Create embedded Kafka brokers.
      Parameters:
      count - the number of brokers.
      controlledShutdown - passed into TestUtils.createBrokerConfig.
      topics - the topics to create (2 partitions per).
    • EmbeddedKafkaZKBroker

      public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics)
      Create embedded Kafka brokers listening on random ports.
      Parameters:
      count - the number of brokers.
      controlledShutdown - passed into TestUtils.createBrokerConfig.
      partitions - partitions per topic.
      topics - the topics to create.
  • Method Details

    • brokerProperties

      public EmbeddedKafkaBroker brokerProperties(Map<String,String> properties)
      Specify the properties to configure Kafka Broker before start, e.g. auto.create.topics.enable, transaction.state.log.replication.factor etc.
      Specified by:
      brokerProperties in interface EmbeddedKafkaBroker
      Parameters:
      properties - the properties to use for configuring Kafka Broker(s).
      Returns:
      this for chaining configuration.
      See Also:
      • KafkaConfig
    • brokerProperty

      public EmbeddedKafkaBroker brokerProperty(String property, Object value)
      Specify a broker property.
      Parameters:
      property - the property name.
      value - the value.
      Returns:
      the EmbeddedKafkaBroker.
    • kafkaPorts

      public EmbeddedKafkaZKBroker kafkaPorts(int... ports)
      Set explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.
      Specified by:
      kafkaPorts in interface EmbeddedKafkaBroker
      Parameters:
      ports - the ports.
      Returns:
      the EmbeddedKafkaBroker.
    • brokerListProperty

      public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty)
      Set the system property with this name to the list of broker addresses. Defaults to spring.kafka.bootstrap-servers for Spring Boot compatibility, since 3.0.10.
      Specified by:
      brokerListProperty in interface EmbeddedKafkaBroker
      Parameters:
      brokerListProperty - the brokerListProperty to set
      Returns:
      this broker.
      Since:
      2.3
    • zkPort

      public EmbeddedKafkaZKBroker zkPort(int port)
      Set an explicit port for the embedded Zookeeper.
      Parameters:
      port - the port.
      Returns:
      the EmbeddedKafkaBroker.
      Since:
      2.3
    • getZkPort

      public int getZkPort()
      Get the port that the embedded Zookeeper is running on or will run on.
      Returns:
      the port.
      Since:
      2.3
    • setZkPort

      public void setZkPort(int zkPort)
      Set the port to run the embedded Zookeeper on (default random).
      Parameters:
      zkPort - the port.
      Since:
      2.3
    • adminTimeout

      public EmbeddedKafkaBroker adminTimeout(int adminTimeout)
      Description copied from interface: EmbeddedKafkaBroker
      Set the timeout in seconds for admin operations (e.g. topic creation, close).
      Specified by:
      adminTimeout in interface EmbeddedKafkaBroker
      Parameters:
      adminTimeout - the timeout.
      Returns:
      the EmbeddedKafkaBroker
    • setAdminTimeout

      public void setAdminTimeout(int adminTimeout)
      Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 10 seconds.
      Parameters:
      adminTimeout - the timeout.
      Since:
      2.2
    • zkConnectionTimeout

      public EmbeddedKafkaZKBroker zkConnectionTimeout(int zkConnectionTimeout)
      Set connection timeout for the client to the embedded Zookeeper.
      Parameters:
      zkConnectionTimeout - the connection timeout,
      Returns:
      the EmbeddedKafkaBroker.
      Since:
      2.4
    • zkSessionTimeout

      public EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout)
      Set session timeout for the client to the embedded Zookeeper.
      Parameters:
      zkSessionTimeout - the session timeout.
      Returns:
      the EmbeddedKafkaBroker.
      Since:
      2.4
    • afterPropertiesSet

      public void afterPropertiesSet()
      Specified by:
      afterPropertiesSet in interface EmbeddedKafkaBroker
      Specified by:
      afterPropertiesSet in interface InitializingBean
    • addTopics

      public void addTopics(String... topicsToAdd)
      Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.
      Specified by:
      addTopics in interface EmbeddedKafkaBroker
      Parameters:
      topicsToAdd - the topics.
    • addTopics

      public void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
      Add topics to the existing broker(s). The broker(s) must be running.
      Specified by:
      addTopics in interface EmbeddedKafkaBroker
      Parameters:
      topicsToAdd - the topics.
      Since:
      2.2
    • addTopicsWithResults

      public Map<String,Exception> addTopicsWithResults(String... topicsToAdd)
      Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.
      Specified by:
      addTopicsWithResults in interface EmbeddedKafkaBroker
      Parameters:
      topicsToAdd - the topics.
      Returns:
      the results; null values indicate success.
      Since:
      2.5.4
    • addTopicsWithResults

      public Map<String,Exception> addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
      Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.
      Specified by:
      addTopicsWithResults in interface EmbeddedKafkaBroker
      Parameters:
      topicsToAdd - the topics.
      Returns:
      the results; null values indicate success.
      Since:
      2.5.4
    • doWithAdmin

      public void doWithAdmin(Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
      Create an AdminClient; invoke the callback and reliably close the admin.
      Parameters:
      callback - the callback.
    • doWithAdminFunction

      public <T> T doWithAdminFunction(Function<org.apache.kafka.clients.admin.AdminClient,T> callback)
      Create an AdminClient; invoke the callback and reliably close the admin.
      Type Parameters:
      T - the function return type.
      Parameters:
      callback - the callback.
      Returns:
      a map of results.
      Since:
      2.5.4
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface EmbeddedKafkaBroker
    • getTopics

      public Set<String> getTopics()
      Description copied from interface: EmbeddedKafkaBroker
      Get the topics.
      Specified by:
      getTopics in interface EmbeddedKafkaBroker
      Returns:
      the topics.
    • getKafkaServers

      public List<kafka.server.KafkaServer> getKafkaServers()
    • getKafkaServer

      public kafka.server.KafkaServer getKafkaServer(int id)
    • getZookeeper

    • getZooKeeperClient

      public kafka.zookeeper.ZooKeeperClient getZooKeeperClient()
      Return the ZooKeeperClient.
      Returns:
      the client.
      Since:
      2.3.2
    • getZookeeperConnectionString

      public String getZookeeperConnectionString()
    • getBrokerAddress

      public BrokerAddress getBrokerAddress(int i)
    • getBrokerAddresses

      public BrokerAddress[] getBrokerAddresses()
    • getPartitionsPerTopic

      public int getPartitionsPerTopic()
      Description copied from interface: EmbeddedKafkaBroker
      Get the configured number of partitions per topic.
      Specified by:
      getPartitionsPerTopic in interface EmbeddedKafkaBroker
      Returns:
      the partition count.
    • bounce

      public void bounce(BrokerAddress brokerAddress)
    • restart

      public void restart(int index) throws Exception
      Throws:
      Exception
    • getBrokersAsString

      public String getBrokersAsString()
      Description copied from interface: EmbeddedKafkaBroker
      Get the bootstrap server addresses as a String.
      Specified by:
      getBrokersAsString in interface EmbeddedKafkaBroker
      Returns:
      the bootstrap servers.
    • consumeFromAllEmbeddedTopics

      public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
      Subscribe a consumer to all the embedded topics.
      Specified by:
      consumeFromAllEmbeddedTopics in interface EmbeddedKafkaBroker
      Parameters:
      consumer - the consumer.
    • consumeFromAllEmbeddedTopics

      public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd)
      Subscribe a consumer to all the embedded topics.
      Specified by:
      consumeFromAllEmbeddedTopics in interface EmbeddedKafkaBroker
      Parameters:
      seekToEnd - true to seek to the end instead of the beginning.
      consumer - the consumer.
      Since:
      2.8.2
    • consumeFromAnEmbeddedTopic

      public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, String topic)
      Subscribe a consumer to one of the embedded topics.
      Specified by:
      consumeFromAnEmbeddedTopic in interface EmbeddedKafkaBroker
      Parameters:
      consumer - the consumer.
      topic - the topic.
    • consumeFromAnEmbeddedTopic

      public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, String topic)
      Subscribe a consumer to one of the embedded topics.
      Specified by:
      consumeFromAnEmbeddedTopic in interface EmbeddedKafkaBroker
      Parameters:
      consumer - the consumer.
      seekToEnd - true to seek to the end instead of the beginning.
      topic - the topic.
      Since:
      2.8.2
    • consumeFromEmbeddedTopics

      public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, String... topicsToConsume)
      Subscribe a consumer to one or more of the embedded topics.
      Specified by:
      consumeFromEmbeddedTopics in interface EmbeddedKafkaBroker
      Parameters:
      consumer - the consumer.
      topicsToConsume - the topics.
      Throws:
      IllegalStateException - if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
    • consumeFromEmbeddedTopics

      public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, String... topicsToConsume)
      Subscribe a consumer to one or more of the embedded topics.
      Specified by:
      consumeFromEmbeddedTopics in interface EmbeddedKafkaBroker
      Parameters:
      consumer - the consumer.
      topicsToConsume - the topics.
      seekToEnd - true to seek to the end instead of the beginning.
      Throws:
      IllegalStateException - if you attempt to consume from a topic that is not in the list of embedded topics.
      Since:
      2.8.2