Basic Example using the Reactive Kafka Binder

In this section, we show some basic code snippets for writing a reactive Kafka application using the reactive binder and details around them.

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

You can use the above upppercase function with both message channel based Kafka binder (spring-cloud-stream-binder-kafka) as well as the reactive Kafka binder (spring-cloud-stream-binder-kafka-reactive), the topic of discussion in this section. When using this function with the regular Kafka binder, although you are using reactive types in the application (i.e., in the uppercase function), you only get the reactive streams within the execution of your function. Outside the function’s execution context, there is no reactive benefits since the underlying binder is not based on the reactive stack. Therefore, although this might look like it is bringing a full end-to-end reactive stack, this application is only partially reactive.

Now assume that you are using the proper reactive binder for Kafka - spring-cloud-stream-binder-kafka-reactive with the above function’s application. This binder implementation will give the full reactive benefits all the way from consumption on the top end to publishing at the bottom end of the chain. This is because the underlying binder is built on top of Reactor Kafka's core API’s. On the consumer side, it makes use of the KafkaReceiver which is a reactive implementation of a Kafka consumer. Similarly, on the producer side, it uses KafkaSender API which is the reactive implementation of a Kafka producer. Since the foundations of the reactive Kafka binder is built upon a proper reactive Kafka API, applications get the full benefits of using reactive technologies. Things like automatic back pressure, among other reactive capabilities, are built-in for the application when using this reactive Kafka binder.

Starting with version 4.0.2, you can customize the ReceiverOptions and SenderOptions by providing one or more ReceiverOptionsCustomizer or SenderOptionsCustomizer beans respectively. They are BiFunction s which receive the binding name and initial options, returning the customized options. The interfaces extend Ordered so the customizers will be applied in the order required, when more than one are present.

The binder does not commit offsets by default. Starting with version 4.0.2, the KafkaHeaders.ACKNOWLEDGMENT header contains a ReceiverOffset object which allows you to cause the offset to be committed by calling its acknowledge() or commit() methods.
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

Refer to the reactor-kafka documentation and javadocs for more information.

In addition, starting with version 4.0.3, the Kafka consumer property reactiveAtmostOnce can be set to true and the binder will automatically commit the offsets before records returned by each poll are processed. Also, starting with version 4.0.3, you can set the consumer property reactiveAutoCommit to true and the the binder will automatically commit the offsets after the records returned by each poll are processed. In these cases, the acknowledgment header is not present.

4.0.2 also provided reactiveAutoCommit, but the implementation was incorrect, it behaved similarly to reactiveAtMostOnce.

The following is an example of how to use reaciveAutoCommit.

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

Note that reactor-kafka returns a Flux<Flux<ConsumerRecord<?, ?>>> when using auto commit. Given that Spring has no access to the contents of the inner flux, the application must deal with the native ConsumerRecord; there is no message conversion or conversion service applied to the contents. This requires the use of native decoding (by specifying a Deserializer of the appropriate type in the configuration) to return record keys/values of the desired types.