Class RetryTopicConfigurationSupport
java.lang.Object
org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport
- All Implemented Interfaces:
Aware,SmartInitializingSingleton,ApplicationContextAware
public class RetryTopicConfigurationSupport
extends Object
implements ApplicationContextAware, SmartInitializingSingleton
This is the main class providing the configuration behind the non-blocking,
topic-based delayed retries feature. It is typically imported by adding
@EnableKafkaRetryTopic to an application
@Configuration class. An alternative more advanced option
is to extend directly from this class and override methods as necessary, remembering
to add @Configuration to the subclass and @Bean
to overridden @Bean methods. For more details see the javadoc of
@EnableRetryTopic.- Since:
- 2.9
- Author:
- Tomaz Fernandes, Gary Russell
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classConfigure blocking retries to be used along non-blocking.static classConfigure customizers for components instantiated by the retry topics feature. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidprotected voidOverride this method to configure blocking retries parameters such as exceptions to be retried and theBackOffto be used.protected voidconfigureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer) Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer,DeadLetterPublishingRecovererandDefaultErrorHandler.protected Consumer<DeadLetterPublishingRecovererFactory>Override this method to further configure theDeadLetterPublishingRecovererFactory.protected Consumer<DestinationTopicResolver>Override this method to configure theDestinationTopicResolver.protected Consumer<ListenerContainerFactoryConfigurer>Override this method to further configure theListenerContainerFactoryConfigurer.protected Consumer<RetryTopicConfigurer>Override this method if you need to configure theRetryTopicConfigurer.protected RetryTopicComponentFactoryOverride this method to provide a subclass ofRetryTopicComponentFactorywith different component implementations or subclasses.destinationTopicResolver(ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider) Return a globalDestinationTopicResolverfor resolving theDestinationTopicto which a givenConsumerRecordshould be sent for retry.kafkaConsumerBackoffManager(ApplicationContext applicationContext, ListenerContainerRegistry registry, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, RetryTopicSchedulerWrapper wrapper, TaskScheduler taskScheduler) Create theKafkaConsumerBackoffManagerinstance that will be used to back off partitions.protected voidmanageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingRetriesExceptions) Override this method to manage non-blocking retries fatal exceptions.retryTopicConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, DestinationTopicResolver destinationTopicResolver, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, BeanFactory beanFactory) Return a globalRetryTopicConfigurerfor configuring retry topics forKafkaListenerEndpointinstances with a correspondingRetryTopicConfiguration.voidsetApplicationContext(ApplicationContext applicationContext)
-
Constructor Details
-
RetryTopicConfigurationSupport
public RetryTopicConfigurationSupport()
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContextin interfaceApplicationContextAware- Throws:
BeansException
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()- Specified by:
afterSingletonsInstantiatedin interfaceSmartInitializingSingleton
-
retryTopicConfigurer
@Bean(name="org.springframework.kafka.retrytopic.internalRetryTopicConfigurer") public RetryTopicConfigurer retryTopicConfigurer(@Qualifier("org.springframework.kafka.config.internalKafkaConsumerBackOffManager") KafkaConsumerBackoffManager kafkaConsumerBackoffManager, @Qualifier("org.springframework.kafka.retrytopic.internalDestinationTopicResolver") DestinationTopicResolver destinationTopicResolver, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, BeanFactory beanFactory) Return a globalRetryTopicConfigurerfor configuring retry topics forKafkaListenerEndpointinstances with a correspondingRetryTopicConfiguration. To configure it, consider overriding theconfigureRetryTopicConfigurer().- Parameters:
kafkaConsumerBackoffManager- the globalKafkaConsumerBackoffManager.destinationTopicResolver- the globalDestinationTopicResolver.componentFactoryProvider- the component factory provider.beanFactory- theBeanFactory.- Returns:
- the instance.
- See Also:
-
configureRetryTopicConfigurer
Override this method if you need to configure theRetryTopicConfigurer.- Returns:
- a
RetryTopicConfigurerconsumer.
-
configureDeadLetterPublishingContainerFactory
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory()Override this method to further configure theDeadLetterPublishingRecovererFactory.- Returns:
- a
DeadLetterPublishingRecovererFactoryconsumer.
-
configureListenerContainerFactoryConfigurer
protected Consumer<ListenerContainerFactoryConfigurer> configureListenerContainerFactoryConfigurer()Override this method to further configure theListenerContainerFactoryConfigurer.- Returns:
- a
ListenerContainerFactoryConfigurerconsumer.
-
configureBlockingRetries
protected void configureBlockingRetries(RetryTopicConfigurationSupport.BlockingRetriesConfigurer blockingRetries) Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOffto be used.- Parameters:
blockingRetries- aRetryTopicConfigurationSupport.BlockingRetriesConfigurer.
-
manageNonBlockingFatalExceptions
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingRetriesExceptions) Override this method to manage non-blocking retries fatal exceptions. Records which processing throws an exception present in this list will be forwarded directly to the DLT, if one is configured, or stop being processed otherwise.- Parameters:
nonBlockingRetriesExceptions- aListof fatal exceptions containing the framework defaults.
-
configureCustomizers
protected void configureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer) Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer,DeadLetterPublishingRecovererandDefaultErrorHandler.- Parameters:
customizersConfigurer- aRetryTopicConfigurationSupport.CustomizersConfigurer.
-
destinationTopicResolver
@Bean(name="org.springframework.kafka.retrytopic.internalDestinationTopicResolver") public DestinationTopicResolver destinationTopicResolver(ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider) Return a globalDestinationTopicResolverfor resolving theDestinationTopicto which a givenConsumerRecordshould be sent for retry. To configure it, consider overriding one of these other more fine-grained methods:manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>>)to configure non-blocking retries.configureDestinationTopicResolver()to further customize the component.createComponentFactory()to provide a subclass instance.
- Parameters:
componentFactoryProvider- the component factory provider.- Returns:
- the instance.
-
configureDestinationTopicResolver
Override this method to configure theDestinationTopicResolver.- Returns:
- a
DestinationTopicResolverconsumer.
-
kafkaConsumerBackoffManager
@Bean(name="org.springframework.kafka.config.internalKafkaConsumerBackOffManager") public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext, @Qualifier("org.springframework.kafka.config.internalKafkaListenerEndpointRegistry") ListenerContainerRegistry registry, ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider, @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable TaskScheduler taskScheduler) Create theKafkaConsumerBackoffManagerinstance that will be used to back off partitions. To provide a custom implementation, either override this method, or override theRetryTopicComponentFactory.kafkaBackOffManagerFactory(org.springframework.kafka.listener.ListenerContainerRegistry, org.springframework.context.ApplicationContext)method and return a differentKafkaBackOffManagerFactory.- Parameters:
applicationContext- the application context.registry- theListenerContainerRegistryto be used to fetch theMessageListenerContainerat runtime to be backed off.componentFactoryProvider- the component factory provider.wrapper- aRetryTopicSchedulerWrapper.taskScheduler- aTaskScheduler.- Returns:
- the instance.
-
createComponentFactory
Override this method to provide a subclass ofRetryTopicComponentFactorywith different component implementations or subclasses.- Returns:
- the instance.
-