Class RetryTopicConfigurer
- java.lang.Object
- 
- org.springframework.kafka.retrytopic.RetryTopicConfigurer
 
- 
- All Implemented Interfaces:
- org.springframework.beans.factory.Aware,- org.springframework.beans.factory.BeanFactoryAware
 
 public class RetryTopicConfigurer extends java.lang.Object implements org.springframework.beans.factory.BeanFactoryAwareConfigures main, retry and DLT topics based on a main endpoint and provided configurations to acomplish a distributed retry / DLT pattern in a non-blocking fashion, at the expense of ordering guarantees. To illustrate, if you have a "main-topic" topic, and want an exponential backoff of 1000ms with a multiplier of 2 and 3 retry attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics. The configuration can be achieved using a RetryTopicConfigurationBuilderto create one or moreRetryTopicConfigurerbeans, or by using theRetryableTopicannotation. More details on usage below.How it works: If a message processing throws an exception, the configured DefaultErrorHandlerandDeadLetterPublishingRecovererforwards the message to the next topic, using aDestinationTopicResolverto know the next topic and the delay for it.Each forwareded record has a back off timestamp header and, if consumption is attempted by the KafkaBackoffAwareMessageListenerAdapterbefore that time, the partition consumption is paused by aKafkaConsumerBackoffManagerand aKafkaBackoffExceptionis thrown.When the partition has been idle for the amount of time specified in the ContainerProperties' idlePartitionEventInterval property. property, a ListenerContainerPartitionIdleEventis published, which theKafkaConsumerBackoffManagerlistens to in order to check whether or not it should unpause the partition.If, when consumption is resumed, processing fails again, the message is forwarded to the next topic and so on, until it gets to the dlt. Considering Kafka's partition ordering guarantees, and each topic having a fixed delay time, we know that the first message consumed in a given retry topic partition will be the one with the earliest backoff timestamp for that partition, so by pausing the partition we know we're not delaying message processing in other partitions longer than necessary. Usages: There are two main ways for configuring the endpoints. The first is by providing one or more Beans in aConfigurationannotated class, such as:@Beanpublic RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) { return RetryTopicConfigurationBuilder .newInstance() .create(template); }This will create retry and dlt topics for all topics in methods annotated with KafkaListener, as well as its consumers, using the default configurations. If message processing fails it will forward the message to the next topic until it gets to the DLT topic. AKafkaOperationsinstance is required for message forwarding.For more fine-grained control over how to handle retrials for each topic, more then one bean can be provided, such as: @Bean public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) { return RetryTopicConfigurationBuilder .newInstance() .fixedBackoff(3000) .maxAttempts(5) .includeTopics("my-topic", "my-other-topic") .create(template); }@Bean public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) { return RetryTopicConfigurationBuilder .newInstance() .exponentialBackoff(1000, 2, 5000) .maxAttempts(4) .excludeTopics("my-topic", "my-other-topic") .retryOn(MyException.class) .create(template); }Some other options include: auto-creation of topics, backoff, retryOn / notRetryOn / transversing as in RetryTemplate, single-topic fixed backoff processing, custom dlt listener beans, custom topic suffixes and providing specific listenerContainerFactories.The other, non-exclusive way to configure the endpoints is through the convenient RetryableTopicannotation, that can be placed on anyKafkaListenerannotated methods, such as:@RetryableTopic(attempts = 3, backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))@KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing }The same configurations are available in the annotation and the builder approaches, and both can be used concurrently. In case the same method / topic can be handled by both, the annotation takes precedence. DLT Handling: The DLT handler method can be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)method, providing the class and method name that should handle the DLT topic. If a bean instance of this type is found in theBeanFactoryit is the instance used. If not an instance is created. The class can use dependency injection as a normal bean.@Bean public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) { return RetryTopicConfigurationBuilder .newInstance() .dltProcessor(MyCustomDltProcessor.class, "processDltMessage") .create(template); }@Component public class MyCustomDltProcessor { public void processDltMessage(MyPojo message) { // ... message processing, persistence, etc } }DltHandlerannotation, that should be used within the same class as the correspondentKafkaListener.@DltHandler public void processMessage(MyPojo message) { // ... message processing, persistence, etc }RetryTopicConfigurer.LoggingDltListenerHandlerMethodis used.- Since:
- 2.7
- Author:
- Tomaz Fernandes
- See Also:
- RetryTopicConfigurationBuilder,- RetryableTopic,- KafkaListener,- Backoff,- DefaultErrorHandler,- DeadLetterPublishingRecoverer
 
