Null Payloads and Log Compaction of 'Tombstone' Records

When using log compaction, you can send and receive messages with null payloads to identify the deletion of a key. You can also receive null values for other reasons, such as a deserializer that might return null when it cannot deserialize a value.

Producing Null Payloads

To send a null payload by using the PulsarTemplate, you can use the fluent API and pass null into the value argument of the newMessage() method, for example:

pulsarTemplate
        .newMessage(null)
        .withTopic("my-topic")
        .withSchema(Schema.STRING)
        .withMessageCustomizer((mb) -> mb.key("key:1234"))
        .send();
When sending null values you must specify the schema type as the system can not determine the type of the message from a null payload.

Consuming Null Payloads

For @PulsarListener and @PulsarReader, the null payload is passed into the listener method based on the type of its message parameter as follows:

Parameter type Passed-in value

primitive

null

user-defined

null

org.apache.pulsar.client.api.Message<T>

non-null Pulsar message whose getValue() returns null

org.springframework.messaging.Message<T>

non-null Spring message whose getPayload() returns PulsarNull

List<X>

non-null list whose entries (X) are one of the above types and act accordingly (ie. primitive entries are null etc..)

org.apache.pulsar.client.api.Messages<T>

non-null container of non-null Pulsar messages whose getValue() returns null

When the passed-in value is null (ie. single record listeners with primitive or user-defined types) you must use the @Payload parameter annotation with required = false.
When using the Spring org.springframework.messaging.Message for your listener payload type, its generic type information must be wide enough to accept Message<PulsarNull> (eg. Message, Message<?>, or Message<Object>). This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the PulsarNull placeholder.

If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted". The following example shows such a configuration:

@PulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
void myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
The @PulsarReader does not yet support @Header arguments, so it is less useful in the log compaction scenario.