For the latest stable version, please use spring-cloud-stream 4.2.0! |
Partition support on the outbound
A Kafka Streams processor usually sends the processed output into an outbound Kafka topic.
If the outbound topic is partitioned and the processor needs to send the outgoing data into particular partitions, the applications needs to provide a bean of type StreamPartitioner
.
See StreamPartitioner for more details.
Let’s see some examples.
This is the same processor we already saw multiple times,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
Here is the output binding destination:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
If the topic outputTopic
has 4 partitions, if you don’t provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case.
Let’s say, you want to send any key that matches to spring
to partition 0, cloud
to partition 1, stream
to partition 2, and everything else to partition 3.
This is what you need to do in the application.
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions. Therefore, you can implement complex partitioning strategies if need be.
You also need to provide this bean name along with the application configuration.
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
Each output topic in the application needs to be configured separately like this.