public class EmbeddedKafkaBroker
extends java.lang.Object
implements org.springframework.beans.factory.InitializingBean, org.springframework.beans.factory.DisposableBean
Modifier and Type | Class and Description |
---|---|
static class |
EmbeddedKafkaBroker.EmbeddedZookeeper
Ported from scala to allow setting the port.
|
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
BEAN_NAME |
static java.lang.String |
BROKER_LIST_PROPERTY
Set the value of this property to a property name that should be set to the list of
embedded broker addresses instead of "spring.embedded.kafka.brokers".
|
static java.lang.String |
SPRING_EMBEDDED_KAFKA_BROKERS |
static java.lang.String |
SPRING_EMBEDDED_ZOOKEEPER_CONNECT |
Constructor and Description |
---|
EmbeddedKafkaBroker(int count) |
EmbeddedKafkaBroker(int count,
boolean controlledShutdown,
int partitions,
java.lang.String... topics)
Create embedded Kafka brokers listening on random ports.
|
EmbeddedKafkaBroker(int count,
boolean controlledShutdown,
java.lang.String... topics)
Create embedded Kafka brokers.
|
Modifier and Type | Method and Description |
---|---|
void |
addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
Add topics to the existing broker(s).
|
void |
addTopics(java.lang.String... topicsToAdd)
Add topics to the existing broker(s) using the configured number of partitions.
|
void |
afterPropertiesSet() |
void |
bounce(BrokerAddress brokerAddress) |
EmbeddedKafkaBroker |
brokerListProperty(java.lang.String brokerListProperty)
Set the system property with this name to the list of broker addresses.
|
EmbeddedKafkaBroker |
brokerProperties(java.util.Map<java.lang.String,java.lang.String> properties)
Specify the properties to configure Kafka Broker before start, e.g.
|
EmbeddedKafkaBroker |
brokerProperty(java.lang.String property,
java.lang.Object value)
Specify a broker property.
|
void |
consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Subscribe a consumer to all the embedded topics.
|
void |
consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
java.lang.String topic)
Subscribe a consumer to one of the embedded topics.
|
void |
consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
java.lang.String... topicsToConsume)
Subscribe a consumer to one or more of the embedded topics.
|
void |
destroy() |
void |
doWithAdmin(java.util.function.Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
Create an
AdminClient ; invoke the callback and reliably close the admin. |
BrokerAddress |
getBrokerAddress(int i) |
BrokerAddress[] |
getBrokerAddresses() |
java.lang.String |
getBrokersAsString() |
kafka.server.KafkaServer |
getKafkaServer(int id) |
java.util.List<kafka.server.KafkaServer> |
getKafkaServers() |
int |
getPartitionsPerTopic() |
java.util.Set<java.lang.String> |
getTopics() |
org.I0Itec.zkclient.ZkClient |
getZkClient()
Deprecated.
in favor of
getZooKeeperClient() . |
int |
getZkPort()
Get the port that the embedded Zookeeper is running on or will run on.
|
EmbeddedKafkaBroker.EmbeddedZookeeper |
getZookeeper() |
kafka.zookeeper.ZooKeeperClient |
getZooKeeperClient()
Return the ZooKeeperClient.
|
java.lang.String |
getZookeeperConnectionString() |
EmbeddedKafkaBroker |
kafkaPorts(int... ports)
Set explicit ports on which the kafka brokers will listen.
|
void |
restart(int index) |
void |
setAdminTimeout(int adminTimeout)
Set the timeout in seconds for admin operations (e.g.
|
void |
setZkPort(int zkPort)
Set the port to run the embedded Zookeeper on (default random).
|
EmbeddedKafkaBroker |
zkPort(int port)
Set an explicit port for the embedded Zookeeper.
|
public static final java.lang.String BEAN_NAME
public static final java.lang.String SPRING_EMBEDDED_KAFKA_BROKERS
public static final java.lang.String SPRING_EMBEDDED_ZOOKEEPER_CONNECT
public static final java.lang.String BROKER_LIST_PROPERTY
public EmbeddedKafkaBroker(int count)
public EmbeddedKafkaBroker(int count, boolean controlledShutdown, java.lang.String... topics)
count
- the number of brokers.controlledShutdown
- passed into TestUtils.createBrokerConfig.topics
- the topics to create (2 partitions per).public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, java.lang.String... topics)
count
- the number of brokers.controlledShutdown
- passed into TestUtils.createBrokerConfig.partitions
- partitions per topic.topics
- the topics to create.public EmbeddedKafkaBroker brokerProperties(java.util.Map<java.lang.String,java.lang.String> properties)
auto.create.topics.enable
, transaction.state.log.replication.factor
etc.properties
- the properties to use for configuring Kafka Broker(s).KafkaConfig
public EmbeddedKafkaBroker brokerProperty(java.lang.String property, java.lang.Object value)
property
- the property name.value
- the value.EmbeddedKafkaBroker
.public EmbeddedKafkaBroker kafkaPorts(int... ports)
ports
- the ports.EmbeddedKafkaBroker
.public EmbeddedKafkaBroker zkPort(int port)
port
- the port.EmbeddedKafkaBroker
.public void setAdminTimeout(int adminTimeout)
adminTimeout
- the timeout.public EmbeddedKafkaBroker brokerListProperty(java.lang.String brokerListProperty)
brokerListProperty
- the brokerListProperty to setpublic int getZkPort()
public void setZkPort(int zkPort)
zkPort
- the port.public void afterPropertiesSet()
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
public void addTopics(java.lang.String... topicsToAdd)
topicsToAdd
- the topics.public void addTopics(org.apache.kafka.clients.admin.NewTopic... topicsToAdd)
topicsToAdd
- the topics.public void doWithAdmin(java.util.function.Consumer<org.apache.kafka.clients.admin.AdminClient> callback)
AdminClient
; invoke the callback and reliably close the admin.callback
- the callback.public void destroy()
destroy
in interface org.springframework.beans.factory.DisposableBean
public java.util.Set<java.lang.String> getTopics()
public java.util.List<kafka.server.KafkaServer> getKafkaServers()
public kafka.server.KafkaServer getKafkaServer(int id)
public EmbeddedKafkaBroker.EmbeddedZookeeper getZookeeper()
@Deprecated public org.I0Itec.zkclient.ZkClient getZkClient()
getZooKeeperClient()
.public kafka.zookeeper.ZooKeeperClient getZooKeeperClient()
public java.lang.String getZookeeperConnectionString()
public BrokerAddress getBrokerAddress(int i)
public BrokerAddress[] getBrokerAddresses()
public int getPartitionsPerTopic()
public void bounce(BrokerAddress brokerAddress)
public void restart(int index) throws java.lang.Exception
java.lang.Exception
public java.lang.String getBrokersAsString()
public void consumeFromAllEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
consumer
- the consumer.public void consumeFromAnEmbeddedTopic(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String topic)
consumer
- the consumer.topic
- the topic.public void consumeFromEmbeddedTopics(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String... topicsToConsume)
consumer
- the consumer.topicsToConsume
- the topics.