Class DeadLetterPublishingRecoverer
- java.lang.Object
-
- org.springframework.kafka.listener.DeadLetterPublishingRecoverer
-
- All Implemented Interfaces:
java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception>
,ConsumerRecordRecoverer
public class DeadLetterPublishingRecoverer extends java.lang.Object implements ConsumerRecordRecoverer
AConsumerRecordRecoverer
that publishes a failed record to a dead-letter topic.- Since:
- 2.2
- Author:
- Gary Russell
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessor
logger
-
Constructor Summary
Constructors Constructor Description DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates)
Create an instance with the provided templates and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record.DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided templates and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
.DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template)
Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record.DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
.DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template)
Deprecated.in favor ofDeadLetterPublishingRecoverer(KafkaOperations)
.DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Deprecated.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
accept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.Exception exception)
protected org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object>
createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, byte[] data, boolean isKey)
Subclasses can override this method to customize the producer record to send to the DLQ.protected void
publish(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)
Override this if you want more than just logging of the send result.void
setHeadersFunction(java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.header.Headers> headersFunction)
Set a function which will be called to obtain additional headers to add to the published record.void
setRetainExceptionHeader(boolean retainExceptionHeader)
Set to true to retain a Java serializedDeserializationException
header.
-
-
-
Constructor Detail
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template)
Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic.- Parameters:
template
- theKafkaOperations
to use for publishing.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
. If the partition in theTopicPartition
is less than 0, no partition is set when publishing to the topic.- Parameters:
template
- theKafkaOperations
to use for publishing.destinationResolver
- the resolving function.
-
DeadLetterPublishingRecoverer
@Deprecated public DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template)
Deprecated.in favor ofDeadLetterPublishingRecoverer(KafkaOperations)
.Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic.- Parameters:
template
- theKafkaTemplate
to use for publishing.
-
DeadLetterPublishingRecoverer
@Deprecated public DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Deprecated.Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
. If the partition in theTopicPartition
is less than 0, no partition is set when publishing to the topic.- Parameters:
template
- theKafkaOperations
to use for publishing.destinationResolver
- the resolving function.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates)
Create an instance with the provided templates and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic. The templates map keys are classes and the value the corresponding template to use for objects (producer record values) of that type. ALinkedHashMap
is recommended when there is more than one template, to ensure the map is traversed in order. To send records with a null value, add a template with theVoid
class as a key; otherwise the first template from the map values iterator will be used.- Parameters:
templates
- theKafkaOperations
s to use for publishing.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided templates and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition
. If the partition in theTopicPartition
is less than 0, no partition is set when publishing to the topic. The templates map keys are classes and the value the corresponding template to use for objects (producer record values) of that type. ALinkedHashMap
is recommended when there is more than one template, to ensure the map is traversed in order. To send records with a null value, add a template with theVoid
class as a key; otherwise the first template from the map values iterator will be used.- Parameters:
templates
- theKafkaOperations
s to use for publishing.destinationResolver
- the resolving function.
-
-
Method Detail
-
setRetainExceptionHeader
public void setRetainExceptionHeader(boolean retainExceptionHeader)
Set to true to retain a Java serializedDeserializationException
header. By default, such headers are removed from the published record, unless both key and value deserialization exceptions occur, in which case, the DLT_* headers are created from the value exception and the key exception header is retained.- Parameters:
retainExceptionHeader
- true to retain the- Since:
- 2.5
-
setHeadersFunction
public void setHeadersFunction(java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.header.Headers> headersFunction)
Set a function which will be called to obtain additional headers to add to the published record.- Parameters:
headersFunction
- the headers function.- Since:
- 2.5.4
-
accept
public void accept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.Exception exception)
- Specified by:
accept
in interfacejava.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception>
-
createProducerRecord
protected org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, @Nullable byte[] data, boolean isKey)
Subclasses can override this method to customize the producer record to send to the DLQ. The default implementation simply copies the key and value from the consumer record and adds the headers. The timestamp is not set (the original timestamp is in one of the headers). IMPORTANT: if the partition in theTopicPartition
is less than 0, it must be set to null in theProducerRecord
.- Parameters:
record
- the failed recordtopicPartition
- theTopicPartition
returned by the destination resolver.headers
- the headers - original record headers plus DLT headers.data
- the value to use instead of the consumer record value.isKey
- true if key deserialization failed.- Returns:
- the producer record to send.
- See Also:
KafkaHeaders
-
publish
protected void publish(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)
Override this if you want more than just logging of the send result.- Parameters:
outRecord
- the record to send.kafkaTemplate
- the template.- Since:
- 2.2.5
-
-