Testing Applications

The spring-pulsar-test dependency includes some useful utilities when testing your applications.

1. PulsarConsumerTestUtil

The org.springframework.pulsar.test.support.PulsarConsumerTestUtil provides a type-safe fluent API for consuming messages from a Pulsar topic within a test.

The following example shows how to consume messages from a topic for 5 seconds:

List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
            .fromTopic("my-topic")
            .withSchema(Schema.STRING)
            .awaitAtMost(Duration.ofSeconds(5))
            .get();

An until method is also available to allow you to specify a condition that must be met before the messages are returned. The following example uses a condition to consume 5 messages from a topic.

List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
            .fromTopic("my-topic")
            .withSchema(Schema.STRING)
            .awaitAtMost(Duration.ofSeconds(5))
            .until(messages -> messages.size() == 5)
            .get();

A set of commonly used conditions are available in org.springframework.pulsar.test.support.ConsumedMessagesConditions. The following example uses the factory-provided atLeastOneMessageMatches condition to return the consumed messages once one of them has a value of "boom".

List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
            .fromTopic("my-topic")
            .withSchema(Schema.STRING)
            .awaitAtMost(Duration.ofSeconds(5))
            .until(ConsumedMessagesConditions.atLeastOneMessageMatches("boom"))
            .get();

2. PulsarTestContainerSupport

The org.springframework.pulsar.test.support.PulsarTestContainerSupport interface provides a static Pulsar Testcontainer. When using Junit Jupiter, the container is automatically started once per test class via @BeforeAll annotation.

The following example shows how you can use the container support in a @SpringBootTest in conjunction with the previously mentioned PulsarConsumerTestUtil.

@SpringBootTest
class MyApplicationTests implements PulsarTestContainerSupport {

	@DynamicPropertySource
	static void pulsarProperties(DynamicPropertyRegistry registry) {
		registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
		registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
	}

	@Test
	void sendAndReceiveWorksAsExpected(
			@Autowired PulsarTemplate<String> template,
			@Autowired PulsarConsumerFactory<String> consumerFactory) {
		var topic = "some-topic";
		var msg = "foo-5150";
		template.send(topic, msg);
		var matchedUsers = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
				.fromTopic(topic)
				.withSchema(Schema.STRING)
				.awaitAtMost(Duration.ofSeconds(2))
				.until(ConsumedMessagesConditions.atLeastOneMessageMatches(msg))
				.get();
		assertThat(matchedUsers).hasSize(1);
	}
}