This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Application Events
The following Spring application events are published by listener containers and their consumers:
-
ConsumerStartingEvent
: published when a consumer thread is first started, before it starts polling. -
ConsumerStartedEvent
: published when a consumer is about to start polling. -
ConsumerFailedToStartEvent
: published if noConsumerStartingEvent
is published within theconsumerStartTimeout
container property. This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency. An error message is also logged when this condition occurs. -
ListenerContainerIdleEvent
: published when no messages have been received inidleEventInterval
(if configured). -
ListenerContainerNoLongerIdleEvent
: published when a record is consumed after previously publishing aListenerContainerIdleEvent
. -
ListenerContainerPartitionIdleEvent
: published when no messages have been received from that partition inidlePartitionEventInterval
(if configured). -
ListenerContainerPartitionNoLongerIdleEvent
: published when a record is consumed from a partition that has previously published aListenerContainerPartitionIdleEvent
. -
NonResponsiveConsumerEvent
: published when the consumer appears to be blocked in thepoll
method. -
ConsumerPartitionPausedEvent
: published by each consumer when a partition is paused. -
ConsumerPartitionResumedEvent
: published by each consumer when a partition is resumed. -
ConsumerPausedEvent
: published by each consumer when the container is paused. -
ConsumerResumedEvent
: published by each consumer when the container is resumed. -
ConsumerStoppingEvent
: published by each consumer just before stopping. -
ConsumerStoppedEvent
: published after the consumer is closed. See Thread Safety. -
ConsumerRetryAuthEvent
: published when authentication or authorization of a consumer fails and is being retried. -
ConsumerRetryAuthSuccessfulEvent
: published when authentication or authorization has been retried successfully. Can only occur when there has been aConsumerRetryAuthEvent
before. -
ContainerStoppedEvent
: published when all consumers have stopped. -
ConcurrentContainerStoppedEvent
: published when theConcurrentMessageListenerContainer
has stopped.
By default, the application context’s event multicaster invokes event listeners on the calling thread.
If you change the multicaster to use an async executor, you must not invoke any Consumer methods when the event contains a reference to the consumer.
|
The ListenerContainerIdleEvent
has the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child. -
id
: The listener ID (or container bean name). -
idleTime
: The time the container had been idle when the event was published. -
topicPartitions
: The topics and partitions that the container was assigned at the time the event was generated. -
consumer
: A reference to the KafkaConsumer
object. For example, if the consumer’spause()
method was previously called, it canresume()
when the event is received. -
paused
: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.
The ListenerContainerNoLongerIdleEvent
has the same properties, except idleTime
and paused
.
The ListenerContainerPartitionIdleEvent
has the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child. -
id
: The listener ID (or container bean name). -
idleTime
: The time partition consumption had been idle when the event was published. -
topicPartition
: The topic and partition that triggered the event. -
consumer
: A reference to the KafkaConsumer
object. For example, if the consumer’spause()
method was previously called, it canresume()
when the event is received. -
paused
: Whether that partition consumption is currently paused for that consumer. See Pausing and Resuming Listener Containers for more information.
The ListenerContainerPartitionNoLongerIdleEvent
has the same properties, except idleTime
and paused
.
The NonResponsiveConsumerEvent
has the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child. -
id
: The listener ID (or container bean name). -
timeSinceLastPoll
: The time just before the container last calledpoll()
. -
topicPartitions
: The topics and partitions that the container was assigned at the time the event was generated. -
consumer
: A reference to the KafkaConsumer
object. For example, if the consumer’spause()
method was previously called, it canresume()
when the event is received. -
paused
: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.
The ConsumerPausedEvent
, ConsumerResumedEvent
, and ConsumerStopping
events have the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child. -
partitions
: TheTopicPartition
instances involved.
The ConsumerPartitionPausedEvent
, ConsumerPartitionResumedEvent
events have the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child. -
partition
: TheTopicPartition
instance involved.
The ConsumerRetryAuthEvent
event has the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child. -
reason
:-
AUTHENTICATION
- the event was published because of an authentication exception. -
AUTHORIZATION
- the event was published because of an authorization exception.
-
The ConsumerStartingEvent
, ConsumerStartedEvent
, ConsumerFailedToStartEvent
, ConsumerStoppedEvent
, ConsumerRetryAuthSuccessfulEvent
and ContainerStoppedEvent
events have the following properties:
-
source
: The listener container instance that published the event. -
container
: The listener container or the parent listener container, if the source container is a child.
All containers (whether a child or a parent) publish ContainerStoppedEvent
.
For a parent container, the source and container properties are identical.
In addition, the ConsumerStoppedEvent
has the following additional property:
-
reason
:-
NORMAL
- the consumer stopped normally (container was stopped). -
ERROR
- ajava.lang.Error
was thrown. -
FENCED
- the transactional producer was fenced and thestopContainerWhenFenced
container property istrue
. -
AUTH
- anAuthenticationException
orAuthorizationException
was thrown and theauthExceptionRetryInterval
is not configured. -
NO_OFFSET
- there is no offset for a partition and theauto.offset.reset
policy isnone
.
-
You can use this event to restart the container after such a condition:
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
Detecting Idle and Non-Responsive Consumers
While efficient, one problem with asynchronous consumers is detecting when they are idle. You might want to take some action if no messages arrive for some period of time.
You can configure the listener container to publish a ListenerContainerIdleEvent
when some time passes with no message delivery.
While the container is idle, an event is published every idleEventInterval
milliseconds.
To configure this feature, set the idleEventInterval
on the container.
The following example shows how to do so:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
The following example shows how to set the idleEventInterval
for a @KafkaListener
:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
In each of these cases, an event is published once per minute while the container is idle.
If, for some reason, the consumer poll()
method does not exit, no messages are received and idle events cannot be generated (this was a problem with early versions of the kafka-clients
when the broker wasn’t reachable).
In this case, the container publishes a NonResponsiveConsumerEvent
if a poll does not return within 3x
the pollTimeout
property.
By default, this check is performed once every 30 seconds in each container.
You can modify this behavior by setting the monitorInterval
(default 30 seconds) and noPollThreshold
(default 3.0) properties in the ContainerProperties
when configuring the listener container.
The noPollThreshold
should be greater than 1.0
to avoid getting spurious events due to a race condition.
Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.
Starting with version 2.6.2, if a container has published a ListenerContainerIdleEvent
, it will publish a ListenerContainerNoLongerIdleEvent
when a record is subsequently received.
Event Consumption
You can capture these events by implementing ApplicationListener
— either a general listener or one narrowed to only receive this specific event.
You can also use @EventListener
, introduced in Spring Framework 4.2.
The next example combines @KafkaListener
and @EventListener
into a single class.
You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle.
You can also use the @EventListener
's condition
for this purpose.
See Application Events for information about event properties.
The event is normally published on the consumer thread, so it is safe to interact with the Consumer
object.
The following example uses both @KafkaListener
and @EventListener
:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
Event listeners see events for all containers.
Consequently, in the preceding example, we narrow the events received based on the listener ID.
Since containers created for the @KafkaListener support concurrency, the actual containers are named id-n where the n is a unique value for each instance to support the concurrency.
That is why we use startsWith in the condition.
|
If you wish to use the idle event to stop the listener container, you should not call container.stop() on the thread that calls the listener.
Doing so causes delays and unnecessary log messages.
Instead, you should hand off the event to a different thread that can then stop the container.
Also, you should not stop() the container instance if it is a child container.
You should stop the concurrent container instead.
|
Current Positions when Idle
Note that you can obtain the current positions when idle is detected by implementing ConsumerSeekAware
in your listener.
See onIdleContainer()
in seek.