- 
- 
Nested Class SummaryNested Classes Modifier and Type Class Description static interfaceRetryTopicConfigurer.EndpointProcessor
 - 
Field SummaryFields Modifier and Type Field Description static EndpointHandlerMethodDEFAULT_DLT_HANDLERThe default method to handle messages in the DLT.
 - 
Constructor SummaryConstructors Constructor Description RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, org.springframework.beans.factory.BeanFactory beanFactory)Deprecated.RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, org.springframework.beans.factory.BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory)Deprecated.RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory)Create an instance with the provided properties.
 - 
Method SummaryAll Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected EndpointCustomizercreateEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties)static EndpointHandlerMethodcreateHandlerMethodWith(java.lang.Object bean, java.lang.reflect.Method method)static EndpointHandlerMethodcreateHandlerMethodWith(java.lang.Object beanOrClass, java.lang.String methodName)protected voidcreateNewTopicBeans(java.util.Collection<java.lang.String> topics, org.springframework.kafka.retrytopic.RetryTopicConfiguration.TopicCreation config)protected EndpointHandlerMethodgetEndpointHandlerMethod(MethodKafkaListenerEndpoint<?,?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props)voidprocessMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?,?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, KafkaListenerContainerFactory<?> factory, java.lang.String defaultContainerFactoryBeanName)Entrypoint for creating and configuring the retry and dlt endpoints, as well as the container factory that will create the corresponding listenerContainer.voidsetBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory)
 
- 
- 
- 
Field Detail- 
DEFAULT_DLT_HANDLERpublic static final EndpointHandlerMethod DEFAULT_DLT_HANDLER The default method to handle messages in the DLT.
 
- 
 - 
Constructor Detail- 
RetryTopicConfigurer@Deprecated public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, org.springframework.beans.factory.BeanFactory beanFactory) Deprecated.
 - 
RetryTopicConfigurer@Deprecated public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, org.springframework.beans.factory.BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) Deprecated.Create an instance with the provided properties.- Parameters:
- destinationTopicProcessor- the destination topic processor.
- containerFactoryResolver- the container factory resolver.
- listenerContainerFactoryConfigurer- the container factory configurer.
- beanFactory- the bean factory.
- retryTopicNamesProviderFactory- the retry topic names factory.
 
 - 
RetryTopicConfigurer@Autowired public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) Create an instance with the provided properties.- Parameters:
- destinationTopicProcessor- the destination topic processor.
- containerFactoryResolver- the container factory resolver.
- listenerContainerFactoryConfigurer- the container factory configurer.
- retryTopicNamesProviderFactory- the retry topic names factory.
 
 
- 
 - 
Method Detail- 
processMainAndRetryListenerspublic void processMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?,?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, @Nullable KafkaListenerContainerFactory<?> factory, java.lang.String defaultContainerFactoryBeanName) Entrypoint for creating and configuring the retry and dlt endpoints, as well as the container factory that will create the corresponding listenerContainer.- Parameters:
- endpointProcessor- function that will process the endpoints processListener method.
- mainEndpoint- the endpoint based on which retry and dlt endpoints are also created and processed.
- configuration- the configuration for the topic.
- registrar- The- KafkaListenerEndpointRegistrarthat will register the endpoints.
- factory- The factory provided in the- KafkaListener
- defaultContainerFactoryBeanName- The default factory bean name for the- KafkaListener
 
 - 
getEndpointHandlerMethodprotected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?,?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) 
 - 
createNewTopicBeansprotected void createNewTopicBeans(java.util.Collection<java.lang.String> topics, org.springframework.kafka.retrytopic.RetryTopicConfiguration.TopicCreation config)
 - 
createEndpointCustomizerprotected EndpointCustomizer createEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) 
 - 
createHandlerMethodWithpublic static EndpointHandlerMethod createHandlerMethodWith(java.lang.Object beanOrClass, java.lang.String methodName) 
 - 
createHandlerMethodWithpublic static EndpointHandlerMethod createHandlerMethodWith(java.lang.Object bean, java.lang.reflect.Method method) 
 - 
setBeanFactorypublic void setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory) throws org.springframework.beans.BeansException- Specified by:
- setBeanFactoryin interface- org.springframework.beans.factory.BeanFactoryAware
- Throws:
- org.springframework.beans.BeansException
 
 
- 
 
-