Topic Naming

Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.

Examples:

"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …​, "my-topic-dlt"

"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …​, "my-topic-myDltSuffix"

The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, …​, retry-n. Therefore, by default the number of retry topics is the configured maxAttempts minus 1.

Retry Topics and DLT Suffixes

You can specify the suffixes that will be used by the retry and DLT topics.

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively.

Appending the Topic’s Index or Delay

You can either append the topic’s index or delay values after the suffix.

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic’s index.

Single Topic for Fixed Delay Retries

If you’re using fixed delay policies such as FixedBackOffPolicy or NoBackOffPolicy you can use a single topic to accomplish the non-blocking retries. This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.

The previous FixedDelayStrategy is now deprecated, and can be replaced by SameIntervalTopicReuseStrategy.
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3_000)
            .maxAttempts(5)
            .useSingleTopicForFixedDelays()
            .create(template);
}
The default behavior is creating separate retry topics for each attempt, appended with their index values: retry-0, retry-1, …​

Single Topic for maxInterval Exponential Delay

If you’re using exponential backoff policy (ExponentialBackOffPolicy), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured maxInterval.

This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the maxInterval value appended.

By opting to use a single topic for the retries with the maxInterval delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.

The default behavior is to work with the number of retry topics equal to the configured maxAttempts minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the maxInterval delay) being suffixed with an additional index.

For instance, when configuring the exponential backoff with initialInterval=1_000, multiplier=2, and maxInterval=16_000, in order to keep trying for one hour, one would need to configure maxAttempts as 229, and by default the needed retry topics would be:

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000-0

  • -retry-16000-1

  • -retry-16000-2

  • …​

  • -retry-16000-224

When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000

This will be the default in a future release.

@RetryableTopic(attempts = 230,
    backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

Custom Naming Strategies

More complex naming strategies can be accomplished by registering a bean that implements RetryTopicNamesProviderFactory. The default implementation is SuffixingRetryTopicNamesProviderFactory and a different implementation can be registered in the following way:

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

As an example, the following implementation, in addition to the standard suffix, adds a prefix to retry/dlt topics names:

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}