Application Events

The following Spring application events are published by listener containers and their consumers:

  • ConsumerStartingEvent: published when a consumer thread is first started, before it starts polling.

  • ConsumerStartedEvent: published when a consumer is about to start polling.

  • ConsumerFailedToStartEvent: published if no ConsumerStartingEvent is published within the consumerStartTimeout container property. This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency. An error message is also logged when this condition occurs.

  • ListenerContainerIdleEvent: published when no messages have been received in idleEventInterval (if configured).

  • ListenerContainerNoLongerIdleEvent: published when a record is consumed after previously publishing a ListenerContainerIdleEvent.

  • ListenerContainerPartitionIdleEvent: published when no messages have been received from that partition in idlePartitionEventInterval (if configured).

  • ListenerContainerPartitionNoLongerIdleEvent: published when a record is consumed from a partition that has previously published a ListenerContainerPartitionIdleEvent.

  • NonResponsiveConsumerEvent: published when the consumer appears to be blocked in the poll method.

  • ConsumerPartitionPausedEvent: published by each consumer when a partition is paused.

  • ConsumerPartitionResumedEvent: published by each consumer when a partition is resumed.

  • ConsumerPausedEvent: published by each consumer when the container is paused.

  • ConsumerResumedEvent: published by each consumer when the container is resumed.

  • ConsumerStoppingEvent: published by each consumer just before stopping.

  • ConsumerStoppedEvent: published after the consumer is closed. See Thread Safety.

  • ConsumerRetryAuthEvent: published when authentication or authorization of a consumer fails and is being retried.

  • ConsumerRetryAuthSuccessfulEvent: published when authentication or authorization has been retried successfully. Can only occur when there has been a ConsumerRetryAuthEvent before.

  • ContainerStoppedEvent: published when all consumers have stopped.

By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, you must not invoke any Consumer methods when the event contains a reference to the consumer.

The ListenerContainerIdleEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • idleTime: The time the container had been idle when the event was published.

  • topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

The ListenerContainerNoLongerIdleEvent has the same properties, except idleTime and paused.

The ListenerContainerPartitionIdleEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • idleTime: The time partition consumption had been idle when the event was published.

  • topicPartition: The topic and partition that triggered the event.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether that partition consumption is currently paused for that consumer. See Pausing and Resuming Listener Containers for more information.

The ListenerContainerPartitionNoLongerIdleEvent has the same properties, except idleTime and paused.

The NonResponsiveConsumerEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • timeSinceLastPoll: The time just before the container last called poll().

  • topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

The ConsumerPausedEvent, ConsumerResumedEvent, and ConsumerStopping events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • partitions: The TopicPartition instances involved.

The ConsumerPartitionPausedEvent, ConsumerPartitionResumedEvent events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • partition: The TopicPartition instance involved.

The ConsumerRetryAuthEvent event has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • reason:

    • AUTHENTICATION - the event was published because of an authentication exception.

    • AUTHORIZATION - the event was published because of an authorization exception.

The ConsumerStartingEvent, ConsumerStartingEvent, ConsumerFailedToStartEvent, ConsumerStoppedEvent, ConsumerRetryAuthSuccessfulEvent and ContainerStoppedEvent events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

All containers (whether a child or a parent) publish ContainerStoppedEvent. For a parent container, the source and container properties are identical.

In addition, the ConsumerStoppedEvent has the following additional property:

  • reason:

    • NORMAL - the consumer stopped normally (container was stopped).

    • ERROR - a java.lang.Error was thrown.

    • FENCED - the transactional producer was fenced and the stopContainerWhenFenced container property is true.

    • AUTH - an AuthenticationException or AuthorizationException was thrown and the authExceptionRetryInterval is not configured.

    • NO_OFFSET - there is no offset for a partition and the auto.offset.reset policy is none.

You can use this event to restart the container after such a condition:

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

Detecting Idle and Non-Responsive Consumers

While efficient, one problem with asynchronous consumers is detecting when they are idle. You might want to take some action if no messages arrive for some period of time.

You can configure the listener container to publish a ListenerContainerIdleEvent when some time passes with no message delivery. While the container is idle, an event is published every idleEventInterval milliseconds.

To configure this feature, set the idleEventInterval on the container. The following example shows how to do so:

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

The following example shows how to set the idleEventInterval for a @KafkaListener:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

In each of these cases, an event is published once per minute while the container is idle.

If, for some reason, the consumer poll() method does not exit, no messages are received and idle events cannot be generated (this was a problem with early versions of the kafka-clients when the broker wasn’t reachable). In this case, the container publishes a NonResponsiveConsumerEvent if a poll does not return within 3x the pollTimeout property. By default, this check is performed once every 30 seconds in each container. You can modify this behavior by setting the monitorInterval (default 30 seconds) and noPollThreshold (default 3.0) properties in the ContainerProperties when configuring the listener container. The noPollThreshold should be greater than 1.0 to avoid getting spurious events due to a race condition. Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.

Starting with version 2.6.2, if a container has published a ListenerContainerIdleEvent, it will publish a ListenerContainerNoLongerIdleEvent when a record is subsequently received.

Event Consumption

You can capture these events by implementing ApplicationListener — either a general listener or one narrowed to only receive this specific event. You can also use @EventListener, introduced in Spring Framework 4.2.

The next example combines @KafkaListener and @EventListener into a single class. You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle. You can also use the @EventListener's condition for this purpose.

See Application Events for information about event properties.

The event is normally published on the consumer thread, so it is safe to interact with the Consumer object.

The following example uses both @KafkaListener and @EventListener:

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
Event listeners see events for all containers. Consequently, in the preceding example, we narrow the events received based on the listener ID. Since containers created for the @KafkaListener support concurrency, the actual containers are named id-n where the n is a unique value for each instance to support the concurrency. That is why we use startsWith in the condition.
If you wish to use the idle event to stop the lister container, you should not call container.stop() on the thread that calls the listener. Doing so causes delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container. Also, you should not stop() the container instance if it is a child container. You should stop the concurrent container instead.

Current Positions when Idle

Note that you can obtain the current positions when idle is detected by implementing ConsumerSeekAware in your listener. See onIdleContainer() in seek.