Handling Exceptions
This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.
Listener Error Handlers
Starting with version 2.0, the @KafkaListener
annotation has a new attribute: errorHandler
.
You can use the errorHandler
to provide the bean name of a KafkaListenerErrorHandler
implementation.
This functional interface has one method, as the following listing shows:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
You have access to the spring-messaging Message<?>
object produced by the message converter and the exception that was thrown by the listener, which is wrapped in a ListenerExecutionFailedException
.
The error handler can throw the original or a new exception, which is thrown to the container.
Anything returned by the error handler is ignored.
Starting with version 2.7, you can set the rawRecordHeader
property on the MessagingMessageConverter
and BatchMessagingMessageConverter
which causes the raw ConsumerRecord
to be added to the converted Message<?>
in the KafkaHeaders.RAW_DATA
header.
This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer
in a listener error handler.
It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
It has a sub-interface (ConsumerAwareListenerErrorHandler
) that has access to the consumer object, through the following method:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
Another sub-interface (ManualAckListenerErrorHandler
) provides access to the Acknowledgment
object when using manual AckMode
s.
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them.
Container Error Handlers
Starting with version 2.8, the legacy ErrorHandler
and BatchErrorHandler
interfaces have been superseded by a new CommonErrorHandler
.
These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener.
CommonErrorHandler
implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated.
The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.
See Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler
for information to migrate custom error handlers to CommonErrorHandler
.
When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction.
Error handling for transactional containers are handled by the AfterRollbackProcessor
.
If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.
This interface has a default method isAckAfterHandle()
which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception; it returns true by default.
Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation).
By default, such exceptions are logged by the container at ERROR
level.
All of the framework error handlers extend KafkaExceptionLogLevelAware
which allows you to control the level at which these exceptions are logged.
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
You can specify a global error handler to be used for all listeners in the container factory. The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.
The container commits any pending offset commits before calling the error handler.
If you are using Spring Boot, you simply need to add the error handler as a @Bean
and Boot will add it to the auto-configured factory.
Back Off Handlers
Error handlers such as the DefaultErrorHandler use a BackOff
to determine how long to wait before retrying a delivery.
Starting with version 2.9, you can configure a custom BackOffHandler
.
The default handler simply suspends the thread until the back off time passes (or the container is stopped).
The framework also provides the ContainerPausingBackOffHandler
which pauses the listener container until the back off time passes and then resumes the container.
This is useful when the delays are longer than the max.poll.interval.ms
consumer property.
Note that the resolution of the actual back off time will be affected by the pollTimeout
container property.
DefaultErrorHandler
This new error handler replaces the SeekToCurrentErrorHandler
and RecoveringBatchErrorHandler
, which have been the default error handlers for several releases now.
One difference is that the fallback behavior for batch listeners (when an exception other than a BatchListenerFailedException
is thrown) is the equivalent of the Retrying Complete Batches.
Starting with version 2.9, the DefaultErrorHandler can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking.
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused poll() , to keep the consumer alive; if Non-Blocking Retries or a ContainerPausingBackOffHandler are being used, the pause may extend over multiple polls).
The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again.
To enable this mode, set the property seekAfterError to false .
|
The error handler can recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the ERROR
level).
You can configure the handler with a custom recoverer (BiConsumer
) and a BackOff
that controls the delivery attempts and delays between each.
Using a FixedBackOff
with FixedBackOff.UNLIMITED_ATTEMPTS
causes (effectively) infinite retries.
The following example configures recovery after three tries:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
To configure the listener container with a customized instance of this handler, add it to the container factory.
For example, with the @KafkaListener
container factory, you can add DefaultErrorHandler
as follows:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
For a record listener, this will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (FixedBackOff(0L, 9)
).
Failures are simply logged after retries are exhausted.
As an example, if the poll
returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets.
The DefaultErrorHandler
seeks to offset 1 for partition 1 and offset 0 for partition 2.
The next poll()
returns the three unprocessed records.
If the AckMode
was BATCH
, the container commits the offsets for the first two partitions before calling the error handler.
For a batch listener, the listener must throw a BatchListenerFailedException
indicating which records in the batch failed.
The sequence of events is:
-
Commit the offsets of the records before the index.
-
If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered.
-
If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered. The recovered record’s offset is committed.
-
If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.
Starting with version 2.9, the DefaultErrorHandler can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking.
Instead, error handler creates a new ConsumerRecords<?, ?> containing just the unprocessed records which will then be submitted to the listener (after performing a single paused poll() , to keep the consumer alive).
To enable this mode, set the property seekAfterError to false .
|
The default recoverer logs the failed record after retries are exhausted.
You can use a custom recoverer, or one provided by the framework such as the DeadLetterPublishingRecoverer
.
When using a POJO batch listener (e.g. List<Thing>
), and you don’t have the full consumer record to add to the exception, you can just add the index of the record that failed:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
When the container is configured with AckMode.MANUAL_IMMEDIATE
, the error handler can be configured to commit the offset of recovered records; set the commitRecovered
property to true
.
See also Publishing Dead-letter Records.
When using transactions, similar functionality is provided by the DefaultAfterRollbackProcessor
.
See After-rollback Processor.
The DefaultErrorHandler
considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
since these exceptions are unlikely to be resolved on a retried delivery.
You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for DefaultErrorHandler.addNotRetryableException()
and DefaultErrorHandler.setClassifications()
for more information, as well as those for the spring-retry
BinaryExceptionClassifier
.
Here is an example that adds IllegalArgumentException
to the not-retryable exceptions:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
The error handler can be configured with one or more RetryListener
s, receiving notifications of retry and recovery progress.
Starting with version 2.8.10, methods for batch listeners were added.
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
See the JavaDocs for more information.
If the recoverer fails (throws an exception), the failed record will be included in the seeks.
If the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
To skip retries after a recovery failure, set the error handler’s resetStateOnRecoveryFailure to false .
|
You can provide the error handler with a BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
to determine the BackOff
to use, based on the failed record and/or the exception:
handler.setBackOffFunction((record, ex) -> { ... });
If the function returns null
, the handler’s default BackOff
will be used.
Set resetStateOnExceptionChange
to true
and the retry sequence will be restarted (including the selection of a new BackOff
, if so configured) if the exception type changes between failures.
When false
(the default before version 2.9), the exception type is not considered.
Starting with version 2.9, this is now true
by default.
Also see Delivery Attempts Header.
Conversion Errors with Batch Error Handlers
Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a MessageConverter
with a ByteArrayDeserializer
, a BytesDeserializer
or a StringDeserializer
, as well as a DefaultErrorHandler
.
When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the ErrorHandlingDeserializer
.
A list of ConversionException
s is available in the listener so the listener can throw a BatchListenerFailedException
indicating the first index at which a conversion exception occurred.
Example:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
Retrying Complete Batches
This is now the fallback behavior of the DefaultErrorHandler
for a batch listener where the listener throws an exception other than a BatchListenerFailedException
.
There is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order.
It is impossible, therefore, to easily maintain retry state for a batch.
The FallbackBatchErrorHandler
takes the following approach.
If a batch listener throws an exception that is not a BatchListenerFailedException
, the retries are performed from the in-memory batch of records.
In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again.
If/when retries are exhausted, the ConsumerRecordRecoverer
is called for each record in the batch.
If the recoverer throws an exception, or the thread is interrupted during its sleep, the batch of records will be redelivered on the next poll.
Before exiting, regardless of the outcome, the consumer is resumed.
This mechanism cannot be used with transactions. |
While waiting for a BackOff
interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop()
rather than causing a delay.
Container Stopping Error Handlers
The CommonContainerStoppingErrorHandler
stops the container if the listener throws an exception.
For record listeners, when the AckMode
is RECORD
, offsets for already processed records are committed.
For record listeners, when the AckMode
is any manual value, offsets for already acknowledged records are committed.
For record listeners, when the AckMode
is BATCH
, or for batch listeners, the entire batch is replayed when the container is restarted.
After the container stops, an exception that wraps the ListenerExecutionFailedException
is thrown.
This is to cause the transaction to roll back (if transactions are enabled).
Delegating Error Handler
The CommonDelegatingErrorHandler
can delegate to different error handlers, depending on the exception type.
For example, you may wish to invoke a DefaultErrorHandler
for most exceptions, or a CommonContainerStoppingErrorHandler
for others.
All delegates must share the same compatible properties (ackAfterHandle
, seekAfterError
…).
Logging Error Handler
The CommonLoggingErrorHandler
simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener.
For a batch listener, all the records in the batch are logged.
Using Different Common Error Handlers for Record and Batch Listeners
If you wish to use a different error handling strategy for record and batch listeners, the CommonMixedErrorHandler
is provided allowing the configuration of a specific error handler for each listener type.
Common Error Handler Summary
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
Legacy Error Handlers and Their Replacements
Legacy Error Handler | Replacement |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No replacement, use |
|
|
|
No replacements, use |
Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler
Refer to the JavaDocs in CommonErrorHandler
.
To replace an ErrorHandler
or ConsumerAwareErrorHandler
implementation, you should implement handleOne()
and leave seeksAfterHandle()
to return false
(default).
You should also implement handleOtherException()
to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).
To replace a RemainingRecordsErrorHandler
implementation, you should implement handleRemaining()
and override seeksAfterHandle()
to return true
(the error handler must perform the necessary seeks).
You should also implement handleOtherException()
- to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).
To replace any BatchErrorHandler
implementation, you should implement handleBatch()
You should also implement handleOtherException()
- to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).
After-rollback Processor
When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back.
By default, any unprocessed records (including the failed record) are re-fetched on the next poll.
This is achieved by performing seek
operations in the DefaultAfterRollbackProcessor
.
With a batch listener, the entire batch of records is reprocessed (the container has no knowledge of which record in the batch failed).
To modify this behavior, you can configure the listener container with a custom AfterRollbackProcessor
.
For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts, perhaps by publishing it to a dead-letter topic.
Starting with version 2.2, the DefaultAfterRollbackProcessor
can now recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the ERROR
level).
You can configure the processor with a custom recoverer (BiConsumer
) and maximum failures.
Setting the maxFailures
property to a negative number causes infinite retries.
The following example configures recovery after three tries:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
When you do not use transactions, you can achieve similar functionality by configuring a DefaultErrorHandler
.
See Container Error Handlers.
Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing. |
See also Publishing Dead-letter Records.
Starting with version 2.2.5, the DefaultAfterRollbackProcessor
can be invoked in a new transaction (started after the failed transaction rolls back).
Then, if you are using the DeadLetterPublishingRecoverer
to publish a failed record, the processor will send the recovered record’s offset in the original topic/partition to the transaction.
To enable this feature, set the commitRecovered
and kafkaTemplate
properties on the DefaultAfterRollbackProcessor
.
If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the processor’s resetStateOnRecoveryFailure property to false .
|
Starting with version 2.6, you can now provide the processor with a BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
to determine the BackOff
to use, based on the failed record and/or the exception:
handler.setBackOffFunction((record, ex) -> { ... });
If the function returns null
, the processor’s default BackOff
will be used.
Starting with version 2.6.3, set resetStateOnExceptionChange
to true
and the retry sequence will be restarted (including the selection of a new BackOff
, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.
Starting with version 2.3.1, similar to the DefaultErrorHandler
, the DefaultAfterRollbackProcessor
considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
since these exceptions are unlikely to be resolved on a retried delivery.
You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for DefaultAfterRollbackProcessor.setClassifications()
for more information, as well as those for the spring-retry
BinaryExceptionClassifier
.
Here is an example that adds IllegalArgumentException
to the not-retryable exceptions:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
Also see Delivery Attempts Header.
With current kafka-clients , the container cannot detect whether a ProducerFencedException is caused by a rebalance or if the producer’s transactional.id has been revoked due to a timeout or expiry.
Because, in most cases, it is caused by a rebalance, the container does not call the AfterRollbackProcessor (because it’s not appropriate to seek the partitions because we no longer are assigned them).
If you ensure the timeout is large enough to process each transaction and periodically perform an "empty" transaction (e.g. via a ListenerContainerIdleEvent ) you can avoid fencing due to timeout and expiry.
Or, you can set the stopContainerWhenFenced container property to true and the container will stop, avoiding the loss of records.
You can consume a ConsumerStoppedEvent and check the Reason property for FENCED to detect this condition.
Since the event also has a reference to the container, you can restart the container using this event.
|
Starting with version 2.7, while waiting for a BackOff
interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the stop()
rather than causing a delay.
Starting with version 2.7, the processor can be configured with one or more RetryListener
s, receiving notifications of retry and recovery progress.
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
See the JavaDocs for more information.
Delivery Attempts Header
The following applies to record listeners only, not batch listeners.
Starting with version 2.5, when using an ErrorHandler
or AfterRollbackProcessor
that implements DeliveryAttemptAware
, it is possible to enable the addition of the KafkaHeaders.DELIVERY_ATTEMPT
header (kafka_deliveryAttempt
) to the record.
The value of this header is an incrementing integer starting at 1.
When receiving a raw ConsumerRecord<?, ?>
the integer is in a byte[4]
.
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
When using @KafkaListener
with the DefaultKafkaHeaderMapper
or SimpleKafkaHeaderMapper
, it can be obtained by adding @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
as a parameter to the listener method.
To enable population of this header, set the container property deliveryAttemptHeader
to true
.
It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header.
The DefaultErrorHandler
and DefaultAfterRollbackProcessor
support this feature.
Listener Info Header
In some cases, it is useful to be able to know which container a listener is running in.
Starting with version 2.8.4, you can now set the listenerInfo
property on the listener container, or set the info
attribute on the @KafkaListener
annotation.
Then, the container will add this in the KafkaListener.LISTENER_INFO
header to all incoming messages; it can then be used in record interceptors, filters, etc., or in the listener itself.
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
When used in a RecordInterceptor
or RecordFilterStrategy
implementation, the header is in the consumer record as a byte array, converted using the KafkaListenerAnnotationBeanPostProcessor
's charSet
property.
The header mappers also convert to String
when creating MessageHeaders
from the consumer record and never map this header on an outbound record.
For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single String
parameter after conversion.
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
If the batch listener has a filter and the filter results in an empty batch, you will need to add required = false to the @Header parameter because the info is not available for an empty batch.
|
If you receive List<Message<Thing>>
the info is in the KafkaHeaders.LISTENER_INFO
header of each Message<?>
.
See Batch Listeners for more information about consuming batches.
Publishing Dead-letter Records
You can configure the DefaultErrorHandler
and DefaultAfterRollbackProcessor
with a record recoverer when the maximum number of failures is reached for a record.
The framework provides the DeadLetterPublishingRecoverer
, which publishes the failed message to another topic.
The recoverer requires a KafkaTemplate<Object, Object>
, which is used to send the record.
You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
, which is called to resolve the destination topic and partition.
By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT ) and to the same partition as the original record.
Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic.
|
If the returned TopicPartition
has a negative partition, the partition is not set in the ProducerRecord
, so the partition is selected by Kafka.
Starting with version 2.2.4, any ListenerExecutionFailedException
(thrown, for example, when an exception is detected in a @KafkaListener
method) is enhanced with the groupId
property.
This allows the destination resolver to use this, in addition to the information in the ConsumerRecord
to select the dead letter topic.
The following example shows how to wire a custom destination resolver:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
The record sent to the dead-letter topic is enhanced with the following headers:
-
KafkaHeaders.DLT_EXCEPTION_FQCN
: The Exception class name (generally aListenerExecutionFailedException
, but can be others). -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
: The Exception cause class name, if present (since version 2.8). -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
: The Exception stack trace. -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
: The Exception message. -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
: The Exception class name (key deserialization errors only). -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
: The Exception stack trace (key deserialization errors only). -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
: The Exception message (key deserialization errors only). -
KafkaHeaders.DLT_ORIGINAL_TOPIC
: The original topic. -
KafkaHeaders.DLT_ORIGINAL_PARTITION
: The original partition. -
KafkaHeaders.DLT_ORIGINAL_OFFSET
: The original offset. -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
: The original timestamp. -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
: The original timestamp type. -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
: The original consumer group that failed to process the record (since version 2.8).
Key exceptions are only caused by DeserializationException
s so there is no DLT_KEY_EXCEPTION_CAUSE_FQCN
.
There are two mechanisms to add more headers.
-
Subclass the recoverer and override
createProducerRecord()
- callsuper.createProducerRecord()
and add more headers. -
Provide a
BiFunction
to receive the consumer record and exception, returning aHeaders
object; headers from there will be copied to the final producer record; also see Managing Dead Letter Record Headers. UsesetHeadersFunction()
to set theBiFunction
.
The second is simpler to implement but the first has more information available, including the already assembled standard headers.
Starting with version 2.3, when used in conjunction with an ErrorHandlingDeserializer
, the publisher will restore the record value()
, in the dead-letter producer record, to the original value that failed to be deserialized.
Previously, the value()
was null and user code had to decode the DeserializationException
from the message headers.
In addition, you can provide multiple KafkaTemplate
s to the publisher; this might be needed, for example, if you want to publish the byte[]
from a DeserializationException
, as well as values using a different serializer from records that were deserialized successfully.
Here is an example of configuring the publisher with KafkaTemplate
s that use a String
and byte[]
serializer:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
The publisher uses the map keys to locate a template that is suitable for the value()
about to be published.
A LinkedHashMap
is recommended so that the keys are examined in order.
When publishing null
values, and there are multiple templates, the recoverer will look for a template for the Void
class; if none is present, the first template from the values().iterator()
will be used.
Since 2.7 you can use the setFailIfSendResultIsError
method so that an exception is thrown when message publishing fails.
You can also set a timeout for the verification of the sender success with setWaitForSendResultTimeout
.
If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the BackOff will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the BackOff was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler’s resetStateOnRecoveryFailure property to false .
|
Starting with version 2.6.3, set resetStateOnExceptionChange
to true
and the retry sequence will be restarted (including the selection of a new BackOff
, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.
Starting with version 2.3, the recoverer can also be used with Kafka Streams - see Recovery from Deserialization Exceptions for more information.
The ErrorHandlingDeserializer
adds the deserialization exception(s) in headers ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
and ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
(using Java serialization).
By default, these headers are not retained in the message published to the dead letter topic.
Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT.
If incoming records are dependent on each other, but may arrive out of order, it may be useful to republish a failed record to the tail of the original topic (for some number of times), instead of sending it directly to the dead letter topic. See this Stack Overflow Question for an example.
The following error handler configuration will do exactly that:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
Starting with version 2.7, the recoverer checks that the partition selected by the destination resolver actually exists.
If the partition is not present, the partition in the ProducerRecord
is set to null
, allowing the KafkaProducer
to select the partition.
You can disable this check by setting the verifyPartition
property to false
.
Starting with version 3.1, setting the logRecoveryRecord
property to true
will log the recovery record and exception.
Managing Dead Letter Record Headers
Referring to Publishing Dead-letter Records above, the DeadLetterPublishingRecoverer
has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using Non-Blocking Retries).
-
appendOriginalHeaders
(defaulttrue
) -
stripPreviousExceptionHeaders
(defaulttrue
since version 2.8)
Apache Kafka supports multiple headers with the same name; to obtain the "latest" value, you can use headers.lastHeader(headerName)
; to get an iterator over multiple headers, use headers.headers(headerName).iterator()
.
When repeatedly republishing a failed record, these headers can grow (and eventually cause publication to fail due to a RecordTooLargeException
); this is especially true for the exception headers and particularly for the stack trace headers.
The reason for the two properties is because, while you might want to retain only the last exception information, you might want to retain the history of which topic(s) the record passed through for each failure.
appendOriginalHeaders
is applied to all headers named ORIGINAL
while stripPreviousExceptionHeaders
is applied to all headers named EXCEPTION
.
Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record.
See the enum HeadersToAdd
for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the getHeaderNames()
method which subclasses can override.
To exclude headers, use the excludeHeaders()
method; for example, to suppress adding the exception stack trace in a header, use:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
In addition, you can completely customize the addition of exception headers by adding an ExceptionHeadersCreator
; this also disables all standard exception headers.
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
Also starting with version 2.8.4, you can now provide multiple headers functions, via the addHeadersFunction
method.
This allows additional functions to apply, even if another function has already been registered, for example, when using Non-Blocking Retries.
Also see Failure Header Management with Non-Blocking Retries.
ExponentialBackOffWithMaxRetries
Implementation
Spring Framework provides a number of BackOff
implementations.
By default, the ExponentialBackOff
will retry indefinitely; to give up after some number of retry attempts requires calculating the maxElapsedTime
.
Since version 2.7.3, Spring for Apache Kafka provides the ExponentialBackOffWithMaxRetries
which is a subclass that receives the maxRetries
property and automatically calculates the maxElapsedTime
, which is a little more convenient.
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
This will retry after 1, 2, 4, 8, 10, 10
seconds, before calling the recoverer.