Partition support on the outbound

A Kafka Streams processor usually sends the processed output into an outbound Kafka topic. If the outbound topic is partitioned and the processor needs to send the outgoing data into particular partitions, the applications needs to provide a bean of type StreamPartitioner. See StreamPartitioner for more details. Let’s see some examples.

This is the same processor we already saw multiple times,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

Here is the output binding destination:

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

If the topic outputTopic has 4 partitions, if you don’t provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case. Let’s say, you want to send any key that matches to spring to partition 0, cloud to partition 1, stream to partition 2, and everything else to partition 3. This is what you need to do in the application.

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions. Therefore, you can implement complex partitioning strategies if need be.

You also need to provide this bean name along with the application configuration.

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

Each output topic in the application needs to be configured separately like this.