Annotation Interface EnableKafka
AbstractListenerContainerFactory. To be used on
Configuration classes as
follows:
@Configuration
@EnableKafka
public class AppConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
return factory;
}
// other @Bean definitions
}
The KafkaListenerContainerFactory is responsible to create the listener
container for a particular endpoint. Typical implementations, as the
ConcurrentKafkaListenerContainerFactory used in the sample above, provides the
necessary configuration options that are supported by the underlying
MessageListenerContainer.
@EnableKafka enables detection of KafkaListener annotations on any
Spring-managed bean in the container. For example, given a class MyService:
package com.acme.foo;
public class MyService {
@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
public void process(String msg) {
// process incoming message
}
}
The container factory to use is identified by the
containerFactory attribute defining the name
of the KafkaListenerContainerFactory bean to use. When none is set a
KafkaListenerContainerFactory bean with name
kafkaListenerContainerFactory is assumed to be present.
the following configuration would ensure that every time a message is received from
topic "myQueue", MyService.process() is called with the content of the message:
@Configuration
@EnableKafka
public class AppConfig {
@Bean
public MyService myService() {
return new MyService();
}
// Kafka infrastructure setup
}
Alternatively, if MyService were annotated with @Component, the
following configuration would ensure that its @KafkaListener annotated method
is invoked with a matching incoming message:
@Configuration
@EnableKafka
@ComponentScan(basePackages = "com.acme.foo")
public class AppConfig {
}
Note that the created containers are not registered with the application context but
can be easily located for management purposes using the
KafkaListenerEndpointRegistry.
Annotated methods can use a flexible signature; in particular, it is possible to use
the Message abstraction and related
annotations, see KafkaListener Javadoc for more details. For instance, the
following would inject the content of the message and the kafka partition header:
@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
public void process(String msg, @Header("kafka_partition") int partition) {
// process incoming message
}
These features are abstracted by the
MessageHandlerMethodFactory that is responsible to build the necessary invoker to
process the annotated method. By default,
DefaultMessageHandlerMethodFactory is used.
When more control is desired, a @Configuration class may implement
KafkaListenerConfigurer. This allows access to the underlying
KafkaListenerEndpointRegistrar instance. The following example demonstrates how to
specify an explicit default KafkaListenerContainerFactory
@Configuration
@EnableKafka
public class AppConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myKafkaListenerContainerFactory());
}
@Bean
public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() {
// factory settings
}
@Bean
public MyService myService() {
return new MyService();
}
}
It is also possible to specify a custom
KafkaListenerEndpointRegistry in case you need more control on the way the containers
are created and managed. The example below also demonstrates how to customize the
DefaultMessageHandlerMethodFactory
as well as how to supply a custom Validator so that payloads annotated with
Validated are first
validated against a custom Validator.
@Configuration
@EnableKafka
public class AppConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
registrar.setValidator(new MyValidator());
}
@Bean
public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() {
// registry configuration
}
@Bean
public MessageHandlerMethodFactory myMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// factory configuration
return factory;
}
@Bean
public MyService myService() {
return new MyService();
}
}
Implementing KafkaListenerConfigurer also allows for fine-grained control over
endpoints registration via the KafkaListenerEndpointRegistrar. For example, the
following configures an extra endpoint:
@Configuration
@EnableKafka
public class AppConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint();
// ... configure the endpoint
registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory());
}
@Bean
public MyService myService() {
return new MyService();
}
@Bean
public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() {
// ...
}
// Kafka infrastructure setup
}
Note that all beans implementing KafkaListenerConfigurer will be detected and
invoked in a similar fashion. The example above can be translated in a regular bean
definition registered in the context in case you use the XML configuration.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also: