public class DeadLetterPublishingRecoverer extends java.lang.Object implements ConsumerRecordRecoverer
ConsumerRecordRecoverer
that publishes a failed record to a dead-letter
topic.Constructor and Description |
---|
DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template)
Create an instance with the provided template and a default destination resolving
function that returns a TopicPartition based on the original topic (appended with ".DLT")
from the failed record, and the same partition as the failed record.
|
DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template,
java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided template and destination resolving function,
that receives the failed consumer record and the exception and returns a
TopicPartition . |
DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object>> templates)
Create an instance with the provided templates and a default destination resolving
function that returns a TopicPartition based on the original topic (appended with
".DLT") from the failed record, and the same partition as the failed record.
|
DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object>> templates,
java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided templates and destination resolving function,
that receives the failed consumer record and the exception and returns a
TopicPartition . |
Modifier and Type | Method and Description |
---|---|
void |
accept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
java.lang.Exception exception) |
protected org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> |
createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
org.apache.kafka.common.TopicPartition topicPartition,
org.apache.kafka.common.header.internals.RecordHeaders headers,
byte[] data,
boolean isKey)
Subclasses can override this method to customize the producer record to send to the
DLQ.
|
protected void |
publish(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord,
KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)
Override this if you want more than just logging of the send result.
|
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template)
template
- the KafkaTemplate
to use for publishing.public DeadLetterPublishingRecoverer(KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
TopicPartition
. If the partition in the TopicPartition
is less than
0, no partition is set when publishing to the topic.template
- the KafkaTemplate
to use for publishing.destinationResolver
- the resolving function.public DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object>> templates)
LinkedHashMap
is recommended when there is more than one
template, to ensure the map is traversed in order.templates
- the KafkaTemplate
s to use for publishing.public DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaTemplate<? extends java.lang.Object,? extends java.lang.Object>> templates, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
TopicPartition
. If the partition in the TopicPartition
is less than
0, no partition is set when publishing to the topic. The templates map keys are
classes and the value the corresponding template to use for objects (producer
record values) of that type. A LinkedHashMap
is recommended when
there is more than one template, to ensure the map is traversed in order.templates
- the KafkaTemplate
s to use for publishing.destinationResolver
- the resolving function.public void accept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.Exception exception)
accept
in interface java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception>
protected org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.internals.RecordHeaders headers, @Nullable byte[] data, boolean isKey)
TopicPartition
is
less than 0, it must be set to null in the ProducerRecord
.record
- the failed recordtopicPartition
- the TopicPartition
returned by the destination
resolver.headers
- the headers - original record headers plus DLT headers.data
- the value to use instead of the consumer record value.isKey
- true if key deserialization failed.KafkaHeaders
protected void publish(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)
outRecord
- the record to send.kafkaTemplate
- the template.