Package org.springframework.kafka.test
Class EmbeddedKafkaZKBroker
java.lang.Object
org.springframework.kafka.test.EmbeddedKafkaZKBroker
- All Implemented Interfaces:
- DisposableBean,- InitializingBean,- EmbeddedKafkaBroker
An embedded Kafka Broker(s) and Zookeeper manager.
 This class is intended to be used in the unit tests.
- Since:
- 2.2
- Author:
- Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski, Adrian Chlebosz
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic final classPorted from scala to allow setting the port.
- 
Field SummaryFieldsFields inherited from interface org.springframework.kafka.test.EmbeddedKafkaBrokerBEAN_NAME, BROKER_LIST_PROPERTY, BROKER_NEEDED, DEFAULT_ADMIN_TIMEOUT, LOOPBACK, SPRING_EMBEDDED_KAFKA_BROKERS
- 
Constructor SummaryConstructorsConstructorDescriptionEmbeddedKafkaZKBroker(int count) EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics) Create embedded Kafka brokers listening on random ports.EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics) Create embedded Kafka brokers.
- 
Method SummaryModifier and TypeMethodDescriptionvoidAdd topics to the existing broker(s) using the configured number of partitions.voidaddTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s).addTopicsWithResults(String... topicsToAdd) Add topics to the existing broker(s) using the configured number of partitions.addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s) and returning a map of results.adminTimeout(int adminTimeout) Set the timeout in seconds for admin operations (e.g.voidvoidbounce(BrokerAddress brokerAddress) brokerListProperty(String brokerListProperty) Set the system property with this name to the list of broker addresses.brokerProperties(Map<String, String> properties) Specify the properties to configure Kafka Broker before start, e.g.brokerProperty(String property, Object value) Specify a broker property.voidconsumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Subscribe a consumer to all the embedded topics.voidconsumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd) Subscribe a consumer to all the embedded topics.voidconsumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String topic) Subscribe a consumer to one of the embedded topics.voidconsumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String topic) Subscribe a consumer to one of the embedded topics.voidconsumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.voidconsumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.voiddestroy()voiddoWithAdmin(Consumer<org.apache.kafka.clients.admin.AdminClient> callback) Create anAdminClient; invoke the callback and reliably close the admin.<T> TdoWithAdminFunction(Function<org.apache.kafka.clients.admin.AdminClient, T> callback) Create anAdminClient; invoke the callback and reliably close the admin.getBrokerAddress(int i) Get the bootstrap server addresses as a String.kafka.server.KafkaServergetKafkaServer(int id) List<kafka.server.KafkaServer>intGet the configured number of partitions per topic.Get the topics.intGet the port that the embedded Zookeeper is running on or will run on.kafka.zookeeper.ZooKeeperClientReturn the ZooKeeperClient.kafkaPorts(int... ports) Set explicit ports on which the kafka brokers will listen.voidrestart(int index) voidsetAdminTimeout(int adminTimeout) Set the timeout in seconds for admin operations (e.g.voidsetZkPort(int zkPort) Set the port to run the embedded Zookeeper on (default random).zkConnectionTimeout(int zkConnectionTimeout) Set connection timeout for the client to the embedded Zookeeper.zkPort(int port) Set an explicit port for the embedded Zookeeper.zkSessionTimeout(int zkSessionTimeout) Set session timeout for the client to the embedded Zookeeper.
- 
Field Details- 
SPRING_EMBEDDED_ZOOKEEPER_CONNECT- See Also:
 
- 
DEFAULT_ZK_SESSION_TIMEOUTpublic static final int DEFAULT_ZK_SESSION_TIMEOUT- See Also:
 
- 
DEFAULT_ZK_CONNECTION_TIMEOUTpublic static final int DEFAULT_ZK_CONNECTION_TIMEOUT- See Also:
 
 
- 
- 
Constructor Details- 
EmbeddedKafkaZKBrokerpublic EmbeddedKafkaZKBroker(int count) 
- 
EmbeddedKafkaZKBrokerCreate embedded Kafka brokers.- Parameters:
- count- the number of brokers.
- controlledShutdown- passed into TestUtils.createBrokerConfig.
- topics- the topics to create (2 partitions per).
 
- 
EmbeddedKafkaZKBrokerpublic EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics) Create embedded Kafka brokers listening on random ports.- Parameters:
- count- the number of brokers.
- controlledShutdown- passed into TestUtils.createBrokerConfig.
- partitions- partitions per topic.
- topics- the topics to create.
 
 
- 
- 
Method Details- 
brokerPropertiesSpecify the properties to configure Kafka Broker before start, e.g.auto.create.topics.enable,transaction.state.log.replication.factoretc.- Specified by:
- brokerPropertiesin interface- EmbeddedKafkaBroker
- Parameters:
- properties- the properties to use for configuring Kafka Broker(s).
- Returns:
- this for chaining configuration.
- See Also:
- 
- KafkaConfig
 
 
- 
brokerPropertySpecify a broker property.- Parameters:
- property- the property name.
- value- the value.
- Returns:
- the EmbeddedKafkaBroker.
 
