Class ContainerProperties
- java.lang.Object
-
- org.springframework.kafka.listener.ConsumerProperties
-
- org.springframework.kafka.listener.ContainerProperties
-
public class ContainerProperties extends ConsumerProperties
Contains runtime properties for a listener container.- Author:
- Gary Russell, Artem Bilan, Artem Yakshin, Johnny Lim, Lukasz Kaminski
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
ContainerProperties.AckMode
The offset commit behavior enumeration.static class
ContainerProperties.AssignmentCommitOption
Offset commit behavior during assignment.static class
ContainerProperties.EOSMode
Mode for exactly once semantics.
-
Field Summary
Fields Modifier and Type Field Description static int
DEFAULT_MONITOR_INTERVAL
The defaultmonitorInterval
(s).static float
DEFAULT_NO_POLL_THRESHOLD
The defaultnoPollThreshold
.static long
DEFAULT_SHUTDOWN_TIMEOUT
The defaultshutDownTimeout
(ms).-
Fields inherited from class org.springframework.kafka.listener.ConsumerProperties
DEFAULT_POLL_TIMEOUT
-
-
Constructor Summary
Constructors Constructor Description ContainerProperties(java.lang.String... topics)
Create properties for a container that will subscribe to the specified topics.ContainerProperties(java.util.regex.Pattern topicPattern)
Create properties for a container that will subscribe to topics matching the specified pattern.ContainerProperties(TopicPartitionOffset... topicPartitions)
Create properties for a container that will assign itself the provided topic partitions.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description int
getAckCount()
ContainerProperties.AckMode
getAckMode()
long
getAckTime()
org.aopalliance.aop.Advice[]
getAdviceChain()
A chain of listenerAdvice
s.ContainerProperties.AssignmentCommitOption
getAssignmentCommitOption()
java.time.Duration
getConsumerStartTimout()
org.springframework.core.task.AsyncListenableTaskExecutor
getConsumerTaskExecutor()
ContainerProperties.EOSMode
getEosMode()
Get the exactly once semantics mode.long
getIdleBetweenPolls()
java.lang.Long
getIdleEventInterval()
java.lang.Object
getMessageListener()
java.util.Map<java.lang.String,java.lang.String>
getMicrometerTags()
int
getMonitorInterval()
float
getNoPollThreshold()
org.springframework.scheduling.TaskScheduler
getScheduler()
long
getShutdownTimeout()
java.lang.Boolean
getSubBatchPerPartition()
Return whether to split batches by partition; null if not set.org.springframework.transaction.TransactionDefinition
getTransactionDefinition()
Get the transaction definition.org.springframework.transaction.PlatformTransactionManager
getTransactionManager()
boolean
isAckOnError()
boolean
isDeliveryAttemptHeader()
boolean
isLogContainerConfig()
Log the container configuration if true (INFO).boolean
isMicrometerEnabled()
boolean
isMissingTopicsFatal()
If true, the container won't start if any of the configured topics are not present on the broker.boolean
isSubBatchPerPartition()
Return whether to split batches by partition.void
setAckCount(int count)
Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNT
orContainerProperties.AckMode.COUNT_TIME
is being used.void
setAckMode(ContainerProperties.AckMode ackMode)
Set the ack mode to use when auto ack (in the configuration properties) is false.void
setAckOnError(boolean ackOnError)
Deprecated.in favor ofGenericErrorHandler.isAckAfterHandle()
.void
setAckTime(long ackTime)
Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIME
orContainerProperties.AckMode.COUNT_TIME
is being used.void
setAdviceChain(org.aopalliance.aop.Advice... adviceChain)
Set a chain of listenerAdvice
s; must not be null or have null elements.void
setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)
Set the assignment commit option.void
setConsumerStartTimout(java.time.Duration consumerStartTimout)
Set the timeout to wait for a consumer thread to start before logging an error.void
setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
Set the executor for threads that poll the consumer.void
setDeliveryAttemptHeader(boolean deliveryAttemptHeader)
Set to true to populate theKafkaHeaders.DELIVERY_ATTEMPT
header when the error handler or after rollback processor implementsDeliveryAttemptAware
.void
setEosMode(ContainerProperties.EOSMode eosMode)
Set the exactly once semantics mode.void
setIdleBetweenPolls(long idleBetweenPolls)
The sleep interval in milliseconds used in the main loop betweenConsumer.poll(Duration)
calls.void
setIdleEventInterval(java.lang.Long idleEventInterval)
Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.void
setLogContainerConfig(boolean logContainerConfig)
Set to true to instruct each container to log this configuration.void
setMessageListener(java.lang.Object messageListener)
Set the message listener; must be aMessageListener
orAcknowledgingMessageListener
.void
setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable the Micrometer listener timers.void
setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.void
setMissingTopicsFatal(boolean missingTopicsFatal)
Set to false to allow the container to start even if any of the configured topics are not present on the broker.void
setMonitorInterval(int monitorInterval)
The interval between checks for a non-responsive consumer in seconds; default 30.void
setNoPollThreshold(float noPollThreshold)
If the time since the last poll /poll timeout
exceeds this value, a NonResponsiveConsumerEvent is published.void
setScheduler(org.springframework.scheduling.TaskScheduler scheduler)
A scheduler used with the monitor interval.void
setShutdownTimeout(long shutdownTimeout)
Set the timeout for shutting down the container.void
setSubBatchPerPartition(boolean subBatchPerPartition)
When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by thepoll()
.void
setSyncCommitTimeout(java.time.Duration syncCommitTimeout)
Set the timeout for commitSync operations (ifConsumerProperties.isSyncCommits()
.void
setTransactionDefinition(org.springframework.transaction.TransactionDefinition transactionDefinition)
Set a transaction definition with properties (e.g.void
setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
Set the transaction manager to start a transaction; offsets are committed with semantics equivalent toContainerProperties.AckMode.RECORD
andContainerProperties.AckMode.BATCH
depending on the listener type (record or batch).java.lang.String
toString()
-
Methods inherited from class org.springframework.kafka.listener.ConsumerProperties
getAuthorizationExceptionRetryInterval, getClientId, getCommitCallback, getCommitLogLevel, getCommitRetries, getConsumerRebalanceListener, getGroupId, getKafkaConsumerProperties, getPollTimeout, getSyncCommitTimeout, getTopicPartitions, getTopicPartitionsToAssign, getTopicPattern, getTopics, isFixTxOffsets, isOnlyLogRecordMetadata, isSyncCommits, renderProperties, setAuthorizationExceptionRetryInterval, setClientId, setCommitCallback, setCommitLogLevel, setCommitRetries, setConsumerRebalanceListener, setFixTxOffsets, setGroupId, setKafkaConsumerProperties, setOnlyLogRecordMetadata, setPollTimeout, setSyncCommits
-
-
-
-
Field Detail
-
DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
The defaultshutDownTimeout
(ms).- See Also:
- Constant Field Values
-
DEFAULT_MONITOR_INTERVAL
public static final int DEFAULT_MONITOR_INTERVAL
The defaultmonitorInterval
(s).- See Also:
- Constant Field Values
-
DEFAULT_NO_POLL_THRESHOLD
public static final float DEFAULT_NO_POLL_THRESHOLD
The defaultnoPollThreshold
.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
ContainerProperties
public ContainerProperties(java.lang.String... topics)
Create properties for a container that will subscribe to the specified topics.- Parameters:
topics
- the topics.
-
ContainerProperties
public ContainerProperties(java.util.regex.Pattern topicPattern)
Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.- Parameters:
topicPattern
- the pattern.- See Also:
CommonClientConfigs.METADATA_MAX_AGE_CONFIG
-
ContainerProperties
public ContainerProperties(TopicPartitionOffset... topicPartitions)
Create properties for a container that will assign itself the provided topic partitions.- Parameters:
topicPartitions
- the topic partitions.
-
-
Method Detail
-
setMessageListener
public void setMessageListener(java.lang.Object messageListener)
Set the message listener; must be aMessageListener
orAcknowledgingMessageListener
.- Parameters:
messageListener
- the listener.
-
setAckMode
public void setAckMode(ContainerProperties.AckMode ackMode)
Set the ack mode to use when auto ack (in the configuration properties) is false.- RECORD: Ack after each record has been passed to the listener.
- BATCH: Ack after each batch of records received from the consumer has been passed to the listener
- TIME: Ack after this number of milliseconds; (should be greater than
#setPollTimeout(long) pollTimeout
. - COUNT: Ack after at least this number of records have been received
- MANUAL: Listener is responsible for acking - use a
AcknowledgingMessageListener
.
RECORD
orBATCH
, depending on the listener type.- Parameters:
ackMode
- theContainerProperties.AckMode
; default BATCH.- See Also:
setTransactionManager(PlatformTransactionManager)
-
setAckCount
public void setAckCount(int count)
Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNT
orContainerProperties.AckMode.COUNT_TIME
is being used.- Parameters:
count
- the count
-
setAckTime
public void setAckTime(long ackTime)
Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIME
orContainerProperties.AckMode.COUNT_TIME
is being used. Should be larger than- Parameters:
ackTime
- the time
-
setConsumerTaskExecutor
public void setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
Set the executor for threads that poll the consumer.- Parameters:
consumerTaskExecutor
- the executor
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout)
Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to#stop(Runnable)
will block for, before returning; default 10000L.- Parameters:
shutdownTimeout
- the shutdown timeout.
-
setSyncCommitTimeout
public void setSyncCommitTimeout(@Nullable java.time.Duration syncCommitTimeout)
Set the timeout for commitSync operations (ifConsumerProperties.isSyncCommits()
. Overrides the default api timeout property. In order of precedence:- this property
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
inConsumerProperties.setKafkaConsumerProperties(Properties)
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
in the consumer factory properties- 60 seconds
- Overrides:
setSyncCommitTimeout
in classConsumerProperties
- Parameters:
syncCommitTimeout
- the timeout.- See Also:
ConsumerProperties.setSyncCommits(boolean)
-
setIdleEventInterval
public void setIdleEventInterval(java.lang.Long idleEventInterval)
Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.- Parameters:
idleEventInterval
- the interval.
-
setAckOnError
@Deprecated public void setAckOnError(boolean ackOnError)
Deprecated.in favor ofGenericErrorHandler.isAckAfterHandle()
.Set whether or not the container should commit offsets (ack messages) where the listener throws exceptions. This works in conjunction withackMode
and is effective only when the kafka propertyenable.auto.commit
isfalse
; it is not applicable to manual ack modes. When this property is set totrue
, all messages handled will have their offset committed. When set tofalse
(the default), offsets will be committed only for successfully handled messages. Manual acks will always be applied. Bear in mind that, if the next message is successfully handled, its offset will be committed, effectively committing the offset of the failed message anyway, so this option has limited applicability, unless you are using aSeekToCurrentBatchErrorHandler
which will seek the current record so that it is reprocessed.Does not apply when transactions are used - in that case, whether or not the offsets are sent to the transaction depends on whether the transaction is committed or rolled back. If a listener throws an exception, the transaction will normally be rolled back unless an error handler is provided that handles the error and exits normally; in which case the offsets are sent to the transaction and the transaction is committed.
- Parameters:
ackOnError
- whether the container should acknowledge messages that throw exceptions.
-
getAckMode
public ContainerProperties.AckMode getAckMode()
-
getAckCount
public int getAckCount()
-
getAckTime
public long getAckTime()
-
getMessageListener
public java.lang.Object getMessageListener()
-
getConsumerTaskExecutor
public org.springframework.core.task.AsyncListenableTaskExecutor getConsumerTaskExecutor()
-
getShutdownTimeout
public long getShutdownTimeout()
-
getIdleEventInterval
public java.lang.Long getIdleEventInterval()
-
isAckOnError
public boolean isAckOnError()
-
getTransactionManager
public org.springframework.transaction.PlatformTransactionManager getTransactionManager()
-
setTransactionManager
public void setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
Set the transaction manager to start a transaction; offsets are committed with semantics equivalent toContainerProperties.AckMode.RECORD
andContainerProperties.AckMode.BATCH
depending on the listener type (record or batch).- Parameters:
transactionManager
- the transaction manager.- Since:
- 1.3
- See Also:
setAckMode(AckMode)
-
getMonitorInterval
public int getMonitorInterval()
-
setMonitorInterval
public void setMonitorInterval(int monitorInterval)
The interval between checks for a non-responsive consumer in seconds; default 30.- Parameters:
monitorInterval
- the interval.- Since:
- 1.3.1
-
getScheduler
public org.springframework.scheduling.TaskScheduler getScheduler()
-
setScheduler
public void setScheduler(org.springframework.scheduling.TaskScheduler scheduler)
A scheduler used with the monitor interval.- Parameters:
scheduler
- the scheduler.- Since:
- 1.3.1
- See Also:
setMonitorInterval(int)
-
getNoPollThreshold
public float getNoPollThreshold()
-
setNoPollThreshold
public void setNoPollThreshold(float noPollThreshold)
If the time since the last poll /poll timeout
exceeds this value, a NonResponsiveConsumerEvent is published. This value should be more than 1.0 to avoid a race condition that can cause spurious events to be published. Default 3.0f.- Parameters:
noPollThreshold
- the threshold- Since:
- 1.3.1
-
isLogContainerConfig
public boolean isLogContainerConfig()
Log the container configuration if true (INFO).- Returns:
- true to log.
- Since:
- 2.1.1
-
setLogContainerConfig
public void setLogContainerConfig(boolean logContainerConfig)
Set to true to instruct each container to log this configuration.- Parameters:
logContainerConfig
- true to log.- Since:
- 2.1.1
-
isMissingTopicsFatal
public boolean isMissingTopicsFatal()
If true, the container won't start if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false.- Returns:
- the missingTopicsFatal.
- Since:
- 2.2
-
setMissingTopicsFatal
public void setMissingTopicsFatal(boolean missingTopicsFatal)
Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;- Parameters:
missingTopicsFatal
- the missingTopicsFatal.- Since:
- 2.2
-
setIdleBetweenPolls
public void setIdleBetweenPolls(long idleBetweenPolls)
The sleep interval in milliseconds used in the main loop betweenConsumer.poll(Duration)
calls. Defaults to0
- no idling.- Parameters:
idleBetweenPolls
- the interval to sleep between polling cycles.- Since:
- 2.3
-
getIdleBetweenPolls
public long getIdleBetweenPolls()
-
isMicrometerEnabled
public boolean isMicrometerEnabled()
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable the Micrometer listener timers. Default true.- Parameters:
micrometerEnabled
- false to disable.- Since:
- 2.3
-
setMicrometerTags
public void setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.- Parameters:
tags
- the tags.- Since:
- 2.3
-
getMicrometerTags
public java.util.Map<java.lang.String,java.lang.String> getMicrometerTags()
-
getConsumerStartTimout
public java.time.Duration getConsumerStartTimout()
-
setConsumerStartTimout
public void setConsumerStartTimout(java.time.Duration consumerStartTimout)
Set the timeout to wait for a consumer thread to start before logging an error. Default 30 seconds.- Parameters:
consumerStartTimout
- the consumer start timeout.
-
isSubBatchPerPartition
public boolean isSubBatchPerPartition()
Return whether to split batches by partition.- Returns:
- subBatchPerPartition.
- Since:
- 2.3.2
-
getSubBatchPerPartition
@Nullable public java.lang.Boolean getSubBatchPerPartition()
Return whether to split batches by partition; null if not set.- Returns:
- subBatchPerPartition.
- Since:
- 2.5
-
setSubBatchPerPartition
public void setSubBatchPerPartition(boolean subBatchPerPartition)
When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by thepoll()
. Useful when using transactions to enable zombie fencing, by using atransactional.id
that is unique for each group/topic/partition. Defaults to true when using transactions withEOSMode.ALPHA
and false when not using transactions or withEOSMode.BETA
.- Parameters:
subBatchPerPartition
- true for a separate transaction for each partition.- Since:
- 2.3.2
-
getAssignmentCommitOption
public ContainerProperties.AssignmentCommitOption getAssignmentCommitOption()
-
setAssignmentCommitOption
public void setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)
Set the assignment commit option. DefaultContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX
.- Parameters:
assignmentCommitOption
- the option.- Since:
- 2.3.6
-
isDeliveryAttemptHeader
public boolean isDeliveryAttemptHeader()
-
setDeliveryAttemptHeader
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader)
Set to true to populate theKafkaHeaders.DELIVERY_ATTEMPT
header when the error handler or after rollback processor implementsDeliveryAttemptAware
. There is a small overhead so this is false by default.- Parameters:
deliveryAttemptHeader
- true to populate- Since:
- 2.5
-
getEosMode
public ContainerProperties.EOSMode getEosMode()
Get the exactly once semantics mode.- Returns:
- the mode.
- Since:
- 2.5
- See Also:
setEosMode(EOSMode)
-
setEosMode
public void setEosMode(ContainerProperties.EOSMode eosMode)
Set the exactly once semantics mode. WhenContainerProperties.EOSMode.ALPHA
a producer per group/topic/partition is used (enabling 'transactional.id fencing`).ContainerProperties.EOSMode.BETA
enables fetch-offset-request fencing, and requires brokers 2.5 or later. In the 2.6 client, the default will be BETA because the 2.6 client can automatically fall back to ALPHA.- Parameters:
eosMode
- the mode; default ALPHA.- Since:
- 2.5
-
getTransactionDefinition
@Nullable public org.springframework.transaction.TransactionDefinition getTransactionDefinition()
Get the transaction definition.- Returns:
- the definition.
- Since:
- 2.5.4
-
setTransactionDefinition
public void setTransactionDefinition(org.springframework.transaction.TransactionDefinition transactionDefinition)
Set a transaction definition with properties (e.g. timeout) that will be copied to the container's transaction template. Note that this is only generally useful when used with aChainedKafkaTransactionManager
configured with a non-Kafka transaction manager. Kafka has no concept of transaction timeout, for example.- Parameters:
transactionDefinition
- the definition.- Since:
- 2.5.4
-
getAdviceChain
public org.aopalliance.aop.Advice[] getAdviceChain()
A chain of listenerAdvice
s.- Returns:
- the adviceChain.
- Since:
- 2.5.6
-
setAdviceChain
public void setAdviceChain(org.aopalliance.aop.Advice... adviceChain)
Set a chain of listenerAdvice
s; must not be null or have null elements.- Parameters:
adviceChain
- the adviceChain to set.- Since:
- 2.5.6
-
toString
public java.lang.String toString()
- Overrides:
toString
in classConsumerProperties
-
-