Partitioning with the Kafka Binder

Apache Kafka supports topic partitioning natively.

Sometimes it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing (all messages for a particular customer should go to the same partition).

The following example shows how to configure the producer and consumer side:

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
It is important to keep in mind that, since Apache Kafka supports partitioning natively, there is no need to rely on binder partitioning as described above unless you are using custom partition keys as in the example or an expression that involves the payload itself. The binder-provided partitioning selection is otherwise intended for middleware technologies that do not support native partitioning. Note that we are using a custom key called partitionKey in the above example, that will be the determining factor for the partition, thus in this case it is appropriate to use binder partitioning. When using native Kafka partitioning, i.e, when you do not provide the partition-key-expression, then Apache Kafka will select a partition, which by default will be the hash value of the record key over the available number of partitions. To add a key to an outbound record, set the KafkaHeaders.KEY header to the desired key value in a spring-messaging Message<?>. By default, when no record key is provided, Apache Kafka will choose a partition based on the logic described in the Apache Kafka Documentation.
The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups. The above configuration supports up to 12 consumer instances (6 if their concurrency is 2, 4 if their concurrency is 3, and so on). It is generally best to “over-provision” the partitions to allow for future increases in consumers or concurrency.
The preceding configuration uses the default partitioning (key.hashCode() % partitionCount). This may or may not provide a suitably balanced algorithm, depending on the key values. In particular, note that this partitioning strategy differs from the default used by a standalone Kafka producer - such as the one used by Kafka Streams, meaning that the same key value may balance differently across partitions when produced by those clients. You can override this default by using the partitionSelectorExpression or partitionSelectorClass properties.

Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side. Kafka allocates partitions across the instances.

The partitionCount for a kafka topic may change during runtime (e.g. due to an adminstration task). The calculated partitions will be different after that (e.g. new partitions will be used then). Since 4.0.3 of Spring Cloud Stream runtime changes of partition count will be supported. See also parameter 'spring.kafka.producer.properties.metadata.max.age.ms' to configure update interval. Due to some limitations it is not possible to use a 'partition-key-expression' which references the 'payload' of a message, the mechanism will be disabled in that case. The overall behavior is disabled by default and can be enabled using configuration parameter 'producer.dynamicPartitionUpdatesEnabled=true'.

The following Spring Boot application listens to a Kafka stream and prints (to the console) the partition ID to which each message goes:

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

You can add instances as needed. Kafka rebalances the partition allocations. If the instance count (or instance count * concurrency) exceeds the number of partitions, some consumers are idle.