Class RecoveringBatchErrorHandler
- java.lang.Object
-
- org.springframework.kafka.listener.KafkaExceptionLogLevelAware
-
- org.springframework.kafka.listener.FailedRecordProcessor
-
- org.springframework.kafka.listener.RecoveringBatchErrorHandler
-
- All Implemented Interfaces:
BatchErrorHandler
,ConsumerAwareBatchErrorHandler
,ContainerAwareBatchErrorHandler
,DeliveryAttemptAware
,GenericErrorHandler<org.apache.kafka.clients.consumer.ConsumerRecords<?,?>>
public class RecoveringBatchErrorHandler extends FailedRecordProcessor implements ContainerAwareBatchErrorHandler
An error handler that seeks to the current offset for each topic in a batch of records. Used to rewind partitions after a message failure so that the batch can be replayed. If the listener throws aBatchListenerFailedException
, with the failed record. The records before the record will have their offsets committed and the partitions for the remaining records will be repositioned and/or the failed record can be recovered and skipped. If some other exception is thrown, or a valid record is not provided in the exception, error handling is delegated to aSeekToCurrentBatchErrorHandler
with this handler'sBackOff
. If the record is recovered, its offset is committed.- Since:
- 2.5
- Author:
- Gary Russell, Myeonghyeon Lee
-
-
Field Summary
-
Fields inherited from class org.springframework.kafka.listener.FailedRecordProcessor
logger
-
-
Constructor Summary
Constructors Constructor Description RecoveringBatchErrorHandler()
Construct an instance with the default recoverer which simply logs the record after 10 (maxFailures) have occurred for a topic/partition/offset.RecoveringBatchErrorHandler(java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> recoverer)
Construct an instance with the provided recoverer which will be called after 10 (maxFailures) have occurred for a topic/partition/offset.RecoveringBatchErrorHandler(java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> recoverer, org.springframework.util.backoff.BackOff backOff)
Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.RecoveringBatchErrorHandler(org.springframework.util.backoff.BackOff backOff)
Construct an instance with the default recoverer which simply logs the record after the backOff returns STOP for a topic/partition/offset.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
handle(java.lang.Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecords<?,?> data, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, MessageListenerContainer container)
Handle the exception.boolean
isAckAfterHandle()
Return true if the offset should be committed for a handled error (no exception thrown).void
setAckAfterHandle(boolean ackAfterHandle)
Set to false to prevent the container from committing the offset of a recovered record (when the error handler does not itself throw an exception).-
Methods inherited from class org.springframework.kafka.listener.FailedRecordProcessor
addNotRetryableException, addNotRetryableExceptions, clearThreadState, deliveryAttempt, getClassifier, getSkipPredicate, isCommitRecovered, removeNotRetryableException, setBackOffFunction, setClassifications, setCommitRecovered, setResetStateOnExceptionChange, setResetStateOnRecoveryFailure
-
Methods inherited from class org.springframework.kafka.listener.KafkaExceptionLogLevelAware
getLogLevel, setLogLevel
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.ConsumerAwareBatchErrorHandler
handle
-
Methods inherited from interface org.springframework.kafka.listener.ContainerAwareBatchErrorHandler
handle, handle
-
Methods inherited from interface org.springframework.kafka.listener.GenericErrorHandler
clearThreadState
-
-
-
-
Constructor Detail
-
RecoveringBatchErrorHandler
public RecoveringBatchErrorHandler()
Construct an instance with the default recoverer which simply logs the record after 10 (maxFailures) have occurred for a topic/partition/offset.
-
RecoveringBatchErrorHandler
public RecoveringBatchErrorHandler(org.springframework.util.backoff.BackOff backOff)
Construct an instance with the default recoverer which simply logs the record after the backOff returns STOP for a topic/partition/offset.- Parameters:
backOff
- theBackOff
.
-
RecoveringBatchErrorHandler
public RecoveringBatchErrorHandler(java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> recoverer)
Construct an instance with the provided recoverer which will be called after 10 (maxFailures) have occurred for a topic/partition/offset.- Parameters:
recoverer
- the recoverer.
-
RecoveringBatchErrorHandler
public RecoveringBatchErrorHandler(@Nullable java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception> recoverer, org.springframework.util.backoff.BackOff backOff)
Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.- Parameters:
recoverer
- the recoverer; if null, the default (logging) recoverer is used.backOff
- theBackOff
.- Since:
- 2.3
-
-
Method Detail
-
isAckAfterHandle
public boolean isAckAfterHandle()
Description copied from interface:GenericErrorHandler
Return true if the offset should be committed for a handled error (no exception thrown).- Specified by:
isAckAfterHandle
in interfaceGenericErrorHandler<org.apache.kafka.clients.consumer.ConsumerRecords<?,?>>
- Returns:
- true to commit.
-
setAckAfterHandle
public void setAckAfterHandle(boolean ackAfterHandle)
Description copied from interface:GenericErrorHandler
Set to false to prevent the container from committing the offset of a recovered record (when the error handler does not itself throw an exception).- Specified by:
setAckAfterHandle
in interfaceGenericErrorHandler<org.apache.kafka.clients.consumer.ConsumerRecords<?,?>>
- Parameters:
ackAfterHandle
- false to not commit.
-
handle
public void handle(java.lang.Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecords<?,?> data, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, MessageListenerContainer container)
Description copied from interface:BatchErrorHandler
Handle the exception.- Specified by:
handle
in interfaceBatchErrorHandler
- Specified by:
handle
in interfaceConsumerAwareBatchErrorHandler
- Specified by:
handle
in interfaceContainerAwareBatchErrorHandler
- Parameters:
thrownException
- the exception.data
- the consumer records.consumer
- the consumer.container
- the container.
-
-