This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0! |
StreamsBuilderFactoryBean configurer
It is often required to customize the StreamsBuilderFactoryBean
that creates the KafkaStreams
objects.
Based on the underlying support provided by Spring Kafka, the binder allows you to customize the StreamsBuilderFactoryBean
.
You can use the StreamsBuilderFactoryBeanConfigurer
to customize the StreamsBuilderFactoryBean
itself.
Then, once you get access to the StreamsBuilderFactoryBean
through this configurer, you can customize the corresponding KafkaStreams
using KafkaStreamsCustomzier
.
Both of these customizers are part of the Spring for Apache Kafka project.
Here is an example of using the StreamsBuilderFactoryBeanConfigurer
.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
The above is shown as an illustration of the things you can do to customize the StreamsBuilderFactoryBean
.
You can essentially call any available mutation operations from StreamsBuilderFactoryBean
to customize it.
This customizer will be invoked by the binder right before the factory bean is started.
Once you get access to the StreamsBuilderFactoryBean
, you can also customize the underlying KafkaStreams
object.
Here is a blueprint for doing so.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
will be called by the StreamsBuilderFactoryBeabn
right before the underlying KafkaStreams
gets started.
There can only be one StreamsBuilderFactoryBeanConfigurer
in the entire application.
Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual StreamsBuilderFactoryBean
objects?
In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.
For e.g,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
Using StreamsBuilderFactoryBeanConfigurer to register a global state store
As mentioned above, the binder does not provide a first class way to register global state stores as a feature.
For that, you need to use the customizer via StreamsBuilderFactoryBeanConfigurer
.
Here is how that can be done.
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
Any customizations on StreamsBuilder
must be done through the KafkaStreamsInfrastructureCustomizer
as shown above.
If StreamsBuilderFactoryBean#getObject()
is called to get access to the StreamsBuilder
object, that may not work as the bean maybe in initialization and thus run into some circular dependency issues.
If you have multiple processors, you want to attach the global state store to the right StreamsBuilder
by filtering out the other StreamsBuilderFactoryBean
objects using the application id as outlined above.
Using StreamsBuilderFactoryBeanConfigurer to register a production exception handler
In the error handling section, we indicated that the binder does not provide a first class way to deal with production exceptions.
Though that is the case, you can still use the StreamsBuilderFacotryBean
customizer to register production exception handlers. See below.
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
Once again, if you have multiple processors, you may want to set it appropriately against the correct StreamsBuilderFactoryBean
.
You may also add such production exception handlers using the configuration property (See below for more on that), but this is an option if you choose to go with a programmatic approach.