Pausing and Resuming Listener Containers
Version 2.1.3 added pause()
and resume()
methods to listener containers.
Previously, you could pause a consumer within a ConsumerAwareMessageListener
and resume it by listening for a ListenerContainerIdleEvent
, which provides access to the Consumer
object.
While you could pause a consumer in an idle container by using an event listener, in some cases, this was not thread-safe, since there is no guarantee that the event listener is invoked on the consumer thread.
To safely pause and resume consumers, you should use the pause
and resume
methods on the listener containers.
A pause()
takes effect just before the next poll()
; a resume()
takes effect just after the current poll()
returns.
When a container is paused, it continues to poll()
the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records.
See the Kafka documentation for more information.
Starting with version 2.1.5, you can call isPauseRequested()
to see if pause()
has been called.
However, the consumers might not have actually paused yet.
isConsumerPaused()
returns true if all Consumer
instances have actually paused.
In addition (also since 2.1.5), ConsumerPausedEvent
and ConsumerResumedEvent
instances are published with the container as the source
property and the TopicPartition
instances involved in the partitions
property.
Starting with version 2.9, a new container property pauseImmediate
, when set to true, causes the pause to take effect after the current record is processed.
By default, the pause takes effect when all of the records from the previous poll have been processed.
See [pauseImmediate].
The following simple Spring Boot application demonstrates by using the container registry to get a reference to a @KafkaListener
method’s container and pausing or resuming its consumers as well as receiving the corresponding events:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
The following listing shows the results of the preceding example:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2