Class RetryTopicConfigurer

java.lang.Object
org.springframework.kafka.retrytopic.RetryTopicConfigurer
All Implemented Interfaces:
Aware, BeanFactoryAware

public class RetryTopicConfigurer extends Object implements BeanFactoryAware

Configures main, retry and DLT topics based on a main endpoint and provided configurations to accomplish a distributed retry / DLT pattern in a non-blocking fashion, at the expense of ordering guarantees.

To illustrate, if you have a "main-topic" topic, and want an exponential backoff of 1000ms with a multiplier of 2 and 3 retry attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics. The configuration can be achieved using a RetryTopicConfigurationBuilder to create one or more RetryTopicConfigurer beans, or by using the RetryableTopic annotation. More details on usage below.

How it works:

If a message processing throws an exception, the configured DefaultErrorHandler and DeadLetterPublishingRecoverer forwards the message to the next topic, using a DestinationTopicResolver to know the next topic and the delay for it.

Each forwarded record has a back off timestamp header and, if consumption is attempted by the KafkaBackoffAwareMessageListenerAdapter before that time, the partition consumption is paused by a KafkaConsumerBackoffManager and a KafkaBackoffException is thrown.

When the partition has been idle for the amount of time specified in the ContainerProperties' idlePartitionEventInterval property. property, a ListenerContainerPartitionIdleEvent is published, which the KafkaConsumerBackoffManager listens to in order to check whether or not it should unpause the partition.

If, when consumption is resumed, processing fails again, the message is forwarded to the next topic and so on, until it gets to the dlt.

Considering Kafka's partition ordering guarantees, and each topic having a fixed delay time, we know that the first message consumed in a given retry topic partition will be the one with the earliest backoff timestamp for that partition, so by pausing the partition we know we're not delaying message processing in other partitions longer than necessary.

Usages:

There are two main ways for configuring the endpoints. The first is by providing one or more Beans in a Configuration annotated class, such as:

     @Bean
     public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .create(template);
      }
 

This will create retry and dlt topics for all topics in methods annotated with KafkaListener, as well as its consumers, using the default configurations. If message processing fails it will forward the message to the next topic until it gets to the DLT topic. A KafkaOperations instance is required for message forwarding.

For more fine-grained control over how to handle retrials for each topic, more then one bean can be provided, such as:

     @Bean
     public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .fixedBackOff(3000)
                 .maxAttempts(5)
                 .includeTopics("my-topic", "my-other-topic")
                 .create(template);
         }
 
           @Bean
     public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .exponentialBackoff(1000, 2, 5000)
                 .maxAttempts(4)
                 .excludeTopics("my-topic", "my-other-topic")
                 .retryOn(MyException.class)
                 .create(template);
         }
 

Some other options include: auto-creation of topics, backoff, retryOn / notRetryOn / transversing as in RetryTemplate, single-topic fixed backoff processing, custom dlt listener beans, custom topic suffixes and providing specific listenerContainerFactories.

The other, non-exclusive way to configure the endpoints is through the convenient RetryableTopic annotation, that can be placed on any KafkaListener annotated methods, directly, such as:

     @RetryableTopic(attempts = 3,
                backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     @KafkaListener(topics = "my-annotated-topic")
     public void processMessage(MyPojo message) {
                        // ... message processing
     }

Since 3.2 , RetryableTopic annotation support KafkaListener annotated class, such as:

     @RetryableTopic(attempts = 3,
                backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     @KafkaListener(topics = "my-annotated-topic")
     static class ListenerBean {
          @KafkaHandler
         public void processMessage(MyPojo message) {
                        // ... message processing
         }
     }

Since 3.2, RetryableTopic annotation supports KafkaListener annotated class, such as:

     @RetryableTopic(attempts = 3,
                backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     @KafkaListener(topics = "my-annotated-topic")
     static class ListenerBean {
          @KafkaHandler
         public void processMessage(MyPojo message) {
                        // ... message processing
         }
     }

Or through meta-annotations, such as:

     @RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     public @interface WithExponentialBackoffRetry {
             @AliasFor(attribute = "attempts", annotation = RetryableTopic.class)
                String retries();
     }

     @WithExponentialBackoffRetry(retries = "3")
     @KafkaListener(topics = "my-annotated-topic")
     public void processMessage(MyPojo message) {
                        // ... message processing
     }

The same configurations are available in the annotation and the builder approaches, and both can be used concurrently. In case the same method / topic can be handled by both, the annotation takes precedence.

DLT Handling:

The DLT handler method can be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) method, providing the class and method name that should handle the DLT topic. If a bean instance of this type is found in the BeanFactory it is the instance used. If not an instance is created. The class can use dependency injection as a normal bean.

     @Bean
     public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
                 .create(template);
     }

     @Component
     public class MyCustomDltProcessor {

                public void processDltMessage(MyPojo message) {
               // ... message processing, persistence, etc
                }
     }
 
The other way to provide the DLT handler method is through the DltHandler annotation, that should be used within the same class as the correspondent KafkaListener.
            @DltHandler
       public void processMessage(MyPojo message) {
                        // ... message processing, persistence, etc
       }
If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used.
Since:
2.7
Author:
Tomaz Fernandes, Fabio da Silva Jr., Gary Russell, Wang Zhiyang, Borahm Lee
See Also: