For the latest stable version, please use spring-cloud-stream 4.2.0! |
Consuming Records
In the above upppercase
function, we are consuming the record as Flux<String>
and then produce it as Flux<String>
.
There might be occasions in which you need to receive the record in the original received format - the ReceiverRecord
.
Here is such a function.
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
In this function, note that, we are consuming the record as Flux<ReceiverRecord<byte[], byte[]>>
and then producing it as Flux<String>
.
ReceiverRecord
is the basic received record which is a specialized Kafka ConsumerRecord
in Reactor Kafka.
When using the reactive Kafka binder, the above function will give you access to the ReceiverRecord
type for each incoming record.
However, in this case, you need to provide a custom implementation for a RecordMessageConverter.
By default, the reactive Kafka binder uses a MessagingMessageConverter that converts the payload and headers from the ConsumerRecord
.
Therefore, by the time your handler method receives it, the payload is already extracted from the received record and passed onto the method as in the case of the first function we looked above.
By providing a custom RecordMessageConverter
implementation in the application, you can override the default behavior.
For example, if you want to consume the record as raw Flux<ReceiverRecord<byte[], byte[]>>
, then you can provide the following bean definition in the application.
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
Then, you need to instruct the framework to use this converter for the required binding.
Here is an example based on our lowercase
function.
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
is the input binding name for our lowercase
function.
For the outbound (lowecase-out-0
), we still use the regular MessagingMessageConverter
.
In the toMessage
implementation above, we receive the raw ConsumerRecord
(ReceiverRecord
since we are in a reactive binder context) and then wrap it inside a Message
.
Then that message payload which is the ReceiverRecord
is provided to the user method.
If reactiveAutoCommit
is false
(default), call rec.receiverOffset().acknowledge()
(or commit()
) to cause the offset to be committed; if reactiveAutoCommit
is true
, the flux supplies ConsumerRecord
s instead.
Refer to the reactor-kafka
documentation and javadocs for more information.