Topic Resolution

A destination topic is needed when producing or consuming messages. The framework looks in the following ordered locations to determine a topic (stopping at the first find):

  • User specified

  • Message type default

  • Global default

When a topic is found via one of the default mechanisms, there is no need to specify the topic on the produce or consume API.

When a topic is not found, the API will throw an exception accordingly.

1. User specified

A topic passed into the API being used has the highest precedence (eg. PulsarTemplate.send("my-topic", myMessage) or @PulsarListener(topics = "my-topic").

2. Message type default

When no topic is passed into the API, the system looks for a message type to topic mapping configured for the type of the message being produced or consumed.

Mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to configure default topics to use when consuming or producing Foo or Bar messages:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
The message-type is the fully-qualified name of the message class.
If the message (or the first message of a Publisher input) is null, the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null messages.

2.1. Specified via annotation

When no topic is passed into the API and there are no custom topic mappings configured, the system looks for a @PulsarMessage annotation on the class of the message being produced or consumed. The default topic can be specified via the topic attribute on the annotation.

The following example configures the default topic to use when producing or consuming messages of type Foo:

@PulsarMessage(topic = "foo-topic")
record Foo(String value) {
}

Property placeholders and SpEL expressions are supported in the @PulsarMessage annotation, for example:

@PulsarMessage(topic = "${app.topics.foo}")
record Foo(String value) {
}

@PulsarMessage(topic = "#{someBean.getTopic()}")
record Bar(String value) {
}

2.2. Custom topic resolver

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can replace the default resolver by proving your own implementation, for example:

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

3. Producer global default

The final location consulted (when producing) is the system-wide producer default topic. It is configured via the spring.pulsar.producer.topic-name property when using the imperative API and the spring.pulsar.reactive.sender.topic-name property when using the reactive API.

4. Consumer global default

The final location consulted (when consuming) is the system-wide consumer default topic. It is configured via the spring.pulsar.consumer.topics or spring.pulsar.consumer.topics-pattern property when using the imperative API and one of the spring.pulsar.reactive.consumer.topics or spring.pulsar.reactive.consumer.topics-pattern property when using the reactive API.