This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.1!

Null Payloads and Log Compaction of 'Tombstone' Records

When you use Log Compaction, you can send and receive messages with null payloads to identify the deletion of a key.

You can also receive null values for other reasons, such as a Deserializer that might return null when it cannot deserialize a value.

To send a null payload by using the KafkaTemplate, you can pass null into the value argument of the send() methods. One exception to this is the send(Message<?> message) variant. Since spring-messaging Message<?> cannot have a null payload, you can use a special payload type called KafkaNull, and the framework sends null. For convenience, the static KafkaNull.INSTANCE is provided.

When you use a message listener container, the received ConsumerRecord has a null value().

To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false. If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted". The following example shows such a configuration:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

When you use a class-level @KafkaListener with multiple @KafkaHandler methods, some additional configuration is needed. Specifically, you need a @KafkaHandler method with a KafkaNull payload. The following example shows how to configure one:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

Note that the argument is null, not KafkaNull.

This feature requires the use of a KafkaNullAwarePayloadArgumentResolver which the framework will configure when using the default MessageHandlerMethodFactory. When using a custom MessageHandlerMethodFactory, see Adding custom HandlerMethodArgumentResolver to @KafkaListener.