Handling Exceptions

This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.

Listener Error Handlers

Starting with version 2.0, the @KafkaListener annotation has a new attribute: errorHandler.

You can use the errorHandler to provide the bean name of a KafkaListenerErrorHandler implementation. This functional interface has one method, as the following listing shows:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

You have access to the spring-messaging Message<?> object produced by the message converter and the exception that was thrown by the listener, which is wrapped in a ListenerExecutionFailedException. The error handler can throw the original or a new exception, which is thrown to the container. Anything returned by the error handler is ignored.

Starting with version 2.7, you can set the rawRecordHeader property on the MessagingMessageConverter and BatchMessagingMessageConverter which causes the raw ConsumerRecord to be added to the converted Message<?> in the KafkaHeaders.RAW_DATA header. This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer in a listener error handler. It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

It has a sub-interface (ConsumerAwareListenerErrorHandler) that has access to the consumer object, through the following method:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

Another sub-interface (ManualAckListenerErrorHandler) provides access to the Acknowledgment object when using manual AckModes.

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them.

Container Error Handlers

Starting with version 2.8, the legacy ErrorHandler and BatchErrorHandler interfaces have been superseded by a new CommonErrorHandler. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.

See Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler for information to migrate custom error handlers to CommonErrorHandler.

When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the AfterRollbackProcessor. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.

This interface has a default method isAckAfterHandle() which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception; it returns true by default.

Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation). By default, such exceptions are logged by the container at ERROR level. All of the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged.

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

You can specify a global error handler to be used for all listeners in the container factory. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.

The container commits any pending offset commits before calling the error handler.

If you are using Spring Boot, you simply need to add the error handler as a @Bean and Boot will add it to the auto-configured factory.

Back Off Handlers

Error handlers such as the DefaultErrorHandler use a BackOff to determine how long to wait before retrying a delivery. Starting with version 2.9, you can configure a custom BackOffHandler. The default handler simply suspends the thread until the back off time passes (or the container is stopped). The framework also provides the ContainerPausingBackOffHandler which pauses the listener container until the back off time passes and then resumes the container. This is useful when the delays are longer than the max.poll.interval.ms consumer property. Note that the resolution of the actual back off time will be affected by the pollTimeout container property.

DefaultErrorHandler

This new error handler replaces the SeekToCurrentErrorHandler and RecoveringBatchErrorHandler, which have been the default error handlers for several releases now. One difference is that the fallback behavior for batch listeners (when an exception other than a BatchListenerFailedException is thrown) is the equivalent of the Retrying Complete Batches.

Starting with version 2.9, the DefaultErrorHandler can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking. Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused poll(), to keep the consumer alive; if Non-Blocking Retries or a ContainerPausingBackOffHandler are being used, the pause may extend over multiple polls). The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again. To enable this mode, set the property seekAfterError to false.

The error handler can recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the handler with a custom recoverer (BiConsumer) and a BackOff that controls the delivery attempts and delays between each. Using a FixedBackOff with FixedBackOff.UNLIMITED_ATTEMPTS causes (effectively) infinite retries. The following example configures recovery after three tries:

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

To configure the listener container with a customized instance of this handler, add it to the container factory.

For example, with the @KafkaListener container factory, you can add DefaultErrorHandler as follows:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

For a record listener, this will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (FixedBackOff(0L, 9)). Failures are simply logged after retries are exhausted.

As an example, if the poll returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets. The DefaultErrorHandler seeks to offset 1 for partition 1 and offset 0 for partition 2. The next poll() returns the three unprocessed records.

If the AckMode was BATCH, the container commits the offsets for the first two partitions before calling the error handler.

For a batch listener, the listener must throw a BatchListenerFailedException indicating which records in the batch failed.

The sequence of events is:

  • Commit the offsets of the records before the index.

  • If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered.

  • If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered. The recovered record’s offset is committed.

  • If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.

Starting with version 2.9, the DefaultErrorHandler can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking. Instead, error handler creates a new ConsumerRecords<?, ?> containing just the unprocessed records which will then be submitted to the listener (after performing a single paused poll(), to keep the consumer alive). To enable this mode, set the property seekAfterError to false.

The default recoverer logs the failed record after retries are exhausted. You can use a custom recoverer, or one provided by the framework such as the DeadLetterPublishingRecoverer.

When using a POJO batch listener (e.g. List<Thing>), and you don’t have the full consumer record to add to the exception, you can just add the index of the record that failed:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < records.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

When the container is configured with AckMode.MANUAL_IMMEDIATE, the error handler can be configured to commit the offset of recovered records; set the commitRecovered property to true.

When using transactions, similar functionality is provided by the DefaultAfterRollbackProcessor. See After-rollback Processor.

