@Target(value=TYPE) @Retention(value=RUNTIME) @Documented @Import(value=KafkaBootstrapConfiguration.class) public @interface EnableKafka
AbstractListenerContainerFactory. To be used on
Configuration classes as
follows:
@Configuration
@EnableKafka
public class AppConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
return factory;
}
// other @Bean definitions
}
The KafkaListenerContainerFactory is responsible to create the listener
container for a particular endpoint. Typical implementations, as the
ConcurrentKafkaListenerContainerFactory used in the sample above, provides the necessary
configuration options that are supported by the underlying
MessageListenerContainer.
@EnableKafka enables detection of KafkaListener annotations on any
Spring-managed bean in the container. For example, given a class MyService:
package com.acme.foo;
public class MyService {
@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
public void process(String msg) {
// process incoming message
}
}
The container factory to use is identified by the
containerFactory attribute defining the name
of the KafkaListenerContainerFactory bean to use. When none is set a
KafkaListenerContainerFactory bean with name
kafkaListenerContainerFactory is assumed to be present.
the following configuration would ensure that every time a message is receied from
topic "myQueue", MyService.process() is called with the content of the message:
@Configuration
@EnableKafka
public class AppConfig {
@Bean
public MyService myService() {
return new MyService();
}
// Kafka infrastructure setup
}
Alternatively, if MyService were annotated with @Component, the
following configuration would ensure that its @KafkaListener annotated method
is invoked with a matching incoming message:
@Configuration
@EnableKafka
@ComponentScan(basePackages = "com.acme.foo")
public class AppConfig {
}
Note that the created containers are not registered with the application context but
can be easily located for management purposes using the
KafkaListenerEndpointRegistry.
Annotated methods can use a flexible signature; in particular, it is possible to use
the Message abstraction and related
annotations, see KafkaListener Javadoc for more details. For instance, the
following would inject the content of the message and a the kafka partition
header:
@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
public void process(String msg, @Header("kafka_partition") int partition) {
// process incoming message
}
These features are abstracted by the
MessageHandlerMethodFactory that is responsible to build the necessary invoker to
process the annotated method. By default,
DefaultMessageHandlerMethodFactory is used.
When more control is desired, a @Configuration class may implement
KafkaListenerConfigurer. This allows access to the underlying
KafkaListenerEndpointRegistrar instance. The following example demonstrates how to
specify an explicit default KafkaListenerContainerFactory
{
@code
@Configuration
@EnableKafka
public class AppConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myKafkaListenerContainerFactory());
}
@Bean
public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() {
// factory settings
}
@Bean
public MyService myService() {
return new MyService();
}
}
}
It is also possible to specify a custom
KafkaListenerEndpointRegistry in case you need more control on the way the containers
are created and managed. The example below also demonstrates how to customize the
KafkaHandlerMethodFactory to use with a custom
Validator so that payloads annotated
with Validated are first
validated against a custom Validator.
{
@code
@Configuration
@EnableKafka
public class AppConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
}
@Bean
public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() {
// registry configuration
}
@Bean
public KafkaHandlerMethodFactory myMessageHandlerMethodFactory() {
DefaultKafkaHandlerMethodFactory factory = new DefaultKafkaHandlerMethodFactory();
factory.setValidator(new MyValidator());
return factory;
}
@Bean
public MyService myService() {
return new MyService();
}
}
}
Implementing KafkaListenerConfigurer also allows for fine-grained control over
endpoints registration via the KafkaListenerEndpointRegistrar. For example, the
following configures an extra endpoint:
{
@code
@Configuration
@EnableKafka
public class AppConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint();
// ... configure the endpoint
registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory());
}
@Bean
public MyService myService() {
return new MyService();
}
@Bean
public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() {
// ...
}
// Kafka infrastructure setup
}
}
Note that all beans implementing KafkaListenerConfigurer will be detected and
invoked in a similar fashion. The example above can be translated in a regular bean
definition registered in the context in case you use the XML configuration.KafkaListener,
KafkaListenerAnnotationBeanPostProcessor,
KafkaListenerEndpointRegistrar,
KafkaListenerEndpointRegistry