Message Consumption

1. @ReactivePulsarListener

When it comes to Pulsar consumers, we recommend that end-user applications use the ReactivePulsarListener annotation. To use ReactivePulsarListener, you need to use the @EnableReactivePulsar annotation. When you use Spring Boot support, it automatically enables this annotation and configures all necessary components, such as the message listener infrastructure (which is responsible for creating the underlying Pulsar consumer).

Let us revisit the ReactivePulsarListener code snippet we saw in the quick-tour section:

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).

You can also further simplify this method:

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

In this most basic form, when the topics are not directly provided, a topic resolution process is used to determine the destination topic. Likewise, when the subscriptionName is not provided on the @ReactivePulsarListener annotation an auto-generated subscription name will be used.

In the ReactivePulsarListener method shown earlier, we receive the data as String, but we do not specify any schema types. Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type.

The framework detects that you expect the String type and then infers the schema type based on that information and provides that schema to the consumer. The framework does this inference for all primitive types. For all non-primitive types the default schema is assumed to be JSON. If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the schemaType property.

This example shows how we can consume complex types from a topic:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

Let us look at a few more ways we can consume.

This example consumes the Pulsar message directly:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

This example consumes the record wrapped in a Spring messaging envelope:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

1.1. Streaming

All of the above are examples of consuming a single record one-by-one. However, one of the compelling reasons to use Reactive is for the streaming capability with backpressure support.

The following example uses ReactivePulsarListener to consume a stream of POJOs:

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

Here we receive the records as a Flux of Pulsar messages. In addition, to enable stream consumption at the ReactivePulsarListener level, you need to set the stream property on the annotation to true.

The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.

Based on the actual type of the messages in the Flux, the framework tries to infer the schema to use. If it contains a complex type, you still need to provide the schemaType on ReactivePulsarListener.

The following listener uses the Spring messaging Message envelope with a complex type :

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message. The MessageUtils provides the same functionality for Spring messages as the set of factory methods on MessagResult does for Pulsar messages.
There is no support for using org.apache.pulsar.client.api.Messages<T> in a @ReactivePulsarListener

1.2. Configuration - Application Properties

The listener relies on the ReactivePulsarConsumerFactory to create and manage the underlying Pulsar consumer that it uses to consume messages. Spring Boot provides this consumer factory which you can further configure by specifying the spring.pulsar.consumer.* application properties. Most of the configured properties on the factory will be respected in the listener with the following exceptions:

The spring.pulsar.consumer.subscription.name property is ignored and is instead generated when not specified on the annotation.
The spring.pulsar.consumer.subscription-type property is ignored and is instead taken from the value on the annotation. However, you can set the subscriptionType = {} on the annotation to instead use the property value as the default.

1.3. Generic records with AUTO_CONSUME

If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the AUTO_CONSUME schema type to consume generic records. In this case, the topic deserializes messages into GenericRecord objects using the schema info associated with the topic.

To consume generic records set the schemaType = SchemaType.AUTO_CONSUME on your @ReactivePulsarListener and use a Pulsar message of type GenericRecord as the message parameter as shown below.

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
	return Mono.empty();
}
The GenericRecord API allows access to the fields and their associated values

1.4. Consumer Customization

You can specify a ReactivePulsarListenerMessageConsumerBuilderCustomizer to configure the underlying Pulsar consumer builder that ultimately constructs the consumer used by the listener to receive the messages.

Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create) may have unintended side effects.

For example, the following code shows how to set the initial position of the subscription to the earliest messaage on the topic.

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.

You can also use the customizer to provide direct Pulsar consumer properties to the consumer builder. This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple ReactivePulsarListener methods whose configuration varies.

The following customizer example uses direct Pulsar consumer properties:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties

2. Specifying Schema Information

As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the ReactivePulsarListener. For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding.

2.1. Custom Schema Mapping

As an alternative to specifying the schema on the ReactivePulsarListener for complex types, the schema resolver can be configured with mappings for the types. This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.

2.1.1. Configuration properties

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
The message-type is the fully-qualified name of the message class.

2.1.2. Schema resolver customizer

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. Type mapping annotation

Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage annotation. The schema info can be specified via the schemaType attribute on the annotation.

The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

With this configuration in place, there is no need to set the schema on the listener, for example:

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}

3. Message Listener Container Infrastructure

In most scenarios, we recommend using the ReactivePulsarListener annotation directly for consuming from a Pulsar topic as that model covers a broad set of application use cases. However, it is important to understand how ReactivePulsarListener works internally.

The message listener container is at the heart of message consumption when you use Spring for Apache Pulsar. The ReactivePulsarListener uses the message listener container infrastructure behind the scenes to create and manage the underlying Pulsar consumer.

3.1. ReactivePulsarMessageListenerContainer

The contract for this message listener container is provided through ReactivePulsarMessageListenerContainer whose default implementation creates a reactive Pulsar consumer and wires up a reactive message pipeline that uses the created consumer.

