Class DeadLetterPublishingRecoverer
java.lang.Object
org.springframework.kafka.listener.KafkaExceptionLogLevelAware
org.springframework.kafka.listener.ExceptionClassifier
org.springframework.kafka.listener.DeadLetterPublishingRecoverer
- All Implemented Interfaces:
BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,
,?>, Exception> ConsumerAwareRecordRecoverer
,ConsumerRecordRecoverer
public class DeadLetterPublishingRecoverer
extends ExceptionClassifier
implements ConsumerAwareRecordRecoverer
A
ConsumerRecordRecoverer
that publishes a failed record to a dead-letter
topic.- Since:
- 2.2
- Author:
- Gary Russell, Tomaz Fernandes
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interface
Use this to provide a custom implementation to take complete control over exception header creation for the output record.static class
Container class for the name of the headers that will be added to the produced record.static class
AHeader
that indicates that this header should replace any existing headers with this name, rather than being appended to the headers, which is the normal behavior. -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionDeadLetterPublishingRecoverer
(Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver, boolean transactional, BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> destinationResolver) Create an instance with a template resolving function that receives the failed consumer record and the exception and returns aKafkaOperations
and a flag on whether or not the publishing from this instance will be transactional or not.DeadLetterPublishingRecoverer
(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) Create an instance with the provided templates and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record.DeadLetterPublishingRecoverer
(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates, BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> destinationResolver) Create an instance with the provided templates and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
.DeadLetterPublishingRecoverer
(KafkaOperations<? extends Object, ? extends Object> template) Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record.DeadLetterPublishingRecoverer
(KafkaOperations<? extends Object, ? extends Object> template, BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> destinationResolver) Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
. -
Method Summary
Modifier and TypeMethodDescriptionvoid
accept
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Exception exception) Recover the record.void
addHeadersFunction
(BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.header.Headers> headersFunction) Add a function which will be called to obtain additional headers to add to the published record.createProducerRecord
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, byte[] key, byte[] value) Subclasses can override this method to customize the producer record to send to the DLQ.protected Duration
determineSendTimeout
(KafkaOperations<?, ?> template) Determine the send timeout based on the template's producer factory andsetWaitForSendResultTimeout(Duration)
.void
Clear the header inclusion bit for the header name.Override this if you want different header names to be used in the sent record.protected long
The number of milliseconds to add to the producer configurationdelivery.timeout.ms
property to avoid timing out before the Kafka producer.void
Set the header inclusion bit for the header name.protected boolean
If true, wait for the send result and throw an exception if it fails.protected boolean
True if publishing should run in a transaction.protected void
publish
(org.apache.kafka.clients.producer.ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> inRecord) Override this if you want more than just logging of the send result.protected void
send
(org.apache.kafka.clients.producer.ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> inRecord) Send the record.void
setAppendOriginalHeaders
(boolean appendOriginalHeaders) Set to false if you don't want to append the current "original" headers (topic, partition etc.) if they are already present.void
Set aDeadLetterPublishingRecoverer.ExceptionHeadersCreator
implementation to completely take over setting the exception headers in the output record.void
setFailIfSendResultIsError
(boolean failIfSendResultIsError) Set to true to enable waiting for the send result and throw an exception if it fails.void
setHeadersFunction
(BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.header.Headers> headersFunction) Set a function which will be called to obtain additional headers to add to the published record.void
setPartitionInfoTimeout
(Duration partitionInfoTimeout) Time to wait for partition information when verifying.void
setRetainExceptionHeader
(boolean retainExceptionHeader) Set to true to retain a Java serializedDeserializationException
header.void
setSkipSameTopicFatalExceptions
(boolean skipSameTopicFatalExceptions) Set to false if you want to forward the record to the same topic even though the exception is fatal by this class' classification, e.g.void
setStripPreviousExceptionHeaders
(boolean stripPreviousExceptionHeaders) Set to false to retain previous exception headers as well as headers for the current exception.void
setThrowIfNoDestinationReturned
(boolean throwIfNoDestinationReturned) Set to true to throw an exception if the destination resolver function returns a null TopicPartition.void
setTimeoutBuffer
(long buffer) Set the number of milliseconds to add to the producer configurationdelivery.timeout.ms
property to avoid timing out before the Kafka producer.void
setVerifyPartition
(boolean verifyPartition) Set to false to disable partition verification.void
setWaitForSendResultTimeout
(Duration waitForSendResultTimeout) Set the minimum time to wait for message sending.protected void
verifySendResult
(KafkaOperations<Object, Object> kafkaTemplate, org.apache.kafka.clients.producer.ProducerRecord<Object, Object> outRecord, CompletableFuture<SendResult<Object, Object>> sendResult, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> inRecord) Wait for the send future to complete.Methods inherited from class org.springframework.kafka.listener.ExceptionClassifier
addNotRetryableExceptions, addRetryableExceptions, defaultFalse, defaultFalse, defaultFatalExceptionsList, getClassifier, notRetryable, removeClassification, setClassifications
Methods inherited from class org.springframework.kafka.listener.KafkaExceptionLogLevelAware
getLogLevel, setLogLevel
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface java.util.function.BiConsumer
andThen
Methods inherited from interface org.springframework.kafka.listener.ConsumerAwareRecordRecoverer
accept
-
Field Details
-
logger
-
-
Constructor Details
-
DeadLetterPublishingRecoverer
Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic.- Parameters:
template
- theKafkaOperations
to use for publishing.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template, BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> destinationResolver) Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
. If the partition in theTopicPartition
is less than 0, no partition is set when publishing to the topic.- Parameters:
template
- theKafkaOperations
to use for publishing.destinationResolver
- the resolving function.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) Create an instance with the provided templates and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic. The templates map keys are classes and the value the corresponding template to use for objects (producer record values) of that type. ALinkedHashMap
is recommended when there is more than one template, to ensure the map is traversed in order. To send records with a null value, add a template with theVoid
class as a key; otherwise the first template from the map values iterator will be used.- Parameters:
templates
- theKafkaOperations
s to use for publishing.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates, BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> destinationResolver) Create an instance with the provided templates and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
. If the partition in theTopicPartition
is less than 0, no partition is set when publishing to the topic. The templates map keys are classes and the value the corresponding template to use for objects (producer record values) of that type. ALinkedHashMap
is recommended when there is more than one template, to ensure the map is traversed in order. To send records with a null value, add a template with theVoid
class as a key; otherwise the first template from the map values iterator will be used.- Parameters:
templates
- theKafkaOperations
s to use for publishing.destinationResolver
- the resolving function.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(Function<org.apache.kafka.clients.producer.ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver, boolean transactional, BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> destinationResolver) Create an instance with a template resolving function that receives the failed consumer record and the exception and returns aKafkaOperations
and a flag on whether or not the publishing from this instance will be transactional or not. Also receives a destination resolving function that works similarly but returns aTopicPartition
instead. If the partition in theTopicPartition
is less than 0, no partition is set when publishing to the topic.- Parameters:
templateResolver
- the function that resolver theKafkaOperations
to use for publishing.transactional
- whether or not publishing by this instance should be transactionaldestinationResolver
- the resolving function.- Since:
- 2.7
-
-
Method Details
-
setRetainExceptionHeader
public void setRetainExceptionHeader(boolean retainExceptionHeader) Set to true to retain a Java serializedDeserializationException
header. By default, such headers are removed from the published record, unless both key and value deserialization exceptions occur, in which case, the DLT_* headers are created from the value exception and the key exception header is retained.- Parameters:
retainExceptionHeader
- true to retain the- Since:
- 2.5
-
setHeadersFunction
public void setHeadersFunction(BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.header.Headers> headersFunction) Set a function which will be called to obtain additional headers to add to the published record. If aHeader
returned is an instance ofDeadLetterPublishingRecoverer.SingleRecordHeader
, then that header will replace any existing header of that name, rather than being appended as a new value.- Parameters:
headersFunction
- the headers function.- Since:
- 2.5.4
- See Also:
-
setVerifyPartition
public void setVerifyPartition(boolean verifyPartition) Set to false to disable partition verification. When true, verify that the partition returned by the resolver actually exists. If not, set theProducerRecord.partition()
to null, allowing the producer to determine the destination partition.- Parameters:
verifyPartition
- false to disable.- Since:
- 2.7
- See Also:
-
setPartitionInfoTimeout
Time to wait for partition information when verifying. Default is 5 seconds.- Parameters:
partitionInfoTimeout
- the timeout.- Since:
- 2.7
- See Also:
-
setAppendOriginalHeaders
public void setAppendOriginalHeaders(boolean appendOriginalHeaders) Set to false if you don't want to append the current "original" headers (topic, partition etc.) if they are already present. When false, only the first "original" headers are retained.- Parameters:
appendOriginalHeaders
- set to false not to replace.- Since:
- 2.7.9
-
setThrowIfNoDestinationReturned
public void setThrowIfNoDestinationReturned(boolean throwIfNoDestinationReturned) Set to true to throw an exception if the destination resolver function returns a null TopicPartition.- Parameters:
throwIfNoDestinationReturned
- true to enable.- Since:
- 2.7
-
setFailIfSendResultIsError
public void setFailIfSendResultIsError(boolean failIfSendResultIsError) Set to true to enable waiting for the send result and throw an exception if it fails. It will wait for the milliseconds specified in waitForSendResultTimeout for the result.- Parameters:
failIfSendResultIsError
- true to enable.- Since:
- 2.7
- See Also:
-
isFailIfSendResultIsError
protected boolean isFailIfSendResultIsError()If true, wait for the send result and throw an exception if it fails. It will wait for the milliseconds specified in waitForSendResultTimeout for the result.- Returns:
- true to wait.
- Since:
- 2.7.14
- See Also:
-
setWaitForSendResultTimeout
Set the minimum time to wait for message sending. Default is the producer configurationdelivery.timeout.ms
plus thesetTimeoutBuffer(long)
.- Parameters:
waitForSendResultTimeout
- the timeout.- Since:
- 2.7
- See Also:
-
setTimeoutBuffer
public void setTimeoutBuffer(long buffer) Set the number of milliseconds to add to the producer configurationdelivery.timeout.ms
property to avoid timing out before the Kafka producer. Default 5000.- Parameters:
buffer
- the buffer.- Since:
- 2.7
- See Also:
-
getTimeoutBuffer
protected long getTimeoutBuffer()The number of milliseconds to add to the producer configurationdelivery.timeout.ms
property to avoid timing out before the Kafka producer.- Returns:
- the buffer.
- Since:
- 2.7.14
-
setStripPreviousExceptionHeaders
public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeaders) Set to false to retain previous exception headers as well as headers for the current exception. Default is true, which means only the current headers are retained; setting it to false this can cause a growth in record size when a record is republished many times.- Parameters:
stripPreviousExceptionHeaders
- false to retain all.- Since:
- 2.7.9
-
setSkipSameTopicFatalExceptions
public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions) Set to false if you want to forward the record to the same topic even though the exception is fatal by this class' classification, e.g. to handle this scenario in a different layer.- Parameters:
skipSameTopicFatalExceptions
- false to forward in this scenario.
-
setExceptionHeadersCreator
public void setExceptionHeadersCreator(DeadLetterPublishingRecoverer.ExceptionHeadersCreator headersCreator) Set aDeadLetterPublishingRecoverer.ExceptionHeadersCreator
implementation to completely take over setting the exception headers in the output record. Disables all headers that are set by default.- Parameters:
headersCreator
- the creator.- Since:
- 2.8.4
-
isTransactional
protected boolean isTransactional()True if publishing should run in a transaction.- Returns:
- true for transactional.
- Since:
- 2.7.14
-
excludeHeader
Clear the header inclusion bit for the header name.- Parameters:
headers
- the headers to clear.- Since:
- 2.8.4
-
includeHeader
Set the header inclusion bit for the header name.- Parameters:
headers
- the headers to set.- Since:
- 2.8.4
-
addHeadersFunction
public void addHeadersFunction(BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.header.Headers> headersFunction) Add a function which will be called to obtain additional headers to add to the published record. Functions are called in the order that they are added, and after any function passed intosetHeadersFunction(BiFunction)
. If aHeader
returned is an instance ofDeadLetterPublishingRecoverer.SingleRecordHeader
, then that header will replace any existing header of that name, rather than being appended as a new value.- Parameters:
headersFunction
- the headers function.- Since:
- 2.8.4
- See Also:
-
accept
public void accept(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, @Nullable org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Exception exception) Description copied from interface:ConsumerAwareRecordRecoverer
Recover the record.- Specified by:
accept
in interfaceConsumerAwareRecordRecoverer
- Parameters:
record
- the record.consumer
- the consumer.exception
- the exception.
-
send
protected void send(org.apache.kafka.clients.producer.ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> inRecord) Send the record.- Parameters:
outRecord
- the record.kafkaTemplate
- the template.inRecord
- the consumer record.- Since:
- 2.7
-
createProducerRecord
protected org.apache.kafka.clients.producer.ProducerRecord<Object,Object> createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, @Nullable byte[] key, @Nullable byte[] value) Subclasses can override this method to customize the producer record to send to the DLQ. The default implementation simply copies the key and value from the consumer record and adds the headers. The timestamp is not set (the original timestamp is in one of the headers). IMPORTANT: if the partition in theTopicPartition
is less than 0, it must be set to null in theProducerRecord
.- Parameters:
record
- the failed recordtopicPartition
- theTopicPartition
returned by the destination resolver.headers
- the headers - original record headers plus DLT headers.key
- the key to use instead of the consumer record key.value
- the value to use instead of the consumer record value.- Returns:
- the producer record to send.
- See Also:
-
publish
protected void publish(org.apache.kafka.clients.producer.ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> inRecord) Override this if you want more than just logging of the send result.- Parameters:
outRecord
- the record to send.kafkaTemplate
- the template.inRecord
- the consumer record.- Since:
- 2.2.5
-
verifySendResult
protected void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate, org.apache.kafka.clients.producer.ProducerRecord<Object, Object> outRecord, @Nullable CompletableFuture<SendResult<Object, Object>> sendResult, org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> inRecord) Wait for the send future to complete.- Parameters:
kafkaTemplate
- the template used to send the record.outRecord
- the record.sendResult
- the future.inRecord
- the original consumer record.
-
determineSendTimeout
Determine the send timeout based on the template's producer factory andsetWaitForSendResultTimeout(Duration)
.- Parameters:
template
- the template.- Returns:
- the timeout.
- Since:
- 2.7.14
-
getHeaderNames
Override this if you want different header names to be used in the sent record.- Returns:
- the header names.
- Since:
- 2.7
-