Message Listener Containers
Two MessageListenerContainer
implementations are provided:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all messages from all topics or partitions on a single thread.
The ConcurrentMessageListenerContainer
delegates to one or more KafkaMessageListenerContainer
instances to provide multi-threaded consumption.
Starting with version 2.2.7, you can add a RecordInterceptor
to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).
Also, starting with version 2.7, there is now a BatchInterceptor
, providing similar functionality for Batch Listeners.
In addition, the ConsumerAwareRecordInterceptor
(and BatchInterceptor
) provide access to the Consumer<?, ?>
.
This might be used, for example, to access the consumer metrics in the interceptor.
You should not execute any methods that affect the consumer’s positions and or committed offsets in these interceptors; the container needs to manage such information. |
If the interceptor mutates the record (by creating a new one), the topic , partition , and offset must remain the same to avoid unexpected side effects such as record loss.
|
The CompositeRecordInterceptor
and CompositeBatchInterceptor
can be used to invoke multiple interceptors.
By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container’s interceptBeforeTx
property to false
to invoke the interceptor after the transaction has started instead.
Starting with version 2.9, this will apply to any transaction manager, not just KafkaAwareTransactionManager
s.
This allows, for example, the interceptor to participate in a JDBC transaction started by the container.
Starting with versions 2.3.8, 2.4.6, the ConcurrentMessageListenerContainer
now supports Static Membership when the concurrency is greater than one.
The group.instance.id
is suffixed with -n
with n
starting at 1
.
This, together with an increased session.timeout.ms
, can be used to reduce rebalance events, for example, when application instances are restarted.
Using KafkaMessageListenerContainer
The following constructor is available:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
It receives a ConsumerFactory
and information about topics and partitions, as well as other configuration, in a ContainerProperties
object.
ContainerProperties
has the following constructors:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
The first constructor takes an array of TopicPartitionOffset
arguments to explicitly instruct the container about which partitions to use (using the consumer assign()
method) and with an optional initial offset.
A positive value is an absolute offset by default.
A negative value is relative to the current last offset within a partition by default.
A constructor for TopicPartitionOffset
that takes an additional boolean
argument is provided.
If this is true
, the initial offsets (positive or negative) are relative to the current position for this consumer.
The offsets are applied when the container is started.
The second takes an array of topics, and Kafka allocates the partitions based on the group.id
property — distributing partitions across the group.
The third uses a regex Pattern
to select the topics.
To assign a MessageListener
to a container, you can use the ContainerProps.setMessageListener
method when creating the Container.
The following example shows how to do so:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
Note that when creating a DefaultKafkaConsumerFactory
, using the constructor that just takes in the properties as above means that key and value Deserializer
classes are picked up from configuration.
Alternatively, Deserializer
instances may be passed to the DefaultKafkaConsumerFactory
constructor for key and/or value, in which case all Consumers share the same instances.
Another option is to provide Supplier<Deserializer>
s (starting with version 2.3) that will be used to obtain separate Deserializer
instances for each Consumer
:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
Refer to the Javadoc for ContainerProperties
for more information about the various properties that you can set.
Since version 2.1.1, a new property called logContainerConfig
is available.
When true
and INFO
logging is enabled each listener container writes a log message summarizing its configuration properties.
By default, logging of topic offset commits is performed at the DEBUG
logging level.
Starting with version 2.1.2, a property in ContainerProperties
called commitLogLevel
lets you specify the log level for these messages.
For example, to change the log level to INFO
, you can use containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
.
Starting with version 2.2, a new container property called missingTopicsFatal
has been added (default: false
since 2.3.4).
This prevents the container from starting if any of the configured topics are not present on the broker.
It does not apply if the container is configured to listen to a topic pattern (regex).
Previously, the container threads looped within the consumer.poll()
method waiting for the topic to appear while logging many messages.
Aside from the logs, there was no indication that there was a problem.
As of version 2.8, a new container property authExceptionRetryInterval
has been introduced.
This causes the container to retry fetching messages after getting any AuthenticationException
or AuthorizationException
from the KafkaConsumer
.
This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect.
Defining authExceptionRetryInterval
allows the container to recover when proper permissions are granted.
By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop. |
Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the configure()
method to configure them with the configuration properties.
Using ConcurrentMessageListenerContainer
The single constructor is similar to the KafkaListenerContainer
constructor.
The following listing shows the constructor’s signature:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
It also has a concurrency
property.
For example, container.setConcurrency(3)
creates three KafkaMessageListenerContainer
instances.
For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.
When listening to multiple topics, the default partition distribution may not be what you expect.
For example, if you have three topics with five partitions each and you want to use When using Spring Boot, you can assign set the strategy as follows:
|
When the container properties are configured with TopicPartitionOffset
s, the ConcurrentMessageListenerContainer
distributes the TopicPartitionOffset
instances across the delegate KafkaMessageListenerContainer
instances.
If, say, six TopicPartitionOffset
instances are provided and the concurrency
is 3
; each container gets two partitions.
For five TopicPartitionOffset
instances, two containers get two partitions, and the third gets one.
If the concurrency
is greater than the number of TopicPartitions
, the concurrency
is adjusted down such that each container gets one partition.
The client.id property (if set) is appended with -n where n is the consumer instance that corresponds to the concurrency.
This is required to provide unique names for MBeans when JMX is enabled.
|
Starting with version 1.3, the MessageListenerContainer
provides access to the metrics of the underlying KafkaConsumer
.
In the case of ConcurrentMessageListenerContainer
, the metrics()
method returns the metrics for all the target KafkaMessageListenerContainer
instances.
The metrics are grouped into the Map<MetricName, ? extends Metric>
by the client-id
provided for the underlying KafkaConsumer
.
Starting with version 2.3, the ContainerProperties
provides an idleBetweenPolls
option to let the main loop in the listener container to sleep between KafkaConsumer.poll()
calls.
An actual sleep interval is selected as the minimum from the provided option and difference between the max.poll.interval.ms
consumer config and the current records batch processing time.
Committing Offsets
Several options are provided for committing offsets.
If the enable.auto.commit
consumer property is true
, Kafka auto-commits the offsets according to its configuration.
If it is false
, the containers support several AckMode
settings (described in the next list).
The default AckMode
is BATCH
.
Starting with version 2.3, the framework sets enable.auto.commit
to false
unless explicitly set in the configuration.
Previously, the Kafka default (true
) was used if the property was not set.
The consumer poll()
method returns one or more ConsumerRecords
.
The MessageListener
is called for each record.
The following lists describes the action taken by the container for each AckMode
(when transactions are not being used):
-
RECORD
: Commit the offset when the listener returns after processing the record. -
BATCH
: Commit the offset when all the records returned by thepoll()
have been processed. -
TIME
: Commit the offset when all the records returned by thepoll()
have been processed, as long as theackTime
since the last commit has been exceeded. -
COUNT
: Commit the offset when all the records returned by thepoll()
have been processed, as long asackCount
records have been received since the last commit. -
COUNT_TIME
: Similar toTIME
andCOUNT
, but the commit is performed if either condition istrue
. -
MANUAL
: The message listener is responsible toacknowledge()
theAcknowledgment
. After that, the same semantics asBATCH
are applied. -
MANUAL_IMMEDIATE
: Commit the offset immediately when theAcknowledgment.acknowledge()
method is called by the listener.
When using transactions, the offset(s) are sent to the transaction and the semantics are equivalent to RECORD
or BATCH
, depending on the listener type (record or batch).
MANUAL and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener .
See Message Listeners.
|
Depending on the syncCommits
container property, the commitSync()
or commitAsync()
method on the consumer is used.
syncCommits
is true
by default; also see setSyncCommitTimeout
.
See setCommitCallback
to get the results of asynchronous commits; the default callback is the LoggingCommitCallback
which logs errors (and successes at debug level).
Because the listener container has its own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
to be false
.
Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.
The Acknowledgment
has the following method:
public interface Acknowledgment {
void acknowledge();
}
This method gives the listener control over when offsets are committed.
Starting with version 2.3, the Acknowledgment
interface has two additional methods nack(long sleep)
and nack(int index, long sleep)
.
The first one is used with a record listener, the second with a batch listener.
Calling the wrong method for your listener type will throw an IllegalStateException
.
If you want to commit a partial batch, using nack() , When using transactions, set the AckMode to MANUAL ; invoking nack() will send the offsets of the successfully processed records to the transaction.
|
nack() can only be called on the consumer thread that invokes your listener.
|
nack() is not allowed when using Out of Order Commits.
|
With a record listener, when nack()
is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next poll()
.
The consumer can be paused before redelivery, by setting the sleep
argument.
This is similar functionality to throwing an exception when the container is configured with a DefaultErrorHandler
.
nack() pauses the entire listener for the specified sleep duration including all assigned partitions.
|
When using a batch listener, you can specify the index within the batch where the failure occurred.
When nack()
is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll()
.
See Container Error Handlers for more information.
The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive.
The actual sleep time, and its resolution, depends on the container’s pollTimeout which defaults to 5 seconds.
The minimum sleep time is equal to the pollTimeout and all sleep times will be a multiple of it.
For small sleep times or, to increase its accuracy, consider reducing the container’s pollTimeout .
|
Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using acknowledge(index)
on the Acknowledgment
argument.
When this method is called, the offset of the record at the index (as well as all previous records) will be committed.
Calling acknowledge()
after a partial batch commit is performed will commit the offsets of the remainder of the batch.
The following limitations apply:
-
AckMode.MANUAL_IMMEDIATE
is required -
The method must be called on the listener thread
-
The listener must consume a
List
rather than the rawConsumerRecords
-
The index must be in the range of the list’s elements
-
The index must be larger than that used in a previous call
These restrictions are enforced and the method will throw an IllegalArgumentException
or IllegalStateException
, depending on the violation.
Listener Container Auto Startup
The listener containers implement SmartLifecycle
, and autoStartup
is true
by default.
The containers are started in a late phase (Integer.MAX-VALUE - 100
).
Other components that implement SmartLifecycle
, to handle data from listeners, should be started in an earlier phase.
The - 100
leaves room for later phases to enable components to be auto-started after the containers.