- 
kafkaPortsSet explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.- Specified by:
- kafkaPortsin interface- EmbeddedKafkaBroker
- Parameters:
- ports- the ports.
- Returns:
- the EmbeddedKafkaBroker.
 
- 
brokerListPropertySet the system property with this name to the list of broker addresses. Defaults tospring.kafka.bootstrap-serversfor Spring Boot compatibility, since 3.0.10.- Specified by:
- brokerListPropertyin interface- EmbeddedKafkaBroker
- Parameters:
- brokerListProperty- the brokerListProperty to set
- Returns:
- this broker.
- Since:
- 2.3
 
- 
zkPortSet an explicit port for the embedded Zookeeper.- Parameters:
- port- the port.
- Returns:
- the EmbeddedKafkaBroker.
- Since:
- 2.3
 
- 
getZkPortpublic int getZkPort()Get the port that the embedded Zookeeper is running on or will run on.- Returns:
- the port.
- Since:
- 2.3
 
- 
setZkPortpublic void setZkPort(int zkPort) Set the port to run the embedded Zookeeper on (default random).- Parameters:
- zkPort- the port.
- Since:
- 2.3
 
- 
adminTimeoutSet the timeout in seconds for admin operations (e.g. topic creation, close).- Parameters:
- adminTimeout- the timeout.
- Returns:
- the EmbeddedKafkaBroker
- Since:
- 2.8.5
 
- 
setAdminTimeoutpublic void setAdminTimeout(int adminTimeout) Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 10 seconds.- Parameters:
- adminTimeout- the timeout.
- Since:
- 2.2
 
- 
zkConnectionTimeoutSet connection timeout for the client to the embedded Zookeeper.- Parameters:
- zkConnectionTimeout- the connection timeout,
- Returns:
- the EmbeddedKafkaBroker.
- Since:
- 2.4
 
- 
zkSessionTimeoutSet session timeout for the client to the embedded Zookeeper.- Parameters:
- zkSessionTimeout- the session timeout.
- Returns:
- the EmbeddedKafkaBroker.
- Since:
- 2.4
 
- 
afterPropertiesSetpublic void afterPropertiesSet()- Specified by:
- afterPropertiesSetin interface- EmbeddedKafkaBroker
- Specified by:
- afterPropertiesSetin interface- InitializingBean
 
- 
addTopicsAdd topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Specified by:
- addTopicsin interface- EmbeddedKafkaBroker
- Parameters:
- topicsToAdd- the topics.
 
- 
addTopicspublic void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s). The broker(s) must be running.- Specified by:
- addTopicsin interface- EmbeddedKafkaBroker
- Parameters:
- topicsToAdd- the topics.
- Since:
- 2.2
 
- 
addTopicsWithResultsAdd topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Specified by:
- addTopicsWithResultsin interface- EmbeddedKafkaBroker
- Parameters:
- topicsToAdd- the topics.
- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
 
