Filtering Messages

In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. The framework cannot know whether such a message has been processed or not. That is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation of it.

The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter class, which can wrap your MessageListener. This class takes an implementation of RecordFilterStrategy in which you implement the filter method to signal that a message is a duplicate and should be discarded. This has an additional property called ackDiscarded, which indicates whether the adapter should acknowledge the discarded record. It is false by default.

When you use @KafkaListener, set the RecordFilterStrategy (and optionally ackDiscarded) on the container factory so that the listener is wrapped in the appropriate filtering adapter.

In addition, a FilteringBatchMessageListenerAdapter is provided, for when you use a batch message listener.

The FilteringBatchMessageListenerAdapter is ignored if your @KafkaListener receives a ConsumerRecords<?, ?> instead of List<ConsumerRecord<?, ?>>, because ConsumerRecords is immutable.

Starting with version 2.8.4, you can override the listener container factory’s default RecordFilterStrategy by using the filter property on the listener annotations.

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

Starting with version 3.3, Ignoring empty batches that result from filtering by RecordFilterStrategy is supported. When implementing RecordFilterStrategy, it can be configured through ignoreEmptyBatch(). The default setting is false, indicating KafkaListener will be invoked even if all ConsumerRecord s are filtered out.

If true is returned, the KafkaListener will not be invoked when all ConsumerRecord are filtered out. However, commit to broker, will still be executed.

If false is returned, the KafkaListener will be invoked when all ConsumerRecord are filtered out.

Here are some examples.

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

In this case, IgnoreEmptyBatchRecordFilterStrategy always returns empty list and return true as result of ignoreEmptyBatch(). Thus KafkaListener#listen(…​) never will be invoked at all.

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

However, in this case, IgnoreEmptyBatchRecordFilterStrategy always returns empty list and return false as result of ignoreEmptyBatch(). Thus KafkaListener#listen(…​) always will be invoked.