For the latest stable version, please use Spring for Apache Kafka 3.3.0!

Dynamically Creating Containers

There are several techniques that can be used to create listener containers at runtime. This section explores some of those techniques.

MessageListener Implementations

If you implement your own listener directly, you can simply use the container factory to create a raw container for that listener:

User Listener
  • Java

  • Kotlin

public class MyListener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // ...
    }

}

private ConcurrentMessageListenerContainer<String, String> createContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {

    ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
    container.getContainerProperties().setMessageListener(new MyListener());
    container.getContainerProperties().setGroupId(group);
    container.setBeanName(group);
    container.start();
    return container;
}
class MyListener : MessageListener<String?, String?> {

    override fun onMessage(data: ConsumerRecord<String?, String?>) {
        // ...
    }

}

private fun createContainer(
    factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
    val container = factory.createContainer(topic)
    container.containerProperties.messageListener = MyListener()
    container.containerProperties.groupId = group
    container.beanName = group
    container.start()
    return container
}

Prototype Beans

Containers for methods annotated with @KafkaListener can be created dynamically by declaring the bean as prototype:

Prototype
  • Java

  • Kotlin

public class MyPojo {

    private final String id;

    private final String topic;

    public MyPojo(String id, String topic) {
        this.id = id;
        this.topic = topic;
    }

    public String getId() {
        return this.id;
    }

    public String getTopic() {
        return this.topic;
    }

    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
    return new MyPojo(id, topic);
}

applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {

    @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
    fun listen(`in`: String?) {
        println(`in`)
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
    return MyPojo(id, topic)
}

applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
Listeners must have unique IDs. Starting with version 2.8.9, the KafkaListenerEndpointRegistry has a new method unregisterListenerContainer(String id) to allow you to re-use an id. Unregistering a container does not stop() the container, you must do that yourself.