For the latest stable version, please use spring-cloud-stream 4.2.0!

Retrying critical business logic

There are scenarios in which you might want to retry parts of your business logic that are critical to the application. There maybe an external call to a relational database or invoking a REST endpoint from the Kafka Streams processor. These calls can fail for various reasons such as network issues or remote service unavailability. More often, these failures may self resolve if you can try them again. By default, Kafka Streams binder creates RetryTemplate beans for all the input bindings.

If the function has the following signature,

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

and with default binding name, the RetryTemplate will be registered as process-in-0-RetryTemplate. This is following the convention of binding name (process-in-0) followed by the literal -RetryTemplate. In the case of multiple input bindings, there will be a separate RetryTemplate bean available per binding. If there is a custom RetryTemplate bean available in the application and provided through spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName, then that takes precedence over any input binding level retry template configuration properties.

Once the RetryTemplate from the binding is injected into the application, it can be used to retry any critical sections of the application. Here is an example:

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

Or you can use a custom RetryTemplate as below.

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

Note that when retries are exhausted, by default, the last exception will be thrown, causing the processor to terminate. If you wish to handle the exception and continue processing, you can add a RecoveryCallback to the execute method: Here is an example.

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

Refer to the Spring Retry project for more information about the RetryTemplate, retry policies, backoff policies and more.