The DefaultErrorHandler considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions. See the Javadocs for DefaultErrorHandler.addNotRetryableException() and DefaultErrorHandler.setClassifications() for more information, as well as those for the spring-retry BinaryExceptionClassifier.

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

The error handler can be configured with one or more RetryListeners, receiving notifications of retry and recovery progress. Starting with version 2.8.10, methods for batch listeners were added.

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

See the JavaDocs for more information.

If the recoverer fails (throws an exception), the failed record will be included in the seeks. If the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. To skip retries after a recovery failure, set the error handler’s resetStateOnRecoveryFailure to false.

You can provide the error handler with a BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

If the function returns null, the handler’s default BackOff will be used.

Set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures. When false (the default before version 2.9), the exception type is not considered.

Starting with version 2.9, this is now true by default.

Conversion Errors with Batch Error Handlers

Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a MessageConverter with a ByteArrayDeserializer, a BytesDeserializer or a StringDeserializer, as well as a DefaultErrorHandler. When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the ErrorHandlingDeserializer. A list of ConversionExceptions is available in the listener so the listener can throw a BatchListenerFailedException indicating the first index at which a conversion exception occurred.

Example:

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

Retrying Complete Batches

This is now the fallback behavior of the DefaultErrorHandler for a batch listener where the listener throws an exception other than a BatchListenerFailedException.

There is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order. It is impossible, therefore, to easily maintain retry state for a batch. The FallbackBatchErrorHandler takes the following approach. If a batch listener throws an exception that is not a BatchListenerFailedException, the retries are performed from the in-memory batch of records. In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again. If/when retries are exhausted, the ConsumerRecordRecoverer is called for each record in the batch. If the recoverer throws an exception, or the thread is interrupted during its sleep, the batch of records will be redelivered on the next poll. Before exiting, regardless of the outcome, the consumer is resumed.

This mechanism cannot be used with transactions.

While waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Container Stopping Error Handlers

The CommonContainerStoppingErrorHandler stops the container if the listener throws an exception. For record listeners, when the AckMode is RECORD, offsets for already processed records are committed. For record listeners, when the AckMode is any manual value, offsets for already acknowledged records are committed. For record listeners, when the AckMode is BATCH, or for batch listeners, the entire batch is replayed when the container is restarted.

After the container stops, an exception that wraps the ListenerExecutionFailedException is thrown. This is to cause the transaction to roll back (if transactions are enabled).

Delegating Error Handler

The CommonDelegatingErrorHandler can delegate to different error handlers, depending on the exception type. For example, you may wish to invoke a DefaultErrorHandler for most exceptions, or a CommonContainerStoppingErrorHandler for others.

Logging Error Handler

The CommonLoggingErrorHandler simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener. For a batch listener, all the records in the batch are logged.

Using Different Common Error Handlers for Record and Batch Listeners

If you wish to use a different error handling strategy for record and batch listeners, the CommonMixedErrorHandler is provided allowing the configuration of a specific error handler for each listener type.

Common Error Handler Summary

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

Legacy Error Handlers and Their Replacements

Legacy Error Handler Replacement

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

No replacement, use DefaultErrorHandler with an infinite BackOff.

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

No replacements, use DefaultErrorHandler and throw an exception other than BatchListenerFailedException.

Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler

Refer to the JavaDocs in CommonErrorHandler.

To replace an ErrorHandler or ConsumerAwareErrorHandler implementation, you should implement handleOne() and leave seeksAfterHandle() to return false (default). You should also implement handleOtherException() to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

To replace a RemainingRecordsErrorHandler implementation, you should implement handleRemaining() and override seeksAfterHandle() to return true (the error handler must perform the necessary seeks). You should also implement handleOtherException() - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

To replace any BatchErrorHandler implementation, you should implement handleBatch() You should also implement handleOtherException() - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

After-rollback Processor

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. By default, any unprocessed records (including the failed record) are re-fetched on the next poll. This is achieved by performing seek operations in the DefaultAfterRollbackProcessor. With a batch listener, the entire batch of records is reprocessed (the container has no knowledge of which record in the batch failed). To modify this behavior, you can configure the listener container with a custom AfterRollbackProcessor. For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts, perhaps by publishing it to a dead-letter topic.

Starting with version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the processor with a custom recoverer (BiConsumer) and maximum failures. Setting the maxFailures property to a negative number causes infinite retries. The following example configures recovery after three tries:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

When you do not use transactions, you can achieve similar functionality by configuring a DefaultErrorHandler. See Container Error Handlers.

Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing.

Starting with version 2.2.5, the DefaultAfterRollbackProcessor can be invoked in a new transaction (started after the failed transaction rolls back). Then, if you are using the DeadLetterPublishingRecoverer to publish a failed record, the processor will send the recovered record’s offset in the original topic/partition to the transaction. To enable this feature, set the commitRecovered and kafkaTemplate properties on the DefaultAfterRollbackProcessor.

