This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.1!

Configuration

Starting with version 2.9, for default configuration, the @EnableKafkaRetryTopic annotation should be used in a @Configuration annotated class. This enables the feature to bootstrap properly and gives access to injecting some of the feature’s components to be looked up at runtime.

It is not necessary to also add @EnableKafka, if you add this annotation, because @EnableKafkaRetryTopic is meta-annotated with @EnableKafka.

Also, starting with that version, for more advanced configuration of the feature’s components and global features, the RetryTopicConfigurationSupport class should be extended in a @Configuration class, and the appropriate methods overridden. For more details refer to Configuring Global Settings and Features.

By default, the containers for the retry topics will have the same concurrency as the main container. Starting with version 3.0, you can set a different concurrency for the retry containers (either on the annotation, or in RetryConfigurationBuilder).

Only one of the above techniques can be used, and only one @Configuration class can extend RetryTopicConfigurationSupport.

Using the @RetryableTopic annotation

To configure the retry topic and dlt for a @KafkaListener annotated method, you just have to add the @RetryableTopic annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

Since 3.2, @RetryableTopic support for @KafkaListener on a class would be:

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

You can specify a method in the same class to process the dlt messages by annotating it with the @DltHandler annotation. If no DltHandler method is provided a default consumer is created which only logs the consumption.

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
If you don’t specify a kafkaTemplate name a bean with name defaultRetryTopicKafkaTemplate will be looked up. If no bean is found an exception is thrown.

Starting with version 3.0, the @RetryableTopic annotation can be used as a meta-annotation on custom annotations; for example:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

Using RetryTopicConfiguration beans

You can also configure the non-blocking retry support by creating RetryTopicConfiguration beans in a @Configuration annotated class.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with @KafkaListener using the default configurations. The KafkaTemplate instance is required for message forwarding.

To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one RetryTopicConfiguration bean can be provided.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
The retry topics' and dlt’s consumers will be assigned to a consumer group with a group id that is the combination of the one which you provide in the groupId parameter of the @KafkaListener annotation with the topic’s suffix. If you don’t provide any they’ll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
If the consumer is configured with an ErrorHandlingDeserializer, to handle deserialization exceptions, it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions. The generic value type of the template should be Object. One technique is to use the DelegatingByTypeSerializer; an example follows:
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
Multiple @KafkaListener annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic. It’s best to use a single RetryTopicConfiguration bean for configuration of such topics; if multiple @RetryableTopic annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic’s listeners and the other annotations' values will be ignored.

Configuring Global Settings and Features

Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API). This does not change the RetryTopicConfiguration beans approach - only infrastructure components' configurations. Now the RetryTopicConfigurationSupport class should be extended in a (single) @Configuration class, and the proper methods overridden. An example follows:

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
When using this configuration approach, the @EnableKafkaRetryTopic annotation should not be used to prevent context failing to start due to duplicated beans. Use the simple @EnableKafka annotation instead.

When autoCreateTopics is true, the main and retry topics will be created with the specified number of partitions and replication factor. Starting with version 3.0, the default replication factor is -1, meaning using the broker default. If your broker version is earlier than 2.4, you will need to set an explicit value. To override these values for a particular topic (e.g. the main topic or DLT), simply add a NewTopic @Bean with the required properties; that will override the auto creation properties.

By default, records are published to the retry topic(s) using the original partition of the received record. If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows.
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

The parameters to the function are the consumer record and the name of the next topic. You can return a specific partition number, or null to indicate that the KafkaProducer should determine the partition.

By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics. Starting with version 2.9.6, if you want to retain just the last value of these headers, use the configureDeadLetterPublishingContainerFactory() method shown above to set the factory’s retainAllRetryHeaderValues property to false.

Find RetryTopicConfiguration

Attempts to provide an instance of RetryTopicConfiguration by either creating one from a @RetryableTopic annotation, or from the bean container if no annotation is available.

If beans are found in the container, there’s a check to determine whether the provided topics should be handled by any of such instances.

If @RetryableTopic annotation is provided, a DltHandler annotated method is looked up.

since 3.2, provide new API to Create RetryTopicConfiguration when @RetryableTopic annotated on a class:

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}