For the latest stable version, please use spring-cloud-stream 4.2.0! |
Binding visualization and control in Kafka Streams binder
Starting with version 3.1.2, Kafka Streams binder supports binding visualization and control.
The only two lifecycle phases supported are STOPPED
and STARTED
.
The lifecycle phases PAUSED
and RESUMED
are not available in Kafka Streams binder.
In order to activate binding visualization and control, the application needs to include the following two dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
If you prefer using webflux, you can then include spring-boot-starter-webflux
instead of the standard web dependency.
In addition, you also need to set the following property:
management.endpoints.web.exposure.include=bindings
To illustrate this feature further, let us use the following application as a guide:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
As we can see, the application has two Kafka Streams functions - one, a consumer and another a function.
The consumer binding is named by default as consumer-in-0
.
Similarly, for the function, the input binding is function-in-0
and the output binding is function-out-0
.
Once the application is started, we can find details about the bindings using the following bindings endpoint.
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
The details about all three bindings can be found above.
Let us now stop the consumer-in-0 binding.
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
At this point, no records will be received through this binding.
Start the binding again.
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
When there are multiple bindings present on a single function, invoking these operations on any of those bindings will work.
This is because all the bindings on a single function are backed by the same StreamsBuilderFactoryBean
.
Therefore, for the function above, either function-in-0
or function-out-0
will work.