If the recoverer fails (throws an exception), the failed record will be included in the seeks. Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the processor’s resetStateOnRecoveryFailure property to false.

Starting with version 2.6, you can now provide the processor with a BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> to determine the BackOff to use, based on the failed record and/or the exception:

handler.setBackOffFunction((record, ex) -> { ... });

If the function returns null, the processor’s default BackOff will be used.

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures. By default, the exception type is not considered.

Starting with version 2.3.1, similar to the DefaultErrorHandler, the DefaultAfterRollbackProcessor considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions. See the Javadocs for DefaultAfterRollbackProcessor.setClassifications() for more information, as well as those for the spring-retry BinaryExceptionClassifier.

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
With current kafka-clients, the container cannot detect whether a ProducerFencedException is caused by a rebalance or if the producer’s transactional.id has been revoked due to a timeout or expiry. Because, in most cases, it is caused by a rebalance, the container does not call the AfterRollbackProcessor (because it’s not appropriate to seek the partitions because we no longer are assigned them). If you ensure the timeout is large enough to process each transaction and periodically perform an "empty" transaction (e.g. via a ListenerContainerIdleEvent) you can avoid fencing due to timeout and expiry. Or, you can set the stopContainerWhenFenced container property to true and the container will stop, avoiding the loss of records. You can consume a ConsumerStoppedEvent and check the Reason property for FENCED to detect this condition. Since the event also has a reference to the container, you can restart the container using this event.

Starting with version 2.7, while waiting for a BackOff interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop() rather than causing a delay.

Starting with version 2.7, the processor can be configured with one or more RetryListeners, receiving notifications of retry and recovery progress.

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

See the JavaDocs for more information.

Delivery Attempts Header

The following applies to record listeners only, not batch listeners.

Starting with version 2.5, when using an ErrorHandler or AfterRollbackProcessor that implements DeliveryAttemptAware, it is possible to enable the addition of the KafkaHeaders.DELIVERY_ATTEMPT header (kafka_deliveryAttempt) to the record. The value of this header is an incrementing integer starting at 1. When receiving a raw ConsumerRecord<?, ?> the integer is in a byte[4].

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

When using @KafkaListener with the DefaultKafkaHeaderMapper or SimpleKafkaHeaderMapper, it can be obtained by adding @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery as a parameter to the listener method.

To enable population of this header, set the container property deliveryAttemptHeader to true. It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header.

The DefaultErrorHandler and DefaultAfterRollbackProcessor support this feature.

Listener Info Header

In some cases, it is useful to be able to know which container a listener is running in.

Starting with version 2.8.4, you can now set the listenerInfo property on the listener container, or set the info attribute on the @KafkaListener annotation. Then, the container will add this in the KafkaListener.LISTENER_INFO header to all incoming messages; it can then be used in record interceptors, filters, etc., or in the listener itself.

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

When used in a RecordInterceptor or RecordFilterStrategy implementation, the header is in the consumer record as a byte array, converted using the KafkaListenerAnnotationBeanPostProcessor's charSet property.

The header mappers also convert to String when creating MessageHeaders from the consumer record and never map this header on an outbound record.

For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single String parameter after conversion.

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
If the batch listener has a filter and the filter results in an empty batch, you will need to add required = false to the @Header parameter because the info is not available for an empty batch.

If you receive List<Message<Thing>> the info is in the KafkaHeaders.LISTENER_INFO header of each Message<?>.

See Batch Listeners for more information about consuming batches.

Publishing Dead-letter Records

You can configure the DefaultErrorHandler and DefaultAfterRollbackProcessor with a record recoverer when the maximum number of failures is reached for a record. The framework provides the DeadLetterPublishingRecoverer, which publishes the failed message to another topic. The recoverer requires a KafkaTemplate<Object, Object>, which is used to send the record. You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>, which is called to resolve the destination topic and partition.

By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic.

If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka. Starting with version 2.2.4, any ListenerExecutionFailedException (thrown, for example, when an exception is detected in a @KafkaListener method) is enhanced with the groupId property. This allows the destination resolver to use this, in addition to the information in the ConsumerRecord to select the dead letter topic.

The following example shows how to wire a custom destination resolver:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

The record sent to the dead-letter topic is enhanced with the following headers:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: The Exception class name (generally a ListenerExecutionFailedException, but can be others).

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: The Exception cause class name, if present (since version 2.8).

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: The Exception stack trace.

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: The Exception message.

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: The Exception class name (key deserialization errors only).

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: The Exception stack trace (key deserialization errors only).

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: The Exception message (key deserialization errors only).

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: The original topic.

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: The original partition.

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: The original offset.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: The original timestamp.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: The original timestamp type.

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: The original consumer group that failed to process the record (since version 2.8).