3.2. ReactiveMessagePipeline

The pipeline is a feature of the underlying Apache Pulsar Reactive client which does the heavy lifting of receiving the data in a reactive manner and then handing it over to the provided message handler. The reactive message listener container implementation is much simpler because the pipeline handles the majority of the work.

3.3. ReactivePulsarMessageHandler

The "listener" aspect is provided by the ReactivePulsarMessageHandler of which there are two provided implementations:

  • ReactivePulsarOneByOneMessageHandler - handles a single message one-by-one

  • ReactivePulsarStreamingHandler - handles multiple messages via a Flux

If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.

4. Concurrency

When consuming records in streaming mode (stream = true) concurrency comes naturally via the underlying Reactive support in the client implementation.

However, when handling messages one-by-one, concurrency can be specified to increase processing throughput. Simply set the concurrency property on @ReactivePulsarListener. Additionally, when concurrency > 1 you can ensure messages are ordered by key and therefore sent to the same handler by setting useKeyOrderedProcessing = "true" on the annotation.

Again, the ReactiveMessagePipeline does the heavy lifting, we simply set the properties on it.

Reactive vs Imperative

Concurrency in the reactive container is different from its imperative counterpart. The latter creates multiple threads (each with a Pulsar consumer) whereas the former dispatches the messages to multiple handler instances concurrently on the Reactive parallel scheduler.

One advantage of the reactive concurrency model is that it can be used with Exclusive subscriptions whereas the imperative concurrency model can not.

5. Pulsar Headers

The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java.

5.1. Accessing In OneByOne Listener

The following example shows how you can access Pulsar Headers when using a one-by-one message listener:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

In the preceding example, we access the values for the messageId message metadata as well as a custom message property named foo. The Spring @Header annotation is used for each header field.

You can also use Pulsar’s Message as the envelope to carry the payload. When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata. However, as a convenience, you can also retrieve it by using the Header annotation. Note that you can also use the Spring messaging Message envelope to carry the payload and then retrieve the Pulsar headers by using @Header.

5.2. Accessing In Streaming Listener

When using a streaming message listener the header support is limited. Only when the Flux contains Spring org.springframework.messaging.Message elements will the headers be populated. Additionally, the Spring @Header annotation can not be used to retrieve the data. You must directly call the corresponding methods on the Spring message to retrieve the data.

6. Message Acknowledgment

The framework automatically handles message acknowledgement. However, the listener method must send a signal indicating whether the message was successfully processed. The container implementation then uses that signal to perform the ack or nack operation. This is a slightly different from its imperative counterpart where the signal is implied as positive unless the method throws an exception.

6.1. OneByOne Listener

The single message (aka OneByOne) message listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).

6.2. Streaming Listener

The streaming listener method returns a Flux<MessageResult<Void>> where each MessageResult element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of acknowledge and negativeAcknowledge static factory methods that can be used to create the appropriate MessageResult instance.

7. Message Redelivery and Error Handling

Apache Pulsar provides various native strategies for message redelivery and error handling. We will take a look at them and see how to use them through Spring for Apache Pulsar.

7.1. Acknowledgment Timeout

By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.

You can specify this property directly as a Pulsar consumer property via a consumer customizer such as:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

7.2. Negative Acknowledgment Redelivery Delay

When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered. The default is to redeliver the message in one minute, but you can change it via a consumer customizer such as:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

7.3. Dead Letter Topic

Apache Pulsar lets applications use a dead letter topic on consumers with a Shared subscription type. For the Exclusive and Failover subscription types, this feature is not available. The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ). Let us see some details around this feature in action by inspecting some code snippets:

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

First, we have a special bean for DeadLetterPolicy, and it is named as deadLetterPolicy (it can be any name as you wish). This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic, in this case. If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ in Pulsar. Next, we provide this bean name to ReactivePulsarListener by setting the deadLetterPolicy property. Note that the ReactivePulsarListener has a subscription type of Shared, as the DLQ feature only works with shared subscriptions. This code is primarily for demonstration purposes, so we provide an ackTimeout value of 1 second. The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry. If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy), the Pulsar consumer publishes the messages to the DLQ topic. We have another ReactivePulsarListener that listens on the DLQ topic to receive data as it is published to the DLQ topic.

Special note on DLQ topics when using partitioned topics

If the main topic is partitioned, behind the scenes, each partition is treated as a separate topic by Pulsar. Pulsar appends partition-<n>, where n stands for the partition number to the main topic name. The problem is that, if you do not specify a DLQ topic (as opposed to what we did above), Pulsar publishes to a default topic name that has this `partition-<n> info in it — for example: topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. The easy way to solve this is to provide a DLQ topic name always.

8. Pulsar Reader Support

The framework provides support for using Pulsar Reader in a Reactive fashion via the ReactivePulsarReaderFactory.

Spring Boot provides this reader factory which can be configured with any of the spring.pulsar.reader.* application properties.