Dead-Letter Topic Processing
Enabling DLQ
To enable DLQ, a Kafka binder based applications must provide a consumer group via the property spring.cloud.stream.bindings.<binding-name>.group
.
Anonymous consumer groups (i.e, where the application does not explicitly provide a group) cannot enable the DLQ feature.
When an application wants to send the record in error to a DLQ topic, that application must enable the DLQ feature, since this is not enabled by default.
To enable DLQ, the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
must be set to true.
When DLQ is enabled, then after an error occurs from processing and all the retries are exhausted based on the spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
property, then that record will be sent to the DLQ topic.
By default, the max-attempts
property is set to three.
When max-attempts
property is greater than 1
, and dlq is enabled, then you will see that the retries are honoring the max-attempts
property.
When no dlq is enabled (which is the default), then the max-attempts
property does not have any bearing in the way how retries are handled.
In that case, the retries will fall back to the container defaults in Spring for Apache Kafka, which is 10
retries.
If an application wants to disable retries altogether when DLQ is disabled, then setting max-attempts
property to 1
will not work.
To completely disable retries in that case, you need to provide a ListenerContainerCustomizer
and then use appropriate Backoff
settings.
Here is an example.
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
With this, the default container behavior will be disabled and no retries will be attempted. As noted above, when enabling DLQ, the binder settings will have precedence.
Handling Records in a Dead-Letter Topic
Because the framework cannot anticipate how users would want to dispose of dead-lettered messages, it does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic. However, if the problem is a permanent issue, that could cause an infinite loop. The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a “parking lot” topic after three attempts. The application is another spring-cloud-stream application that reads from the dead-letter topic. It exits when no messages are received for 5 seconds.
The examples assume the original destination is so8400out
and the consumer group is so8400
.
There are a couple of strategies to consider:
-
Consider running the rerouting only when the main application is not running. Otherwise, the retries for transient errors are used up very quickly.
-
Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.
The following code listings show the sample application:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}