Class DefaultErrorHandler
java.lang.Object
org.springframework.kafka.listener.KafkaExceptionLogLevelAware
org.springframework.kafka.listener.ExceptionClassifier
org.springframework.kafka.listener.FailedRecordProcessor
org.springframework.kafka.listener.FailedBatchProcessor
org.springframework.kafka.listener.DefaultErrorHandler
- All Implemented Interfaces:
CommonErrorHandler
,DeliveryAttemptAware
An error handler that, for record listeners, seeks to the current offset for each topic
in the remaining records. Used to rewind partitions after a message failure so that it
can be replayed. For batch listeners, seeks to the current offset for each topic in a
batch of records. Used to rewind partitions after a message failure so that the batch
can be replayed. If the listener throws a
BatchListenerFailedException
, with
the failed record. The records before the record will have their offsets committed and
the partitions for the remaining records will be repositioned and/or the failed record
can be recovered and skipped. If some other exception is thrown, or a valid record is
not provided in the exception, error handling is delegated to a
FallbackBatchErrorHandler
with this handler's BackOff
. If the record is
recovered, its offset is committed. This is a replacement for the legacy
SeekToCurrentErrorHandler
and SeekToCurrentBatchErrorHandler
(but the
fallback now can send the messages to a recoverer after retries are completed instead
of retrying indefinitely).- Since:
- 2.8
- Author:
- Gary Russell
-
Field Summary
Fields inherited from class org.springframework.kafka.listener.FailedRecordProcessor
logger
-
Constructor Summary
ConstructorsConstructorDescriptionConstruct an instance with the default recoverer which simply logs the record after 10 (maxFailures) have occurred for a topic/partition/offset, with the default back off (9 retries, no delay).DefaultErrorHandler
(ConsumerRecordRecoverer recoverer) Construct an instance with the provided recoverer which will be called after 10 (maxFailures) have occurred for a topic/partition/offset.DefaultErrorHandler
(ConsumerRecordRecoverer recoverer, BackOff backOff) Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.DefaultErrorHandler
(ConsumerRecordRecoverer recoverer, BackOff backOff, BackOffHandler backOffHandler) Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.DefaultErrorHandler
(BackOff backOff) Construct an instance with the default recoverer which simply logs the record after the backOff returns STOP for a topic/partition/offset. -
Method Summary
Modifier and TypeMethodDescriptionboolean
Return true if this error handler supports delivery attempts headers.void
handleBatch
(Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecords<?, ?> data, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) Handle the exception for a batch listener.<K,
V> org.apache.kafka.clients.consumer.ConsumerRecords<K, V> handleBatchAndReturnRemaining
(Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecords<?, ?> data, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) Handle the exception for a batch listener.boolean
handleOne
(Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container) Handle the exception for a record listener whenCommonErrorHandler.remainingRecords()
returns false.void
handleOtherException
(Exception thrownException, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) Called when an exception is thrown with no records available, e.g.void
handleRemaining
(Exception thrownException, List<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>> records, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container) Handle the exception for a record listener whenCommonErrorHandler.remainingRecords()
returns true.boolean
Return true if the offset should be committed for a handled error (no exception thrown).void
onPartitionsAssigned
(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions, Runnable publishPause) Called when partitions are assigned.boolean
Deprecated, for removal: This API element is subject to removal in a future version.boolean
Return true if this error handler performs seeks on the failed record and remaining records (or just the remaining records after a failed record is recovered).void
setAckAfterHandle
(boolean ackAfterHandle) Set to false to prevent the container from committing the offset of a recovered record (when the error handler does not itself throw an exception).void
setCommitRecovered
(boolean commitRecovered) Set to true to commit the offset for a recovered record.Methods inherited from class org.springframework.kafka.listener.FailedBatchProcessor
doHandle, getFallbackBatchHandler, handle, notRetryable, removeClassification, setClassifications, setLogLevel, setReclassifyOnExceptionChange, setRetryListeners
Methods inherited from class org.springframework.kafka.listener.FailedRecordProcessor
clearThreadState, deliveryAttempt, getFailureTracker, getRetryListeners, isCommitRecovered, isSeekAfterError, setBackOffFunction, setResetStateOnExceptionChange, setResetStateOnRecoveryFailure, setSeekAfterError
Methods inherited from class org.springframework.kafka.listener.ExceptionClassifier
addNotRetryableExceptions, addRetryableExceptions, defaultFalse, defaultFalse, defaultFatalExceptionsList, getClassifier
Methods inherited from class org.springframework.kafka.listener.KafkaExceptionLogLevelAware
getLogLevel
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.kafka.listener.CommonErrorHandler
clearThreadState, deliveryAttempt, handleRecord
-
Constructor Details
-
DefaultErrorHandler
public DefaultErrorHandler()Construct an instance with the default recoverer which simply logs the record after 10 (maxFailures) have occurred for a topic/partition/offset, with the default back off (9 retries, no delay). -
DefaultErrorHandler
Construct an instance with the default recoverer which simply logs the record after the backOff returns STOP for a topic/partition/offset.- Parameters:
backOff
- theBackOff
.
-
DefaultErrorHandler
Construct an instance with the provided recoverer which will be called after 10 (maxFailures) have occurred for a topic/partition/offset.- Parameters:
recoverer
- the recoverer.
-
DefaultErrorHandler
Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.- Parameters:
recoverer
- the recoverer; if null, the default (logging) recoverer is used.backOff
- theBackOff
.
-
DefaultErrorHandler
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler) Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.- Parameters:
recoverer
- the recoverer; if null, the default (logging) recoverer is used.backOff
- theBackOff
.backOffHandler
- theBackOffHandler
.- Since:
- 2.9
-
-
Method Details
-
setCommitRecovered
public void setCommitRecovered(boolean commitRecovered) Set to true to commit the offset for a recovered record. The container must be configured withContainerProperties.AckMode.MANUAL_IMMEDIATE
. Whether or not the commit is sync or async depends on the container's syncCommits property.- Overrides:
setCommitRecovered
in classFailedRecordProcessor
- Parameters:
commitRecovered
- true to commit.
-
isAckAfterHandle
public boolean isAckAfterHandle()Description copied from interface:CommonErrorHandler
Return true if the offset should be committed for a handled error (no exception thrown).- Specified by:
isAckAfterHandle
in interfaceCommonErrorHandler
- Returns:
- true to commit.
-
setAckAfterHandle
public void setAckAfterHandle(boolean ackAfterHandle) Description copied from interface:CommonErrorHandler
Set to false to prevent the container from committing the offset of a recovered record (when the error handler does not itself throw an exception).- Specified by:
setAckAfterHandle
in interfaceCommonErrorHandler
- Parameters:
ackAfterHandle
- false to not commit.
-
remainingRecords
Deprecated, for removal: This API element is subject to removal in a future version.Description copied from interface:CommonErrorHandler
Return false if this error handler should only receive the current failed record; remaining records will be passed to the listener after the error handler returns. When true (default), all remaining records including the failed record are passed to the error handler.- Specified by:
remainingRecords
in interfaceCommonErrorHandler
- Returns:
- false to receive only the failed record.
- See Also:
-
seeksAfterHandling
public boolean seeksAfterHandling()Description copied from interface:CommonErrorHandler
Return true if this error handler performs seeks on the failed record and remaining records (or just the remaining records after a failed record is recovered).- Specified by:
seeksAfterHandling
in interfaceCommonErrorHandler
- Returns:
- true if the next poll should fetch records.
-
deliveryAttemptHeader
public boolean deliveryAttemptHeader()Description copied from interface:CommonErrorHandler
Return true if this error handler supports delivery attempts headers.- Specified by:
deliveryAttemptHeader
in interfaceCommonErrorHandler
- Returns:
- true if capable.
-
handleOne
public boolean handleOne(Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container) Description copied from interface:CommonErrorHandler
Handle the exception for a record listener whenCommonErrorHandler.remainingRecords()
returns false. Use this to handle just the single failed record.- Specified by:
handleOne
in interfaceCommonErrorHandler
- Parameters:
thrownException
- the exception.record
- the record.consumer
- the consumer.container
- the container.- Returns:
- true if the error was "handled" or false if not and the container will re-submit the record to the listener.
- See Also:
-
handleRemaining
public void handleRemaining(Exception thrownException, List<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>> records, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container) Description copied from interface:CommonErrorHandler
Handle the exception for a record listener whenCommonErrorHandler.remainingRecords()
returns true. The failed record and all the remaining records from the poll are passed in. Usually used when the error handler performs seeks so that the remaining records will be redelivered on the next poll.- Specified by:
handleRemaining
in interfaceCommonErrorHandler
- Parameters:
thrownException
- the exception.records
- the remaining records including the one that failed.consumer
- the consumer.container
- the container.- See Also:
-
handleBatch
public void handleBatch(Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecords<?, ?> data, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) Description copied from interface:CommonErrorHandler
Handle the exception for a batch listener. The completeConsumerRecords
from the poll is supplied. The error handler needs to perform seeks if you wish to reprocess the records in the batch.- Specified by:
handleBatch
in interfaceCommonErrorHandler
- Parameters:
thrownException
- the exception.data
- the consumer records.consumer
- the consumer.container
- the container.invokeListener
- a callback to re-invoke the listener.
-
handleBatchAndReturnRemaining
public <K,V> org.apache.kafka.clients.consumer.ConsumerRecords<K,V> handleBatchAndReturnRemaining(Exception thrownException, org.apache.kafka.clients.consumer.ConsumerRecords<?, ?> data, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) Description copied from interface:CommonErrorHandler
Handle the exception for a batch listener. The completeConsumerRecords
from the poll is supplied. Return the members of the batch that should be re-sent to the listener. The returned records MUST be in the same order as the original records.- Specified by:
handleBatchAndReturnRemaining
in interfaceCommonErrorHandler
- Type Parameters:
K
- the key type.V
- the value type.- Parameters:
thrownException
- the exception.data
- the consumer records.consumer
- the consumer.container
- the container.invokeListener
- a callback to re-invoke the listener.- Returns:
- the consumer records, or a subset.
-
handleOtherException
public void handleOtherException(Exception thrownException, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) Description copied from interface:CommonErrorHandler
Called when an exception is thrown with no records available, e.g. if the consumer poll throws an exception.- Specified by:
handleOtherException
in interfaceCommonErrorHandler
- Parameters:
thrownException
- the exception.consumer
- the consumer.container
- the container.batchListener
- true if the listener is a batch listener.
-
onPartitionsAssigned
public void onPartitionsAssigned(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions, Runnable publishPause) Description copied from interface:CommonErrorHandler
Called when partitions are assigned.- Specified by:
onPartitionsAssigned
in interfaceCommonErrorHandler
- Parameters:
consumer
- the consumer.partitions
- the newly assigned partitions.publishPause
- called to publish a consumer paused event.
-