This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0!

Listener Container Properties

Table 1. ContainerProperties Properties
Property Default Description

ackCount

1

The number of records before committing pending offsets when the ackMode is COUNT or COUNT_TIME.

adviceChain

null

A chain of Advice objects (e.g. MethodInterceptor around advice) wrapping the message listener, invoked in order.

ackMode

BATCH

Controls how often offsets are committed - see Committing Offsets.

ackTime

5000

The time in milliseconds after which pending offsets are committed when the ackMode is TIME or COUNT_TIME.

assignmentCommitOption

LATEST_ONLY _NO_TX

Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG is latest and it won’t run in a transaction even if there is a transaction manager present. See the JavaDocs for ContainerProperties.AssignmentCommitOption for more information about the available options.

asyncAcks

false

Enable out-of-order commits (see Manually Committing Offsets); the consumer is paused and commits are deferred until gaps are filled.

authExceptionRetryInterval

null

When not null, a Duration to sleep between polls when an AuthenticationException or AuthorizationException is thrown by the Kafka client. When null, such exceptions are considered fatal and the container will stop.

batchRecoverAfterRollback

false

Set to true to enable batch recovery, See After Rollback Processor.

clientId

(empty string)

A prefix for the client.id consumer property. Overrides the consumer factory client.id property; in a concurrent container, -n is added as a suffix for each consumer instance.

checkDeserExWhenKeyNull

false

Set to true to always check for a DeserializationException header when a null key is received. Useful when the consumer code cannot determine that an ErrorHandlingDeserializer has been configured, such as when using a delegating deserializer.

checkDeserExWhenValueNull

false

Set to true to always check for a DeserializationException header when a null value is received. Useful when the consumer code cannot determine that an ErrorHandlingDeserializer has been configured, such as when using a delegating deserializer.

commitCallback

null

When present and syncCommits is false a callback invoked after the commit completes.

commitLogLevel

DEBUG

The logging level for logs pertaining to committing offsets.

consumerRebalanceListener

null

A rebalance listener; see Rebalancing Listeners.

commitRetries

3

Set the number of retries RetriableCommitFailedException when using syncCommits set to true. Default 3 (4-attempt total).

consumerStartTimeout

30s

The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads.

deliveryAttemptHeader

false

See Delivery Attempts Header.

eosMode

V2

Exactly Once Semantics mode; see Exactly Once Semantics.

fixTxOffsets

false

When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records. This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero. Set this property to true and the container will correct such mis-reported offsets. The check is performed before the next poll to avoid adding significant complexity to the commit processing. At the time of writing, the lag will only be corrected if the consumer is configured with isolation.level=read_committed and max.poll.records is greater than 1. See KAFKA-10683 for more information.

groupId

null

Overrides the consumer group.id property; automatically set by the @KafkaListener id or groupId property.

idleBeforeDataMultiplier

5.0

Multiplier for idleEventInterval that is applied before any records are received. After a record is received, the multiplier is no longer applied. Available since version 2.8.

idleBetweenPolls

0

Used to slow down deliveries by sleeping the thread between polls. The time to process a batch of records plus this value must be less than the max.poll.interval.ms consumer property.

idleEventInterval

null

When set, enables publication of ListenerContainerIdleEvents, see Application Events and Detecting Idle and Non-Responsive Consumers. Also see idleBeforeDataMultiplier.

idlePartitionEventInterval

null

When set, enables publication of ListenerContainerIdlePartitionEvents, see Application Events and Detecting Idle and Non-Responsive Consumers.

kafkaConsumerProperties

None

Used to override any arbitrary consumer properties configured on the consumer factory.

kafkaAwareTransactionManager

null

See Transactions.

listenerTaskExecutor

SimpleAsyncTaskExecutor

A task executor to run the consumer threads. The default executor creates threads named <name>-C-n; with the KafkaMessageListenerContainer, the name is the bean name; with the ConcurrentMessageListenerContainer the name is the bean name suffixed with -n where n is incremented for each child container.

logContainerConfig

false

Set to true to log at INFO level all container properties.

messageListener

null

The message listener.

micrometerEnabled

true

Whether or not to maintain Micrometer timers for the consumer threads.

micrometerTags

empty

A map of static tags to be added to micrometer metrics.

micrometerTagsProvider

null

A function that provides dynamic tags, based on the consumer record.

missingTopicsFatal

false

When true prevents the container from starting if the configured topic(s) are not present on the broker.

monitorInterval

30s

How often to check the state of the consumer threads for NonResponsiveConsumerEvent s. See noPollThreshold and pollTimeout.

noPollThreshold

3.0

Multiplied by pollTimeOut to determine whether to publish a NonResponsiveConsumerEvent. See monitorInterval.

observationConvention

null

