This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0!

Custom Kafka Binder Health Indicator

Overriding Default Kafka Binder Health Indicator

Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath. This health indicator checks the health of the binder and any communication issues with the Kafka broker. If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for KafkaBinderHealth interface. KafkaBinderHealth is a marker interface that extends from HealthIndicator. In the custom implementation, it must provide an implementation for the health() method. The custom implementation must be present in the application configuration as a bean. When the binder discovers the custom implementation, it will use that instead of the default implementation. Here is an example of such a custom implementation bean in the application.

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

Custom kafka Binder Health Indicator Example

Here is the pseudo-code for writing a custom Kafka binder HealthIndicator. In this example, we try to override the binder provided Kafka HealthIndicator by specifically checking first for cluster connectivity and then followed by topic-related issues.

First, we need create a custom implementation of the KafkaBinderHealth interface.

public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
    @Value("${spring.cloud.bus.destination}")
    private String topic;
    private final AdminClient client;

    public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
		// More about configuring Kafka
		// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
        this.client = AdminClient.create(admin.getConfigurationProperties());
    }

    @Override
    public Health health() {
        if (!checkBrokersConnection()) {
            logger.error("Error when connect brokers");
			return Health.down().withDetail("BrokersConnectionError", "Error message").build();
        }
		if (!checkTopicConnection()) {
			logger.error("Error when trying to connect with specific topic");
			return Health.down().withDetail("TopicError", "Error message with topic name").build();
		}
        return Health.up().build();
    }

    public boolean checkBrokersConnection() {
        // Your implementation
    }

    public boolean checkTopicConnection() {
		// Your implementation
    }
}

Then we need to create a bean for the custom implementation.

@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
	@Bean
	public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
		return new KafkaBinderHealthImplementation(admin);
	}
}