Class KafkaTestUtils
- java.lang.Object
-
- org.springframework.kafka.test.utils.KafkaTestUtils
-
public final class KafkaTestUtils extends java.lang.Object
Kafka testing utilities.- Author:
- Gary Russell, Hugo Wood, Artem Bilan
-
-
Method Summary
All Methods Static Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static java.util.Map<java.lang.String,java.lang.Object>
consumerProps(java.lang.String brokers, java.lang.String group, java.lang.String autoCommit)
Set up test properties for an<Integer, String>
consumer.static java.util.Map<java.lang.String,java.lang.Object>
consumerProps(java.lang.String group, java.lang.String autoCommit, EmbeddedKafkaBroker embeddedKafka)
Set up test properties for an<Integer, String>
consumer.static java.util.Properties
defaultPropertyOverrides()
Return aProperties
object equal to the default consumer property overrides.static org.apache.kafka.clients.consumer.OffsetAndMetadata
getCurrentOffset(java.lang.String brokerAddresses, java.lang.String group, java.lang.String topic, int partition)
Get the current offset and metadata for the provided group/topic/partition.static org.apache.kafka.clients.consumer.OffsetAndMetadata
getCurrentOffset(org.apache.kafka.clients.admin.AdminClient adminClient, java.lang.String group, java.lang.String topic, int partition)
Get the current offset and metadata for the provided group/topic/partition.static java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long>
getEndOffsets(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String topic, java.lang.Integer... partitions)
Return the end offsets of the requested topic/partitionsstatic org.apache.kafka.clients.consumer.ConsumerRecord<?,?>
getOneRecord(java.lang.String brokerAddresses, java.lang.String group, java.lang.String topic, int partition, boolean seekToLast, boolean commit, long timeout)
Deprecated.static org.apache.kafka.clients.consumer.ConsumerRecord<?,?>
getOneRecord(java.lang.String brokerAddresses, java.lang.String group, java.lang.String topic, int partition, boolean seekToLast, boolean commit, java.time.Duration timeout)
Get a single record for the group from the topic/partition.static java.lang.Object
getPropertyValue(java.lang.Object root, java.lang.String propertyPath)
Uses nestedDirectFieldAccessor
s to obtain a property using dotted notation to traverse fields; e.g.static <T> T
getPropertyValue(java.lang.Object root, java.lang.String propertyPath, java.lang.Class<T> type)
A typed version ofgetPropertyValue(Object, String)
.static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecords<K,V>getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
Poll the consumer for records.static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecords<K,V>getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, long timeout)
Deprecated.in favor ofgetRecords(Consumer, Duration)
static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecords<K,V>getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, long timeout, int minRecords)
Deprecated.in favor of {#getRecords(Consumer, Duration, int)
}static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecords<K,V>getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.time.Duration timeout)
Poll the consumer for records.static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecords<K,V>getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.time.Duration timeout, int minRecords)
Poll the consumer for records.static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecord<K,V>getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.lang.String topic)
Poll the consumer, expecting a single record for the specified topic.static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecord<K,V>getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.lang.String topic, long timeout)
Deprecated.in favor ofgetSingleRecord(Consumer, String, Duration)
static <K,V>
org.apache.kafka.clients.consumer.ConsumerRecord<K,V>getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.lang.String topic, java.time.Duration timeout)
Poll the consumer, expecting a single record for the specified topic.static java.util.Map<java.lang.String,java.lang.Object>
producerProps(java.lang.String brokers)
Set up test properties for an<Integer, String>
producer.static java.util.Map<java.lang.String,java.lang.Object>
producerProps(EmbeddedKafkaBroker embeddedKafka)
Set up test properties for an<Integer, String>
producer.
-
-
-
Method Detail
-
consumerProps
public static java.util.Map<java.lang.String,java.lang.Object> consumerProps(java.lang.String group, java.lang.String autoCommit, EmbeddedKafkaBroker embeddedKafka)
Set up test properties for an<Integer, String>
consumer.- Parameters:
group
- the group id.autoCommit
- the auto commit.embeddedKafka
- aEmbeddedKafkaBroker
instance.- Returns:
- the properties.
-
producerProps
public static java.util.Map<java.lang.String,java.lang.Object> producerProps(EmbeddedKafkaBroker embeddedKafka)
Set up test properties for an<Integer, String>
producer.- Parameters:
embeddedKafka
- aEmbeddedKafkaBroker
instance.- Returns:
- the properties.
-
consumerProps
public static java.util.Map<java.lang.String,java.lang.Object> consumerProps(java.lang.String brokers, java.lang.String group, java.lang.String autoCommit)
Set up test properties for an<Integer, String>
consumer.- Parameters:
brokers
- the bootstrapServers property.group
- the group id.autoCommit
- the auto commit.- Returns:
- the properties.
-
producerProps
public static java.util.Map<java.lang.String,java.lang.Object> producerProps(java.lang.String brokers)
Set up test properties for an<Integer, String>
producer.- Parameters:
brokers
- the bootstrapServers property.- Returns:
- the properties.
- Since:
- 2.3.5
-
getSingleRecord
public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecord<K,V> getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.lang.String topic)
Poll the consumer, expecting a single record for the specified topic.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.topic
- the topic.- Returns:
- the record.
- Throws:
java.lang.IllegalStateException
- if exactly one record is not received.- See Also:
getSingleRecord(Consumer, String, Duration)
-
getSingleRecord
@Deprecated public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecord<K,V> getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.lang.String topic, long timeout)
Deprecated.in favor ofgetSingleRecord(Consumer, String, Duration)
Poll the consumer, expecting a single record for the specified topic.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.topic
- the topic.timeout
- max time in milliseconds to wait for records; forwarded toConsumer.poll(long)
.- Returns:
- the record.
- Throws:
java.lang.IllegalStateException
- if exactly one record is not received.- Since:
- 2.0
-
getSingleRecord
public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecord<K,V> getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.lang.String topic, java.time.Duration timeout)
Poll the consumer, expecting a single record for the specified topic.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.topic
- the topic.timeout
- max duration to wait for records; forwarded toConsumer.poll(Duration)
.- Returns:
- the record.
- Throws:
java.lang.IllegalStateException
- if exactly one record is not received.- Since:
- 2.9.3
-
getOneRecord
@Nullable @Deprecated public static org.apache.kafka.clients.consumer.ConsumerRecord<?,?> getOneRecord(java.lang.String brokerAddresses, java.lang.String group, java.lang.String topic, int partition, boolean seekToLast, boolean commit, long timeout)
Deprecated.Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.- Parameters:
brokerAddresses
- the broker address(es).group
- the group.topic
- the topic.partition
- the partition.seekToLast
- true to fetch an existing last record, if present.commit
- commit offset after polling or not.timeout
- the timeout.- Returns:
- the record or null if no record received.
- Since:
- 2.3
-
getOneRecord
@Nullable public static org.apache.kafka.clients.consumer.ConsumerRecord<?,?> getOneRecord(java.lang.String brokerAddresses, java.lang.String group, java.lang.String topic, int partition, boolean seekToLast, boolean commit, java.time.Duration timeout)
Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.- Parameters:
brokerAddresses
- the broker address(es).group
- the group.topic
- the topic.partition
- the partition.seekToLast
- true to fetch an existing last record, if present.commit
- commit offset after polling or not.timeout
- the timeout.- Returns:
- the record or null if no record received.
- Since:
- 2.9.3
-
getCurrentOffset
public static org.apache.kafka.clients.consumer.OffsetAndMetadata getCurrentOffset(java.lang.String brokerAddresses, java.lang.String group, java.lang.String topic, int partition) throws java.lang.Exception
Get the current offset and metadata for the provided group/topic/partition.- Parameters:
brokerAddresses
- the broker address(es).group
- the group.topic
- the topic.partition
- the partition.- Returns:
- the offset and metadata.
- Throws:
java.lang.Exception
- if an exception occurs.- Since:
- 2.3
-
getCurrentOffset
public static org.apache.kafka.clients.consumer.OffsetAndMetadata getCurrentOffset(org.apache.kafka.clients.admin.AdminClient adminClient, java.lang.String group, java.lang.String topic, int partition) throws java.lang.Exception
Get the current offset and metadata for the provided group/topic/partition.- Parameters:
adminClient
- the AdminClient instance.group
- the group.topic
- the topic.partition
- the partition.- Returns:
- the offset and metadata.
- Throws:
java.lang.Exception
- if an exception occurs.- Since:
- 3.0
-
getEndOffsets
public static java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> getEndOffsets(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.String topic, java.lang.Integer... partitions)
Return the end offsets of the requested topic/partitions- Parameters:
consumer
- the consumer.topic
- the topic.partitions
- the partitions, or null for all partitions.- Returns:
- the map of end offsets.
- Since:
- 2.6.5
- See Also:
Consumer.endOffsets(Collection, Duration)
-
getRecords
public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
Poll the consumer for records.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.- Returns:
- the records.
- See Also:
getRecords(Consumer, Duration)
-
getRecords
@Deprecated public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, long timeout)
Deprecated.in favor ofgetRecords(Consumer, Duration)
Poll the consumer for records.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.timeout
- max time in milliseconds to wait for records; forwarded toConsumer.poll(long)
.- Returns:
- the records.
- Throws:
java.lang.IllegalStateException
- if the poll returns null (since 2.3.4).- Since:
- 2.0
-
getRecords
public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.time.Duration timeout)
Poll the consumer for records.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.timeout
- max time in milliseconds to wait for records; forwarded toConsumer.poll(Duration)
.- Returns:
- the records.
- Throws:
java.lang.IllegalStateException
- if the poll returns null (since 2.3.4).- Since:
- 2.9.3
-
getRecords
@Deprecated public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, long timeout, int minRecords)
Deprecated.in favor of {#getRecords(Consumer, Duration, int)
}Poll the consumer for records.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.timeout
- max time in milliseconds to wait for records; forwarded toConsumer.poll(long)
.minRecords
- wait until the timeout or at least this number of records are received.- Returns:
- the records.
- Throws:
java.lang.IllegalStateException
- if the poll returns null.- Since:
- 2.4.2
-
getRecords
public static <K,V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, java.time.Duration timeout, int minRecords)
Poll the consumer for records.- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
consumer
- the consumer.timeout
- max time in milliseconds to wait for records; forwarded toConsumer.poll(Duration)
.minRecords
- wait until the timeout or at least this number of records are received.- Returns:
- the records.
- Throws:
java.lang.IllegalStateException
- if the poll returns null.- Since:
- 2.9.3
-
getPropertyValue
public static java.lang.Object getPropertyValue(java.lang.Object root, java.lang.String propertyPath)
Uses nestedDirectFieldAccessor
s to obtain a property using dotted notation to traverse fields; e.g. "foo.bar.baz" will obtain a reference to the baz field of the bar field of foo. Adopted from Spring Integration.- Parameters:
root
- The object.propertyPath
- The path.- Returns:
- The field.
-
getPropertyValue
public static <T> T getPropertyValue(java.lang.Object root, java.lang.String propertyPath, java.lang.Class<T> type)
A typed version ofgetPropertyValue(Object, String)
.- Type Parameters:
T
- the type.- Parameters:
root
- the object.propertyPath
- the path.type
- the type to cast the object to.- Returns:
- the field value.
- See Also:
getPropertyValue(Object, String)
-
defaultPropertyOverrides
public static java.util.Properties defaultPropertyOverrides()
Return aProperties
object equal to the default consumer property overrides. Useful when matching arguments in Mockito tests.- Returns:
- the default properties.
- Since:
- 2.2.5
-
-