@Target(value=TYPE) @Retention(value=RUNTIME) @Documented @Import(value=KafkaBootstrapConfiguration.class) public @interface EnableKafka
AbstractListenerContainerFactory
. To be used on
Configuration
classes as
follows:
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }The
KafkaListenerContainerFactory
is responsible to create the listener
container for a particular endpoint. Typical implementations, as the
ConcurrentKafkaListenerContainerFactory
used in the sample above, provides the necessary
configuration options that are supported by the underlying
MessageListenerContainer
.
@EnableKafka
enables detection of KafkaListener
annotations on any
Spring-managed bean in the container. For example, given a class MyService
:
package com.acme.foo; public class MyService { @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg) { // process incoming message } }The container factory to use is identified by the
containerFactory
attribute defining the name
of the KafkaListenerContainerFactory
bean to use. When none is set a
KafkaListenerContainerFactory
bean with name
kafkaListenerContainerFactory
is assumed to be present.
the following configuration would ensure that every time a message is received from
topic "myQueue", MyService.process()
is called with the content of the message:
@Configuration @EnableKafka public class AppConfig { @Bean public MyService myService() { return new MyService(); } // Kafka infrastructure setup }Alternatively, if
MyService
were annotated with @Component
, the
following configuration would ensure that its @KafkaListener
annotated method
is invoked with a matching incoming message:
@Configuration @EnableKafka @ComponentScan(basePackages = "com.acme.foo") public class AppConfig { }Note that the created containers are not registered with the application context but can be easily located for management purposes using the
KafkaListenerEndpointRegistry
.
Annotated methods can use a flexible signature; in particular, it is possible to use
the Message
abstraction and related
annotations, see KafkaListener
Javadoc for more details. For instance, the
following would inject the content of the message and the kafka partition
header:
@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg, @Header("kafka_partition") int partition) { // process incoming message }These features are abstracted by the
MessageHandlerMethodFactory
that is responsible to build the necessary invoker to
process the annotated method. By default,
DefaultMessageHandlerMethodFactory
is used.
When more control is desired, a @Configuration
class may implement
KafkaListenerConfigurer
. This allows access to the underlying
KafkaListenerEndpointRegistrar
instance. The following example demonstrates how to
specify an explicit default KafkaListenerContainerFactory
{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setContainerFactory(myKafkaListenerContainerFactory()); } @Bean public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() { // factory settings } @Bean public MyService myService() { return new MyService(); } } }It is also possible to specify a custom
KafkaListenerEndpointRegistry
in case you need more control on the way the containers
are created and managed. The example below also demonstrates how to customize the
KafkaHandlerMethodFactory
to use with a custom
Validator
so that payloads annotated
with Validated
are first
validated against a custom Validator
.
{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry()); registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory); } @Bean public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() { // registry configuration } @Bean public KafkaHandlerMethodFactory myMessageHandlerMethodFactory() { DefaultKafkaHandlerMethodFactory factory = new DefaultKafkaHandlerMethodFactory(); factory.setValidator(new MyValidator()); return factory; } @Bean public MyService myService() { return new MyService(); } } }Implementing
KafkaListenerConfigurer
also allows for fine-grained control over
endpoints registration via the KafkaListenerEndpointRegistrar
. For example, the
following configures an extra endpoint:
{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint(); // ... configure the endpoint registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory()); } @Bean public MyService myService() { return new MyService(); } @Bean public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() { // ... } // Kafka infrastructure setup } }Note that all beans implementing
KafkaListenerConfigurer
will be detected and
invoked in a similar fashion. The example above can be translated in a regular bean
definition registered in the context in case you use the XML configuration.KafkaListener
,
KafkaListenerAnnotationBeanPostProcessor
,
KafkaListenerEndpointRegistrar
,
KafkaListenerEndpointRegistry