Pausing and Resuming Listener Containers

Version 2.1.3 added pause() and resume() methods to listener containers. Previously, you could pause a consumer within a ConsumerAwareMessageListener and resume it by listening for a ListenerContainerIdleEvent, which provides access to the Consumer object. While you could pause a consumer in an idle container by using an event listener, in some cases, this was not thread-safe, since there is no guarantee that the event listener is invoked on the consumer thread. To safely pause and resume consumers, you should use the pause and resume methods on the listener containers. A pause() takes effect just before the next poll(); a resume() takes effect just after the current poll() returns. When a container is paused, it continues to poll() the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records. See the Kafka documentation for more information.

Starting with version 2.1.5, you can call isPauseRequested() to see if pause() has been called. However, the consumers might not have actually paused yet. isConsumerPaused() returns true if all Consumer instances have actually paused.

In addition(also since 2.1.5), ConsumerPausedEvent and ConsumerResumedEvent instances are published with the container as the source property and the TopicPartition instances involved in the partitions property.

Starting with version 2.9, a new container property pauseImmediate, when set to true, causes the pause to take effect after the current record is processed. By default, the pause takes effect when all the records from the previous poll have been processed. See pauseImmediate.

The following simple Spring Boot application demonstrates by using the container registry to get a reference to a @KafkaListener method’s container and pausing or resuming its consumers as well as receiving the corresponding events:

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

The following listing shows the results of the preceding example:

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2