- 
addTopicsWithResultspublic Map<String,Exception> addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.- Specified by:
- addTopicsWithResultsin interface- EmbeddedKafkaBroker
- Parameters:
- topicsToAdd- the topics.
- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
 
- 
doWithAdminCreate anAdminClient; invoke the callback and reliably close the admin.- Parameters:
- callback- the callback.
 
- 
doWithAdminFunctionCreate anAdminClient; invoke the callback and reliably close the admin.- Type Parameters:
- T- the function return type.
- Parameters:
- callback- the callback.
- Returns:
- a map of results.
- Since:
- 2.5.4
 
- 
destroypublic void destroy()- Specified by:
- destroyin interface- DisposableBean
- Specified by:
- destroyin interface- EmbeddedKafkaBroker
 
- 
getTopicsDescription copied from interface:EmbeddedKafkaBrokerGet the topics.- Specified by:
- getTopicsin interface- EmbeddedKafkaBroker
- Returns:
- the topics.
 
- 
getKafkaServers
- 
getKafkaServerpublic kafka.server.KafkaServer getKafkaServer(int id) 
- 
getZookeeper
- 
getZooKeeperClientpublic kafka.zookeeper.ZooKeeperClient getZooKeeperClient()Return the ZooKeeperClient.- Returns:
- the client.
- Since:
- 2.3.2
 
- 
getZookeeperConnectionString
- 
getBrokerAddress
- 
getBrokerAddresses
- 
getPartitionsPerTopicpublic int getPartitionsPerTopic()Description copied from interface:EmbeddedKafkaBrokerGet the configured number of partitions per topic.- Specified by:
- getPartitionsPerTopicin interface- EmbeddedKafkaBroker
- Returns:
- the partition count.
 
- 
bounce
- 
restart- Throws:
- Exception
 
- 
getBrokersAsStringDescription copied from interface:EmbeddedKafkaBrokerGet the bootstrap server addresses as a String.- Specified by:
- getBrokersAsStringin interface- EmbeddedKafkaBroker
- Returns:
- the bootstrap servers.
 
- 
consumeFromAllEmbeddedTopicspublic void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Subscribe a consumer to all the embedded topics.- Specified by:
- consumeFromAllEmbeddedTopicsin interface- EmbeddedKafkaBroker
- Parameters:
- consumer- the consumer.
 
- 
consumeFromAllEmbeddedTopicspublic void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd) Subscribe a consumer to all the embedded topics.- Specified by:
- consumeFromAllEmbeddedTopicsin interface- EmbeddedKafkaBroker
- Parameters:
- seekToEnd- true to seek to the end instead of the beginning.
- consumer- the consumer.
- Since:
- 2.8.2
 
- 
consumeFromAnEmbeddedTopicpublic void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String topic) Subscribe a consumer to one of the embedded topics.- Specified by:
- consumeFromAnEmbeddedTopicin interface- EmbeddedKafkaBroker
- Parameters:
- consumer- the consumer.
- topic- the topic.
 
- 
consumeFromAnEmbeddedTopicpublic void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String topic) Subscribe a consumer to one of the embedded topics.- Specified by:
- consumeFromAnEmbeddedTopicin interface- EmbeddedKafkaBroker
- Parameters:
- consumer- the consumer.
- seekToEnd- true to seek to the end instead of the beginning.
- topic- the topic.
- Since:
- 2.8.2
 
- 
consumeFromEmbeddedTopicspublic void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.- Specified by:
- consumeFromEmbeddedTopicsin interface- EmbeddedKafkaBroker
- Parameters:
- consumer- the consumer.
- topicsToConsume- the topics.
- Throws:
- IllegalStateException- if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
 
- 
consumeFromEmbeddedTopicspublic void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.- Specified by:
- consumeFromEmbeddedTopicsin interface- EmbeddedKafkaBroker
- Parameters:
- consumer- the consumer.
- topicsToConsume- the topics.
- seekToEnd- true to seek to the end instead of the beginning.
- Throws:
- IllegalStateException- if you attempt to consume from a topic that is not in the list of embedded topics.
- Since:
- 2.8.2
 
 
-