For the latest stable version, please use spring-cloud-stream 4.2.0!

Timestamp extractor

Kafka Streams allows you to control the processing of the consumer records based on various notions of timestamp. By default, Kafka Streams extracts the timestamp metadata embedded in the consumer record. You can change this default behavior by providing a different TimestampExtractor implementation per input binding. Here are some details on how that can be done.

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

Then you set the above TimestampExtractor bean name per consumer binding.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

If you skip an input consumer binding for setting a custom timestamp extractor, that consumer will use the default settings.