Package org.springframework.kafka.test
Class EmbeddedKafkaBroker
- java.lang.Object
-
- org.springframework.kafka.test.EmbeddedKafkaBroker
-
- All Implemented Interfaces:
org.springframework.beans.factory.DisposableBean
,org.springframework.beans.factory.InitializingBean
public class EmbeddedKafkaBroker extends java.lang.Object implements org.springframework.beans.factory.InitializingBean, org.springframework.beans.factory.DisposableBean
An embedded Kafka Broker(s) and Zookeeper manager. This class is intended to be used in the unit tests.- Since:
- 2.2
- Author:
- Marius Bogoevici, Artem Bilan, Gary Russell, Kamill Sokol, Elliot Kennedy, Nakul Mishra, Pawel Lozinski, Adrian Chlebosz
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
EmbeddedKafkaBroker.EmbeddedZookeeper
Ported from scala to allow setting the port.
-
Field Summary
Fields Modifier and Type Field Description static java.lang.String
BEAN_NAME
static java.lang.String
BROKER_LIST_PROPERTY
Set the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".static int
DEFAULT_ADMIN_TIMEOUT
static int
DEFAULT_ZK_CONNECTION_TIMEOUT
static int
DEFAULT_ZK_SESSION_TIMEOUT
static java.lang.String
SPRING_EMBEDDED_KAFKA_BROKERS
static java.lang.String
SPRING_EMBEDDED_ZOOKEEPER_CONNECT
-
Constructor Summary
Constructors Constructor Description EmbeddedKafkaBroker(int count)
EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, java.lang.String... topics)
Create embedded Kafka brokers listening on random ports.EmbeddedKafkaBroker(int count, boolean controlledShutdown, java.lang.String... topics)
Create embedded Kafka brokers.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addTopics(java.lang.String... topicsToAdd)
Add topics to the existing broker(s) using the configured number of partitions.void
addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
Add topics to the existing broker(s).java.util.Map<java.lang.String,java.lang.Exception>
addTopicsWithResults(java.lang.String... topicsToAdd)
Add topics to the existing broker(s) using the configured number of partitions.java.util.Map<java.lang.String,java.lang.Exception>
addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
Add topics to the existing broker(s) and returning a map of results.EmbeddedKafkaBroker
adminTimeout(int adminTimeout)
Set the timeout in seconds for admin operations (e.g.void
afterPropertiesSet()
void
bounce(BrokerAddress brokerAddress)
EmbeddedKafkaBroker
brokerListProperty(java.lang.String brokerListProperty)
Set the system property with this name to the list of broker addresses.EmbeddedKafkaBroker
brokerProperties(java.util.Map<java.lang.String,java.lang.String> properties)
Specify the properties to configure Kafka Broker before start, e.g.EmbeddedKafkaBroker
brokerProperty(java.lang.String property, java.lang.Object value)
Specify a broker property.void
consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Subscribe a consumer to all the embedded topics.void
consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd)
Subscribe a consumer to all the embedded topics.void
consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, java.lang.String topic)
Subscribe a consumer to one of the embedded topics.void
consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String topic)
Subscribe a consumer to one of the embedded topics.void
consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, java.lang.String... topicsToConsume)
Subscribe a consumer to one or more of the embedded topics.void
consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String... topicsToConsume)
Subscribe a consumer to one or more of the embedded topics.void
destroy()
void
doWithAdmin(java.util.function.Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
Create anAdminClient
; invoke the callback and reliably close the admin.<T> T
doWithAdminFunction(java.util.function.Function<org.apache.kafka.clients.admin.AdminClient,T> callback)
Create anAdminClient
; invoke the callback and reliably close the admin.BrokerAddress
getBrokerAddress(int i)
BrokerAddress[]
getBrokerAddresses()
java.lang.String
getBrokersAsString()
kafka.server.KafkaServer
getKafkaServer(int id)
java.util.List<kafka.server.KafkaServer>
getKafkaServers()
int
getPartitionsPerTopic()
java.util.Set<java.lang.String>
getTopics()
int
getZkPort()
Get the port that the embedded Zookeeper is running on or will run on.EmbeddedKafkaBroker.EmbeddedZookeeper
getZookeeper()
kafka.zookeeper.ZooKeeperClient
getZooKeeperClient()
Return the ZooKeeperClient.java.lang.String
getZookeeperConnectionString()
EmbeddedKafkaBroker
kafkaPorts(int... ports)
Set explicit ports on which the kafka brokers will listen.void
restart(int index)
void
setAdminTimeout(int adminTimeout)
Set the timeout in seconds for admin operations (e.g.void
setZkPort(int zkPort)
Set the port to run the embedded Zookeeper on (default random).EmbeddedKafkaBroker
zkConnectionTimeout(int zkConnectionTimeout)
Set connection timeout for the client to the embedded Zookeeper.EmbeddedKafkaBroker
zkPort(int port)
Set an explicit port for the embedded Zookeeper.EmbeddedKafkaBroker
zkSessionTimeout(int zkSessionTimeout)
Set session timeout for the client to the embedded Zookeeper.
-
-
-
Field Detail
-
BEAN_NAME
public static final java.lang.String BEAN_NAME
- See Also:
- Constant Field Values
-
SPRING_EMBEDDED_KAFKA_BROKERS
public static final java.lang.String SPRING_EMBEDDED_KAFKA_BROKERS
- See Also:
- Constant Field Values
-
SPRING_EMBEDDED_ZOOKEEPER_CONNECT
public static final java.lang.String SPRING_EMBEDDED_ZOOKEEPER_CONNECT
- See Also:
- Constant Field Values
-
BROKER_LIST_PROPERTY
public static final java.lang.String BROKER_LIST_PROPERTY
Set the value of this property to a property name that should be set to the list of embedded broker addresses instead of "spring.embedded.kafka.brokers".- See Also:
- Constant Field Values
-
DEFAULT_ADMIN_TIMEOUT
public static final int DEFAULT_ADMIN_TIMEOUT
- See Also:
- Constant Field Values
-
DEFAULT_ZK_SESSION_TIMEOUT
public static final int DEFAULT_ZK_SESSION_TIMEOUT
- See Also:
- Constant Field Values
-
DEFAULT_ZK_CONNECTION_TIMEOUT
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
EmbeddedKafkaBroker
public EmbeddedKafkaBroker(int count)
-
EmbeddedKafkaBroker
public EmbeddedKafkaBroker(int count, boolean controlledShutdown, java.lang.String... topics)
Create embedded Kafka brokers.- Parameters:
count
- the number of brokers.controlledShutdown
- passed into TestUtils.createBrokerConfig.topics
- the topics to create (2 partitions per).
-
EmbeddedKafkaBroker
public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, java.lang.String... topics)
Create embedded Kafka brokers listening on random ports.- Parameters:
count
- the number of brokers.controlledShutdown
- passed into TestUtils.createBrokerConfig.partitions
- partitions per topic.topics
- the topics to create.
-
-
Method Detail
-
brokerProperties
public EmbeddedKafkaBroker brokerProperties(java.util.Map<java.lang.String,java.lang.String> properties)
Specify the properties to configure Kafka Broker before start, e.g.auto.create.topics.enable
,transaction.state.log.replication.factor
etc.- Parameters:
properties
- the properties to use for configuring Kafka Broker(s).- Returns:
- this for chaining configuration.
- See Also:
KafkaConfig
-
brokerProperty
public EmbeddedKafkaBroker brokerProperty(java.lang.String property, java.lang.Object value)
Specify a broker property.- Parameters:
property
- the property name.value
- the value.- Returns:
- the
EmbeddedKafkaBroker
.
-
kafkaPorts
public EmbeddedKafkaBroker kafkaPorts(int... ports)
Set explicit ports on which the kafka brokers will listen. Useful when running an embedded broker that you want to access from other processes.- Parameters:
ports
- the ports.- Returns:
- the
EmbeddedKafkaBroker
.
-
brokerListProperty
public EmbeddedKafkaBroker brokerListProperty(java.lang.String brokerListProperty)
Set the system property with this name to the list of broker addresses.- Parameters:
brokerListProperty
- the brokerListProperty to set- Returns:
- this broker.
- Since:
- 2.3
-
zkPort
public EmbeddedKafkaBroker zkPort(int port)
Set an explicit port for the embedded Zookeeper.- Parameters:
port
- the port.- Returns:
- the
EmbeddedKafkaBroker
. - Since:
- 2.3
-
getZkPort
public int getZkPort()
Get the port that the embedded Zookeeper is running on or will run on.- Returns:
- the port.
- Since:
- 2.3
-
setZkPort
public void setZkPort(int zkPort)
Set the port to run the embedded Zookeeper on (default random).- Parameters:
zkPort
- the port.- Since:
- 2.3
-
adminTimeout
public EmbeddedKafkaBroker adminTimeout(int adminTimeout)
Set the timeout in seconds for admin operations (e.g. topic creation, close).- Parameters:
adminTimeout
- the timeout.- Returns:
- the
EmbeddedKafkaBroker
- Since:
- 2.8.5
-
setAdminTimeout
public void setAdminTimeout(int adminTimeout)
Set the timeout in seconds for admin operations (e.g. topic creation, close). Default 10 seconds.- Parameters:
adminTimeout
- the timeout.- Since:
- 2.2
-
zkConnectionTimeout
public EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout)
Set connection timeout for the client to the embedded Zookeeper.- Parameters:
zkConnectionTimeout
- the connection timeout,- Returns:
- the
EmbeddedKafkaBroker
. - Since:
- 2.4
-
zkSessionTimeout
public EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout)
Set session timeout for the client to the embedded Zookeeper.- Parameters:
zkSessionTimeout
- the session timeout.- Returns:
- the
EmbeddedKafkaBroker
. - Since:
- 2.4
-
afterPropertiesSet
public void afterPropertiesSet()
- Specified by:
afterPropertiesSet
in interfaceorg.springframework.beans.factory.InitializingBean
-
addTopics
public void addTopics(java.lang.String... topicsToAdd)
Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Parameters:
topicsToAdd
- the topics.
-
addTopics
public void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
Add topics to the existing broker(s). The broker(s) must be running.- Parameters:
topicsToAdd
- the topics.- Since:
- 2.2
-
addTopicsWithResults
public java.util.Map<java.lang.String,java.lang.Exception> addTopicsWithResults(java.lang.String... topicsToAdd)
Add topics to the existing broker(s) using the configured number of partitions. The broker(s) must be running.- Parameters:
topicsToAdd
- the topics.- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
-
addTopicsWithResults
public java.util.Map<java.lang.String,java.lang.Exception> addTopicsWithResults(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
Add topics to the existing broker(s) and returning a map of results. The broker(s) must be running.- Parameters:
topicsToAdd
- the topics.- Returns:
- the results; null values indicate success.
- Since:
- 2.5.4
-
doWithAdmin
public void doWithAdmin(java.util.function.Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
Create anAdminClient
; invoke the callback and reliably close the admin.- Parameters:
callback
- the callback.
-
doWithAdminFunction
public <T> T doWithAdminFunction(java.util.function.Function<org.apache.kafka.clients.admin.AdminClient,T> callback)
Create anAdminClient
; invoke the callback and reliably close the admin.- Type Parameters:
T
- the function return type.- Parameters:
callback
- the callback.- Returns:
- a map of results.
- Since:
- 2.5.4
-
destroy
public void destroy()
- Specified by:
destroy
in interfaceorg.springframework.beans.factory.DisposableBean
-
getTopics
public java.util.Set<java.lang.String> getTopics()
-
getKafkaServers
public java.util.List<kafka.server.KafkaServer> getKafkaServers()
-
getKafkaServer
public kafka.server.KafkaServer getKafkaServer(int id)
-
getZookeeper
public EmbeddedKafkaBroker.EmbeddedZookeeper getZookeeper()
-
getZooKeeperClient
public kafka.zookeeper.ZooKeeperClient getZooKeeperClient()
Return the ZooKeeperClient.- Returns:
- the client.
- Since:
- 2.3.2
-
getZookeeperConnectionString
public java.lang.String getZookeeperConnectionString()
-
getBrokerAddress
public BrokerAddress getBrokerAddress(int i)
-
getBrokerAddresses
public BrokerAddress[] getBrokerAddresses()
-
getPartitionsPerTopic
public int getPartitionsPerTopic()
-
bounce
public void bounce(BrokerAddress brokerAddress)
-
restart
public void restart(int index) throws java.lang.Exception
- Throws:
java.lang.Exception
-
getBrokersAsString
public java.lang.String getBrokersAsString()
-
consumeFromAllEmbeddedTopics
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Subscribe a consumer to all the embedded topics.- Parameters:
consumer
- the consumer.
-
consumeFromAllEmbeddedTopics
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd)
Subscribe a consumer to all the embedded topics.- Parameters:
seekToEnd
- true to seek to the end instead of the beginning.consumer
- the consumer.- Since:
- 2.8.2
-
consumeFromAnEmbeddedTopic
public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String topic)
Subscribe a consumer to one of the embedded topics.- Parameters:
consumer
- the consumer.topic
- the topic.
-
consumeFromAnEmbeddedTopic
public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, java.lang.String topic)
Subscribe a consumer to one of the embedded topics.- Parameters:
consumer
- the consumer.seekToEnd
- true to seek to the end instead of the beginning.topic
- the topic.- Since:
- 2.8.2
-
consumeFromEmbeddedTopics
public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String... topicsToConsume)
Subscribe a consumer to one or more of the embedded topics.- Parameters:
consumer
- the consumer.topicsToConsume
- the topics.- Throws:
java.lang.IllegalStateException
- if you attempt to consume from a topic that is not in the list of embedded topics (since 2.3.4).
-
consumeFromEmbeddedTopics
public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, boolean seekToEnd, java.lang.String... topicsToConsume)
Subscribe a consumer to one or more of the embedded topics.- Parameters:
consumer
- the consumer.topicsToConsume
- the topics.seekToEnd
- true to seek to the end instead of the beginning.- Throws:
java.lang.IllegalStateException
- if you attempt to consume from a topic that is not in the list of embedded topics.- Since:
- 2.8.2
-
-