For the latest stable version, please use spring-cloud-stream 4.2.0!

Binding visualization and control in Kafka Streams binder

Starting with version 3.1.2, Kafka Streams binder supports binding visualization and control. The only two lifecycle phases supported are STOPPED and STARTED. The lifecycle phases PAUSED and RESUMED are not available in Kafka Streams binder.

In order to activate binding visualization and control, the application needs to include the following two dependencies.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

If you prefer using webflux, you can then include spring-boot-starter-webflux instead of the standard web dependency.

In addition, you also need to set the following property:

management.endpoints.web.exposure.include=bindings

To illustrate this feature further, let us use the following application as a guide:

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

As we can see, the application has two Kafka Streams functions - one, a consumer and another a function. The consumer binding is named by default as consumer-in-0. Similarly, for the function, the input binding is function-in-0 and the output binding is function-out-0.

Once the application is started, we can find details about the bindings using the following bindings endpoint.

 curl http://localhost:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

The details about all three bindings can be found above.

Let us now stop the consumer-in-0 binding.

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

At this point, no records will be received through this binding.

Start the binding again.

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

When there are multiple bindings present on a single function, invoking these operations on any of those bindings will work. This is because all the bindings on a single function are backed by the same StreamsBuilderFactoryBean. Therefore, for the function above, either function-in-0 or function-out-0 will work.