Class RetryTopicConfigurationSupport
- java.lang.Object
-
- org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport
-
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.SmartInitializingSingleton,org.springframework.context.ApplicationContextAware
public class RetryTopicConfigurationSupport extends java.lang.Object implements org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.SmartInitializingSingletonThis is the main class providing the configuration behind the non-blocking, topic-based delayed retries feature. It is typically imported by adding@EnableKafkaRetryTopicto an application@Configurationclass. An alternative more advanced option is to extend directly from this class and override methods as necessary, remembering to add@Configurationto the subclass and@Beanto overridden@Beanmethods. For more details see the javadoc of@EnableRetryTopic.- Since:
- 2.9
- Author:
- Tomaz Fernandes, Gary Russell
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classRetryTopicConfigurationSupport.BlockingRetriesConfigurerConfigure blocking retries to be used along non-blocking.static classRetryTopicConfigurationSupport.CustomizersConfigurerConfigure customizers for components instantiated by the retry topics feature.
-
Constructor Summary
Constructors Constructor Description RetryTopicConfigurationSupport()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidafterSingletonsInstantiated()protected voidconfigureBlockingRetries(RetryTopicConfigurationSupport.BlockingRetriesConfigurer blockingRetries)Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOffto be used.protected voidconfigureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer)Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer,DeadLetterPublishingRecovererandDefaultErrorHandler.protected java.util.function.Consumer<DeadLetterPublishingRecovererFactory>configureDeadLetterPublishingContainerFactory()Override this method to further configure theDeadLetterPublishingRecovererFactory.protected java.util.function.Consumer<DestinationTopicResolver>configureDestinationTopicResolver()Override this method to configure theDestinationTopicResolver.protected java.util.function.Consumer<ListenerContainerFactoryConfigurer>configureListenerContainerFactoryConfigurer()Override this method to further configure theListenerContainerFactoryConfigurer.protected java.util.function.Consumer<RetryTopicConfigurer>configureRetryTopicConfigurer()Override this method if you need to configure theRetryTopicConfigurer.protected RetryTopicComponentFactorycreateComponentFactory()Override this method to provide a subclass ofRetryTopicComponentFactorywith different component implementations or subclasses.DestinationTopicResolverdestinationTopicResolver()Return a globalDestinationTopicResolverfor resolving theDestinationTopicto which a givenConsumerRecordshould be sent for retry.KafkaConsumerBackoffManagerkafkaConsumerBackoffManager(org.springframework.context.ApplicationContext applicationContext, ListenerContainerRegistry registry, RetryTopicSchedulerWrapper wrapper, org.springframework.scheduling.TaskScheduler taskScheduler)Create theKafkaConsumerBackoffManagerinstance that will be used to back off partitions.protected voidmanageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>> nonBlockingRetriesExceptions)Override this method to manage non-blocking retries fatal exceptions.RetryTopicConfigurerretryTopicConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, DestinationTopicResolver destinationTopicResolver, org.springframework.beans.factory.BeanFactory beanFactory)Return a globalRetryTopicConfigurerfor configuring retry topics forKafkaListenerEndpointinstances with a correspondingRetryTopicConfiguration.voidsetApplicationContext(org.springframework.context.ApplicationContext applicationContext)
-
-
-
Method Detail
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException- Specified by:
setApplicationContextin interfaceorg.springframework.context.ApplicationContextAware- Throws:
org.springframework.beans.BeansException
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()
- Specified by:
afterSingletonsInstantiatedin interfaceorg.springframework.beans.factory.SmartInitializingSingleton
-
retryTopicConfigurer
@Bean(name="org.springframework.kafka.retrytopic.internalRetryTopicConfigurer") public RetryTopicConfigurer retryTopicConfigurer(@Qualifier("org.springframework.kafka.config.internalKafkaConsumerBackOffManager") KafkaConsumerBackoffManager kafkaConsumerBackoffManager, @Qualifier("org.springframework.kafka.retrytopic.internalDestinationTopicResolver") DestinationTopicResolver destinationTopicResolver, org.springframework.beans.factory.BeanFactory beanFactory)
Return a globalRetryTopicConfigurerfor configuring retry topics forKafkaListenerEndpointinstances with a correspondingRetryTopicConfiguration. To configure it, consider overriding theconfigureRetryTopicConfigurer().- Parameters:
kafkaConsumerBackoffManager- the globalKafkaConsumerBackoffManager.destinationTopicResolver- the globalDestinationTopicResolver.beanFactory- theBeanFactory.- Returns:
- the instance.
- See Also:
KafkaListenerAnnotationBeanPostProcessor
-
configureRetryTopicConfigurer
protected java.util.function.Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer()
Override this method if you need to configure theRetryTopicConfigurer.- Returns:
- a
RetryTopicConfigurerconsumer.
-
configureDeadLetterPublishingContainerFactory
protected java.util.function.Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory()
Override this method to further configure theDeadLetterPublishingRecovererFactory.- Returns:
- a
DeadLetterPublishingRecovererFactoryconsumer.
-
configureListenerContainerFactoryConfigurer
protected java.util.function.Consumer<ListenerContainerFactoryConfigurer> configureListenerContainerFactoryConfigurer()
Override this method to further configure theListenerContainerFactoryConfigurer.- Returns:
- a
ListenerContainerFactoryConfigurerconsumer.
-
configureBlockingRetries
protected void configureBlockingRetries(RetryTopicConfigurationSupport.BlockingRetriesConfigurer blockingRetries)
Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOffto be used.- Parameters:
blockingRetries- aRetryTopicConfigurationSupport.BlockingRetriesConfigurer.
-
manageNonBlockingFatalExceptions
protected void manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>> nonBlockingRetriesExceptions)
Override this method to manage non-blocking retries fatal exceptions. Records which processing throws an exception present in this list will be forwarded directly to the DLT, if one is configured, or stop being processed otherwise.- Parameters:
nonBlockingRetriesExceptions- aListof fatal exceptions containing the framework defaults.
-
configureCustomizers
protected void configureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer)
Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer,DeadLetterPublishingRecovererandDefaultErrorHandler.- Parameters:
customizersConfigurer- aRetryTopicConfigurationSupport.CustomizersConfigurer.
-
destinationTopicResolver
@Bean(name="org.springframework.kafka.retrytopic.internalDestinationTopicResolver") public DestinationTopicResolver destinationTopicResolver()
Return a globalDestinationTopicResolverfor resolving theDestinationTopicto which a givenConsumerRecordshould be sent for retry. To configure it, consider overriding one of these other more fine-grained methods:manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>>)to configure non-blocking retries.configureDestinationTopicResolver()to further customize the component.createComponentFactory()to provide a subclass instance.
- Returns:
- the instance.
-
configureDestinationTopicResolver
protected java.util.function.Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
Override this method to configure theDestinationTopicResolver.- Returns:
- a
DestinationTopicResolverconsumer.
-
kafkaConsumerBackoffManager
@Bean(name="org.springframework.kafka.config.internalKafkaConsumerBackOffManager") public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(org.springframework.context.ApplicationContext applicationContext, @Qualifier("org.springframework.kafka.config.internalKafkaListenerEndpointRegistry") ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable org.springframework.scheduling.TaskScheduler taskScheduler)
Create theKafkaConsumerBackoffManagerinstance that will be used to back off partitions. To provide a custom implementation, either override this method, or override theRetryTopicComponentFactory.kafkaBackOffManagerFactory(org.springframework.kafka.listener.ListenerContainerRegistry, org.springframework.context.ApplicationContext)method and return a differentKafkaBackOffManagerFactory.- Parameters:
applicationContext- the application context.registry- theListenerContainerRegistryto be used to fetch theMessageListenerContainerat runtime to be backed off.wrapper- aRetryTopicSchedulerWrapper.taskScheduler- aTaskScheduler.- Returns:
- the instance.
-
createComponentFactory
protected RetryTopicComponentFactory createComponentFactory()
Override this method to provide a subclass ofRetryTopicComponentFactorywith different component implementations or subclasses.- Returns:
- the instance.
-
-