This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!

Observability in Reactive Kafka Binder

This section describes how Micrometer-based observability is enabled in the reactive Kafka binder.

Producer Binding

There is built-in support for observability in producer binding. To enable it, set the following property:

spring.cloud.stream.kafka.binder.enable-observation

When this property is set to true, you can observe the publishing of records. Both publishing records using StreamBridge and regular Supplier<?> beans can be observed.

Consumer Binding

Enabling observability on the consumer side is more complex than on the producer side. There are two starting points for consumer binding:

  1. A topic where data is published via a producer binding

  2. A topic where data is produced outside of Spring Cloud Stream

In the first case, the application ideally wants to carry the observability headers down to the consumer inbound. In the second case, if there was no upstream observation started, it will start a new observation.

Example: Function with Observability

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);

		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}

In this example:

  1. When a record is received, an observation is created.

  2. If there’s an upstream observation, it will be part of the KafkaRecordReceiverContext.

  3. A Mono is created with context deferred.

  4. When the map operation is invoked, the context has access to the correct observation.

  5. The result of the flatMap operation is sent back to the binding as Flux<Message<?>>.

  6. The outbound record will have the same observability headers from the input binding.

Example: Consumer with Observability

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}

In this case:

  1. Since there’s no output binding, doOnNext is used on the Flux instead of flatMap.

  2. The direct call to observe starts the observation and properly shuts it down when finished.