Connecting to Kafka
-
KafkaAdmin
- see Configuring Topics -
ProducerFactory
- see Sending Messages -
ConsumerFactory
- see Receiving Messages
Starting with version 2.5, each of these extends KafkaResourceFactory
.
This allows changing the bootstrap servers at runtime by adding a Supplier<String>
to their configuration: setBootstrapServersSupplier(() -> …)
.
This will be called for all new connections to get the list of servers.
Consumers and Producers are generally long-lived.
To close existing Producers, call reset()
on the DefaultKafkaProducerFactory
.
To close existing Consumers, call stop()
(and then start()
) on the KafkaListenerEndpointRegistry
and/or stop()
and start()
on any other listener container beans.
For convenience, the framework also provides an ABSwitchCluster
which supports two sets of bootstrap servers; one of which is active at any time.
Configure the ABSwitchCluster
and add it to the producer and consumer factories, and the KafkaAdmin
, by calling setBootstrapServersSupplier()
.
When you want to switch, call primary()
or secondary()
and call reset()
on the producer factory to establish new connection(s); for consumers, stop()
and start()
all listener containers.
When using @KafkaListener
s, stop()
and start()
the KafkaListenerEndpointRegistry
bean.
See the Javadocs for more information.
Factory Listeners
Starting with version 2.5, the DefaultKafkaProducerFactory
and DefaultKafkaConsumerFactory
can be configured with a Listener
to receive notifications whenever a producer or consumer is created or closed.
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
In each case, the id
is created by appending the client-id
property (obtained from the metrics()
after creation) to the factory beanName
property, separated by .
.
These listeners can be used, for example, to create and bind a Micrometer KafkaClientMetrics
instance when a new client is created (and close it when the client is closed).
The framework provides listeners that do exactly that; see Micrometer Native Metrics.
Default client ID prefixes
Starting with version 3.2, for Spring Boot applications which define an application name using the spring.application.name
property, this name is now used
as a default prefix for auto-generated client IDs for these client types:
-
consumer clients which don’t use a consumer group
-
producer clients
-
admin clients
This makes it easier to identify these clients at server side for troubleshooting or applying quotas.
Client Type | Without application name | With application name |
---|---|---|
consumer without consumer group |
consumer-null-1 |
myapp-consumer-1 |
consumer with consumer group "mygroup" |
consumer-mygroup-1 |
consumer-mygroup-1 |
producer |
producer-1 |
myapp-producer-1 |
admin |
adminclient-1 |
myapp-admin-1 |