Class RetryTopicConfigurationSupport
- java.lang.Object
-
- org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport
-
- All Implemented Interfaces:
org.springframework.beans.factory.Aware
,org.springframework.beans.factory.SmartInitializingSingleton
,org.springframework.context.ApplicationContextAware
public class RetryTopicConfigurationSupport extends java.lang.Object implements org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.SmartInitializingSingleton
This is the main class providing the configuration behind the non-blocking, topic-based delayed retries feature. It is typically imported by adding@EnableKafkaRetryTopic
to an application@Configuration
class. An alternative more advanced option is to extend directly from this class and override methods as necessary, remembering to add@Configuration
to the subclass and@Bean
to overridden@Bean
methods. For more details see the javadoc of@EnableRetryTopic
.- Since:
- 2.9
- Author:
- Tomaz Fernandes, Gary Russell
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
RetryTopicConfigurationSupport.BlockingRetriesConfigurer
Configure blocking retries to be used along non-blocking.static class
RetryTopicConfigurationSupport.CustomizersConfigurer
Configure customizers for components instantiated by the retry topics feature.
-
Constructor Summary
Constructors Constructor Description RetryTopicConfigurationSupport()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
afterSingletonsInstantiated()
protected void
configureBlockingRetries(RetryTopicConfigurationSupport.BlockingRetriesConfigurer blockingRetries)
Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOff
to be used.protected void
configureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer)
Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer
,DeadLetterPublishingRecoverer
andDefaultErrorHandler
.protected java.util.function.Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory()
Override this method to further configure theDeadLetterPublishingRecovererFactory
.protected java.util.function.Consumer<DestinationTopicResolver>
configureDestinationTopicResolver()
Override this method to configure theDestinationTopicResolver
.protected java.util.function.Consumer<ListenerContainerFactoryConfigurer>
configureListenerContainerFactoryConfigurer()
Override this method to further configure theListenerContainerFactoryConfigurer
.protected java.util.function.Consumer<RetryTopicConfigurer>
configureRetryTopicConfigurer()
Override this method if you need to configure theRetryTopicConfigurer
.protected RetryTopicComponentFactory
createComponentFactory()
Override this method to provide a subclass ofRetryTopicComponentFactory
with different component implementations or subclasses.DestinationTopicResolver
destinationTopicResolver()
Return a globalDestinationTopicResolver
for resolving theDestinationTopic
to which a givenConsumerRecord
should be sent for retry.KafkaConsumerBackoffManager
kafkaConsumerBackoffManager(org.springframework.context.ApplicationContext applicationContext, ListenerContainerRegistry registry, RetryTopicSchedulerWrapper wrapper, org.springframework.scheduling.TaskScheduler taskScheduler)
Create theKafkaConsumerBackoffManager
instance that will be used to back off partitions.protected void
manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>> nonBlockingRetriesExceptions)
Override this method to manage non-blocking retries fatal exceptions.RetryTopicConfigurer
retryTopicConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, DestinationTopicResolver destinationTopicResolver, org.springframework.beans.factory.BeanFactory beanFactory)
Return a globalRetryTopicConfigurer
for configuring retry topics forKafkaListenerEndpoint
instances with a correspondingRetryTopicConfiguration
.void
setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
-
-
-
Method Detail
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
- Specified by:
setApplicationContext
in interfaceorg.springframework.context.ApplicationContextAware
- Throws:
org.springframework.beans.BeansException
-
afterSingletonsInstantiated
public void afterSingletonsInstantiated()
- Specified by:
afterSingletonsInstantiated
in interfaceorg.springframework.beans.factory.SmartInitializingSingleton
-
retryTopicConfigurer
@Bean(name="org.springframework.kafka.retrytopic.internalRetryTopicConfigurer") public RetryTopicConfigurer retryTopicConfigurer(@Qualifier("org.springframework.kafka.config.internalKafkaConsumerBackOffManager") KafkaConsumerBackoffManager kafkaConsumerBackoffManager, @Qualifier("org.springframework.kafka.retrytopic.internalDestinationTopicResolver") DestinationTopicResolver destinationTopicResolver, org.springframework.beans.factory.BeanFactory beanFactory)
Return a globalRetryTopicConfigurer
for configuring retry topics forKafkaListenerEndpoint
instances with a correspondingRetryTopicConfiguration
. To configure it, consider overriding theconfigureRetryTopicConfigurer()
.- Parameters:
kafkaConsumerBackoffManager
- the globalKafkaConsumerBackoffManager
.destinationTopicResolver
- the globalDestinationTopicResolver
.beanFactory
- theBeanFactory
.- Returns:
- the instance.
- See Also:
KafkaListenerAnnotationBeanPostProcessor
-
configureRetryTopicConfigurer
protected java.util.function.Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer()
Override this method if you need to configure theRetryTopicConfigurer
.- Returns:
- a
RetryTopicConfigurer
consumer.
-
configureDeadLetterPublishingContainerFactory
protected java.util.function.Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory()
Override this method to further configure theDeadLetterPublishingRecovererFactory
.- Returns:
- a
DeadLetterPublishingRecovererFactory
consumer.
-
configureListenerContainerFactoryConfigurer
protected java.util.function.Consumer<ListenerContainerFactoryConfigurer> configureListenerContainerFactoryConfigurer()
Override this method to further configure theListenerContainerFactoryConfigurer
.- Returns:
- a
ListenerContainerFactoryConfigurer
consumer.
-
configureBlockingRetries
protected void configureBlockingRetries(RetryTopicConfigurationSupport.BlockingRetriesConfigurer blockingRetries)
Override this method to configure blocking retries parameters such as exceptions to be retried and theBackOff
to be used.- Parameters:
blockingRetries
- aRetryTopicConfigurationSupport.BlockingRetriesConfigurer
.
-
manageNonBlockingFatalExceptions
protected void manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>> nonBlockingRetriesExceptions)
Override this method to manage non-blocking retries fatal exceptions. Records which processing throws an exception present in this list will be forwarded directly to the DLT, if one is configured, or stop being processed otherwise.- Parameters:
nonBlockingRetriesExceptions
- aList
of fatal exceptions containing the framework defaults.
-
configureCustomizers
protected void configureCustomizers(RetryTopicConfigurationSupport.CustomizersConfigurer customizersConfigurer)
Override this method to configure customizers for components created by non-blocking retries' configuration, such asMessageListenerContainer
,DeadLetterPublishingRecoverer
andDefaultErrorHandler
.- Parameters:
customizersConfigurer
- aRetryTopicConfigurationSupport.CustomizersConfigurer
.
-
destinationTopicResolver
@Bean(name="org.springframework.kafka.retrytopic.internalDestinationTopicResolver") public DestinationTopicResolver destinationTopicResolver()
Return a globalDestinationTopicResolver
for resolving theDestinationTopic
to which a givenConsumerRecord
should be sent for retry. To configure it, consider overriding one of these other more fine-grained methods:manageNonBlockingFatalExceptions(java.util.List<java.lang.Class<? extends java.lang.Throwable>>)
to configure non-blocking retries.configureDestinationTopicResolver()
to further customize the component.createComponentFactory()
to provide a subclass instance.
- Returns:
- the instance.
-
configureDestinationTopicResolver
protected java.util.function.Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
Override this method to configure theDestinationTopicResolver
.- Returns:
- a
DestinationTopicResolver
consumer.
-
kafkaConsumerBackoffManager
@Bean(name="org.springframework.kafka.config.internalKafkaConsumerBackOffManager") public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(org.springframework.context.ApplicationContext applicationContext, @Qualifier("org.springframework.kafka.config.internalKafkaListenerEndpointRegistry") ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable org.springframework.scheduling.TaskScheduler taskScheduler)
Create theKafkaConsumerBackoffManager
instance that will be used to back off partitions. To provide a custom implementation, either override this method, or override theRetryTopicComponentFactory.kafkaBackOffManagerFactory(org.springframework.kafka.listener.ListenerContainerRegistry, org.springframework.context.ApplicationContext)
method and return a differentKafkaBackOffManagerFactory
.- Parameters:
applicationContext
- the application context.registry
- theListenerContainerRegistry
to be used to fetch theMessageListenerContainer
at runtime to be backed off.wrapper
- aRetryTopicSchedulerWrapper
.taskScheduler
- aTaskScheduler
.- Returns:
- the instance.
-
createComponentFactory
protected RetryTopicComponentFactory createComponentFactory()
Override this method to provide a subclass ofRetryTopicComponentFactory
with different component implementations or subclasses.- Returns:
- the instance.
-
-