Enables the non-blocking topic-based delayed retries feature. To be used in
Configuration
classes as follows:
@EnableKafkaRetryTopic
@Configuration
public class AppConfig {
}
@Component
public class MyListener {
@RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(4000))
@KafkaListener(topics = "myTopic")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
logger.info("Message {} received in topic {} ", message, receivedTopic);
}
@DltHandler
public void dltHandler(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
logger.info("Message {} received in dlt handler at topic {} ", message, receivedTopic);
}
Using this annotation configures the default
RetryTopicConfigurationSupport
bean. This annotation is meta-annotated with
@EnableKafka
so it is not
necessary to specify both.
To configure the feature's components, extend the
RetryTopicConfigurationSupport
class and override the appropriate methods on a
@Configuration
class, such as:
@Configuration
@EnableKafka
public class AppConfig extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
.backOff(new FixedBackOff(50, 3));
}
@Override
protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
nonBlockingRetries
.addToFatalExceptions(ShouldSkipBothRetriesException.class);
}
In this case, you should not use this annotation, use
@EnableKafka
instead.