This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Monitoring
Monitoring Listener Performance
Starting with version 2.3, the listener container will automatically create and update Micrometer Timer
s for the listener, if Micrometer
is detected on the classpath, and a single MeterRegistry
is present in the application context.
The timers can be disabled by setting the ContainerProperty
's micrometerEnabled
to false
.
Two timers are maintained - one for successful calls to the listener and one for failures.
The timers are named spring.kafka.listener
and have the following tags:
-
name
: (container bean name) -
result
:success
orfailure
-
exception
:none
orListenerExecutionFailedException
You can add additional tags using the ContainerProperties
's micrometerTags
property.
Starting with versions 2.9.8, 3.0.6, you can provide a function in ContainerProperties
's micrometerTagsProvider
; the function receives the ConsumerRecord<?, ?>
and returns tags which can be based on that record, and merged with any static tags in micrometerTags
.
With the concurrent container, timers are created for each thread and the name tag is suffixed with -n where n is 0 to concurrency-1 .
|
Monitoring KafkaTemplate Performance
Starting with version 2.5, the template will automatically create and update Micrometer Timer
s for send operations, if Micrometer
is detected on the classpath, and a single MeterRegistry
is present in the application context.
The timers can be disabled by setting the template’s micrometerEnabled
property to false
.
Two timers are maintained - one for successful calls to the listener and one for failures.
The timers are named spring.kafka.template
and have the following tags:
-
name
: (template bean name) -
result
:success
orfailure
-
exception
:none
or the exception class name for failures
You can add additional tags using the template’s micrometerTags
property.
Starting with versions 2.9.8, 3.0.6, you can provide a KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
property; the function receives the ProducerRecord<?, ?>
and returns tags which can be based on that record, and merged with any static tags in micrometerTags
.
Micrometer Native Metrics
Starting with version 2.5, the framework provides Factory Listeners to manage a Micrometer KafkaClientMetrics
instance whenever producers and consumers are created and closed.
To enable this feature, simply add the listeners to your producer and consumer factories:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
The consumer/producer id
passed to the listener is added to the meter’s tags with tag name spring.id
.
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
A similar listener is provided for the StreamsBuilderFactoryBean
- see KafkaStreams Micrometer Support.
Starting with version 3.3, a KafkaMetricsSupport
abstract class is introduced to manage io.micrometer.core.instrument.binder.kafka.KafkaMetrics
binding into a MeterRegistry
for provided Kafka client.
This class is a super for the mentioned above MicrometerConsumerListener
, MicrometerProducerListener
and KafkaStreamsMicrometerListener
.
However, it can be used for any Kafka client use-cases.
The class needs to be extended and its bindClient()
and unbindClient()
API have to be called to connect Kafka client metrics with a Micrometer collector.
Micrometer Observation
Using Micrometer for observation is now supported, since version 3.0, for the KafkaTemplate
and listener containers.
Set observationEnabled
to true
on the KafkaTemplate
and ContainerProperties
to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.
Micrometer Observation does not support batch listener; this will enable Micrometer Timers |
Refer to Micrometer Tracing for more information.
To add tags to timers/traces, configure a custom KafkaTemplateObservationConvention
or KafkaListenerObservationConvention
to the template or listener container, respectively.
The default implementations add the bean.name
tag for template observations and listener.id
tag for containers.
You can either subclass DefaultKafkaTemplateObservationConvention
or DefaultKafkaListenerObservationConvention
or provide completely new implementations.
See Micrometer Observation Documentation for details of the default observations that are recorded.
Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records.
To do so, add a custom KafkaListenerObservationConvention
and/or KafkaTemplateObservationConvention
to the listener container properties or KafkaTemplate
respectively.
The record
property in both observation contexts contains the ConsumerRecord
or ProducerRecord
respectively.
The sender and receiver contexts remoteServiceName
properties are set to the Kafka clusterId
property; this is retrieved by a KafkaAdmin
.
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual clusterId
on the KafkaAdmin
and inject it into KafkaTemplate
s and listener containers.
When it is null
(default), the admin will invoke the describeCluster
admin operation to retrieve it from the broker.