Message Listener Containers

Two MessageListenerContainer implementations are provided:

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. If the interceptor returns null, the listener is not called. Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception). Also, starting with version 2.7, there is now a BatchInterceptor, providing similar functionality for Batch Listeners. In addition, the ConsumerAwareRecordInterceptor (and BatchInterceptor) provide access to the Consumer<?, ?>. This might be used, for example, to access the consumer metrics in the interceptor.

You should not execute any methods that affect the consumer’s positions and or committed offsets in these interceptors; the container needs to manage such information.
If the interceptor mutates the record (by creating a new one), the topic, partition, and offset must remain the same to avoid unexpected side effects such as record loss.

The CompositeRecordInterceptor and CompositeBatchInterceptor can be used to invoke multiple interceptors.

By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started. You can set the listener container’s interceptBeforeTx property to false to invoke the interceptor after the transaction has started instead. Starting with version 2.9, this will apply to any transaction manager, not just KafkaAwareTransactionManagers. This allows, for example, the interceptor to participate in a JDBC transaction started by the container.

Starting with versions 2.3.8, 2.4.6, the ConcurrentMessageListenerContainer now supports Static Membership when the concurrency is greater than one. The group.instance.id is suffixed with -n with n starting at 1. This, together with an increased session.timeout.ms, can be used to reduce rebalance events, for example, when application instances are restarted.

Using KafkaMessageListenerContainer

The following constructor is available:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

It receives a ConsumerFactory and information about topics and partitions, as well as other configuration, in a ContainerProperties object. ContainerProperties has the following constructors:

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

The first constructor takes an array of TopicPartitionOffset arguments to explicitly instruct the container about which partitions to use (using the consumer assign() method) and with an optional initial offset. A positive value is an absolute offset by default. A negative value is relative to the current last offset within a partition by default. A constructor for TopicPartitionOffset that takes an additional boolean argument is provided. If this is true, the initial offsets (positive or negative) are relative to the current position for this consumer. The offsets are applied when the container is started. The second takes an array of topics, and Kafka allocates the partitions based on the group.id property — distributing partitions across the group. The third uses a regex Pattern to select the topics.

To assign a MessageListener to a container, you can use the ContainerProps.setMessageListener method when creating the Container. The following example shows how to do so:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

Note that when creating a DefaultKafkaConsumerFactory, using the constructor that just takes in the properties as above means that key and value Deserializer classes are picked up from configuration. Alternatively, Deserializer instances may be passed to the DefaultKafkaConsumerFactory constructor for key and/or value, in which case all Consumers share the same instances. Another option is to provide Supplier<Deserializer> s (starting with version 2.3) that will be used to obtain separate Deserializer instances for each Consumer:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

Refer to the Javadoc for ContainerProperties for more information about the various properties that you can set.

Since version 2.1.1, a new property called logContainerConfig is available. When true and INFO logging is enabled each listener container writes a log message summarizing its configuration properties.

By default, logging of topic offset commits is performed at the DEBUG logging level. Starting with version 2.1.2, a property in ContainerProperties called commitLogLevel lets you specify the log level for these messages. For example, to change the log level to INFO, you can use containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.

Starting with version 2.2, a new container property called missingTopicsFatal has been added (default: false since 2.3.4). This prevents the container from starting if any of the configured topics are not present on the broker. It does not apply if the container is configured to listen to a topic pattern (regex). Previously, the container threads looped within the consumer.poll() method waiting for the topic to appear while logging many messages. Aside from the logs, there was no indication that there was a problem.

As of version 2.8, a new container property authExceptionRetryInterval has been introduced. This causes the container to retry fetching messages after getting any AuthenticationException or AuthorizationException from the KafkaConsumer. This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect. Defining authExceptionRetryInterval allows the container to recover when proper permissions are granted.

By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop.

Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the configure() method to configure them with the configuration properties.

Using ConcurrentMessageListenerContainer

The single constructor is similar to the KafkaListenerContainer constructor. The following listing shows the constructor’s signature:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

It also has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.

For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.

When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor, you can set the partition.assignment.strategy consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) in the properties provided to the DefaultKafkaConsumerFactory.

