This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!

StreamsBuilderFactoryBean configurer

It is often required to customize the StreamsBuilderFactoryBean that creates the KafkaStreams objects. Based on the underlying support provided by Spring Kafka, the binder allows you to customize the StreamsBuilderFactoryBean. You can use the StreamsBuilderFactoryBeanConfigurer to customize the StreamsBuilderFactoryBean itself. Then, once you get access to the StreamsBuilderFactoryBean through this configurer, you can customize the corresponding KafkaStreams using KafkaStreamsCustomzier. Both of these customizers are part of the Spring for Apache Kafka project.

Here is an example of using the StreamsBuilderFactoryBeanConfigurer.

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

The above is shown as an illustration of the things you can do to customize the StreamsBuilderFactoryBean. You can essentially call any available mutation operations from StreamsBuilderFactoryBean to customize it. This customizer will be invoked by the binder right before the factory bean is started.

Once you get access to the StreamsBuilderFactoryBean, you can also customize the underlying KafkaStreams object. Here is a blueprint for doing so.

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer will be called by the StreamsBuilderFactoryBeabn right before the underlying KafkaStreams gets started.

There can only be one StreamsBuilderFactoryBeanConfigurer in the entire application. Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual StreamsBuilderFactoryBean objects? In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.

For e.g,

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

Using StreamsBuilderFactoryBeanConfigurer to register a global state store

As mentioned above, the binder does not provide a first class way to register global state stores as a feature. For that, you need to use the customizer via StreamsBuilderFactoryBeanConfigurer. Here is how that can be done.

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

Any customizations on StreamsBuilder must be done through the KafkaStreamsInfrastructureCustomizer as shown above. If StreamsBuilderFactoryBean#getObject() is called to get access to the StreamsBuilder object, that may not work as the bean maybe in initialization and thus run into some circular dependency issues.

If you have multiple processors, you want to attach the global state store to the right StreamsBuilder by filtering out the other StreamsBuilderFactoryBean objects using the application id as outlined above.

Using StreamsBuilderFactoryBeanConfigurer to register a production exception handler

In the error handling section, we indicated that the binder does not provide a first class way to deal with production exceptions. Though that is the case, you can still use the StreamsBuilderFacotryBean customizer to register production exception handlers. See below.

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

Once again, if you have multiple processors, you may want to set it appropriately against the correct StreamsBuilderFactoryBean. You may also add such production exception handlers using the configuration property (See below for more on that), but this is an option if you choose to go with a programmatic approach.