Class DefaultErrorHandler

  • All Implemented Interfaces:
    CommonErrorHandler, DeliveryAttemptAware

    public class DefaultErrorHandler
    extends FailedBatchProcessor
    implements CommonErrorHandler
    An error handler that, for record listeners, seeks to the current offset for each topic in the remaining records. Used to rewind partitions after a message failure so that it can be replayed. For batch listeners, seeks to the current offset for each topic in a batch of records. Used to rewind partitions after a message failure so that the batch can be replayed. If the listener throws a BatchListenerFailedException, with the failed record. The records before the record will have their offsets committed and the partitions for the remaining records will be repositioned and/or the failed record can be recovered and skipped. If some other exception is thrown, or a valid record is not provided in the exception, error handling is delegated to a FallbackBatchErrorHandler with this handler's BackOff. If the record is recovered, its offset is committed. This is a replacement for the legacy SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler (but the fallback now can send the messages to a recoverer after retries are completed instead of retrying indefinitely).
    Since:
    2.8
    Author:
    Gary Russell
    • Constructor Detail

      • DefaultErrorHandler

        public DefaultErrorHandler()
        Construct an instance with the default recoverer which simply logs the record after 10 (maxFailures) have occurred for a topic/partition/offset, with the default back off (9 retries, no delay).
      • DefaultErrorHandler

        public DefaultErrorHandler​(org.springframework.util.backoff.BackOff backOff)
        Construct an instance with the default recoverer which simply logs the record after the backOff returns STOP for a topic/partition/offset.
        Parameters:
        backOff - the BackOff.
      • DefaultErrorHandler

        public DefaultErrorHandler​(ConsumerRecordRecoverer recoverer)
        Construct an instance with the provided recoverer which will be called after 10 (maxFailures) have occurred for a topic/partition/offset.
        Parameters:
        recoverer - the recoverer.
      • DefaultErrorHandler

        public DefaultErrorHandler​(@Nullable
                                   ConsumerRecordRecoverer recoverer,
                                   org.springframework.util.backoff.BackOff backOff)
        Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.
        Parameters:
        recoverer - the recoverer; if null, the default (logging) recoverer is used.
        backOff - the BackOff.
      • DefaultErrorHandler

        public DefaultErrorHandler​(@Nullable
                                   ConsumerRecordRecoverer recoverer,
                                   org.springframework.util.backoff.BackOff backOff,
                                   @Nullable
                                   BackOffHandler backOffHandler)
        Construct an instance with the provided recoverer which will be called after the backOff returns STOP for a topic/partition/offset.
        Parameters:
        recoverer - the recoverer; if null, the default (logging) recoverer is used.
        backOff - the BackOff.
        backOffHandler - the BackOffHandler.
        Since:
        2.9
    • Method Detail

      • setCommitRecovered

        public void setCommitRecovered​(boolean commitRecovered)
        Set to true to commit the offset for a recovered record. The container must be configured with ContainerProperties.AckMode.MANUAL_IMMEDIATE. Whether or not the commit is sync or async depends on the container's syncCommits property.
        Overrides:
        setCommitRecovered in class FailedRecordProcessor
        Parameters:
        commitRecovered - true to commit.
      • isAckAfterHandle

        public boolean isAckAfterHandle()
        Description copied from interface: CommonErrorHandler
        Return true if the offset should be committed for a handled error (no exception thrown).
        Specified by:
        isAckAfterHandle in interface CommonErrorHandler
        Returns:
        true to commit.
      • setAckAfterHandle

        public void setAckAfterHandle​(boolean ackAfterHandle)
        Description copied from interface: CommonErrorHandler
        Set to false to prevent the container from committing the offset of a recovered record (when the error handler does not itself throw an exception).
        Specified by:
        setAckAfterHandle in interface CommonErrorHandler
        Parameters:
        ackAfterHandle - false to not commit.
      • seeksAfterHandling

        public boolean seeksAfterHandling()
        Description copied from interface: CommonErrorHandler
        Return true if this error handler performs seeks on the failed record and remaining records (or just the remaining records after a failed record is recovered).
        Specified by:
        seeksAfterHandling in interface CommonErrorHandler
        Returns:
        true if the next poll should fetch records.
      • deliveryAttemptHeader

        public boolean deliveryAttemptHeader()
        Description copied from interface: CommonErrorHandler
        Return true if this error handler supports delivery attempts headers.
        Specified by:
        deliveryAttemptHeader in interface CommonErrorHandler
        Returns:
        true if capable.
      • handleOne

        public boolean handleOne​(java.lang.Exception thrownException,
                                 org.apache.kafka.clients.consumer.ConsumerRecord<?,​?> record,
                                 org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                 MessageListenerContainer container)
        Description copied from interface: CommonErrorHandler
        Handle the exception for a record listener when CommonErrorHandler.remainingRecords() returns false. Use this to handle just the single failed record.
        Specified by:
        handleOne in interface CommonErrorHandler
        Parameters:
        thrownException - the exception.
        record - the record.
        consumer - the consumer.
        container - the container.
        Returns:
        true if the error was "handled" or false if not and the container will re-submit the record to the listener.
        See Also:
        CommonErrorHandler.remainingRecords()
      • handleRemaining

        public void handleRemaining​(java.lang.Exception thrownException,
                                    java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,​?>> records,
                                    org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                    MessageListenerContainer container)
        Description copied from interface: CommonErrorHandler
        Handle the exception for a record listener when CommonErrorHandler.remainingRecords() returns true. The failed record and all the remaining records from the poll are passed in. Usually used when the error handler performs seeks so that the remaining records will be redelivered on the next poll.
        Specified by:
        handleRemaining in interface CommonErrorHandler
        Parameters:
        thrownException - the exception.
        records - the remaining records including the one that failed.
        consumer - the consumer.
        container - the container.
        See Also:
        CommonErrorHandler.remainingRecords()
      • handleBatch

        public void handleBatch​(java.lang.Exception thrownException,
                                org.apache.kafka.clients.consumer.ConsumerRecords<?,​?> data,
                                org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                MessageListenerContainer container,
                                java.lang.Runnable invokeListener)
        Description copied from interface: CommonErrorHandler
        Handle the exception for a batch listener. The complete ConsumerRecords from the poll is supplied. The error handler needs to perform seeks if you wish to reprocess the records in the batch.
        Specified by:
        handleBatch in interface CommonErrorHandler
        Parameters:
        thrownException - the exception.
        data - the consumer records.
        consumer - the consumer.
        container - the container.
        invokeListener - a callback to re-invoke the listener.
      • handleBatchAndReturnRemaining

        public <K,​V> org.apache.kafka.clients.consumer.ConsumerRecords<K,​V> handleBatchAndReturnRemaining​(java.lang.Exception thrownException,
                                                                                                                      org.apache.kafka.clients.consumer.ConsumerRecords<?,​?> data,
                                                                                                                      org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                                                                                                      MessageListenerContainer container,
                                                                                                                      java.lang.Runnable invokeListener)
        Description copied from interface: CommonErrorHandler
        Handle the exception for a batch listener. The complete ConsumerRecords from the poll is supplied. Return the members of the batch that should be re-sent to the listener. The returned records MUST be in the same order as the original records.
        Specified by:
        handleBatchAndReturnRemaining in interface CommonErrorHandler
        Type Parameters:
        K - the key type.
        V - the value type.
        Parameters:
        thrownException - the exception.
        data - the consumer records.
        consumer - the consumer.
        container - the container.
        invokeListener - a callback to re-invoke the listener.
        Returns:
        the consumer records, or a subset.
      • handleOtherException

        public void handleOtherException​(java.lang.Exception thrownException,
                                         org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                         MessageListenerContainer container,
                                         boolean batchListener)
        Description copied from interface: CommonErrorHandler
        Called when an exception is thrown with no records available, e.g. if the consumer poll throws an exception.
        Specified by:
        handleOtherException in interface CommonErrorHandler
        Parameters:
        thrownException - the exception.
        consumer - the consumer.
        container - the container.
        batchListener - true if the listener is a batch listener.
      • onPartitionsAssigned

        public void onPartitionsAssigned​(org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                         java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)
        Description copied from interface: CommonErrorHandler
        Called when partitions are assigned.
        Specified by:
        onPartitionsAssigned in interface CommonErrorHandler
        Parameters:
        consumer - the consumer.
        partitions - the newly assigned partitions.