This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Filtering Messages
In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. The framework cannot know whether such a message has been processed or not. That is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation of it.
The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter
class, which can wrap your MessageListener
.
This class takes an implementation of RecordFilterStrategy
in which you implement the filter
method to signal that a message is a duplicate and should be discarded.
This has an additional property called ackDiscarded
, which indicates whether the adapter should acknowledge the discarded record.
It is false
by default.
When you use @KafkaListener
, set the RecordFilterStrategy
(and optionally ackDiscarded
) on the container factory so that the listener is wrapped in the appropriate filtering adapter.
In addition, a FilteringBatchMessageListenerAdapter
is provided, for when you use a batch message listener.
The FilteringBatchMessageListenerAdapter is ignored if your @KafkaListener receives a ConsumerRecords<?, ?> instead of List<ConsumerRecord<?, ?>> , because ConsumerRecords is immutable.
|
Starting with version 2.8.4, you can override the listener container factory’s default RecordFilterStrategy
by using the filter
property on the listener annotations.
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}