This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.3! |
Observability in Reactive Kafka Binder
This section describes how Micrometer-based observability is enabled in the reactive Kafka binder.
Producer Binding
There is built-in support for observability in producer binding. To enable it, set the following property:
spring.cloud.stream.kafka.binder.enable-observation
When this property is set to true
, you can observe the publishing of records.
Both publishing records using StreamBridge
and regular Supplier<?>
beans can be observed.
Consumer Binding
Enabling observability on the consumer side is more complex than on the producer side. There are two starting points for consumer binding:
-
A topic where data is published via a producer binding
-
A topic where data is produced outside of Spring Cloud Stream
In the first case, the application ideally wants to carry the observability headers down to the consumer inbound. In the second case, if there was no upstream observation started, it will start a new observation.
Example: Function with Observability
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);
return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
In this example:
-
When a record is received, an observation is created.
-
If there’s an upstream observation, it will be part of the
KafkaRecordReceiverContext
. -
A
Mono
is created with context deferred. -
When the
map
operation is invoked, the context has access to the correct observation. -
The result of the
flatMap
operation is sent back to the binding asFlux<Message<?>>
. -
The outbound record will have the same observability headers from the input binding.
Example: Consumer with Observability
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
In this case:
-
Since there’s no output binding,
doOnNext
is used on theFlux
instead offlatMap
. -
The direct call to
observe
starts the observation and properly shuts it down when finished.