This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0!

Monitoring

Monitoring Listener Performance

Starting with version 2.3, the listener container will automatically create and update Micrometer Timers for the listener, if Micrometer is detected on the classpath, and a single MeterRegistry is present in the application context. The timers can be disabled by setting the ContainerProperty's micrometerEnabled to false.

Two timers are maintained - one for successful calls to the listener and one for failures.

The timers are named spring.kafka.listener and have the following tags:

  • name : (container bean name)

  • result : success or failure

  • exception : none or ListenerExecutionFailedException

You can add additional tags using the ContainerProperties's micrometerTags property.

Starting with versions 2.9.8, 3.0.6, you can provide a function in ContainerProperties's micrometerTagsProvider; the function receives the ConsumerRecord<?, ?> and returns tags which can be based on that record, and merged with any static tags in micrometerTags.

With the concurrent container, timers are created for each thread and the name tag is suffixed with -n where n is 0 to concurrency-1.

Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer Timers for send operations, if Micrometer is detected on the classpath, and a single MeterRegistry is present in the application context. The timers can be disabled by setting the template’s micrometerEnabled property to false.

Two timers are maintained - one for successful calls to the listener and one for failures.

The timers are named spring.kafka.template and have the following tags:

  • name : (template bean name)

  • result : success or failure

  • exception : none or the exception class name for failures

You can add additional tags using the template’s micrometerTags property.

Starting with versions 2.9.8, 3.0.6, you can provide a KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) property; the function receives the ProducerRecord<?, ?> and returns tags which can be based on that record, and merged with any static tags in micrometerTags.

Micrometer Native Metrics

Starting with version 2.5, the framework provides Factory Listeners to manage a Micrometer KafkaClientMetrics instance whenever producers and consumers are created and closed.

To enable this feature, simply add the listeners to your producer and consumer factories:

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

The consumer/producer id passed to the listener is added to the meter’s tags with tag name spring.id.

An example of obtaining one of the Kafka metrics
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

A similar listener is provided for the StreamsBuilderFactoryBean - see KafkaStreams Micrometer Support.

Starting with version 3.3, a KafkaMetricsSupport abstract class is introduced to manage io.micrometer.core.instrument.binder.kafka.KafkaMetrics binding into a MeterRegistry for provided Kafka client. This class is a super for the mentioned above MicrometerConsumerListener, MicrometerProducerListener and KafkaStreamsMicrometerListener. However, it can be used for any Kafka client use-cases. The class needs to be extended and its bindClient() and unbindClient() API have to be called to connect Kafka client metrics with a Micrometer collector.

Micrometer Observation

Using Micrometer for observation is now supported, since version 3.0, for the KafkaTemplate and listener containers.

Set observationEnabled to true on the KafkaTemplate and ContainerProperties to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.

Micrometer Observation does not support batch listener; this will enable Micrometer Timers

Refer to Micrometer Tracing for more information.

To add tags to timers/traces, configure a custom KafkaTemplateObservationConvention or KafkaListenerObservationConvention to the template or listener container, respectively.

The default implementations add the bean.name tag for template observations and listener.id tag for containers.

You can either subclass DefaultKafkaTemplateObservationConvention or DefaultKafkaListenerObservationConvention or provide completely new implementations.

See Micrometer Observation Documentation for details of the default observations that are recorded.

Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records. To do so, add a custom KafkaListenerObservationConvention and/or KafkaTemplateObservationConvention to the listener container properties or KafkaTemplate respectively. The record property in both observation contexts contains the ConsumerRecord or ProducerRecord respectively.

The sender and receiver contexts remoteServiceName properties are set to the Kafka clusterId property; this is retrieved by a KafkaAdmin. If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual clusterId on the KafkaAdmin and inject it into KafkaTemplate s and listener containers. When it is null (default), the admin will invoke the describeCluster admin operation to retrieve it from the broker.