Null Payloads and Log Compaction of 'Tombstone' Records
When you use Log Compaction, you can send and receive messages with null payloads to identify the deletion of a key.
You can also receive null values for other reasons, such as a Deserializer that might return null when it cannot deserialize a value.
To send a null payload by using the KafkaTemplate, you can pass null into the value argument of the send() methods.
One exception to this is the send(Message<?> message) variant.
Since spring-messaging Message<?> cannot have a null payload, you can use a special payload type called KafkaNull, and the framework sends null.
For convenience, the static KafkaNull.INSTANCE is provided.
When you use a message listener container, the received ConsumerRecord has a null value().
To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted".
The following example shows such a configuration:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}When you use a class-level @KafkaListener with multiple @KafkaHandler methods, some additional configuration is needed.
Specifically, you need a @KafkaHandler method with a KafkaNull payload.
The following example shows how to configure one:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
    @KafkaHandler
    public void listen(String cat) {
        ...
    }
    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }
    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }
}Note that the argument is null, not KafkaNull.
| This feature requires the use of a KafkaNullAwarePayloadArgumentResolverwhich the framework will configure when using the defaultMessageHandlerMethodFactory.
When using a customMessageHandlerMethodFactory, see Adding customHandlerMethodArgumentResolverto@KafkaListener. |