Message Production

1. ReactivePulsarTemplate

On the Pulsar producer side, Spring Boot auto-configuration provides a ReactivePulsarTemplate for publishing records. The template implements an interface called ReactivePulsarOperations and provides methods to publish records through its contract.

The template provides send methods that accept a single message and return a Mono<MessageId>. It also provides send methods that accept multiple messages (in the form of the ReactiveStreams Publisher type) and return a Flux<MessageId>.

For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic.

1.1. Fluent API

The template provides a fluent builder to handle more complicated send requests.

1.2. Message customization

You can specify a MessageSpecBuilderCustomizer to configure the outgoing message. For example, the following code shows how to send a keyed message:

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

1.3. Sender customization

You can specify a ReactiveMessageSenderBuilderCustomizer to configure the underlying Pulsar sender builder that ultimately constructs the sender used to send the outgoing message.

Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create) may have unintended side effects.

For example, the following code shows how to disable batching and enable chunking:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

This other example shows how to use custom routing when publishing records to partitioned topics. Specify your custom MessageRouter implementation on the sender builder such as:

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
Note that, when using a MessageRouter, the only valid setting for spring.pulsar.producer.message-routing-mode is custom.

2. Specifying Schema Information

If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the ReactivePulsarTemplate, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding.

2.1. Custom Schema Mapping

As an alternative to specifying the schema when invoking send operations on the ReactivePulsarTemplate for complex types, the schema resolver can be configured with mappings for the types. This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.

2.1.1. Configuration properties

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
The message-type is the fully-qualified name of the message class.

2.1.2. Schema resolver customizer

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. Type mapping annotation

Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage annotation. The schema info can be specified via the schemaType attribute on the annotation.

The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

With this configuration in place, there is no need to set specify the schema on send operations.

2.2. Producing with AUTO_SCHEMA

If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an AUTO_PRODUCE schema to publish a raw JSON or Avro payload as a byte[] safely.

In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.

Simply specify a schema of Schema.AUTO_PRODUCE_BYTES() on your template send operations as shown in the example below:

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
This is only supported with Avro and JSON schema types.

3. ReactivePulsarSenderFactory

The ReactivePulsarTemplate relies on a ReactivePulsarSenderFactory to actually create the underlying sender.

Spring Boot provides this sender factory which can be configured with any of the spring.pulsar.producer.* application properties.

If topic information is not specified when using the sender factory APIs directly, the same topic resolution process used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.

3.1. Producer Caching

Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the ReactiveMessageSenderCache in the underlying Apache Pulsar Reactive client caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period.

You can configure the cache settings by specifying any of the spring.pulsar.producer.cache.* application properties.