This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.1!

Features

Most of the features are available both for the @RetryableTopic annotation and the RetryTopicConfiguration beans.

BackOff Configuration

The BackOff configuration relies on the BackOffPolicy interface from the Spring Retry project.

It includes:

  • Fixed Back Off

  • Exponential Back Off

  • Random Exponential Back Off

  • Uniform Random Back Off

  • No Back Off

  • Custom Back Off

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

You can also provide a custom implementation of Spring Retry’s SleepingBackOffPolicy interface:

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
The default back off policy is FixedBackOffPolicy with a maximum of 3 attempts and 1000ms intervals.
There is a 30-second default maximum delay for the ExponentialBackOffPolicy. If your back off policy requires delays with values bigger than that, adjust the maxDelay property accordingly.
The first attempt counts against maxAttempts, so if you provide a maxAttempts value of 4 there’ll be the original attempt plus 3 retries.

Global Timeout

You can set the global timeout for the retrying process. If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
The default is having no timeout set, which can also be achieved by providing -1 as the timout value.

Exception Classifier

You can specify which exceptions you want to retry on and which not to. You can also set it to traverse the causes to lookup nested exceptions.

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
The default behavior is retrying on all exceptions and not traversing causes.

Since 2.8.3 there’s a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries. See DefaultErrorHandler for the default list of fatal exceptions. You can add or remove exceptions to and from this list by overriding the configureNonBlockingRetries method in a @Configuration class that extends RetryTopicConfigurationSupport. See Configuring Global Settings and Features for more information.

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
To disable fatal exceptions' classification, just clear the provided list.

Include and Exclude Topics

You can decide which topics will and will not be handled by a RetryTopicConfiguration bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
The default behavior is to include all topics.

Topics AutoCreation

Unless otherwise specified the framework will auto create the required topics using NewTopic beans that are consumed by the KafkaAdmin bean. You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off. Starting with version 3.0, the default replication factor is -1, meaning using the broker default. If your broker version is earlier than 2.4, you will need to set an explicit value.

Note that if you’re not using Spring Boot you’ll have to provide a KafkaAdmin bean in order to use this feature.
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
By default the topics are autocreated with one partition and a replication factor of -1 (meaning using the broker default). If your broker version is earlier than 2.4, you will need to set an explicit value.

Failure Header Management

When considering how to manage failure headers (original headers and exception headers), the framework delegates to the DeadLetterPublishingRecoverer to decide whether to append or replace the headers.

By default, it explicitly sets appendOriginalHeaders to false and leaves stripPreviousExceptionHeaders to the default used by the DeadLetterPublishingRecover.

This means that only the first "original" and last exception headers are retained with the default configuration. This is to avoid creation of excessively large messages (due to the stack trace header, for example) when many retry steps are involved.

See Managing Dead Letter Record Headers for more information.

To reconfigure the framework to use different settings for these properties, configure a DeadLetterPublishingRecovererer customizer by overriding the configureCustomizers method in a @Configuration class that extends RetryTopicConfigurationSupport. See Configuring Global Settings and Features for more details.

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a headersFunction to the factory - factory.setHeadersFunction((rec, ex) -> { ... }).

By default, any headers added will be cumulative - Kafka headers can contain multiple values. Starting with version 2.9.5, if the Headers returned by the function contains a header of type DeadLetterPublishingRecoverer.SingleRecordHeader, then any existing values for that header will be removed and only the new single value will remain.

Custom DeadLetterPublishingRecoverer

As can be seen in Failure Header Management it is possible to customize the default DeadLetterPublishingRecoverer instances created by the framework. However, for some use cases, it is necessary to subclass the DeadLetterPublishingRecoverer, for example to override createProducerRecord() to modify the contents sent to the retry (or dead-letter) topics. Starting with version 3.0.9, you can override the RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory() method to provide a DeadLetterPublisherCreator instance, for example:

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

It is recommended that you use the provided resolvers when constructing the custom instance.

Routing of messages to custom DLTs based on thrown exceptions

Starting with version 3.2.0, it’s possible to route messages to custom DLTs based on the type of the exception, which has been thrown during their processing. In order to do that, there’s a need to specify the routing. Routing customization consists of the specification of the additional destinations. Destinations in turn consist of two settings: the suffix and exceptions. When the exception type specified in exceptions has been thrown, the DLT containing the suffix will be considered as the target topic for the message before the general purpose DLT is considered. Examples of configuration using either annotations or RetryTopicConfiguration beans:

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(kafkaOperations)
            .create(template);
}

suffix takes place before the general dltTopicSuffix in the custom DLT name. Considering presented examples, the message, which caused the DeserializationException will be routed to the my-annotated-topic-deserialization-dlt instead of the my-annotated-topic-dlt. Custom DLTs will be created following the same rules as stated in the Topics AutoCreation.