Annotation Type EnableKafka
-
@Target(TYPE) @Retention(RUNTIME) @Documented @Import(KafkaListenerConfigurationSelector.class) public @interface EnableKafka
Enable Kafka listener annotated endpoints that are created under the covers by aAbstractListenerContainerFactory
. To be used onConfiguration
classes as follows:@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }
TheKafkaListenerContainerFactory
is responsible to create the listener container for a particular endpoint. Typical implementations, as theConcurrentKafkaListenerContainerFactory
used in the sample above, provides the necessary configuration options that are supported by the underlyingMessageListenerContainer
.@EnableKafka
enables detection ofKafkaListener
annotations on any Spring-managed bean in the container. For example, given a classMyService
:package com.acme.foo; public class MyService { @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg) { // process incoming message } }
The container factory to use is identified by thecontainerFactory
attribute defining the name of theKafkaListenerContainerFactory
bean to use. When none is set aKafkaListenerContainerFactory
bean with namekafkaListenerContainerFactory
is assumed to be present.the following configuration would ensure that every time a message is received from topic "myQueue",
MyService.process()
is called with the content of the message:@Configuration @EnableKafka public class AppConfig { @Bean public MyService myService() { return new MyService(); } // Kafka infrastructure setup }
Alternatively, ifMyService
were annotated with@Component
, the following configuration would ensure that its@KafkaListener
annotated method is invoked with a matching incoming message:@Configuration @EnableKafka @ComponentScan(basePackages = "com.acme.foo") public class AppConfig { }
Note that the created containers are not registered with the application context but can be easily located for management purposes using theKafkaListenerEndpointRegistry
.Annotated methods can use a flexible signature; in particular, it is possible to use the
Message
abstraction and related annotations, seeKafkaListener
Javadoc for more details. For instance, the following would inject the content of the message and the kafka partition header:@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg, @Header("kafka_partition") int partition) { // process incoming message }
These features are abstracted by theMessageHandlerMethodFactory
that is responsible to build the necessary invoker to process the annotated method. By default,DefaultMessageHandlerMethodFactory
is used.When more control is desired, a
@Configuration
class may implementKafkaListenerConfigurer
. This allows access to the underlyingKafkaListenerEndpointRegistrar
instance. The following example demonstrates how to specify an explicit defaultKafkaListenerContainerFactory
@Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setContainerFactory(myKafkaListenerContainerFactory()); } @Bean public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() { // factory settings } @Bean public MyService myService() { return new MyService(); } }
It is also possible to specify a customKafkaListenerEndpointRegistry
in case you need more control on the way the containers are created and managed. The example below also demonstrates how to customize theDefaultMessageHandlerMethodFactory
as well as how to supply a customValidator
so that payloads annotated withValidated
are first validated against a customValidator
.@Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry()); registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory); registrar.setValidator(new MyValidator()); } @Bean public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() { // registry configuration } @Bean public MessageHandlerMethodFactory myMessageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); // factory configuration return factory; } @Bean public MyService myService() { return new MyService(); } }
ImplementingKafkaListenerConfigurer
also allows for fine-grained control over endpoints registration via theKafkaListenerEndpointRegistrar
. For example, the following configures an extra endpoint:@Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint(); // ... configure the endpoint registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory()); } @Bean public MyService myService() { return new MyService(); } @Bean public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() { // ... } // Kafka infrastructure setup }
Note that all beans implementingKafkaListenerConfigurer
will be detected and invoked in a similar fashion. The example above can be translated in a regular bean definition registered in the context in case you use the XML configuration.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
KafkaListener
,KafkaListenerAnnotationBeanPostProcessor
,KafkaListenerEndpointRegistrar
,KafkaListenerEndpointRegistry