Class RetryTopicConfigurer
- java.lang.Object
-
- org.springframework.kafka.retrytopic.RetryTopicConfigurer
-
public class RetryTopicConfigurer extends java.lang.Object
Configures main, retry and DLT topics based on a main endpoint and provided configurations to acomplish a distributed retry / DLT pattern in a non-blocking fashion, at the expense of ordering guarantees.
To illustrate, if you have a "main-topic" topic, and want an exponential backoff of 1000ms with a multiplier of 2 and 3 retry attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics. The configuration can be achieved using a
RetryTopicConfigurationBuilder
to create one or moreRetryTopicConfigurer
beans, or by using theRetryableTopic
annotation. More details on usage below.How it works:
If a message processing throws an exception, the configured
SeekToCurrentErrorHandler
andDeadLetterPublishingRecoverer
forwards the message to the next topic, using aDestinationTopicResolver
to know the next topic and the delay for it.Each forwareded record has a back off timestamp header and, if consumption is attempted by the
KafkaBackoffAwareMessageListenerAdapter
before that time, the partition consumption is paused by aKafkaConsumerBackoffManager
and aKafkaBackoffException
is thrown.When the partition has been idle for the amount of time specified in the
ContainerProperties.idlePartitionEventInterval
property, aListenerContainerPartitionIdleEvent
is published, which theKafkaConsumerBackoffManager
listens to in order to check whether or not it should unpause the partition.If, when consumption is resumed, processing fails again, the message is forwarded to the next topic and so on, until it gets to the dlt.
Considering Kafka's partition ordering guarantees, and each topic having a fixed delay time, we know that the first message consumed in a given retry topic partition will be the one with the earliest backoff timestamp for that partition, so by pausing the partition we know we're not delaying message processing in other partitions longer than necessary.
Usages:
There are two main ways for configuring the endpoints. The first is by providing one or more
Bean
s in aConfiguration
annotated class, such as:@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) { return RetryTopicConfiguration .builder() .create(template); }
This will create retry and dlt topics for all topics in methods annotated with
KafkaListener
, as well as its consumers, using the default configurations. If message processing fails it will forward the message to the next topic until it gets to the DLT topic. AKafkaOperations
instance is required for message forwarding.For more fine-grained control over how to handle retrials for each topic, more then one bean can be provided, such as:
@Bean public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) { return RetryTopicConfiguration .builder() .fixedBackoff(3000) .maxAttempts(5) .includeTopics("my-topic", "my-other-topic") .create(template); }
@Bean public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) { return RetryTopicConfiguration .builder() .exponentialBackoff(1000, 2, 5000) .maxAttempts(4) .excludeTopics("my-topic", "my-other-topic") .retryOn(MyException.class) .create(template); }
Some other options include: auto-creation of topics, backoff, retryOn / notRetryOn / transversing as in
RetryTemplate
, single-topic fixed backoff processing, custom dlt listener beans, custom topic suffixes and providing specific listenerContainerFactories.The other, non-exclusive way to configure the endpoints is through the convenient
RetryableTopic
annotation, that can be placed on anyKafkaListener
annotated methods, such as:@RetryableTopic(attempts = 3, backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing }
The same configurations are available in the annotation and the builder approaches, and both can be used concurrently. In case the same method / topic can be handled by both, the annotation takes precedence.
DLT Handling:
The DLT handler method can be provided through the
RetryTopicConfigurationBuilder.dltHandlerMethod(Class, String)
method, providing the class and method name that should handle the DLT topic. If a bean instance of this type is found in theBeanFactory
it is the instance used. If not an instance is created. The class can use dependency injection as a normal bean.@Bean public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) { return RetryTopicConfiguration .builder() .dltProcessor(MyCustomDltProcessor.class, "processDltMessage") .create(template); }
@Component public class MyCustomDltProcessor { public void processDltMessage(MyPojo message) { // ... message processing, persistence, etc } }
DltHandler
annotation, that should be used within the same class as the correspondentKafkaListener
.@DltHandler public void processMessage(MyPojo message) { // ... message processing, persistence, etc }
RetryTopicConfigurer.LoggingDltListenerHandlerMethod
is used.- Since:
- 2.7
- Author:
- Tomaz Fernandes
- See Also:
RetryTopicConfigurationBuilder
,RetryableTopic
,KafkaListener
,Backoff
,SeekToCurrentErrorHandler
,DeadLetterPublishingRecoverer
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
RetryTopicConfigurer.EndpointProcessingCustomizerHolder
Creates the endpoint and factory customizers that will be called at the end of the KafkaListenerAnnotationBeanPostProcessor's processListener(MethodKafkaListenerEndpoint, KafkaListener, Object, Object, String)} method.static interface
RetryTopicConfigurer.EndpointProcessor
-
Field Summary
Fields Modifier and Type Field Description static org.springframework.kafka.retrytopic.RetryTopicConfigurer.EndpointHandlerMethod
DEFAULT_DLT_HANDLER
The default method to handle messages in the DLT.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static org.springframework.kafka.retrytopic.RetryTopicConfigurer.EndpointHandlerMethod
createHandlerMethodWith(java.lang.Class<?> beanClass, java.lang.String methodName)
static org.springframework.kafka.retrytopic.RetryTopicConfigurer.EndpointHandlerMethod
createHandlerMethodWith(java.lang.Object bean, java.lang.reflect.Method method)
void
processMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?,?> mainEndpoint, RetryTopicConfiguration configuration)
Entrypoint for creating and configuring the retry and dlt endpoints, as well as the container factory that will the corresponding listenerContainer.
-
-
-
Method Detail
-
processMainAndRetryListeners
public void processMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?,?> mainEndpoint, RetryTopicConfiguration configuration)
Entrypoint for creating and configuring the retry and dlt endpoints, as well as the container factory that will the corresponding listenerContainer.- Parameters:
endpointProcessor
- the endpoint and factory configurers that will be called at the end of theKafkaListenerAnnotationBeanPostProcessor
processListener method. As a side effect, the created destinations are registered at theDestinationTopicContainer
mainEndpoint
- the endpoint based on which retry and dlt endpoints are also created and processed.configuration
- the configuration for the topic
-
createHandlerMethodWith
public static org.springframework.kafka.retrytopic.RetryTopicConfigurer.EndpointHandlerMethod createHandlerMethodWith(java.lang.Class<?> beanClass, java.lang.String methodName)
-
createHandlerMethodWith
public static org.springframework.kafka.retrytopic.RetryTopicConfigurer.EndpointHandlerMethod createHandlerMethodWith(java.lang.Object bean, java.lang.reflect.Method method)
-
-