Class KafkaTestUtils

java.lang.Object
org.springframework.kafka.test.utils.KafkaTestUtils

public final class KafkaTestUtils extends Object
Kafka testing utilities.
Author:
Gary Russell, Hugo Wood, Artem Bilan, Sanghyeok An
  • Method Details

    • consumerProps

      public static Map<String,Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka)
      Set up test properties for an <Integer, String> consumer.
      Parameters:
      group - the group id.
      autoCommit - the auto commit.
      embeddedKafka - a EmbeddedKafkaBroker instance.
      Returns:
      the properties.
    • consumerProps

      public static Map<String,Object> consumerProps(String brokers, String group)
      Set up test properties for an <Integer, String> consumer.
      Parameters:
      brokers - the bootstrapServers property.
      group - the group id.
      Returns:
      the properties.
      Since:
      3.3
    • producerProps

      public static Map<String,Object> producerProps(EmbeddedKafkaBroker embeddedKafka)
      Set up test properties for an <Integer, String> producer.
      Parameters:
      embeddedKafka - a EmbeddedKafkaBroker instance.
      Returns:
      the properties.
    • consumerProps

      public static Map<String,Object> consumerProps(String brokers, String group, String autoCommit)
      Set up test properties for an <Integer, String> consumer.
      Parameters:
      brokers - the bootstrapServers property.
      group - the group id.
      autoCommit - the auto commit.
      Returns:
      the properties.
    • producerProps

      public static Map<String,Object> producerProps(String brokers)
      Set up test properties for an <Integer, String> producer.
      Parameters:
      brokers - the bootstrapServers property.
      Returns:
      the properties.
      Since:
      2.3.5
    • streamsProps

      public static Map<String,Object> streamsProps(String applicationId, String brokers)
      Set up test properties for the Kafka Streams.
      Parameters:
      applicationId - the applicationId for the Kafka Streams.
      brokers - the bootstrapServers property.
      Returns:
      the properties.
      Since:
      3.3
    • getSingleRecord

      public static <K, V> org.apache.kafka.clients.consumer.ConsumerRecord<K,V> getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, String topic)
      Poll the consumer, expecting a single record for the specified topic.
      Type Parameters:
      K - the key type.
      V - the value type.
      Parameters:
      consumer - the consumer.
      topic - the topic.
      Returns:
      the record.
      Throws:
      IllegalStateException - if exactly one record is not received.
      See Also:
    • getSingleRecord

      public static <K, V> org.apache.kafka.clients.consumer.ConsumerRecord<K,V> getSingleRecord(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, String topic, Duration timeout)
      Poll the consumer, expecting a single record for the specified topic.
      Type Parameters:
      K - the key type.
      V - the value type.
      Parameters:
      consumer - the consumer.
      topic - the topic.
      timeout - max duration to wait for records; forwarded to Consumer.poll(Duration).
      Returns:
      the record.
      Throws:
      IllegalStateException - if exactly one record is not received.
      Since:
      2.9.3
    • getOneRecord

      @Nullable public static org.apache.kafka.clients.consumer.ConsumerRecord<?,?> getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, Duration timeout)
      Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.
      Parameters:
      brokerAddresses - the broker address(es).
      group - the group.
      topic - the topic.
      partition - the partition.
      seekToLast - true to fetch an existing last record, if present.
      commit - commit offset after polling or not.
      timeout - the timeout.
      Returns:
      the record or null if no record received.
      Since:
      2.9.3
    • getCurrentOffset

      public static org.apache.kafka.clients.consumer.OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception
      Get the current offset and metadata for the provided group/topic/partition.
      Parameters:
      brokerAddresses - the broker address(es).
      group - the group.
      topic - the topic.
      partition - the partition.
      Returns:
      the offset and metadata.
      Throws:
      Exception - if an exception occurs.
      Since:
      2.3
    • getCurrentOffset

      public static org.apache.kafka.clients.consumer.OffsetAndMetadata getCurrentOffset(org.apache.kafka.clients.admin.AdminClient adminClient, String group, String topic, int partition) throws Exception
      Get the current offset and metadata for the provided group/topic/partition.
      Parameters:
      adminClient - the AdminClient instance.
      group - the group.
      topic - the topic.
      partition - the partition.
      Returns:
      the offset and metadata.
      Throws:
      Exception - if an exception occurs.
      Since:
      3.0
    • getEndOffsets

      public static Map<org.apache.kafka.common.TopicPartition,Long> getEndOffsets(org.apache.kafka.clients.consumer.Consumer<?,?> consumer, String topic, Integer... partitions)
      Return the end offsets of the requested topic/partitions
      Parameters:
      consumer - the consumer.
      topic - the topic.
      partitions - the partitions, or null for all partitions.
      Returns:
      the map of end offsets.
      Since:
      2.6.5
      See Also:
      • Consumer.endOffsets(Collection, Duration)
    • getRecords

      public static <K, V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
      Poll the consumer for records.
      Type Parameters:
      K - the key type.
      V - the value type.
      Parameters:
      consumer - the consumer.
      Returns:
      the records.
      See Also:
    • getRecords

      public static <K, V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, Duration timeout)
      Poll the consumer for records.
      Type Parameters:
      K - the key type.
      V - the value type.
      Parameters:
      consumer - the consumer.
      timeout - max time in milliseconds to wait for records; forwarded to Consumer.poll(Duration).
      Returns:
      the records.
      Throws:
      IllegalStateException - if the poll returns null (since 2.3.4).
      Since:
      2.9.3
    • getRecords

      public static <K, V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> getRecords(org.apache.kafka.clients.consumer.Consumer<K,V> consumer, Duration timeout, int minRecords)
      Poll the consumer for records.
      Type Parameters:
      K - the key type.
      V - the value type.
      Parameters:
      consumer - the consumer.
      timeout - max time in milliseconds to wait for records; forwarded to Consumer.poll(Duration).
      minRecords - wait until the timeout or at least this number of records are received.
      Returns:
      the records.
      Throws:
      IllegalStateException - if the poll returns null.
      Since:
      2.9.3
    • getPropertyValue

      public static Object getPropertyValue(Object root, String propertyPath)
      Uses nested DirectFieldAccessors to obtain a property using dotted notation to traverse fields; e.g. "foo.bar.baz" will obtain a reference to the baz field of the bar field of foo. Adopted from Spring Integration.
      Parameters:
      root - The object.
      propertyPath - The path.
      Returns:
      The field.
    • getPropertyValue

      public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type)
      Type Parameters:
      T - the type.
      Parameters:
      root - the object.
      propertyPath - the path.
      type - the type to cast the object to.
      Returns:
      the field value.
      See Also:
    • defaultPropertyOverrides

      public static Properties defaultPropertyOverrides()
      Return a Properties object equal to the default consumer property overrides. Useful when matching arguments in Mockito tests.
      Returns:
      the default properties.
      Since:
      2.2.5