K
- the key type.V
- the value type.public class FilteringBatchAcknowledgingMessageListenerAdapter<K,V> extends AbstractFilteringMessageListener<K,V,BatchAcknowledgingMessageListener<K,V>> implements BatchAcknowledgingMessageListener<K,V>
BatchAcknowledgingMessageListener
adapter that implements filter logic via a
RecordFilterStrategy
. Note that any discarded records will be ack'd when the
delegate listener invokes acknowledge()
on the Acknowledgment
.ConsumerSeekAware.ConsumerSeekCallback
delegate, logger
Constructor and Description |
---|
FilteringBatchAcknowledgingMessageListenerAdapter(BatchAcknowledgingMessageListener<K,V> delegate,
RecordFilterStrategy<K,V> recordFilterStrategy)
Create an instance with the supplied strategy and delegate listener; when all
messages in a batch are discarded, an empty list is passed to the delegate so it
can decide whether or not to acknowledge.
|
FilteringBatchAcknowledgingMessageListenerAdapter(BatchAcknowledgingMessageListener<K,V> delegate,
RecordFilterStrategy<K,V> recordFilterStrategy,
boolean ackDiscarded)
Create an instance with the supplied strategy, delegate listener and discard
action.
|
Modifier and Type | Method and Description |
---|---|
void |
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumerRecords,
Acknowledgment acknowledgment)
Invoked with data from kafka.
|
filter
onIdleContainer, onPartitionsAssigned, registerSeekCallback
public FilteringBatchAcknowledgingMessageListenerAdapter(BatchAcknowledgingMessageListener<K,V> delegate, RecordFilterStrategy<K,V> recordFilterStrategy)
delegate
- the delegate.recordFilterStrategy
- the filter.public FilteringBatchAcknowledgingMessageListenerAdapter(BatchAcknowledgingMessageListener<K,V> delegate, RecordFilterStrategy<K,V> recordFilterStrategy, boolean ackDiscarded)
delegate
- the delegate.recordFilterStrategy
- the filter.ackDiscarded
- true to ack automatically if all records are filtered.public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> consumerRecords, Acknowledgment acknowledgment)
GenericAcknowledgingMessageListener
onMessage
in interface GenericAcknowledgingMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>>
consumerRecords
- the data to be processed.acknowledgment
- the acknowledgment.