Starting @KafkaListeners in Sequence

A common use case is to start a listener after another listener has consumed all the records in a topic. For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics. Starting with version 2.7.3, a new component ContainerGroupSequencer has been introduced. It uses the @KafkaListener's containerGroup property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.

It is best illustrated with an example.

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
    return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}

Here, we have 4 listeners in two groups, g1 and g2.

During application context initialization, the sequencer sets the autoStartup property of all the containers in the provided groups to false. It also sets the idleEventInterval for any containers (that do not already have one set) to the supplied value (5000ms in this case). Then, when the sequencer is started by the application context, the containers in the first group are started. As ListenerContainerIdleEvents are received, each individual child container in each container is stopped. When all child containers in a ConcurrentMessageListenerContainer are stopped, the parent container is stopped. When all containers in a group have been stopped, the containers in the next group are started. There is no limit to the number of groups or containers in a group.

By default, the containers in the final group (g2 above) are not stopped when they go idle. To modify that behavior, set stopLastGroupWhenIdle to true on the sequencer.

As an aside, previously containers in each group were added to a bean of type Collection<MessageListenerContainer> with the bean name being the containerGroup. These collections are now deprecated in favor of beans of type ContainerGroup with a bean name that is the group name, suffixed with .group; in the example above, there would be 2 beans g1.group and g2.group. The Collection beans will be removed in a future release.