Key exceptions are only caused by DeserializationExceptions so there is no DLT_KEY_EXCEPTION_CAUSE_FQCN.

There are two mechanisms to add more headers.

  1. Subclass the recoverer and override createProducerRecord() - call super.createProducerRecord() and add more headers.

  2. Provide a BiFunction to receive the consumer record and exception, returning a Headers object; headers from there will be copied to the final producer record; also see Managing Dead Letter Record Headers. Use setHeadersFunction() to set the BiFunction.

The second is simpler to implement but the first has more information available, including the already assembled standard headers.

Starting with version 2.3, when used in conjunction with an ErrorHandlingDeserializer, the publisher will restore the record value(), in the dead-letter producer record, to the original value that failed to be deserialized. Previously, the value() was null and user code had to decode the DeserializationException from the message headers. In addition, you can provide multiple KafkaTemplates to the publisher; this might be needed, for example, if you want to publish the byte[] from a DeserializationException, as well as values using a different serializer from records that were deserialized successfully. Here is an example of configuring the publisher with KafkaTemplates that use a String and byte[] serializer:

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

The publisher uses the map keys to locate a template that is suitable for the value() about to be published. A LinkedHashMap is recommended so that the keys are examined in order.

When publishing null values, and there are multiple templates, the recoverer will look for a template for the Void class; if none is present, the first template from the values().iterator() will be used.

Since 2.7 you can use the setFailIfSendResultIsError method so that an exception is thrown when message publishing fails. You can also set a timeout for the verification of the sender success with setWaitForSendResultTimeout.

If the recoverer fails (throws an exception), the failed record will be included in the seeks. Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the error handler’s resetStateOnRecoveryFailure property to false.

Starting with version 2.6.3, set resetStateOnExceptionChange to true and the retry sequence will be restarted (including the selection of a new BackOff, if so configured) if the exception type changes between failures. By default, the exception type is not considered.

Starting with version 2.3, the recoverer can also be used with Kafka Streams - see Recovery from Deserialization Exceptions for more information.

The ErrorHandlingDeserializer adds the deserialization exception(s) in headers ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER and ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER (using Java serialization). By default, these headers are not retained in the message published to the dead letter topic. Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT.

If incoming records are dependent on each other, but may arrive out of order, it may be useful to republish a failed record to the tail of the original topic (for some number of times), instead of sending it directly to the dead letter topic. See this Stack Overflow Question for an example.

The following error handler configuration will do exactly that:

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

Starting with version 2.7, the recoverer checks that the partition selected by the destination resolver actually exists. If the partition is not present, the partition in the ProducerRecord is set to null, allowing the KafkaProducer to select the partition. You can disable this check by setting the verifyPartition property to false.

Starting with version 3.1, setting the logRecoveryRecord property to true will log the recovery record and exception.

Managing Dead Letter Record Headers

Referring to Publishing Dead-letter Records above, the DeadLetterPublishingRecoverer has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using Non-Blocking Retries).

  • appendOriginalHeaders (default true)

  • stripPreviousExceptionHeaders (default true since version 2.8)

Apache Kafka supports multiple headers with the same name; to obtain the "latest" value, you can use headers.lastHeader(headerName); to get an iterator over multiple headers, use headers.headers(headerName).iterator().

When repeatedly republishing a failed record, these headers can grow (and eventually cause publication to fail due to a RecordTooLargeException); this is especially true for the exception headers and particularly for the stack trace headers.

The reason for the two properties is because, while you might want to retain only the last exception information, you might want to retain the history of which topic(s) the record passed through for each failure.

appendOriginalHeaders is applied to all headers named ORIGINAL while stripPreviousExceptionHeaders is applied to all headers named EXCEPTION.

Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record. See the enum HeadersToAdd for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the getHeaderNames() method which subclasses can override.

To exclude headers, use the excludeHeaders() method; for example, to suppress adding the exception stack trace in a header, use:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

In addition, you can completely customize the addition of exception headers by adding an ExceptionHeadersCreator; this also disables all standard exception headers.

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

Also starting with version 2.8.4, you can now provide multiple headers functions, via the addHeadersFunction method. This allows additional functions to apply, even if another function has already been registered, for example, when using Non-Blocking Retries.

ExponentialBackOffWithMaxRetries Implementation

Spring Framework provides a number of BackOff implementations. By default, the ExponentialBackOff will retry indefinitely; to give up after some number of retry attempts requires calculating the maxElapsedTime. Since version 2.7.3, Spring for Apache Kafka provides the ExponentialBackOffWithMaxRetries which is a subclass that receives the maxRetries property and automatically calculates the maxElapsedTime, which is a little more convenient.

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

This will retry after 1, 2, 4, 8, 10, 10 seconds, before calling the recoverer.