Dead-Letter Topic Processing
Dead-Letter Topic Partition Selection
By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.
To change this behavior, add a DlqPartitionFunction
implementation as a @Bean
to the application context.
Only one such bean can be present.
The function is provided with the consumer group, the failed ConsumerRecord
and the exception.
For example, if you always want to route to partition 0, you might use:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1 ), there is no need to supply a DlqPartitionFunction ; the framework will always use partition 0.
If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1 ), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.
|
It is also possible to define a custom name for the DLQ topic.
In order to do so, create an implementation of DlqDestinationResolver
as a @Bean
to the application context.
When the binder detects such a bean, that takes precedence, otherwise it will use the dlqName
property.
If neither of these are found, it will default to error.<destination>.<group>
.
Here is an example of DlqDestinationResolver
as a @Bean
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
One important thing to keep in mind when providing an implementation for DlqDestinationResolver
is that the provisioner in the binder will not auto create topics for the application.
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.
Handling Records in a Dead-Letter Topic
Because the framework cannot anticipate how users would want to dispose of dead-lettered messages, it does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic. However, if the problem is a permanent issue, that could cause an infinite loop. The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a “parking lot” topic after three attempts. The application is another spring-cloud-stream application that reads from the dead-letter topic. It terminates when no messages are received for 5 seconds.
The examples assume the original destination is so8400out
and the consumer group is so8400
.
There are a couple of strategies to consider:
-
Consider running the rerouting only when the main application is not running. Otherwise, the retries for transient errors are used up very quickly.
-
Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.
The following code listings show the sample application:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}