Consuming Records

In the above upppercase function, we are consuming the record as Flux<String> and then produce it as Flux<String>. There might be occasions in which you need to receive the record in the original received format - the ReceiverRecord. Here is such a function.

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

In this function, note that, we are consuming the record as Flux<ReceiverRecord<byte[], byte[]>> and then producing it as Flux<String>. ReceiverRecord is the basic received record which is a specialized Kafka ConsumerRecord in Reactor Kafka. When using the reactive Kafka binder, the above function will give you access to the ReceiverRecord type for each incoming record. However, in this case, you need to provide a custom implementation for a RecordMessageConverter. By default, the reactive Kafka binder uses a MessagingMessageConverter that converts the payload and headers from the ConsumerRecord. Therefore, by the time your handler method receives it, the payload is already extracted from the received record and passed onto the method as in the case of the first function we looked above. By providing a custom RecordMessageConverter implementation in the application, you can override the default behavior. For example, if you want to consume the record as raw Flux<ReceiverRecord<byte[], byte[]>>, then you can provide the following bean definition in the application.

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

Then, you need to instruct the framework to use this converter for the required binding. Here is an example based on our lowercase function.

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0 is the input binding name for our lowercase function. For the outbound (lowecase-out-0), we still use the regular MessagingMessageConverter.

In the toMessage implementation above, we receive the raw ConsumerRecord (ReceiverRecord since we are in a reactive binder context) and then wrap it inside a Message. Then that message payload which is the ReceiverRecord is provided to the user method.

If reactiveAutoCommit is false (default), call rec.receiverOffset().acknowledge() (or commit()) to cause the offset to be committed; if reactiveAutoCommit is true, the flux supplies ConsumerRecord s instead. Refer to the reactor-kafka documentation and javadocs for more information.