@Target(value=TYPE) @Retention(value=RUNTIME) @Documented @Import(value=KafkaBootstrapConfiguration.class) public @interface EnableKafka
AbstractListenerContainerFactory
. To be used on
Configuration
classes as
follows:
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }The
KafkaListenerContainerFactory
is responsible to create the listener
container for a particular endpoint. Typical implementations, as the
ConcurrentKafkaListenerContainerFactory
used in the sample above, provides the necessary
configuration options that are supported by the underlying
MessageListenerContainer
.
@EnableKafka
enables detection of KafkaListener
annotations on any
Spring-managed bean in the container. For example, given a class MyService
:
package com.acme.foo; public class MyService { @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg) { // process incoming message } }The container factory to use is identified by the
containerFactory
attribute defining the name
of the KafkaListenerContainerFactory
bean to use. When none is set a
KafkaListenerContainerFactory
bean with name
kafkaListenerContainerFactory
is assumed to be present.
the following configuration would ensure that every time a message is receied from
topic "myQueue", MyService.process()
is called with the content of the message:
@Configuration @EnableKafka public class AppConfig { @Bean public MyService myService() { return new MyService(); } // Kafka infrastructure setup }Alternatively, if
MyService
were annotated with @Component
, the
following configuration would ensure that its @KafkaListener
annotated method
is invoked with a matching incoming message:
@Configuration @EnableKafka @ComponentScan(basePackages = "com.acme.foo") public class AppConfig { }Note that the created containers are not registered with the application context but can be easily located for management purposes using the
KafkaListenerEndpointRegistry
.
Annotated methods can use a flexible signature; in particular, it is possible to use
the Message
abstraction and related
annotations, see KafkaListener
Javadoc for more details. For instance, the
following would inject the content of the message and a the kafka partition
header:
@KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic") public void process(String msg, @Header("kafka_partition") int partition) { // process incoming message }These features are abstracted by the
MessageHandlerMethodFactory
that is responsible to build the necessary invoker to
process the annotated method. By default,
DefaultMessageHandlerMethodFactory
is used.
When more control is desired, a @Configuration
class may implement
KafkaListenerConfigurer
. This allows access to the underlying
KafkaListenerEndpointRegistrar
instance. The following example demonstrates how to
specify an explicit default KafkaListenerContainerFactory
{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setContainerFactory(myKafkaListenerContainerFactory()); } @Bean public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() { // factory settings } @Bean public MyService myService() { return new MyService(); } } }It is also possible to specify a custom
KafkaListenerEndpointRegistry
in case you need more control on the way the containers
are created and managed. The example below also demonstrates how to customize the
KafkaHandlerMethodFactory
to use with a custom
Validator
so that payloads annotated
with Validated
are first
validated against a custom Validator
.
{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry()); registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory); } @Bean public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() { // registry configuration } @Bean public KafkaHandlerMethodFactory myMessageHandlerMethodFactory() { DefaultKafkaHandlerMethodFactory factory = new DefaultKafkaHandlerMethodFactory(); factory.setValidator(new MyValidator()); return factory; } @Bean public MyService myService() { return new MyService(); } } }Implementing
KafkaListenerConfigurer
also allows for fine-grained control over
endpoints registration via the KafkaListenerEndpointRegistrar
. For example, the
following configures an extra endpoint:
{ @code @Configuration @EnableKafka public class AppConfig implements KafkaListenerConfigurer { @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint(); // ... configure the endpoint registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory()); } @Bean public MyService myService() { return new MyService(); } @Bean public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() { // ... } // Kafka infrastructure setup } }Note that all beans implementing
KafkaListenerConfigurer
will be detected and
invoked in a similar fashion. The example above can be translated in a regular bean
definition registered in the context in case you use the XML configuration.KafkaListener
,
KafkaListenerAnnotationBeanPostProcessor
,
KafkaListenerEndpointRegistrar
,
KafkaListenerEndpointRegistry