This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Configuration
Starting with version 2.9, for default configuration, the @EnableKafkaRetryTopic
annotation should be used in a @Configuration
annotated class.
This enables the feature to bootstrap properly and gives access to injecting some of the feature’s components to be looked up at runtime.
It is not necessary to also add @EnableKafka , if you add this annotation, because @EnableKafkaRetryTopic is meta-annotated with @EnableKafka .
|
Also, starting with that version, for more advanced configuration of the feature’s components and global features, the RetryTopicConfigurationSupport
class should be extended in a @Configuration
class, and the appropriate methods overridden.
For more details refer to Configuring Global Settings and Features.
By default, the containers for the retry topics will have the same concurrency as the main container.
Starting with version 3.0, you can set a different concurrency
for the retry containers (either on the annotation, or in RetryConfigurationBuilder
).
Only one of the above techniques can be used, and only one @Configuration class can extend RetryTopicConfigurationSupport .
|
Using the @RetryableTopic
annotation
To configure the retry topic and dlt for a @KafkaListener
annotated method, you just have to add the @RetryableTopic
annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
Since 3.2, @RetryableTopic
support for @KafkaListener on a class would be:
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
You can specify a method in the same class to process the dlt messages by annotating it with the @DltHandler
annotation.
If no DltHandler method is provided a default consumer is created which only logs the consumption.
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
If you don’t specify a kafkaTemplate name a bean with name defaultRetryTopicKafkaTemplate will be looked up.
If no bean is found an exception is thrown.
|
Starting with version 3.0, the @RetryableTopic
annotation can be used as a meta-annotation on custom annotations; for example:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
Using RetryTopicConfiguration
beans
You can also configure the non-blocking retry support by creating RetryTopicConfiguration
beans in a @Configuration
annotated class.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with @KafkaListener
using the default configurations. The KafkaTemplate
instance is required for message forwarding.
To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one RetryTopicConfiguration
bean can be provided.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
The retry topics' and dlt’s consumers will be assigned to a consumer group with a group id that is the combination of the one which you provide in the groupId parameter of the @KafkaListener annotation with the topic’s suffix.
If you don’t provide any they’ll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
|
If the consumer is configured with an ErrorHandlingDeserializer , to handle deserialization exceptions, it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions.
The generic value type of the template should be Object .
One technique is to use the DelegatingByTypeSerializer ; an example follows:
|
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Multiple @KafkaListener annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
It’s best to use a single RetryTopicConfiguration bean for configuration of such topics; if multiple @RetryableTopic annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic’s listeners and the other annotations' values will be ignored.
|
Configuring Global Settings and Features
Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API).
This does not change the RetryTopicConfiguration
beans approach - only infrastructure components' configurations.
Now the RetryTopicConfigurationSupport
class should be extended in a (single) @Configuration
class, and the proper methods overridden.
An example follows:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
When using this configuration approach, the @EnableKafkaRetryTopic annotation should not be used to prevent context failing to start due to duplicated beans.
Use the simple @EnableKafka annotation instead.
|
When autoCreateTopics
is true, the main and retry topics will be created with the specified number of partitions and replication factor.
Starting with version 3.0, the default replication factor is -1
, meaning using the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
To override these values for a particular topic (e.g. the main topic or DLT), simply add a NewTopic
@Bean
with the required properties; that will override the auto creation properties.
By default, records are published to the retry topic(s) using the original partition of the received record. If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows. |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
The parameters to the function are the consumer record and the name of the next topic.
You can return a specific partition number, or null
to indicate that the KafkaProducer
should determine the partition.
By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics.
Starting with version 2.9.6, if you want to retain just the last value of these headers, use the configureDeadLetterPublishingContainerFactory()
method shown above to set the factory’s retainAllRetryHeaderValues
property to false
.
Find RetryTopicConfiguration
Attempts to provide an instance of RetryTopicConfiguration
by either creating one from a @RetryableTopic
annotation, or from the bean container if no annotation is available.
If beans are found in the container, there’s a check to determine whether the provided topics should be handled by any of such instances.
If @RetryableTopic
annotation is provided, a DltHandler
annotated method is looked up.
since 3.2, provide new API to Create RetryTopicConfiguration
when @RetryableTopic
annotated on a class:
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}