State Store
State stores are created automatically by Kafka Streams when the high level DSL is used and appropriate calls are made those trigger a state store.
If you want to materialize an incoming KTable
binding as a named state store, then you can do so by using the following strategy.
Lets say you have the following function.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Then by setting the following property, the incoming KTable
data will be materialized in to the named state store.
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder. Especially when the processor API is used, you need to register a state store manually. In order to do so, you can create the StateStore as a bean in the application. Here are examples of defining such beans.
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
These state stores can be then accessed by the applications directly.
During the bootstrap, the above beans will be processed by the binder and passed on to the Streams builder object.
Accessing the state store:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
This will not work when it comes to registering global state stores.
In order to register a global state store, please see the section below on customizing StreamsBuilderFactoryBean
.