This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Dynamically Creating Containers
There are several techniques that can be used to create listener containers at runtime. This section explores some of those techniques.
MessageListener Implementations
If you implement your own listener directly, you can simply use the container factory to create a raw container for that listener:
User Listener
-
Java
-
Kotlin
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
class MyListener : MessageListener<String?, String?> {
override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}
}
private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
Prototype Beans
Containers for methods annotated with @KafkaListener
can be created dynamically by declaring the bean as prototype:
Prototype
-
Java
-
Kotlin
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
fun listen(`in`: String?) {
println(`in`)
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
Listeners must have unique IDs.
Starting with version 2.8.9, the KafkaListenerEndpointRegistry has a new method unregisterListenerContainer(String id) to allow you to re-use an id.
Unregistering a container does not stop() the container, you must do that yourself.
|