Connecting to Kafka

Starting with version 2.5, each of these extends KafkaResourceFactory. This allows changing the bootstrap servers at runtime by adding a Supplier<String> to their configuration: setBootstrapServersSupplier(() -> …​). This will be called for all new connections to get the list of servers. Consumers and Producers are generally long-lived. To close existing Producers, call reset() on the DefaultKafkaProducerFactory. To close existing Consumers, call stop() (and then start()) on the KafkaListenerEndpointRegistry and/or stop() and start() on any other listener container beans.

For convenience, the framework also provides an ABSwitchCluster which supports two sets of bootstrap servers; one of which is active at any time. Configure the ABSwitchCluster and add it to the producer and consumer factories, and the KafkaAdmin, by calling setBootstrapServersSupplier(). When you want to switch, call primary() or secondary() and call reset() on the producer factory to establish new connection(s); for consumers, stop() and start() all listener containers. When using @KafkaListeners, stop() and start() the KafkaListenerEndpointRegistry bean.

See the Javadocs for more information.

Factory Listeners

Starting with version 2.5, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory can be configured with a Listener to receive notifications whenever a producer or consumer is created or closed.

Producer Factory Listener
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
Consumer Factory Listener
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

In each case, the id is created by appending the client-id property (obtained from the metrics() after creation) to the factory beanName property, separated by ..

These listeners can be used, for example, to create and bind a Micrometer KafkaClientMetrics instance when a new client is created (and close it when the client is closed).

The framework provides listeners that do exactly that; see Micrometer Native Metrics.