This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4! |
Tracing using Spring Cloud Sleuth
When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information.
However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code.
This can be done by injecting the KafkaStreamsTracing
bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean.
Here are some examples of using it.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
In the example above, there are two places where it adds explicit tracing instrumentation.
First, we are logging the key/value information from the incoming KStream
.
When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id.
Second, when we call a map
operation, instead of calling it directly on the KStream
class, we wrap it inside a transform
operation and then call map
from KafkaStreamsTracing
.
In this case also, the logged message will contain the span ID and trace ID.
Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers. When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}