This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.1! |
@KafkaListener
Lifecycle Management
The listener containers created for @KafkaListener
annotations are not beans in the application context.
Instead, they are registered with an infrastructure bean of type KafkaListenerEndpointRegistry
.
This bean is automatically declared by the framework and manages the containers' lifecycles; it will auto-start any containers that have autoStartup
set to true
.
All containers created by all container factories must be in the same phase
.
See Listener Container Auto Startup for more information.
You can manage the lifecycle programmatically by using the registry.
Starting or stopping the registry will start or stop all the registered containers.
Alternatively, you can get a reference to an individual container by using its id
attribute.
You can set autoStartup
on the annotation, which overrides the default setting configured into the container factory.
You can get a reference to the bean from the application context, such as auto-wiring, to manage its registered containers.
The following examples show how to do so:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
The registry only maintains the life cycle of containers it manages; containers declared as beans are not managed by the registry and can be obtained from the application context.
A collection of managed containers can be obtained by calling the registry’s getListenerContainers()
method.
Version 2.2.5 added a convenience method getAllListenerContainers()
, which returns a collection of all containers, including those managed by the registry and those declared as beans.
The collection returned will include any prototype beans that have been initialized, but it will not initialize any lazy bean declarations.
Endpoints registered after the application context has been refreshed will start immediately, regardless of their autoStartup property, to comply with the SmartLifecycle contract, where autoStartup is only considered during application context initialization.
An example of late registration is a bean with a @KafkaListener in prototype scope where an instance is created after the context is initialized.
Starting with version 2.8.7, you can set the registry’s alwaysStartAfterRefresh property to false and then the container’s autoStartup property will define whether or not the container is started.
|
Retrieving MessageListenerContainers from KafkaListenerEndpointRegistry
The KafkaListenerEndpointRegistry
provides methods for retrieving MessageListenerContainer
instances to accommodate a range of management scenarios:
All Containers: For operations that cover all listener containers, use getListenerContainers()
to retrieve a comprehensive collection.
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
Specific Container by ID: To manage an individual container, getListenerContainer(String id)
enables retrieval by its id.
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
Dynamic Container Filtering: Introduced in version 3.2, two overloaded getListenerContainersMatching
methods enable refined selection of containers.
One method takes a Predicate<String>
for ID-based filtering as a parameter, while the other takes a BiPredicate<String, MessageListenerContainer>
for more advanced criteria that may include container properties or state as a parameter.
// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);
// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
Utilize these methods to efficiently manage and query MessageListenerContainer
instances within your application.