Testing Applications
The spring-pulsar-test
dependency includes some useful utilities when testing your applications.
1. PulsarConsumerTestUtil
The org.springframework.pulsar.test.support.PulsarConsumerTestUtil
provides a type-safe fluent API for consuming messages from a Pulsar topic within a test.
The following example shows how to consume messages from a topic for 5 seconds:
List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic("my-topic")
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.get();
An until
method is also available to allow you to specify a condition that must be met before the messages are returned.
The following example uses a condition to consume 5 messages from a topic.
List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic("my-topic")
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(messages -> messages.size() == 5)
.get();
A set of commonly used conditions are available in org.springframework.pulsar.test.support.ConsumedMessagesConditions
.
The following example uses the factory-provided atLeastOneMessageMatches
condition to return the consumed messages once one of them has a value of "boom"
.
List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic("my-topic")
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(ConsumedMessagesConditions.atLeastOneMessageMatches("boom"))
.get();
2. PulsarTestContainerSupport
The org.springframework.pulsar.test.support.PulsarTestContainerSupport
interface provides a static Pulsar Testcontainer.
When using Junit Jupiter, the container is automatically started once per test class via @BeforeAll
annotation.
The following example shows how you can use the container support in a @SpringBootTest
in conjunction with the previously mentioned PulsarConsumerTestUtil
.
@SpringBootTest
class MyApplicationTests implements PulsarTestContainerSupport {
@DynamicPropertySource
static void pulsarProperties(DynamicPropertyRegistry registry) {
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
}
@Test
void sendAndReceiveWorksAsExpected(
@Autowired PulsarTemplate<String> template,
@Autowired PulsarConsumerFactory<String> consumerFactory) {
var topic = "some-topic";
var msg = "foo-5150";
template.send(topic, msg);
var matchedUsers = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic(topic)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(2))
.until(ConsumedMessagesConditions.atLeastOneMessageMatches(msg))
.get();
assertThat(matchedUsers).hasSize(1);
}
}