Enforcing Consumer Rebalance
Kafka clients now support an option to trigger an enforced rebalance.
Starting with version 3.1.2
, Spring for Apache Kafka provides an option to invoke this API on the Kafka consumer via the message listener container.
When calling this API, it is simply alerting the Kafka consumer to trigger an enforced rebalance; the actual rebalance will only occur as part of the next poll()
operation.
If there is already a rebalance in progress, calling an enforced rebalance is a NO-OP.
The caller must wait for the current rebalance to complete before invoking another one.
See the javadocs for enforceRebalance
for more details.
The following code snippet shows the essence of enforcing a rebalance using the message listener container.
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
As the code above shows, the application uses the KafkaListenerEndpointRegistry
to gain access to the message listener container and then calling the enforceRebalance
API on it.
When calling the enforceRebalance
on the listener container, it delegates the call to the underlying Kafka consumer.
The Kafka consumer will trigger a rebalance as part of the next poll()
operation.