Message Production

1. Pulsar Template

On the Pulsar producer side, Spring Boot auto-configuration provides a PulsarTemplate for publishing records. The template implements an interface called PulsarOperations and provides methods to publish records through its contract.

There are two categories of these send API methods: send and sendAsync. The send methods block calls by using the synchronous sending capabilities on the Pulsar producer. They return the MessageId of the message that was published once the message is persisted on the broker. The sendAsync method calls are asynchronous calls that are non-blocking. They return a CompletableFuture, which you can use to asynchronously receive the message ID once the messages are published.

For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic.

1.1. Simple API

The template provides a handful of methods (prefixed with 'send') for simple send requests. For more complicated send requests, a fluent API lets you configure more options.

1.2. Fluent API

The template provides a fluent builder to handle more complicated send requests.

1.3. Message customization

You can specify a TypedMessageBuilderCustomizer to configure the outgoing message. For example, the following code shows how to send a keyed message:

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

1.4. Producer customization

You can specify a ProducerBuilderCustomizer to configure the underlying Pulsar producer builder that ultimately constructs the producer used to send the outgoing message.

Use with caution as this gives full access to the producer builder and invoking some of its methods (such as create) may have unintended side effects.

For example, the following code shows how to disable batching and enable chunking:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

This other example shows how to use custom routing when publishing records to partitioned topics. Specify your custom MessageRouter implementation on the Producer builder such as:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
Note that, when using a MessageRouter, the only valid setting for spring.pulsar.producer.message-routing-mode is custom.

This other example shows how to add a ProducerInterceptor that will intercept and mutate messages received by the producer before being published to the brokers:

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

The customizer will only apply to the producer used for the send operation. If you want to apply a customizer to all producers, you must provide them to the producer factory as described in Global producer customization.

The rules described in “Caution on Lambda customizers” must be followed when using Lambda customizers.

2. Specifying Schema Information

If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the PulsarTemplate, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding.

2.1. Custom Schema Mapping

As an alternative to specifying the schema when invoking send operations on the PulsarTemplate for complex types, the schema resolver can be configured with mappings for the types. This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.

2.1.1. Configuration properties

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
The message-type is the fully-qualified name of the message class.

2.1.2. Schema resolver customizer

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. Type mapping annotation

Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage annotation. The schema info can be specified via the schemaType attribute on the annotation.

The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

With this configuration in place, there is no need to set specify the schema on send operations.

2.2. Producing with AUTO_SCHEMA

If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an AUTO_PRODUCE schema to publish a raw JSON or Avro payload as a byte[] safely.

In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.

Simply specify a schema of Schema.AUTO_PRODUCE_BYTES() on your template send operations as shown in the example below:

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
This is only supported with Avro and JSON schema types.

3. Pulsar Producer Factory

The PulsarTemplate relies on a PulsarProducerFactory to actually create the underlying producer. Spring Boot auto-configuration also provides this producer factory which you can further configure by specifying any of the spring.pulsar.producer.* application properties.

If topic information is not specified when using the producer factory APIs directly, the same topic resolution process used by the PulsarTemplate is used with the one exception that the "Message type default" step is omitted.

3.1. Global producer customization

The framework provides the ProducerBuilderCustomizer contract which allows you to configure the underlying builder which is used to construct each producer. To customize all producers, you can pass a list of customizers into the PulsarProducerFactory constructor. When using multiple customizers, they are applied in the order in which they appear in the list.

If you use Spring Boot auto-configuration, you can specify the customizers as beans and they will be passed automatically to the PulsarProducerFactory, ordered according to their @Order annotation.

If you want to apply a customizer to just a single producer, you can use the Fluent API and specify the customizer at send time.

4. Pulsar Producer Caching

Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the producer factory caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period. The cache key is composed of just enough information to ensure that callers are returned the same producer on subsequent creation requests.

Additionally, you can configure the cache settings by specifying any of the spring.pulsar.producer.cache.* application properties.

4.1. Caution on Lambda customizers

Any user-provided producer customizers are also included in the cache key. Because the cache key relies on a valid implementation of equals/hashCode, one must take caution when using Lambda customizers.

RULE: Two customizers implemented as Lambdas will match on equals/hashCode if and only if they use the same Lambda instance and do not require any variable defined outside its closure.

To clarify the above rule we will look at a few examples. In the following example, the customizer is defined as an inline Lambda which means that each call to sendUser uses the same Lambda instance. Additionally, it requires no variable outside its closure. Therefore, it will match as a cache key.

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

In this next case, the customizer is defined as an inline Lambda which means that each call to sendUser uses the same Lambda instance. However, it requires a variable outside its closure. Therefore, it will not match as a cache key.

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

In this final example, the customizer is defined as an inline Lambda which means that each call to sendUser uses the same Lambda instance. While it does use a variable name, it does not originate outside its closure and therefore will match as a cache key. This illustrates that variables can be used within the Lambda closure and can even make calls to static methods.

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
RULE: If your Lambda customizer is not defined once and only once (the same instance is used on subsequent calls) OR it requires variable(s) defined outside its closure then you must provide a customizer implementation with a valid equals/hashCode implementation.
If these rules are not followed then the producer cache will always miss and your application performance will be negatively affected.

5. Intercept Messages on the Producer

Adding a ProducerInterceptor lets you intercept and mutate messages received by the producer before they are published to the brokers. To do so, you can pass a list of interceptors into the PulsarTemplate constructor. When using multiple interceptors, the order they are applied in is the order in which they appear in the list.

If you use Spring Boot auto-configuration, you can specify the interceptors as Beans. They are passed automatically to the PulsarTemplate. Ordering of the interceptors is achieved by using the @Order annotation as follows:

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
If you are not using the starter, you will need to configure and register the aforementioned components yourself.