Filtering Messages
In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. The framework cannot know whether such a message has been processed or not. That is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation of it.
The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter
class, which can wrap your MessageListener
.
This class takes an implementation of RecordFilterStrategy
in which you implement the filter
method to signal that a message is a duplicate and should be discarded.
This has an additional property called ackDiscarded
, which indicates whether the adapter should acknowledge the discarded record.
It is false
by default.
When you use @KafkaListener
, set the RecordFilterStrategy
(and optionally ackDiscarded
) on the container factory so that the listener is wrapped in the appropriate filtering adapter.
In addition, a FilteringBatchMessageListenerAdapter
is provided, for when you use a batch message listener.
The FilteringBatchMessageListenerAdapter is ignored if your @KafkaListener receives a ConsumerRecords<?, ?> instead of List<ConsumerRecord<?, ?>> , because ConsumerRecords is immutable.
|
Starting with version 2.8.4, you can override the listener container factory’s default RecordFilterStrategy
by using the filter
property on the listener annotations.
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
Starting with version 3.3, Ignoring empty batches that result from filtering by RecordFilterStrategy
is supported.
When implementing RecordFilterStrategy
, it can be configured through ignoreEmptyBatch()
.
The default setting is false
, indicating KafkaListener
will be invoked even if all ConsumerRecord
s are filtered out.
If true
is returned, the KafkaListener
will not be invoked when all ConsumerRecord
are filtered out.
However, commit to broker, will still be executed.
If false
is returned, the KafkaListener
will be invoked when all ConsumerRecord
are filtered out.
Here are some examples.
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
In this case, IgnoreEmptyBatchRecordFilterStrategy
always returns empty list and return true
as result of ignoreEmptyBatch()
.
Thus KafkaListener#listen(…)
never will be invoked at all.
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
However, in this case, IgnoreEmptyBatchRecordFilterStrategy
always returns empty list and return false
as result of ignoreEmptyBatch()
.
Thus KafkaListener#listen(…)
always will be invoked.