Class EmbeddedKafkaBroker

  • All Implemented Interfaces:
    org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean

    public class EmbeddedKafkaBroker
    extends java.lang.Object
    implements org.springframework.beans.factory.InitializingBean, org.springframework.beans.factory.DisposableBean
    An embedded Kafka Broker(s) and Zookeeper manager. This class is intended to be used in the unit tests.
    Since:
    2.2
    Author:
    Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski, Adrian Chlebosz
    • Field Detail

      • SPRING_EMBEDDED_KAFKA_BROKERS

        public static final java.lang.String SPRING_EMBEDDED_KAFKA_BROKERS
        See Also:
        Constant Field Values
      • SPRING_EMBEDDED_ZOOKEEPER_CONNECT

        public static final java.lang.String SPRING_EMBEDDED_ZOOKEEPER_CONNECT
        See Also:
        Constant Field Values
      • BROKER_LIST_PROPERTY

        public static final java.lang.String BROKER_LIST_PROPERTY
        Set the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".
        See Also:
        Constant Field Values
      • DEFAULT_ZK_SESSION_TIMEOUT

        public static final int DEFAULT_ZK_SESSION_TIMEOUT
        See Also:
        Constant Field Values
      • DEFAULT_ZK_CONNECTION_TIMEOUT

        public static final int DEFAULT_ZK_CONNECTION_TIMEOUT
        See Also:
        Constant Field Values
    • Constructor Detail

      • EmbeddedKafkaBroker

        public EmbeddedKafkaBroker​(int count)
      • EmbeddedKafkaBroker

        public EmbeddedKafkaBroker​(int count,
                                   boolean controlledShutdown,
                                   java.lang.String... topics)
        Create embedded Kafka brokers.
        Parameters:
        count - the number of brokers.
        controlledShutdown - passed into TestUtils.createBrokerConfig.
        topics - the topics to create (2 partitions per).
      • EmbeddedKafkaBroker

        public EmbeddedKafkaBroker​(int count,
                                   boolean controlledShutdown,
                                   int partitions,
                                   java.lang.String... topics)
        Create embedded Kafka brokers listening on random ports.
        Parameters:
        count - the number of brokers.
        controlledShutdown - passed into TestUtils.createBrokerConfig.
        partitions - partitions per topic.
        topics - the topics to create.
    • Method Detail

      • brokerProperties

        public EmbeddedKafkaBroker brokerProperties​(java.util.Map<java.lang.String,​java.lang.String> properties)
        Specify the properties to configure Kafka Broker before start, e.g. auto.create.topics.enable, transaction.state.log.replication.factor etc.
        Parameters:
        properties - the properties to use for configuring Kafka Broker(s).
        Returns:
        this for chaining configuration.
        See Also:
        KafkaConfig
      • brokerProperty

        public EmbeddedKafkaBroker brokerProperty​(java.lang.String property,
                                                  java.lang.Object value)
        Specify a broker property.
        Parameters:
        property - the property name.
        value - the value.
        Returns:
        the EmbeddedKafkaBroker.
      • kafkaPorts

        public EmbeddedKafkaBroker kafkaPorts​(int... ports)
        Set explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.
        Parameters:
        ports - the ports.
        Returns:
        the EmbeddedKafkaBroker.
      • brokerListProperty

        public EmbeddedKafkaBroker brokerListProperty​(java.lang.String brokerListProperty)
        Set the system property with this name to the list of broker addresses.
        Parameters:
        brokerListProperty - the brokerListProperty to set
        Returns:
        this broker.
        Since:
        2.3
      • getZkPort

        public int getZkPort()
        Get the port that the embedded Zookeeper is running on or will run on.
        Returns:
        the port.
        Since:
        2.3
      • setZkPort

        public void setZkPort​(int zkPort)
        Set the port to run the embedded Zookeeper on (default random).
        Parameters:
        zkPort - the port.
        Since:
        2.3
      • adminTimeout

        public EmbeddedKafkaBroker adminTimeout​(int adminTimeout)
        Set the timeout in seconds for admin operations (e.g. topic creation, close).
        Parameters:
        adminTimeout - the timeout.
        Returns:
        the EmbeddedKafkaBroker
        Since:
        2.8.5
      • setAdminTimeout

        public void setAdminTimeout​(int adminTimeout)
        Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 10 seconds.
        Parameters:
        adminTimeout - the timeout.
        Since:
        2.2
      • zkConnectionTimeout

        public EmbeddedKafkaBroker zkConnectionTimeout​(int zkConnectionTimeout)
        Set connection timeout for the client to the embedded Zookeeper.
        Parameters:
        zkConnectionTimeout - the connection timeout,
        Returns:
        the EmbeddedKafkaBroker.
        Since:
        2.4
      • zkSessionTimeout

        public EmbeddedKafkaBroker zkSessionTimeout​(int zkSessionTimeout)
        Set session timeout for the client to the embedded Zookeeper.
        Parameters:
        zkSessionTimeout - the session timeout.
        Returns:
        the EmbeddedKafkaBroker.
        Since:
        2.4
      • afterPropertiesSet

        public void afterPropertiesSet()
        Specified by:
        afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
      • addTopics

        public void addTopics​(java.lang.String... topicsToAdd)
        Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.
        Parameters:
        topicsToAdd - the topics.
      • addTopics

        public void addTopics​(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
        Add topics to the existing broker(s). The broker(s) must be running.
        Parameters:
        topicsToAdd - the topics.
        Since:
        2.2
      • addTopicsWithResults

        public java.util.Map<java.lang.String,​java.lang.Exception> addTopicsWithResults​(java.lang.String... topicsToAdd)
        Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.
        Parameters:
        topicsToAdd - the topics.
        Returns:
        the results; null values indicate success.
        Since:
        2.5.4
      • addTopicsWithResults

        public java.util.Map<java.lang.String,​java.lang.Exception> addTopicsWithResults​(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
        Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.
        Parameters:
        topicsToAdd - the topics.
        Returns:
        the results; null values indicate success.
        Since:
        2.5.4
      • doWithAdmin

        public void doWithAdmin​(java.util.function.Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
        Create an AdminClient; invoke the callback and reliably close the admin.
        Parameters:
        callback - the callback.
      • doWithAdminFunction

        public <T> T doWithAdminFunction​(java.util.function.Function<org.apache.kafka.clients.admin.AdminClient,​T> callback)
        Create an AdminClient; invoke the callback and reliably close the admin.
        Type Parameters:
        T - the function return type.
        Parameters:
        callback - the callback.
        Returns:
        a map of results.
        Since:
        2.5.4
      • destroy

        public void destroy()
        Specified by:
        destroy in interface org.springframework.beans.factory.DisposableBean
      • getTopics

        public java.util.Set<java.lang.String> getTopics()
      • getKafkaServers

        public java.util.List<kafka.server.KafkaServer> getKafkaServers()
      • getKafkaServer

        public kafka.server.KafkaServer getKafkaServer​(int id)
      • getZooKeeperClient

        public kafka.zookeeper.ZooKeeperClient getZooKeeperClient()
        Return the ZooKeeperClient.
        Returns:
        the client.
        Since:
        2.3.2
      • getZookeeperConnectionString

        public java.lang.String getZookeeperConnectionString()
      • getBrokerAddress

        public BrokerAddress getBrokerAddress​(int i)
      • getBrokerAddresses

        public BrokerAddress[] getBrokerAddresses()
      • getPartitionsPerTopic

        public int getPartitionsPerTopic()
      • restart

        public void restart​(int index)
                     throws java.lang.Exception
        Throws:
        java.lang.Exception
      • getBrokersAsString

        public java.lang.String getBrokersAsString()
      • consumeFromAllEmbeddedTopics

        public void consumeFromAllEmbeddedTopics​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer)
        Subscribe a consumer to all the embedded topics.
        Parameters:
        consumer - the consumer.
      • consumeFromAllEmbeddedTopics

        public void consumeFromAllEmbeddedTopics​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                                 boolean seekToEnd)
        Subscribe a consumer to all the embedded topics.
        Parameters:
        seekToEnd - true to seek to the end instead of the beginning.
        consumer - the consumer.
        Since:
        2.8.2
      • consumeFromAnEmbeddedTopic

        public void consumeFromAnEmbeddedTopic​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                               java.lang.String topic)
        Subscribe a consumer to one of the embedded topics.
        Parameters:
        consumer - the consumer.
        topic - the topic.
      • consumeFromAnEmbeddedTopic

        public void consumeFromAnEmbeddedTopic​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                               boolean seekToEnd,
                                               java.lang.String topic)
        Subscribe a consumer to one of the embedded topics.
        Parameters:
        consumer - the consumer.
        seekToEnd - true to seek to the end instead of the beginning.
        topic - the topic.
        Since:
        2.8.2
      • consumeFromEmbeddedTopics

        public void consumeFromEmbeddedTopics​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                              java.lang.String... topicsToConsume)
        Subscribe a consumer to one or more of the embedded topics.
        Parameters:
        consumer - the consumer.
        topicsToConsume - the topics.
        Throws:
        java.lang.IllegalStateException - if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
      • consumeFromEmbeddedTopics

        public void consumeFromEmbeddedTopics​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                              boolean seekToEnd,
                                              java.lang.String... topicsToConsume)
        Subscribe a consumer to one or more of the embedded topics.
        Parameters:
        consumer - the consumer.
        topicsToConsume - the topics.
        seekToEnd - true to seek to the end instead of the beginning.
        Throws:
        java.lang.IllegalStateException - if you attempt to consume from a topic that is not in the list of embedded topics.
        Since:
        2.8.2