This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0!

Tracing using Spring Cloud Sleuth

When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information. However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code. This can be done by injecting the KafkaStreamsTracing bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean. Here are some examples of using it.

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

In the example above, there are two places where it adds explicit tracing instrumentation. First, we are logging the key/value information from the incoming KStream. When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id. Second, when we call a map operation, instead of calling it directly on the KStream class, we wrap it inside a transform operation and then call map from KafkaStreamsTracing. In this case also, the logged message will contain the span ID and trace ID.

Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers. When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}