Null Payloads and Log Compaction of 'Tombstone' Records

When you use Log Compaction, you can send and receive messages with null payloads to identify the deletion of a key.

You can also receive null values for other reasons, such as a Deserializer that might return null when it cannot deserialize a value.

To send a null payload by using the KafkaTemplate, you can pass null into the value argument of the send() methods. One exception to this is the send(Message<?> message) variant. Since spring-messaging Message<?> cannot have a null payload, you can use a special payload type called KafkaNull, and the framework sends null. For convenience, the static KafkaNull.INSTANCE is provided.

When you use a message listener container, the received ConsumerRecord has a null value().

To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false. If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted". The following example shows such a configuration:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

When you use a class-level @KafkaListener with multiple @KafkaHandler methods, some additional configuration is needed. Specifically, you need a @KafkaHandler method with a KafkaNull payload. The following example shows how to configure one:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

Note that the argument is null, not KafkaNull.

This feature requires the use of a KafkaNullAwarePayloadArgumentResolver which the framework will configure when using the default MessageHandlerMethodFactory. When using a custom MessageHandlerMethodFactory, see Adding custom HandlerMethodArgumentResolver to @KafkaListener.