This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0! |
Retry and Dead Letter Processing
By default, when you configure retry (e.g. maxAttempts
) and enableDlq
in a consumer binding, these functions are performed within the binder, with no participation by the listener container or Kafka consumer.
There are situations where it is preferable to move this functionality to the listener container, such as:
-
The aggregate of retries and delays will exceed the consumer’s
max.poll.interval.ms
property, potentially causing a partition rebalance. -
You wish to publish the dead letter to a different Kafka cluster.
-
You wish to add retry listeners to the error handler.
-
…
To configure moving this functionality from the binder to the container, define a @Bean
of type ListenerContainerWithDlqAndRetryCustomizer
.
This interface has the following methods:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
The destination resolver and BackOff
are created from the binding properties (if configured). The KafkaTemplate
uses configuration from spring.kafka….
properties. You can then use these to create a custom error handler and dead letter publisher; for example:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
Now, only a single retry delay needs to be greater than the consumer’s max.poll.interval.ms
property.
When working with several binders, the 'ListenerContainerWithDlqAndRetryCustomizer' bean gets overridden by the 'DefaultBinderFactory'. For the bean to apply, you need to use a 'BinderCustomizer' to set the container customizer (See [binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}