Null Payloads and Log Compaction of 'Tombstone' Records

When using log compaction, you can send and receive messages with null payloads to identify the deletion of a key. You can also receive null values for other reasons, such as a deserializer that might return null when it cannot deserialize a value.

Producing Null Payloads

You can send a null value with the ReactivePulsarTemplate by passing a null message parameter value to one of the send methods, for example:

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
When sending null values you must specify the schema type as the system can not determine the type of the message from a null payload.

Consuming Null Payloads

For @ReactivePularListener, the null payload is passed into the listener method based on the type of its message parameter as follows:

Parameter type Passed-in value

primitive

null

user-defined

null

org.apache.pulsar.client.api.Message<T>

non-null Pulsar message whose getValue() returns null

org.springframework.messaging.Message<T>

non-null Spring message whose getPayload() returns PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

non-null flux whose entries are non-null Pulsar messages whose getValue() returns null

Flux<org.springframework.messaging.Message<T>>

non-null flux whose entries are non-null Spring messages whose getPayload() returns PulsarNull

When the passed-in value is null (ie. single record listeners with primitive or user-defined types) you must use the @Payload parameter annotation with required = false.
When using the Spring org.springframework.messaging.Message for your listener payload type, its generic type information must be wide enough to accept Message<PulsarNull> (eg. Message, Message<?>, or Message<Object>). This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the PulsarNull placeholder.

If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted". The following example shows such a configuration:

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
When using a streaming message listener (Flux) the header support is limited, so it less useful in the log compaction scenario.