When using Spring Boot, you can assign set the strategy as follows:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

When the container properties are configured with TopicPartitionOffsets, the ConcurrentMessageListenerContainer distributes the TopicPartitionOffset instances across the delegate KafkaMessageListenerContainer instances.

If, say, six TopicPartitionOffset instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartitionOffset instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

The client.id property (if set) is appended with -n where n is the consumer instance that corresponds to the concurrency. This is required to provide unique names for MBeans when JMX is enabled.

Starting with version 1.3, the MessageListenerContainer provides access to the metrics of the underlying KafkaConsumer. In the case of ConcurrentMessageListenerContainer, the metrics() method returns the metrics for all the target KafkaMessageListenerContainer instances. The metrics are grouped into the Map<MetricName, ? extends Metric> by the client-id provided for the underlying KafkaConsumer.

Starting with version 2.3, the ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls. An actual sleep interval is selected as the minimum from the provided option and difference between the max.poll.interval.ms consumer config and the current records batch processing time.

Committing Offsets

Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.

The consumer poll() method returns one or more ConsumerRecords. The MessageListener is called for each record. The following lists describes the action taken by the container for each AckMode (when transactions are not being used):

  • RECORD: Commit the offset when the listener returns after processing the record.

  • BATCH: Commit the offset when all the records returned by the poll() have been processed.

  • TIME: Commit the offset when all the records returned by the poll() have been processed, as long as the ackTime since the last commit has been exceeded.

  • COUNT: Commit the offset when all the records returned by the poll() have been processed, as long as ackCount records have been received since the last commit.

  • COUNT_TIME: Similar to TIME and COUNT, but the commit is performed if either condition is true.

  • MANUAL: The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as BATCH are applied.

  • MANUAL_IMMEDIATE: Commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

When using transactions, the offset(s) are sent to the transaction and the semantics are equivalent to RECORD or BATCH, depending on the listener type (record or batch).

MANUAL and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener. See Message Listeners.

Depending on the syncCommits container property, the commitSync() or commitAsync() method on the consumer is used. syncCommits is true by default; also see setSyncCommitTimeout. See setCommitCallback to get the results of asynchronous commits; the default callback is the LoggingCommitCallback which logs errors (and successes at debug level).

Because the listener container has its own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.

The Acknowledgment has the following method:

public interface Acknowledgment {

    void acknowledge();

}

This method gives the listener control over when offsets are committed.

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

If you want to commit a partial batch, using nack(), When using transactions, set the AckMode to MANUAL; invoking nack() will send the offsets of the successfully processed records to the transaction.
nack() can only be called on the consumer thread that invokes your listener.
nack() is not allowed when using Out of Order Commits.

With a record listener, when nack() is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next poll(). The consumer can be paused before redelivery, by setting the sleep argument. This is similar functionality to throwing an exception when the container is configured with a DefaultErrorHandler.

nack() pauses the entire listener for the specified sleep duration including all assigned partitions.

When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll().

See Container Error Handlers for more information.

The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive. The actual sleep time, and its resolution, depends on the container’s pollTimeout which defaults to 5 seconds. The minimum sleep time is equal to the pollTimeout and all sleep times will be a multiple of it. For small sleep times or, to increase its accuracy, consider reducing the container’s pollTimeout.

Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using acknowledge(index) on the Acknowledgment argument. When this method is called, the offset of the record at the index (as well as all previous records) will be committed. Calling acknowledge() after a partial batch commit is performed will commit the offsets of the remainder of the batch. The following limitations apply:

  • AckMode.MANUAL_IMMEDIATE is required

  • The method must be called on the listener thread

  • The listener must consume a List rather than the raw ConsumerRecords

  • The index must be in the range of the list’s elements

  • The index must be larger than that used in a previous call

These restrictions are enforced and the method will throw an IllegalArgumentException or IllegalStateException, depending on the violation.

Listener Container Auto Startup

The listener containers implement SmartLifecycle, and autoStartup is true by default. The containers are started in a late phase (Integer.MAX-VALUE - 100). Other components that implement SmartLifecycle, to handle data from listeners, should be started in an earlier phase. The - 100 leaves room for later phases to enable components to be auto-started after the containers.