Package org.springframework.kafka.test
Class EmbeddedKafkaKraftBroker
java.lang.Object
org.springframework.kafka.test.EmbeddedKafkaKraftBroker
- All Implemented Interfaces:
DisposableBean
,InitializingBean
,EmbeddedKafkaBroker
An embedded Kafka Broker(s) using KRaft.
This class is intended to be used in the unit tests.
- Since:
- 3.1
- Author:
- Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski, Adrian Chlebosz, Soby Chacko, Sanghyeok An, Wouter Coekaerts
-
Field Summary
Modifier and TypeFieldDescriptionstatic final String
Set the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".static final int
Fields inherited from interface org.springframework.kafka.test.EmbeddedKafkaBroker
BEAN_NAME, BROKER_NEEDED, LOOPBACK, SPRING_EMBEDDED_KAFKA_BROKERS
-
Constructor Summary
ConstructorDescriptionEmbeddedKafkaKraftBroker
(int count, int partitions, String... topics) Create embedded Kafka brokers listening on random ports. -
Method Summary
Modifier and TypeMethodDescriptionvoid
Add topics to the existing broker(s) using the configured number of partitions.void
addTopics
(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s).addTopicsWithResults
(String... topicsToAdd) Add topics to the existing broker(s) using the configured number of partitions.addTopicsWithResults
(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s) and returning a map of results.adminTimeout
(int adminTimeout) Set the timeout in seconds for admin operations (e.g.void
brokerListProperty
(String brokerListProperty) Set the system property with this name to the list of broker addresses.brokerProperties
(Map<String, String> properties) Specify the properties to configure Kafka Broker before start, e.g.brokerProperty
(String property, Object value) Specify a broker property.void
consumeFromAllEmbeddedTopics
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Subscribe a consumer to all the embedded topics.void
consumeFromAllEmbeddedTopics
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd) Subscribe a consumer to all the embedded topics.void
consumeFromAnEmbeddedTopic
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String topic) Subscribe a consumer to one of the embedded topics.void
consumeFromAnEmbeddedTopic
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String topic) Subscribe a consumer to one of the embedded topics.void
consumeFromEmbeddedTopics
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.void
consumeFromEmbeddedTopics
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.void
destroy()
void
doWithAdmin
(Consumer<org.apache.kafka.clients.admin.AdminClient> callback) Create anAdminClient
; invoke the callback and reliably close the admin.<T> T
doWithAdminFunction
(Function<org.apache.kafka.clients.admin.AdminClient, T> callback) Create anAdminClient
; invoke the callback and reliably close the admin.Get the bootstrap server addresses as a String.kafka.testkit.KafkaClusterTestKit
int
Get the configured number of partitions per topic.Get the topics.kafkaPorts
(int... ports) IMPORTANT: It is not possible to configure custom ports when using KRaft based EmbeddedKafka.void
setAdminTimeout
(int adminTimeout) Set the timeout in seconds for admin operations (e.g.
-
Field Details
-
BROKER_LIST_PROPERTY
Set the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".- See Also:
-
DEFAULT_ADMIN_TIMEOUT
public static final int DEFAULT_ADMIN_TIMEOUT- See Also:
-
-
Constructor Details
-
EmbeddedKafkaKraftBroker
Create embedded Kafka brokers listening on random ports.- Parameters:
count
- the number of brokers.partitions
- partitions per topic.topics
- the topics to create.
-
-
Method Details
-
brokerProperties
Specify the properties to configure Kafka Broker before start, e.g.auto.create.topics.enable
,transaction.state.log.replication.factor
etc.- Specified by:
brokerProperties
in interfaceEmbeddedKafkaBroker
- Parameters:
properties
- the properties to use for configuring Kafka Broker(s).- Returns:
- this for chaining configuration.
- See Also:
-
KafkaConfig
-
brokerProperty
Specify a broker property.- Parameters:
property
- the property name.value
- the value.- Returns:
- the
EmbeddedKafkaKraftBroker
.
-
kafkaPorts
IMPORTANT: It is not possible to configure custom ports when using KRaft based EmbeddedKafka. TheKafkaClusterTestKit
does not support setting custom ports at the moment. Therefore, this property is out of use. Set explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.- Specified by:
kafkaPorts
in interfaceEmbeddedKafkaBroker
- Parameters:
ports
- the ports.- Returns:
- the
EmbeddedKafkaKraftBroker
.
-
brokerListProperty
Set the system property with this name to the list of broker addresses.- Specified by:
brokerListProperty
in interfaceEmbeddedKafkaBroker
- Parameters:
brokerListProperty
- the brokerListProperty to set- Returns:
- this broker.
- Since:
- 2.3
-
adminTimeout
Description copied from interface:EmbeddedKafkaBroker
Set the timeout in seconds for admin operations (e.g. topic creation, close).- Specified by:
adminTimeout
in interfaceEmbeddedKafkaBroker
- Parameters:
adminTimeout
- the timeout.- Returns:
- the
EmbeddedKafkaBroker
-
setAdminTimeout
public void setAdminTimeout(int adminTimeout) Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 10 seconds.- Parameters:
adminTimeout
- the timeout.- Since:
- 2.2
-
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSet
in interfaceEmbeddedKafkaBroker
- Specified by:
afterPropertiesSet
in interfaceInitializingBean
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
- Specified by:
destroy
in interfaceEmbeddedKafkaBroker
-
addTopics
Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Specified by:
addTopics
in interfaceEmbeddedKafkaBroker
- Parameters:
topicsToAdd
- the topics.
-
addTopics
public void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s). The broker(s) must be running.- Specified by:
addTopics
in interfaceEmbeddedKafkaBroker
- Parameters:
topicsToAdd
- the topics.- Since:
- 2.2
-
addTopicsWithResults
Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Specified by:
addTopicsWithResults
in interfaceEmbeddedKafkaBroker
- Parameters:
topicsToAdd
- the topics.- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
-
addTopicsWithResults
public Map<String,Exception> addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd) Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.- Specified by:
addTopicsWithResults
in interfaceEmbeddedKafkaBroker
- Parameters:
topicsToAdd
- the topics.- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
-
doWithAdmin
Create anAdminClient
; invoke the callback and reliably close the admin.- Parameters:
callback
- the callback.
-
doWithAdminFunction
Create anAdminClient
; invoke the callback and reliably close the admin.- Type Parameters:
T
- the function return type.- Parameters:
callback
- the callback.- Returns:
- a map of results.
- Since:
- 2.5.4
-
getTopics
Description copied from interface:EmbeddedKafkaBroker
Get the topics.- Specified by:
getTopics
in interfaceEmbeddedKafkaBroker
- Returns:
- the topics.
-
getPartitionsPerTopic
public int getPartitionsPerTopic()Description copied from interface:EmbeddedKafkaBroker
Get the configured number of partitions per topic.- Specified by:
getPartitionsPerTopic
in interfaceEmbeddedKafkaBroker
- Returns:
- the partition count.
-
getBrokersAsString
Description copied from interface:EmbeddedKafkaBroker
Get the bootstrap server addresses as a String.- Specified by:
getBrokersAsString
in interfaceEmbeddedKafkaBroker
- Returns:
- the bootstrap servers.
-
getCluster
public kafka.testkit.KafkaClusterTestKit getCluster() -
consumeFromAllEmbeddedTopics
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Subscribe a consumer to all the embedded topics.- Specified by:
consumeFromAllEmbeddedTopics
in interfaceEmbeddedKafkaBroker
- Parameters:
consumer
- the consumer.
-
consumeFromAllEmbeddedTopics
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd) Subscribe a consumer to all the embedded topics.- Specified by:
consumeFromAllEmbeddedTopics
in interfaceEmbeddedKafkaBroker
- Parameters:
seekToEnd
- true to seek to the end instead of the beginning.consumer
- the consumer.- Since:
- 2.8.2
-
consumeFromAnEmbeddedTopic
public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String topic) Subscribe a consumer to one of the embedded topics.- Specified by:
consumeFromAnEmbeddedTopic
in interfaceEmbeddedKafkaBroker
- Parameters:
consumer
- the consumer.topic
- the topic.
-
consumeFromAnEmbeddedTopic
public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String topic) Subscribe a consumer to one of the embedded topics.- Specified by:
consumeFromAnEmbeddedTopic
in interfaceEmbeddedKafkaBroker
- Parameters:
consumer
- the consumer.seekToEnd
- true to seek to the end instead of the beginning.topic
- the topic.- Since:
- 2.8.2
-
consumeFromEmbeddedTopics
public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.- Specified by:
consumeFromEmbeddedTopics
in interfaceEmbeddedKafkaBroker
- Parameters:
consumer
- the consumer.topicsToConsume
- the topics.- Throws:
IllegalStateException
- if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
-
consumeFromEmbeddedTopics
public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) Subscribe a consumer to one or more of the embedded topics.- Specified by:
consumeFromEmbeddedTopics
in interfaceEmbeddedKafkaBroker
- Parameters:
consumer
- the consumer.topicsToConsume
- the topics.seekToEnd
- true to seek to the end instead of the beginning.- Throws:
IllegalStateException
- if you attempt to consume from a topic that is not in the list of embedded topics.- Since:
- 2.8.2
-