For the latest stable version, please use spring-cloud-stream 4.2.0! |
Retrying critical business logic
There are scenarios in which you might want to retry parts of your business logic that are critical to the application.
There maybe an external call to a relational database or invoking a REST endpoint from the Kafka Streams processor.
These calls can fail for various reasons such as network issues or remote service unavailability.
More often, these failures may self resolve if you can try them again.
By default, Kafka Streams binder creates RetryTemplate
beans for all the input bindings.
If the function has the following signature,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
and with default binding name, the RetryTemplate
will be registered as process-in-0-RetryTemplate
.
This is following the convention of binding name (process-in-0
) followed by the literal -RetryTemplate
.
In the case of multiple input bindings, there will be a separate RetryTemplate
bean available per binding.
If there is a custom RetryTemplate
bean available in the application and provided through spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
, then that takes precedence over any input binding level retry template configuration properties.
Once the RetryTemplate
from the binding is injected into the application, it can be used to retry any critical sections of the application.
Here is an example:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
Or you can use a custom RetryTemplate
as below.
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
Note that when retries are exhausted, by default, the last exception will be thrown, causing the processor to terminate.
If you wish to handle the exception and continue processing, you can add a RecoveryCallback to the execute
method:
Here is an example.
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
Refer to the Spring Retry project for more information about the RetryTemplate, retry policies, backoff policies and more.