Annotation Interface EnableKafka
AbstractListenerContainerFactory. To be used on
 Configuration classes as
 follows:
 
 @Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }
 
 The KafkaListenerContainerFactory is responsible to create the listener
 container for a particular endpoint. Typical implementations, as the
 ConcurrentKafkaListenerContainerFactory used in the sample above, provides the
 necessary configuration options that are supported by the underlying
 MessageListenerContainer.
 
 @EnableKafka enables detection of KafkaListener annotations on any
 Spring-managed bean in the container. For example, given a class MyService:
 
 package com.acme.foo;
 public class MyService {
        @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
        public void process(String msg) {
                // process incoming message
        }
 }
 
 The container factory to use is identified by the
 containerFactory attribute defining the name
 of the KafkaListenerContainerFactory bean to use. When none is set a
 KafkaListenerContainerFactory bean with name
 kafkaListenerContainerFactory is assumed to be present.
 
 the following configuration would ensure that every time a message is received from
 topic "myQueue", MyService.process() is called with the content of the message:
 
 @Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public MyService myService() {
                return new MyService();
        }
        // Kafka infrastructure setup
 }
 
 Alternatively, if MyService were annotated with @Component, the
 following configuration would ensure that its @KafkaListener annotated method
 is invoked with a matching incoming message:
 
 @Configuration
 @EnableKafka
 @ComponentScan(basePackages = "com.acme.foo")
 public class AppConfig {
 }
 
 Note that the created containers are not registered with the application context but
 can be easily located for management purposes using the
 KafkaListenerEndpointRegistry.
 
 Annotated methods can use a flexible signature; in particular, it is possible to use
 the Message abstraction and related
 annotations, see KafkaListener Javadoc for more details. For instance, the
 following would inject the content of the message and the kafka partition header:
 
 @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
 public void process(String msg, @Header("kafka_partition") int partition) {
        // process incoming message
 }
 
 These features are abstracted by the
 MessageHandlerMethodFactory that is responsible to build the necessary invoker to
 process the annotated method. By default,
 DefaultMessageHandlerMethodFactory is used.
 
 When more control is desired, a @Configuration class may implement
 KafkaListenerConfigurer. This allows access to the underlying
 KafkaListenerEndpointRegistrar instance. The following example demonstrates how to
 specify an explicit default KafkaListenerContainerFactory
 
        @Configuration
        @EnableKafka
        public class AppConfig implements KafkaListenerConfigurer {
                @Override
                public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                        registrar.setContainerFactory(myKafkaListenerContainerFactory());
                }
                @Bean
                public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() {
                        // factory settings
                }
                @Bean
                public MyService myService() {
                        return new MyService();
                }
        }
 
 It is also possible to specify a custom
 KafkaListenerEndpointRegistry in case you need more control on the way the containers
 are created and managed. The example below also demonstrates how to customize the
 DefaultMessageHandlerMethodFactory
 as well as how to supply a custom Validator so that payloads annotated with
 Validated are first
 validated against a custom Validator.
 
        @Configuration
        @EnableKafka
        public class AppConfig implements KafkaListenerConfigurer {
                @Override
                public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                        registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
                        registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
                        registrar.setValidator(new MyValidator());
                }
                @Bean
                public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() {
                        // registry configuration
                }
                @Bean
                public MessageHandlerMethodFactory myMessageHandlerMethodFactory() {
                        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
                        // factory configuration
                        return factory;
                }
                @Bean
                public MyService myService() {
                        return new MyService();
                }
        }
 
 Implementing KafkaListenerConfigurer also allows for fine-grained control over
 endpoints registration via the KafkaListenerEndpointRegistrar. For example, the
 following configures an extra endpoint:
 
        @Configuration
        @EnableKafka
        public class AppConfig implements KafkaListenerConfigurer {
                @Override
                public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                        SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint();
                        // ... configure the endpoint
                        registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory());
                }
                @Bean
                public MyService myService() {
                        return new MyService();
                }
                @Bean
                public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() {
                        // ...
                }
                // Kafka infrastructure setup
        }
 
 Note that all beans implementing KafkaListenerConfigurer will be detected and
 invoked in a similar fashion. The example above can be translated in a regular bean
 definition registered in the context in case you use the XML configuration.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also: