Manually starting Kafka Streams processors selectively

While the approach laid out above will unconditionally apply auto start false to all the Kafka Streams processors in the application through StreamsBuilderFactoryManager, it is often desirable that only individually selected Kafka Streams processors are not auto started. For instance, let us assume that you have three different functions (processors) in your application and for one of the processors, you do not want to start it as part of the application startup. Here is an example of such a situation.

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

In this scenario above, if you set spring.kafka.streams.auto-startup to false, then none of the processors will auto start during the application startup. In that case, you have to programmatically start them as described above by calling start() on the underlying StreamsBuilderFactoryManager. However, if we have a use case to selectively disable only one processor, then you have to set auto-startup on the individual binding for that processor. Let us assume that we don’t want our process3 function to auto start. This is a BiFunction with two input bindings - process3-in-0 and process3-in-1. In order to avoid auto start for this processor, you can pick any of these input bindings and set auto-startup on them. It does not matter which binding you pick; if you wish, you can set auto-startup to false on both of them, but one will be sufficient. Because they share the same factory bean, you don’t have to set autoStartup to false on both bindings, but it probably makes sense to do so, for clarity.

Here is the Spring Cloud Stream property that you can use to disable auto startup for this processor.

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

or

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

Then, you can manually start the processor either using the REST endpoint or using the BindingsEndpoint API as shown below. For this, you need to ensure that you have the Spring Boot actuator dependency on the classpath.

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0

or

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

See this section from the reference docs for more details on this mechanism.

When controlling the bindings by disabling auto-startup as described in this section, please note that this is only available for consumer bindings. In other words, if you use the producer binding, process3-out-0, that does not have any effect in terms of disabling the auto starting of the processor, although this producer binding uses the same StreamsBuilderFactoryBean as the consumer bindings.