Package org.springframework.kafka.test
Class EmbeddedKafkaBroker
java.lang.Object
org.springframework.kafka.test.EmbeddedKafkaBroker
- All Implemented Interfaces:
DisposableBean,InitializingBean
An embedded Kafka Broker(s) and Zookeeper manager.
This class is intended to be used in the unit tests.
- Since:
- 2.2
- Author:
- Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final classPorted from scala to allow setting the port. -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Stringstatic final StringSet the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".static final intstatic final intstatic final Stringstatic final String -
Constructor Summary
ConstructorsConstructorDescriptionEmbeddedKafkaBroker(int count) EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, String... topics) Create embedded Kafka brokers listening on random ports.EmbeddedKafkaBroker(int count, boolean controlledShutdown, String... topics) Create embedded Kafka brokers. -
Method Summary
Modifier and TypeMethodDescriptionvoidAdd topics to the existing broker(s) using the configured number of partitions.voidaddTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s).addTopicsWithResults(String... topicsToAdd) Add topics to the existing broker(s) using the configured number of partitions.addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s) and returning a map of results.voidvoidbounce(BrokerAddress brokerAddress) brokerListProperty(String brokerListProperty) Set the system property with this name to the list of broker addresses.brokerProperties(Map<String, String> properties) Specify the properties to configure Kafka Broker before start, e.g.brokerProperty(String property, Object value) Specify a broker property.voidconsumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Subscribe a consumer to all the embedded topics.voidconsumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd) Subscribe a consumer to all the embedded topics.voidconsumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String topic) Subscribe a consumer to one of the embedded topics.voidconsumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String topic) Subscribe a consumer to one of the embedded topics.voidconsumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.voidconsumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.voiddestroy()voiddoWithAdmin(Consumer<org.apache.kafka.clients.admin.AdminClient> callback) Create anAdminClient; invoke the callback and reliably close the admin.<T> TdoWithAdminFunction(Function<org.apache.kafka.clients.admin.AdminClient, T> callback) Create anAdminClient; invoke the callback and reliably close the admin.getBrokerAddress(int i) kafka.server.KafkaServergetKafkaServer(int id) List<kafka.server.KafkaServer>intintGet the port that the embedded Zookeeper is running on or will run on.kafka.zookeeper.ZooKeeperClientReturn the ZooKeeperClient.kafkaPorts(int... ports) Set explicit ports on which the kafka brokers will listen.voidrestart(int index) voidsetAdminTimeout(int adminTimeout) Set the timeout in seconds for admin operations (e.g.voidsetZkPort(int zkPort) Set the port to run the embedded Zookeeper on (default random).zkConnectionTimeout(int zkConnectionTimeout) Set connection timeout for the client to the embedded Zookeeper.zkPort(int port) Set an explicit port for the embedded Zookeeper.zkSessionTimeout(int zkSessionTimeout) Set session timeout for the client to the embedded Zookeeper.
-
Field Details
-
BEAN_NAME
- See Also:
-
SPRING_EMBEDDED_KAFKA_BROKERS
- See Also:
-
SPRING_EMBEDDED_ZOOKEEPER_CONNECT
- See Also:
-
BROKER_LIST_PROPERTY
Set the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".- See Also:
-
DEFAULT_ZK_SESSION_TIMEOUT
public static final int DEFAULT_ZK_SESSION_TIMEOUT- See Also:
-
DEFAULT_ZK_CONNECTION_TIMEOUT
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT- See Also:
-
-
Constructor Details
-
EmbeddedKafkaBroker
public EmbeddedKafkaBroker(int count) -
EmbeddedKafkaBroker
Create embedded Kafka brokers.- Parameters:
count- the number of brokers.controlledShutdown- passed into TestUtils.createBrokerConfig.topics- the topics to create (2 partitions per).
-
EmbeddedKafkaBroker
Create embedded Kafka brokers listening on random ports.- Parameters:
count- the number of brokers.controlledShutdown- passed into TestUtils.createBrokerConfig.partitions- partitions per topic.topics- the topics to create.
-
-
Method Details
-
brokerProperties
Specify the properties to configure Kafka Broker before start, e.g.auto.create.topics.enable,transaction.state.log.replication.factoretc.- Parameters:
properties- the properties to use for configuring Kafka Broker(s).- Returns:
- this for chaining configuration.
- See Also:
-
KafkaConfig
-
brokerProperty
Specify a broker property.- Parameters:
property- the property name.value- the value.- Returns:
- the
EmbeddedKafkaBroker.
-
kafkaPorts
Set explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.- Parameters:
ports- the ports.- Returns:
- the
EmbeddedKafkaBroker.
-
zkPort
Set an explicit port for the embedded Zookeeper.- Parameters:
port- the port.- Returns:
- the
EmbeddedKafkaBroker. - Since:
- 2.3
-
setAdminTimeout
public void setAdminTimeout(int adminTimeout) Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 30 seconds.- Parameters:
adminTimeout- the timeout.- Since:
- 2.2
-
brokerListProperty
Set the system property with this name to the list of broker addresses.- Parameters:
brokerListProperty- the brokerListProperty to set- Returns:
- this broker.
- Since:
- 2.3
-
getZkPort
public int getZkPort()Get the port that the embedded Zookeeper is running on or will run on.- Returns:
- the port.
- Since:
- 2.3
-
setZkPort
public void setZkPort(int zkPort) Set the port to run the embedded Zookeeper on (default random).- Parameters:
zkPort- the port.- Since:
- 2.3
-
zkConnectionTimeout
Set connection timeout for the client to the embedded Zookeeper.- Parameters:
zkConnectionTimeout- the connection timeout,- Returns:
- the
EmbeddedKafkaBroker. - Since:
- 2.4
-
zkSessionTimeout
Set session timeout for the client to the embedded Zookeeper.- Parameters:
zkSessionTimeout- the session timeout.- Returns:
- the
EmbeddedKafkaBroker. - Since:
- 2.4
-
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSetin interfaceInitializingBean
-
addTopics
Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Parameters:
topicsToAdd- the topics.
-
addTopics
public void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s). The broker(s) must be running.- Parameters:
topicsToAdd- the topics.- Since:
- 2.2
-
addTopicsWithResults
Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Parameters:
topicsToAdd- the topics.- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
-
addTopicsWithResults
public Map<String,Exception> addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.- Parameters:
topicsToAdd- the topics.- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
-
doWithAdmin
Create anAdminClient; invoke the callback and reliably close the admin.- Parameters:
callback- the callback.
-
doWithAdminFunction
Create anAdminClient; invoke the callback and reliably close the admin.- Type Parameters:
T- the function return type.- Parameters:
callback- the callback.- Returns:
- a map of results.
- Since:
- 2.5.4
-
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean
-
getTopics
-
getKafkaServers
-
getKafkaServer
public kafka.server.KafkaServer getKafkaServer(int id) -
getZookeeper
-
getZooKeeperClient
public kafka.zookeeper.ZooKeeperClient getZooKeeperClient()Return the ZooKeeperClient.- Returns:
- the client.
- Since:
- 2.3.2
-
getZookeeperConnectionString
-
getBrokerAddress
-
getBrokerAddresses
-
getPartitionsPerTopic
public int getPartitionsPerTopic() -
bounce
-
restart
- Throws:
Exception
-
getBrokersAsString
-
consumeFromAllEmbeddedTopics
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Subscribe a consumer to all the embedded topics.- Parameters:
consumer- the consumer.
-
consumeFromAllEmbeddedTopics
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd) Subscribe a consumer to all the embedded topics.- Parameters:
seekToEnd- true to seek to the end instead of the beginning.consumer- the consumer.- Since:
- 2.8.2
-
consumeFromAnEmbeddedTopic
public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String topic) Subscribe a consumer to one of the embedded topics.- Parameters:
consumer- the consumer.topic- the topic.
-
consumeFromAnEmbeddedTopic
public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String topic) Subscribe a consumer to one of the embedded topics.- Parameters:
consumer- the consumer.seekToEnd- true to seek to the end instead of the beginning.topic- the topic.- Since:
- 2.8.2
-
consumeFromEmbeddedTopics
public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.- Parameters:
consumer- the consumer.topicsToConsume- the topics.- Throws:
IllegalStateException- if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
-
consumeFromEmbeddedTopics
public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.- Parameters:
consumer- the consumer.topicsToConsume- the topics.seekToEnd- true to seek to the end instead of the beginning.- Throws:
IllegalStateException- if you attempt to consume from a topic that is not in the list of embedded topics.- Since:
- 2.8.2
-