Rebalancing Listeners
ContainerProperties
has a property called consumerRebalanceListener
, which takes an implementation of the Kafka client’s ConsumerRebalanceListener
interface.
If this property is not provided, the container configures a logging listener that logs rebalance events at the INFO
level.
The framework also adds a sub-interface ConsumerAwareRebalanceListener
.
The following listing shows the ConsumerAwareRebalanceListener
interface definition:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
Notice that there are two callbacks when partitions are revoked. The first is called immediately. The second is called after any pending offsets are committed. This is useful if you wish to maintain offsets in some external repository, as the following example shows:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
Starting with version 2.4, a new method onPartitionsLost() has been added (similar to a method with the same name in ConsumerRebalanceLister ).
The default implementation on ConsumerRebalanceLister simply calls onPartionsRevoked .
The default implementation on ConsumerAwareRebalanceListener does nothing.
When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call onPartitionsRevoked from onPartitionsLost .
If you implement ConsumerRebalanceListener you should override the default method.
This is because the listener container will call its own onPartitionsRevoked from its implementation of onPartitionsLost after calling the method on your implementation.
If you implementation delegates to the default behavior, onPartitionsRevoked will be called twice each time the Consumer calls that method on the container’s listener.
|