Publishing and Consuming Partitioned Topics
In the following example, we publish to a topic called hello-pulsar-partitioned
.
It is a topic that is partitioned, and, for this sample, we assume that the topic is already created with three partitions.
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
In the preceding example, we publish to a partitioned topic, and we would like to publish some data segment to a specific partition.
If you leave it to Pulsar’s default, it follows a round-robin mode of partition assignments, and we would like to override that.
To do so, we provide a message router object with the send
method.
Consider the three message routers implemented.
FooRouter
always sends data to partition 0
, BarRouter
sends to partition 1
, and BuzzRouter
sends to partition 2
.
Also note that we now use the sendAsync
method of PulsarTemplate
that returns a CompletableFuture
.
When running the application, we also need to set the messageRoutingMode
on the producer to CustomPartition
(spring.pulsar.producer.message-routing-mode
).
On the consumer side, we use a PulsarListener
with the exclusive subscription type.
This means that data from all the partitions ends up in the same consumer and there is no ordering guarantee.
What can we do if we want each partition to be consumed by a single distinct consumer?
We can switch to the failover
subscription mode and add three separate consumers:
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
When you follow this approach, a single partition always gets consumed by a dedicated consumer.
In a similar vein, if you want to use Pulsar’s shared consumer type, you can use the shared
subscription type.
However, when you use the shared
mode, you lose any ordering guarantees, as a single consumer may receive messages from all the partitions before another consumer gets a chance.
Consider the following example:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}