Null Payloads and Log Compaction of 'Tombstone' Records
When you use Log Compaction, you can send and receive messages with null
payloads to identify the deletion of a key.
You can also receive null
values for other reasons, such as a Deserializer
that might return null
when it cannot deserialize a value.
To send a null
payload by using the KafkaTemplate
, you can pass null into the value argument of the send()
methods.
One exception to this is the send(Message<?> message)
variant.
Since spring-messaging
Message<?>
cannot have a null
payload, you can use a special payload type called KafkaNull
, and the framework sends null
.
For convenience, the static KafkaNull.INSTANCE
is provided.
When you use a message listener container, the received ConsumerRecord
has a null
value()
.
To configure the @KafkaListener
to handle null
payloads, you must use the @Payload
annotation with required = false
.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted
".
The following example shows such a configuration:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
When you use a class-level @KafkaListener
with multiple @KafkaHandler
methods, some additional configuration is needed.
Specifically, you need a @KafkaHandler
method with a KafkaNull
payload.
The following example shows how to configure one:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
Note that the argument is null
, not KafkaNull
.
This feature requires the use of a KafkaNullAwarePayloadArgumentResolver which the framework will configure when using the default MessageHandlerMethodFactory .
When using a custom MessageHandlerMethodFactory , see Adding custom HandlerMethodArgumentResolver to @KafkaListener .
|