For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Topic Naming
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
Examples:
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix"
The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, …, retry-n.
Therefore, by default the number of retry topics is the configured maxAttempts minus 1.
|
You can configure the suffixes, choose whether to append the attempt index or delay, use a single retry topic when using fixed backoff, and use a single retry topic for the attempts with the maxInterval when using exponential backoffs.
Retry Topics and DLT Suffixes
You can specify the suffixes that will be used by the retry and DLT topics.
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. |
Appending the Topic’s Index or Delay
You can either append the topic’s index or delay values after the suffix.
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic’s index. |
Single Topic for Fixed Delay Retries
If you’re using fixed delay policies such as FixedBackOffPolicy
or NoBackOffPolicy
you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
The previous FixedDelayStrategy is now deprecated, and can be replaced by SameIntervalTopicReuseStrategy .
|
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
The default behavior is creating separate retry topics for each attempt, appended with their index values: retry-0, retry-1, … |
Single Topic for maxInterval Exponential Delay
If you’re using exponential backoff policy (ExponentialBackOffPolicy
), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured maxInterval
.
This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the maxInterval
value appended.
By opting to use a single topic for the retries with the maxInterval delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.
|
The default behavior is to work with the number of retry topics equal to the configured maxAttempts
minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the maxInterval
delay) being suffixed with an additional index.
For instance, when configuring the exponential backoff with initialInterval=1_000
, multiplier=2
, and maxInterval=16_000
, in order to keep trying for one hour, one would need to configure maxAttempts
as 229, and by default the needed retry topics would be:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
…
-
-retry-16000-224
When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
This will be the default in a future release.
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
Custom Naming Strategies
More complex naming strategies can be accomplished by registering a bean that implements RetryTopicNamesProviderFactory
.
The default implementation is SuffixingRetryTopicNamesProviderFactory
and a different implementation can be registered in the following way:
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
As an example, the following implementation, in addition to the standard suffix, adds a prefix to retry/dlt topics names:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}