public class ContainerProperties extends ConsumerProperties
Modifier and Type | Class and Description |
---|---|
static class |
ContainerProperties.AckMode
The offset commit behavior enumeration.
|
static class |
ContainerProperties.AssignmentCommitOption
Offset commit behavior during assignment.
|
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_MONITOR_INTERVAL
The default
monitorInterval (s). |
static float |
DEFAULT_NO_POLL_THRESHOLD
The default
noPollThreshold . |
static long |
DEFAULT_SHUTDOWN_TIMEOUT
The default
shutDownTimeout (ms). |
DEFAULT_POLL_TIMEOUT
Constructor and Description |
---|
ContainerProperties(java.util.regex.Pattern topicPattern)
Create properties for a container that will subscribe to topics matching the
specified pattern.
|
ContainerProperties(java.lang.String... topics)
Create properties for a container that will subscribe to the specified topics.
|
ContainerProperties(TopicPartitionInitialOffset... topicPartitions)
Deprecated.
in favor of
ContainerProperties(TopicPartitionOffset...) . |
ContainerProperties(TopicPartitionOffset... topicPartitions)
Create properties for a container that will assign itself the provided topic
partitions.
|
Modifier and Type | Method and Description |
---|---|
int |
getAckCount() |
ContainerProperties.AckMode |
getAckMode() |
long |
getAckTime() |
ContainerProperties.AssignmentCommitOption |
getAssignmentCommitOption() |
java.util.Properties |
getConsumerProperties()
Deprecated.
in favor of
ConsumerProperties.getKafkaConsumerProperties() . |
java.time.Duration |
getConsumerStartTimout() |
org.springframework.core.task.AsyncListenableTaskExecutor |
getConsumerTaskExecutor() |
long |
getIdleBetweenPolls() |
java.lang.Long |
getIdleEventInterval() |
java.lang.Object |
getMessageListener() |
java.util.Map<java.lang.String,java.lang.String> |
getMicrometerTags() |
int |
getMonitorInterval() |
float |
getNoPollThreshold() |
org.springframework.scheduling.TaskScheduler |
getScheduler() |
long |
getShutdownTimeout() |
TopicPartitionInitialOffset[] |
getTopicPartitions()
Deprecated.
in favor of
ConsumerProperties.getTopicPartitionsToAssign() . |
org.springframework.transaction.PlatformTransactionManager |
getTransactionManager() |
boolean |
isAckOnError() |
boolean |
isLogContainerConfig()
Log the container configuration if true (INFO).
|
boolean |
isMicrometerEnabled() |
boolean |
isMissingTopicsFatal()
If true, the container won't start if any of the configured topics are not present
on the broker.
|
boolean |
isSubBatchPerPartition() |
void |
setAckCount(int count)
Set the number of outstanding record count after which offsets should be
committed when
ContainerProperties.AckMode.COUNT or ContainerProperties.AckMode.COUNT_TIME is being used. |
void |
setAckMode(ContainerProperties.AckMode ackMode)
Set the ack mode to use when auto ack (in the configuration properties) is false.
|
void |
setAckOnError(boolean ackOnError)
Set whether or not the container should commit offsets (ack messages) where the
listener throws exceptions.
|
void |
setAckTime(long ackTime)
Set the time (ms) after which outstanding offsets should be committed when
ContainerProperties.AckMode.TIME or ContainerProperties.AckMode.COUNT_TIME is being used. |
void |
setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)
Set the assignment commit option.
|
void |
setConsumerProperties(java.util.Properties consumerProperties)
Deprecated.
|
void |
setConsumerStartTimout(java.time.Duration consumerStartTimout)
Set the timeout to wait for a consumer thread to start before logging
an error.
|
void |
setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
Set the executor for threads that poll the consumer.
|
void |
setIdleBetweenPolls(long idleBetweenPolls)
The sleep interval in milliseconds used in the main loop between
Consumer.poll(Duration) calls. |
void |
setIdleEventInterval(java.lang.Long idleEventInterval)
Set the idle event interval; when set, an event is emitted if a poll returns
no records and this interval has elapsed since a record was returned.
|
void |
setLogContainerConfig(boolean logContainerConfig)
Set to true to instruct each container to log this configuration.
|
void |
setMessageListener(java.lang.Object messageListener)
Set the message listener; must be a
MessageListener
or AcknowledgingMessageListener . |
void |
setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable the Micrometer listener timers.
|
void |
setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.
|
void |
setMissingTopicsFatal(boolean missingTopicsFatal)
Set to false to allow the container to start even if any of the configured topics
are not present on the broker.
|
void |
setMonitorInterval(int monitorInterval)
The interval between checks for a non-responsive consumer in
seconds; default 30.
|
void |
setNoPollThreshold(float noPollThreshold)
If the time since the last poll /
poll timeout
exceeds this value, a NonResponsiveConsumerEvent is published. |
void |
setScheduler(org.springframework.scheduling.TaskScheduler scheduler)
A scheduler used with the monitor interval.
|
void |
setShutdownTimeout(long shutdownTimeout)
Set the timeout for shutting down the container.
|
void |
setSubBatchPerPartition(boolean subBatchPerPartition)
When using a batch message listener whether to dispatch records by partition (with
a transaction for each sub batch if transactions are in use) or the complete batch
received by the
poll() . |
void |
setSyncCommitTimeout(java.time.Duration syncCommitTimeout)
Set the timeout for commitSync operations (if
ConsumerProperties.isSyncCommits() . |
void |
setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
Set the transaction manager to start a transaction; offsets are committed with
semantics equivalent to
ContainerProperties.AckMode.RECORD and ContainerProperties.AckMode.BATCH depending
on the listener type (record or batch). |
java.lang.String |
toString() |
getAuthorizationExceptionRetryInterval, getClientId, getCommitCallback, getCommitLogLevel, getCommitRetries, getConsumerRebalanceListener, getGroupId, getKafkaConsumerProperties, getPollTimeout, getSyncCommitTimeout, getTopicPartitionsToAssign, getTopicPattern, getTopics, isOnlyLogRecordMetadata, isSyncCommits, renderProperties, setAuthorizationExceptionRetryInterval, setClientId, setCommitCallback, setCommitLogLevel, setCommitRetries, setConsumerRebalanceListener, setGroupId, setKafkaConsumerProperties, setOnlyLogRecordMetadata, setPollTimeout, setSyncCommits
public static final long DEFAULT_SHUTDOWN_TIMEOUT
shutDownTimeout
(ms).public static final int DEFAULT_MONITOR_INTERVAL
monitorInterval
(s).public static final float DEFAULT_NO_POLL_THRESHOLD
noPollThreshold
.public ContainerProperties(java.lang.String... topics)
topics
- the topics.public ContainerProperties(java.util.regex.Pattern topicPattern)
topicPattern
- the pattern.CommonClientConfigs.METADATA_MAX_AGE_CONFIG
@Deprecated public ContainerProperties(TopicPartitionInitialOffset... topicPartitions)
ContainerProperties(TopicPartitionOffset...)
.topicPartitions
- the topic partitions.public ContainerProperties(TopicPartitionOffset... topicPartitions)
topicPartitions
- the topic partitions.public void setMessageListener(java.lang.Object messageListener)
MessageListener
or AcknowledgingMessageListener
.messageListener
- the listener.public void setAckMode(ContainerProperties.AckMode ackMode)
#setPollTimeout(long) pollTimeout
.AcknowledgingMessageListener
.
RECORD
or BATCH
, depending on
the listener type.ackMode
- the ContainerProperties.AckMode
; default BATCH.setTransactionManager(PlatformTransactionManager)
public void setAckCount(int count)
ContainerProperties.AckMode.COUNT
or ContainerProperties.AckMode.COUNT_TIME
is being used.count
- the countpublic void setAckTime(long ackTime)
ContainerProperties.AckMode.TIME
or ContainerProperties.AckMode.COUNT_TIME
is being used. Should be
larger thanackTime
- the timepublic void setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
consumerTaskExecutor
- the executorpublic void setShutdownTimeout(long shutdownTimeout)
#stop(Runnable)
will block for, before
returning; default 10000L.shutdownTimeout
- the shutdown timeout.public void setSyncCommitTimeout(@Nullable java.time.Duration syncCommitTimeout)
ConsumerProperties.isSyncCommits()
. Overrides
the default api timeout property. In order of precedence:
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
in
ConsumerProperties.setKafkaConsumerProperties(Properties)
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
in the consumer factory
propertiessetSyncCommitTimeout
in class ConsumerProperties
syncCommitTimeout
- the timeout.ConsumerProperties.setSyncCommits(boolean)
public void setIdleEventInterval(java.lang.Long idleEventInterval)
idleEventInterval
- the interval.public void setAckOnError(boolean ackOnError)
ackMode
and is
effective only when the kafka property enable.auto.commit
is false
;
it is not applicable to manual ack modes. When this property is set to
true
, all messages handled will have their offset committed. When set to
false
(the default), offsets will be committed only for successfully
handled messages. Manual acks will always be applied. Bear in mind that, if the
next message is successfully handled, its offset will be committed, effectively
committing the offset of the failed message anyway, so this option has limited
applicability, unless you are using a SeekToCurrentBatchErrorHandler
which
will seek the current record so that it is reprocessed.
Does not apply when transactions are used - in that case, whether or not the offsets are sent to the transaction depends on whether the transaction is committed or rolled back. If a listener throws an exception, the transaction will normally be rolled back unless an error handler is provided that handles the error and exits normally; in which case the offsets are sent to the transaction and the transaction is committed.
ackOnError
- whether the container should acknowledge messages that throw
exceptions.@Deprecated @Nullable public TopicPartitionInitialOffset[] getTopicPartitions()
ConsumerProperties.getTopicPartitionsToAssign()
.public ContainerProperties.AckMode getAckMode()
public int getAckCount()
public long getAckTime()
public java.lang.Object getMessageListener()
public org.springframework.core.task.AsyncListenableTaskExecutor getConsumerTaskExecutor()
public long getShutdownTimeout()
public java.lang.Long getIdleEventInterval()
public boolean isAckOnError()
public org.springframework.transaction.PlatformTransactionManager getTransactionManager()
public void setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
ContainerProperties.AckMode.RECORD
and ContainerProperties.AckMode.BATCH
depending
on the listener type (record or batch).transactionManager
- the transaction manager.setAckMode(AckMode)
public int getMonitorInterval()
public void setMonitorInterval(int monitorInterval)
monitorInterval
- the interval.public org.springframework.scheduling.TaskScheduler getScheduler()
public void setScheduler(org.springframework.scheduling.TaskScheduler scheduler)
scheduler
- the scheduler.setMonitorInterval(int)
public float getNoPollThreshold()
public void setNoPollThreshold(float noPollThreshold)
poll timeout
exceeds this value, a NonResponsiveConsumerEvent is published.
This value should be more than 1.0 to avoid a race condition that can cause
spurious events to be published.
Default 3.0f.noPollThreshold
- the thresholdpublic boolean isLogContainerConfig()
public void setLogContainerConfig(boolean logContainerConfig)
logContainerConfig
- true to log.public boolean isMissingTopicsFatal()
public void setMissingTopicsFatal(boolean missingTopicsFatal)
missingTopicsFatal
- the missingTopicsFatal.@Deprecated public java.util.Properties getConsumerProperties()
ConsumerProperties.getKafkaConsumerProperties()
.group.id
and client.id
are ignored.ConsumerConfig
,
ConsumerProperties.setGroupId(String)
,
ConsumerProperties.setClientId(String)
@Deprecated public void setConsumerProperties(java.util.Properties consumerProperties)
ConsumerProperties.setKafkaConsumerProperties(Properties)
.group.id
and client.id
are ignored.consumerProperties
- the properties.ConsumerConfig
,
ConsumerProperties.setGroupId(String)
,
ConsumerProperties.setClientId(String)
public void setIdleBetweenPolls(long idleBetweenPolls)
Consumer.poll(Duration)
calls.
Defaults to 0
- no idling.idleBetweenPolls
- the interval to sleep between polling cycles.public long getIdleBetweenPolls()
public boolean isMicrometerEnabled()
public void setMicrometerEnabled(boolean micrometerEnabled)
micrometerEnabled
- false to disable.public void setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
tags
- the tags.public java.util.Map<java.lang.String,java.lang.String> getMicrometerTags()
public java.time.Duration getConsumerStartTimout()
public void setConsumerStartTimout(java.time.Duration consumerStartTimout)
consumerStartTimout
- the consumer start timeout.public boolean isSubBatchPerPartition()
public void setSubBatchPerPartition(boolean subBatchPerPartition)
poll()
. Useful when using transactions to enable zombie
fencing, by using a transactional.id
that is unique for each
group/topic/partition.subBatchPerPartition
- true for a separate transaction for each partition.public ContainerProperties.AssignmentCommitOption getAssignmentCommitOption()
public void setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)
ContainerProperties.AssignmentCommitOption.ALWAYS
.
In a future release it will default to ContainerProperties.AssignmentCommitOption.LATEST_ONLY
.assignmentCommitOption
- the option.public java.lang.String toString()
toString
in class ConsumerProperties