Annotation Interface EnableKafkaRetryTopic


Enables the non-blocking topic-based delayed retries feature. To be used in Configuration classes as follows:
 @EnableKafkaRetryTopic
 @Configuration
 public class AppConfig {
 }

 @Component
 public class MyListener {

        @RetryableTopic(sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC, backOff = @BackOff(4000))
        @KafkaListener(topics = "myTopic")
        public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
                logger.info("Message {} received in topic {} ", message, receivedTopic);
        }

        @DltHandler
        public void dltHandler(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
                logger.info("Message {} received in dlt handler at topic {} ", message, receivedTopic);
        }
 }
 
Using this annotation configures the default RetryTopicConfigurationSupport bean. This annotation is meta-annotated with @EnableKafka so it is not necessary to specify both.

To configure the feature's components, extend the RetryTopicConfigurationSupport class and override the appropriate methods on a @Configuration class, such as:

 @Configuration
 @EnableKafka
 public class AppConfig extends RetryTopicConfigurationSupport {

        @Override
        protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
                blockingRetries
                        .retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
                        .backOff(new FixedBackOff(50, 3));
        }

        @Override
        protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
                nonBlockingFatalExceptions.add(MyNonBlockingException.class);
        }

        @Override
        protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
                // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
                customizersConfigurer.customizeErrorHandler(eh -> {
                        eh.setSeekAfterError(false);
                });
        }
 }
 
In this case, you should not use this annotation, use @EnableKafka instead.
Since:
2.9
Author:
Tomaz Fernandes