Annotation Type EnableKafka
-
@Target(TYPE) @Retention(RUNTIME) @Documented @Import(KafkaListenerConfigurationSelector.class) public @interface EnableKafka
Enable Kafka listener annotated endpoints that are created under the covers by aAbstractListenerContainerFactory. To be used onConfigurationclasses as follows:@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }TheKafkaListenerContainerFactoryis responsible to create the listener container for a particular endpoint. Typical implementations, as theConcurrentKafkaListenerContainerFactoryused in the sample above, provides the necessary configuration options that are supported by the underlyingMessageListenerContainer.@EnableKafkaenables detection ofKafkaListenerannotations on any Spring-managed bean in the container. For example, given a classMyService:package com.acme.foo; public class MyService { @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg) { // process incoming message } }The container factory to use is identified by thecontainerFactoryattribute defining the name of theKafkaListenerContainerFactorybean to use. When none is set aKafkaListenerContainerFactorybean with namekafkaListenerContainerFactoryis assumed to be present.the following configuration would ensure that every time a message is received from topic "myQueue",
MyService.process()is called with the content of the message:@Configuration @EnableKafka public class AppConfig { @Bean public MyService myService() { return new MyService(); } // Kafka infrastructure setup }Alternatively, ifMyServicewere annotated with@Component, the following configuration would ensure that its@KafkaListenerannotated method is invoked with a matching incoming message:@Configuration @EnableKafka @ComponentScan(basePackages = "com.acme.foo") public class AppConfig { }Note that the created containers are not registered with the application context but can be easily located for management purposes using theKafkaListenerEndpointRegistry.Annotated methods can use a flexible signature; in particular, it is possible to use the
Messageabstraction and related annotations, seeKafkaListenerJavadoc for more details. For instance, the following would inject the content of the message and the kafka partition header:@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg, @Header("kafka_partition") int partition) { // process incoming message }These features are abstracted by theMessageHandlerMethodFactorythat is responsible to build the necessary invoker to process the annotated method. By default,DefaultMessageHandlerMethodFactoryis used.When more control is desired, a
@Configurationclass may implementKafkaListenerConfigurer. This allows access to the underlyingKafkaListenerEndpointRegistrarinstance. The following example demonstrates how to specify an explicit defaultKafkaListenerContainerFactory{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setContainerFactory(myKafkaListenerContainerFactory()); } @Bean public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() { // factory settings } @Bean public MyService myService() { return new MyService(); } } }It is also possible to specify a customKafkaListenerEndpointRegistryin case you need more control on the way the containers are created and managed. The example below also demonstrates how to customize theDefaultMessageHandlerMethodFactoryas well as how to supply a customValidatorso that payloads annotated withValidatedare first validated against a customValidator.{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry()); registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory); registrar.setValidator(new MyValidator()); } @Bean public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() { // registry configuration } @Bean public MessageHandlerMethodFactory myMessageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); // factory configuration return factory; } @Bean public MyService myService() { return new MyService(); } } }ImplementingKafkaListenerConfigureralso allows for fine-grained control over endpoints registration via theKafkaListenerEndpointRegistrar. For example, the following configures an extra endpoint:{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint(); // ... configure the endpoint registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory()); } @Bean public MyService myService() { return new MyService(); } @Bean public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() { // ... } // Kafka infrastructure setup } }Note that all beans implementingKafkaListenerConfigurerwill be detected and invoked in a similar fashion. The example above can be translated in a regular bean definition registered in the context in case you use the XML configuration.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
KafkaListener,KafkaListenerAnnotationBeanPostProcessor,KafkaListenerEndpointRegistrar,KafkaListenerEndpointRegistry