This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0!

Interactive Queries

Kafka Streams binder API exposes a class called InteractiveQueryService to interactively query the state stores. You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to autowire the bean.

@Autowired
private InteractiveQueryService interactiveQueryService;

Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

During the startup, the above method call to retrieve the store might fail. For example, it might still be in the middle of initializing the state store. In such cases, it will be useful to retry this operation. Kafka Streams binder provides a simple retry mechanism to accommodate this.

Following are the two properties that you can use to control this retrying.

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - Default is 1 .

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - Default is 1000 milliseconds.

If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the particular key that you are querying. InteractiveQueryService API provides methods for identifying the host information.

In order for this to work, you must configure the property application.server as below:

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

Here are some code snippets:

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

For more information on these host finding methods, please see the Javadoc on the methods. For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions. The aforementioned retry properties are applicable for these methods as well.

Other API methods available through the InteractiveQueryService

Use the following API method to retrieve the KeyQueryMetadata object associated with the combination of given store and key.

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

Use the following API method to retrieve the KakfaStreams object associated with the combination of given store and key.

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

Customizing Store Query Parameters

Sometimes it is necessary that you need to fine tune the store query parameters before querying the store through InteractiveQueryService. For this purpose, starting with the 4.0.1 version of the binder, you can provide a bean for StoreQueryParametersCustomizer which is a functional interface with a customize method that takes a StoreQueryParameter as the argument. Here is its method signature.

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

Using this approach, applications can further customize the StoreQueryParameters such as enabling stale stores.

When this bean is present in this application, InteractiveQueryService will call its customize method before querying the state store.

Keep in mind that, there must be a unique bean for StoreQueryParametersCustomizer available in the application.