This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.3!

Ancillaries to the programming model

Multiple Kafka Streams processors within a single application

Binder allows to have multiple Kafka Streams processors within a single Spring Cloud Stream application. You can have an application as below.

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

In this case, the binder will create 3 separate Kafka Streams objects with different application ID’s (more on this below). However, if you have more than one processor in the application, you have to tell Spring Cloud Stream, which functions need to be activated. Here is how you activate the functions.

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

If you want certain functions to be not activated right away, you can remove that from this list.

This is also true when you have a single Kafka Streams processor and other types of Function beans in the same application that is handled through a different binder (for e.g., a function bean that is based on the regular Kafka Message Channel binder)

Kafka Streams Application ID

Application id is a mandatory property that you need to provide for a Kafka Streams application. Spring Cloud Stream Kafka Streams binder allows you to configure this application id in multiple ways.

If you only have one single processor in the application, then you can set this at the binder level using the following property:

spring.cloud.stream.kafka.streams.binder.applicationId.

As a convenience, if you only have a single processor, you can also use spring.application.name as the property to delegate the application id.

If you have multiple Kafka Streams processors in the application, then you need to set the application id per processor. In the case of the functional model, you can attach it to each function as a property.

For e.g. imagine that you have the following functions.

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

and

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

Then you can set the application id for each, using the following binder level properties.

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

and

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

For function based model also, this approach of setting application id at the binding level will work. However, setting per function at the binder level as we have seen above is much easier if you are using the functional model.

For production deployments, it is highly recommended to explicitly specify the application ID through configuration. This is especially going to be very critical if you are auto scaling your application in which case you need to make sure that you are deploying each instance with the same application ID.

If the application does not provide an application ID, then in that case the binder will auto generate a static application ID for you. This is convenient in development scenarios as it avoids the need for explicitly providing the application ID. The generated application ID in this manner will be static over application restarts. In the case of functional model, the generated application ID will be the function bean name followed by the literal applicationID, for e.g process-applicationID if process if the function bean name.

Summary of setting Application ID

  • By default, binder will auto generate the application ID per function methods.

  • If you have a single processor, then you can use spring.kafka.streams.applicationId, spring.application.name or spring.cloud.stream.kafka.streams.binder.applicationId.

  • If you have multiple processors, then application ID can be set per function using the property - spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId.

Overriding the default binding names generated by the binder with the functional style

By default, the binder uses the strategy discussed above to generate the binding name when using the functional style, i.e. <function-bean-name>-<in>|<out>-[0..n], for e.g. process-in-0, process-out-0 etc. If you want to override those binding names, you can do that by specifying the following properties.

spring.cloud.stream.function.bindings.<default binding name>. Default binding name is the original binding name generated by the binder.

For e.g. lets say, you have this function.

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder will generate bindings with names, process-in-0, process-in-1 and process-out-0. Now, if you want to change them to something else completely, maybe more domain specific binding names, then you can do so as below.

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

and

spring.cloud.stream.function.bindings.process-out-0=clicks

After that, you must set all the binding level properties on these new binding names.

Please keep in mind that with the functional programming model described above, adhering to the default binding names make sense in most situations. The only reason you may still want to do this overriding is when you have larger number of configuration properties and you want to map the bindings to something more domain friendly.

Setting up bootstrap server configuration

When running Kafka Streams applications, you must provide the Kafka broker server information. If you don’t provide this information, the binder expects that you are running the broker at the default localhost:9092. If that is not the case, then you need to override that. There are a couple of ways to do that.

  • Using the boot property - spring.kafka.bootstrapServers

  • Binder level property - spring.cloud.stream.kafka.streams.binder.brokers

When it comes to the binder level property, it doesn’t matter if you use the broker property provided through the regular Kafka binder - spring.cloud.stream.kafka.binder.brokers. Kafka Streams binder will first check if Kafka Streams binder specific broker property is set (spring.cloud.stream.kafka.streams.binder.brokers) and if not found, it looks for spring.cloud.stream.kafka.binder.brokers.