When set, add dynamic tags to the timers and traces, based on information in the consumer records.

observationEnabled

false

Set to true to enable observation via Micrometer.

offsetAndMetadataProvider

null

A provider for OffsetAndMetadata; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.

onlyLogRecordMetadata

false

Set to false to log the complete consumer record (in error, debug logs etc.) instead of just topic-partition@offset.

pauseImmediate

false

When the container is paused, stop processing after the current record instead of after processing all the records from the previous poll; the remaining records are retained in memory and will be passed to the listener when the container is resumed.

pollTimeout

5000

The timeout passed into Consumer.poll() in milliseconds.

pollTimeoutWhilePaused

100

The timeout passed into Consumer.poll() (in milliseconds) when the container is in a paused state.

restartAfterAuthExceptions

false

True to restart the container if it is stopped due to authorization/authentication exceptions.

scheduler

ThreadPoolTaskScheduler

A scheduler on which to run the consumer monitor task.

shutdownTimeout

10000

The maximum time in ms to block the stop() method until all consumers stop and before publishing the container stopped event.

stopContainerWhenFenced

false

Stop the listener container if a ProducerFencedException is thrown. See After-rollback Processor for more information.

stopImmediate

false

When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll.

subBatchPerPartition

See desc.

When using a batch listener, if this is true, the listener is called with the results of the poll split into sub batches, one per partition. Default false.

syncCommitTimeout

null

The timeout to use when syncCommits is true. When not set, the container will attempt to determine the default.api.timeout.ms consumer property and use that; otherwise it will use 60 seconds.

syncCommits

true

Whether to use sync or async commits for offsets; see commitCallback.

topics topicPattern topicPartitions

n/a

The configured topics, topic pattern or explicitly assigned topics/partitions. Mutually exclusive; at least one must be provided; enforced by ContainerProperties constructors.

transactionManager

null

Deprecated since 3.2, see [kafkaAwareTransactionManager], Other transaction managers.

Table 2. AbstractListenerContainer Properties
Property Default Description

afterRollbackProcessor

DefaultAfterRollbackProcessor

An AfterRollbackProcessor to invoke after a transaction is rolled back.

applicationEventPublisher

application context

The event publisher.

batchErrorHandler

See desc.

Deprecated - see commonErrorHandler.

batchInterceptor

null

Set a BatchInterceptor to call before invoking the batch listener; does not apply to record listeners. Also see interceptBeforeTx.

beanName

bean name

The bean name of the container; suffixed with -n for child containers.

commonErrorHandler

See desc.

DefaultErrorHandler or null when a transactionManager is provided when a DefaultAfterRollbackProcessor is used. See Container Error Handlers.

containerProperties

ContainerProperties

The container properties instance.

groupId

See desc.

The containerProperties.groupId, if present, otherwise the group.id property from the consumer factory.

interceptBeforeTx

true

Determines whether the recordInterceptor is called before or after a transaction starts.

listenerId

See desc.

The bean name for user-configured containers or the id attribute of @KafkaListeners.

listenerInfo

null

A value to populate in the KafkaHeaders.LISTENER_INFO header. With @KafkaListener, this value is obtained from the info attribute. This header can be used in various places, such as a RecordInterceptor, RecordFilterStrategy and in the listener code itself.

pauseRequested

(read only)

True if a consumer pause has been requested.

recordInterceptor

null

Set a RecordInterceptor to call before invoking the record listener; does not apply to batch listeners. Also see interceptBeforeTx.

topicCheckTimeout

30s

When the missingTopicsFatal container property is true, how long to wait, in seconds, for the describeTopics operation to complete.

Table 3. KafkaMessageListenerContainer Properties
Property Default Description

assignedPartitions

(read only)

The partitions currently assigned to this container (explicitly or not).

assignedPartitionsByClientId

(read only)

The partitions currently assigned to this container (explicitly or not).

clientIdSuffix

null

Used by the concurrent container to give each child container’s consumer a unique client.id.

containerPaused

n/a

True if pause has been requested and the consumer has actually paused.

Table 4. ConcurrentMessageListenerContainer Properties
Property Default Description

alwaysClientIdSuffix

true

Set to false to suppress adding a suffix to the client.id consumer property, when the concurrency is only 1.

assignedPartitions

(read only)

The aggregate of partitions currently assigned to this container’s child KafkaMessageListenerContainers (explicitly or not).

assignedPartitionsByClientId

(read only)

The partitions currently assigned to this container’s child KafkaMessageListenerContainers (explicitly or not), keyed by the child container’s consumer’s client.id property.

concurrency

1

The number of child KafkaMessageListenerContainers to manage.

containerPaused

n/a

True if pause has been requested and all child containers' consumer has actually paused.

containers

n/a

A reference to all child KafkaMessageListenerContainers.