Class ContainerProperties

java.lang.Object
org.springframework.kafka.listener.ConsumerProperties
org.springframework.kafka.listener.ContainerProperties

public class ContainerProperties extends ConsumerProperties
Contains runtime properties for a listener container.
Author:
Gary Russell, Artem Bilan, Artem Yakshin, Johnny Lim, Lukasz Kaminski, Kyuhyeok Park
  • Field Details

  • Constructor Details

    • ContainerProperties

      public ContainerProperties(String... topics)
      Create properties for a container that will subscribe to the specified topics.
      Parameters:
      topics - the topics.
    • ContainerProperties

      public ContainerProperties(Pattern topicPattern)
      Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.
      Parameters:
      topicPattern - the pattern.
      See Also:
      • CommonClientConfigs.METADATA_MAX_AGE_CONFIG
    • ContainerProperties

      public ContainerProperties(TopicPartitionOffset... topicPartitions)
      Create properties for a container that will assign itself the provided topic partitions.
      Parameters:
      topicPartitions - the topic partitions.
  • Method Details

    • setMessageListener

      public void setMessageListener(Object messageListener)
      Set the message listener; must be a MessageListener or AcknowledgingMessageListener.
      Parameters:
      messageListener - the listener.
    • setAckMode

      public void setAckMode(ContainerProperties.AckMode ackMode)
      Set the ack mode to use when auto ack (in the configuration properties) is false.
      • RECORD: Commit the offset after each record has been processed by the listener.
      • BATCH: Commit the offsets for each batch of records received from the consumer when they all have been processed by the listener
      • TIME: Commit pending offsets after ackTime number of milliseconds; (should be greater than ConsumerProperties#setPollTimeout(long) pollTimeout.
      • COUNT: Commit pending offsets after at least ackCount number of records have been processed
      • COUNT_TIME: Commit pending offsets after ackTime number of milliseconds or at least ackCount number of records have been processed
      • MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener. Acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener.
      • MANUAL_IMMEDIATE: Listener is responsible for acking - use a AcknowledgingMessageListener. The commit will be performed immediately if the Acknowledgment is acknowledged on the calling consumer thread. Otherwise, the acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener. Results will be indeterminate if you sometimes acknowledge on the calling thread and sometimes not.
      Parameters:
      ackMode - the ContainerProperties.AckMode; default BATCH.
      See Also:
    • setAckCount

      public void setAckCount(int count)
      Set the number of outstanding record count after which offsets should be committed when ContainerProperties.AckMode.COUNT or ContainerProperties.AckMode.COUNT_TIME is being used.
      Parameters:
      count - the count
    • setAckTime

      public void setAckTime(long ackTime)
      Set the time (ms) after which outstanding offsets should be committed when ContainerProperties.AckMode.TIME or ContainerProperties.AckMode.COUNT_TIME is being used. Should be larger than zero.
      Parameters:
      ackTime - the time
    • setListenerTaskExecutor

      public void setListenerTaskExecutor(@Nullable AsyncTaskExecutor listenerTaskExecutor)
      Set the executor for threads that poll the consumer.
      Parameters:
      listenerTaskExecutor - the executor
      Since:
      2.8.9
    • setShutdownTimeout

      public void setShutdownTimeout(long shutdownTimeout)
      Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to #stop(Runnable) will block for, before returning; default 10000L.
      Parameters:
      shutdownTimeout - the shutdown timeout.
    • setSyncCommitTimeout

      public void setSyncCommitTimeout(@Nullable Duration syncCommitTimeout)
      Set the timeout for commitSync operations (if ConsumerProperties.isSyncCommits(). Overrides the default api timeout property. In order of precedence:
      Overrides:
      setSyncCommitTimeout in class ConsumerProperties
      Parameters:
      syncCommitTimeout - the timeout.
      See Also:
    • setIdleEventInterval

      public void setIdleEventInterval(@Nullable Long idleEventInterval)
      Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.
      Parameters:
      idleEventInterval - the interval.
      See Also:
    • setIdleBeforeDataMultiplier

      public void setIdleBeforeDataMultiplier(double idleBeforeDataMultiplier)
      Multiply the setIdleEventInterval(Long) by this value until at least one record is received. Default 5.0.
      Parameters:
      idleBeforeDataMultiplier - false to allow publishing.
      Since:
      2.8
      See Also:
    • setIdlePartitionEventInterval

      public void setIdlePartitionEventInterval(@Nullable Long idlePartitionEventInterval)
      Set the idle partition event interval; when set, an event is emitted if a poll returns no records for a partition and this interval has elapsed since a record was returned.
      Parameters:
      idlePartitionEventInterval - the interval.
    • getAckMode

      public ContainerProperties.AckMode getAckMode()
    • getAckCount

      public int getAckCount()
    • getAckTime

      public long getAckTime()
    • getMessageListener

      public Object getMessageListener()
    • getListenerTaskExecutor

      @Nullable public AsyncTaskExecutor getListenerTaskExecutor()
      Return the consumer task executor.
      Returns:
      the executor.
    • getShutdownTimeout

      public long getShutdownTimeout()
    • getIdleEventInterval

      @Nullable public Long getIdleEventInterval()
      Return the idle event interval.
      Returns:
      the interval.
    • getIdleBeforeDataMultiplier

      public double getIdleBeforeDataMultiplier()
      Multiply the setIdleEventInterval(Long) by this value until at least one record is received. Default 5.0.
      Returns:
      the noIdleBeforeData.
      Since:
      2.8
      See Also:
    • getIdlePartitionEventInterval

      @Nullable public Long getIdlePartitionEventInterval()
      Return the idle partition event interval.
      Returns:
      the interval.
    • getTransactionManager

      @Nullable public PlatformTransactionManager getTransactionManager()
    • setTransactionManager

      public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager)
      Set the transaction manager to start a transaction; if it is a KafkaAwareTransactionManager, offsets are committed with semantics equivalent to ContainerProperties.AckMode.RECORD and ContainerProperties.AckMode.BATCH depending on the listener type (record or batch). For other transaction managers, adding the transaction manager to the container facilitates, for example, a record or batch interceptor participating in the same transaction (you must set the container's interceptBeforeTx property to false).
      Parameters:
      transactionManager - the transaction manager.
      Since:
      1.3
      See Also:
    • getMonitorInterval

      public int getMonitorInterval()
    • setMonitorInterval

      public void setMonitorInterval(int monitorInterval)
      The interval between checks for a non-responsive consumer in seconds; default 30.
      Parameters:
      monitorInterval - the interval.
      Since:
      1.3.1
    • getScheduler

      @Nullable public TaskScheduler getScheduler()
      Return the task scheduler, if present.
      Returns:
      the scheduler.
    • setScheduler

      public void setScheduler(@Nullable TaskScheduler scheduler)
      A scheduler used with the monitor interval.
      Parameters:
      scheduler - the scheduler.
      Since:
      1.3.1
      See Also:
    • getNoPollThreshold

      public float getNoPollThreshold()
    • setNoPollThreshold

      public void setNoPollThreshold(float noPollThreshold)
      If the time since the last poll / poll timeout exceeds this value, a NonResponsiveConsumerEvent is published. This value should be more than 1.0 to avoid a race condition that can cause spurious events to be published. Default 3.0f.
      Parameters:
      noPollThreshold - the threshold
      Since:
      1.3.1
    • isLogContainerConfig

      public boolean isLogContainerConfig()
      Log the container configuration if true (INFO).
      Returns:
      true to log.
      Since:
      2.1.1
    • setLogContainerConfig

      public void setLogContainerConfig(boolean logContainerConfig)
      Set to true to instruct each container to log this configuration.
      Parameters:
      logContainerConfig - true to log.
      Since:
      2.1.1
    • isMissingTopicsFatal

      public boolean isMissingTopicsFatal()
      If true, the container won't start if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false.
      Returns:
      the missingTopicsFatal.
      Since:
      2.2
    • setMissingTopicsFatal

      public void setMissingTopicsFatal(boolean missingTopicsFatal)
      Set to true to prevent the container from starting if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false;
      Parameters:
      missingTopicsFatal - the missingTopicsFatal.
      Since:
      2.2
    • setIdleBetweenPolls

      public void setIdleBetweenPolls(long idleBetweenPolls)
      The sleep interval in milliseconds used in the main loop between Consumer.poll(Duration) calls. Defaults to 0 - no idling.
      Parameters:
      idleBetweenPolls - the interval to sleep between polling cycles.
      Since:
      2.3
    • getIdleBetweenPolls

      public long getIdleBetweenPolls()
    • isMicrometerEnabled

      public boolean isMicrometerEnabled()
    • setMicrometerEnabled

      public void setMicrometerEnabled(boolean micrometerEnabled)
      Set to false to disable the Micrometer listener timers. Default true. Disabled when setObservationEnabled(boolean) is true.
      Parameters:
      micrometerEnabled - false to disable.
      Since:
      2.3
    • isObservationEnabled

      public boolean isObservationEnabled()
    • setObservationEnabled

      public void setObservationEnabled(boolean observationEnabled)
      Set to true to enable observation via Micrometer. When false (default) basic Micrometer timers are used instead (when enabled).
      Parameters:
      observationEnabled - true to enable.
      Since:
      3.0
      See Also:
    • setMicrometerTags

      public void setMicrometerTags(Map<String,String> tags)
      Set additional tags for the Micrometer listener timers.
      Parameters:
      tags - the tags.
      Since:
      2.3
    • getMicrometerTags

      public Map<String,String> getMicrometerTags()
      Return static Micrometer tags.
      Returns:
      the tags.
      Since:
      2.3
    • setMicrometerTagsProvider

      public void setMicrometerTagsProvider(@Nullable Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,Map<String,String>> micrometerTagsProvider)
      Set a function to provide dynamic tags based on the consumer record. These tags will be added to any static tags provided in micrometerTags. Only applies to record listeners, ignored for batch listeners. Does not apply if observation is enabled.
      Parameters:
      micrometerTagsProvider - the micrometerTagsProvider.
      Since:
      2.9.8
      See Also:
    • getMicrometerTagsProvider

      @Nullable public Function<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,Map<String,String>> getMicrometerTagsProvider()
      Return the Micrometer tags provider.
      Returns:
      the micrometerTagsProvider.
      Since:
      2.9.8
    • getConsumerStartTimeout

      public Duration getConsumerStartTimeout()
    • setConsumerStartTimeout

      public void setConsumerStartTimeout(Duration consumerStartTimeout)
      Set the timeout to wait for a consumer thread to start before logging an error. Default 30 seconds.
      Parameters:
      consumerStartTimeout - the consumer start timeout.
    • isSubBatchPerPartition

      public boolean isSubBatchPerPartition()
      Return whether to split batches by partition.
      Returns:
      subBatchPerPartition.
      Since:
      2.3.2
    • getSubBatchPerPartition

      @Nullable public Boolean getSubBatchPerPartition()
      Return whether to split batches by partition; null if not set.
      Returns:
      subBatchPerPartition.
      Since:
      2.5
    • setSubBatchPerPartition

      public void setSubBatchPerPartition(@Nullable Boolean subBatchPerPartition)
      When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by the poll(). Useful when using transactions to enable zombie fencing, by using a transactional.id that is unique for each group/topic/partition. Defaults to true when using transactions with EOSMode.ALPHA and false when not using transactions or with EOSMode.BETA.
      Parameters:
      subBatchPerPartition - true for a separate transaction for each partition.
      Since:
      2.3.2
    • getAssignmentCommitOption

      public ContainerProperties.AssignmentCommitOption getAssignmentCommitOption()
    • setAssignmentCommitOption

      public void setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)
      Set the assignment commit option. Default ContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX.
      Parameters:
      assignmentCommitOption - the option.
      Since:
      2.3.6
    • isDeliveryAttemptHeader

      public boolean isDeliveryAttemptHeader()
    • setDeliveryAttemptHeader

      public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader)
      Set to true to populate the KafkaHeaders.DELIVERY_ATTEMPT header when the error handler or after rollback processor implements DeliveryAttemptAware. There is a small overhead so this is false by default.
      Parameters:
      deliveryAttemptHeader - true to populate
      Since:
      2.5
    • getEosMode

      public ContainerProperties.EOSMode getEosMode()
      Get the exactly once semantics mode.
      Returns:
      the mode.
      Since:
      2.5
      See Also:
    • setEosMode

      public void setEosMode(ContainerProperties.EOSMode eosMode)
      Set the exactly once semantics mode. Only ContainerProperties.EOSMode.V2 is supported since version 3.0.
      Parameters:
      eosMode - the mode; default V2.
      Since:
      2.5
    • getTransactionDefinition

      @Nullable public TransactionDefinition getTransactionDefinition()
      Get the transaction definition.
      Returns:
      the definition.
      Since:
      2.5.4
    • setTransactionDefinition

      public void setTransactionDefinition(@Nullable TransactionDefinition transactionDefinition)
      Set a transaction definition with properties (e.g. timeout) that will be copied to the container's transaction template. Note that this is only generally useful when used with a PlatformTransactionManager that supports a custom definition; this does NOT include the KafkaTransactionManager which has no concept of transaction timeout. It can be useful to start, for example a database transaction, in the container, rather than using @Transactional on the listener, because then a record interceptor, or filter in a listener adapter can participate in the transaction.
      Parameters:
      transactionDefinition - the definition.
      Since:
      2.5.4
      See Also:
    • getAdviceChain

      public Advice[] getAdviceChain()
      A chain of listener Advices.
      Returns:
      the adviceChain.
      Since:
      2.5.6
    • setAdviceChain

      public void setAdviceChain(Advice... adviceChain)
      Set a chain of listener Advices; must not be null or have null elements.
      Parameters:
      adviceChain - the adviceChain to set.
      Since:
      2.5.6
    • isStopContainerWhenFenced

      public boolean isStopContainerWhenFenced()
      When true, the container will stop after a ProducerFencedException.
      Returns:
      the stopContainerWhenFenced
      Since:
      2.5.8
    • setStopContainerWhenFenced

      public void setStopContainerWhenFenced(boolean stopContainerWhenFenced)
      Set to true to stop the container when a ProducerFencedException is thrown. Currently, there is no way to determine if such an exception is thrown due to a rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The best solution is to ensure that the transaction.timeout.ms is large enough so that transactions don't time out.
      Parameters:
      stopContainerWhenFenced - true to stop the container.
      Since:
      2.5.8
    • isStopImmediate

      public boolean isStopImmediate()
      When true, the container will be stopped immediately after processing the current record.
      Returns:
      true to stop immediately.
      Since:
      2.5.11
    • setStopImmediate

      public void setStopImmediate(boolean stopImmediate)
      Set to true to stop the container after processing the current record (when stop() is called). When false (default), the container will stop after all the results of the previous poll are processed.
      Parameters:
      stopImmediate - true to stop after the current record.
      Since:
      2.5.11
    • isAsyncAcks

      public boolean isAsyncAcks()
      When true, async manual acknowledgments are supported.
      Returns:
      true for async ack support.
      Since:
      2.8
    • setAsyncAcks

      public void setAsyncAcks(boolean asyncAcks)
      Set to true to support asynchronous record acknowledgments. Only applies with ContainerProperties.AckMode.MANUAL or ContainerProperties.AckMode.MANUAL_IMMEDIATE. Out of order offset commits are deferred until all previous offsets in the partition have been committed. The consumer is paused, if necessary, until all acks have been completed.
      Parameters:
      asyncAcks - true to use async acks.
      Since:
      2.8
    • isPauseImmediate

      public boolean isPauseImmediate()
      When pausing the container with a record listener, whether the pause takes effect immediately, when the current record has been processed, or after all records from the previous poll have been processed. Default false.
      Returns:
      whether to pause immediately.
      Since:
      2.9
    • setPauseImmediate

      public void setPauseImmediate(boolean pauseImmediate)
      Set to true to pause the container after the current record has been processed, rather than after all the records from the previous poll have been processed.
      Parameters:
      pauseImmediate - true to pause immediately.
      Since:
      2.9
    • getObservationConvention

      public KafkaListenerObservationConvention getObservationConvention()
    • setObservationConvention

      public void setObservationConvention(KafkaListenerObservationConvention observationConvention)
      Parameters:
      observationConvention - the convention.
      Since:
      3.0
    • getPollTimeoutWhilePaused

      public Duration getPollTimeoutWhilePaused()
      The poll timeout to use while paused; usually a lower number than pollTimeout.
      Returns:
      the pollTimeoutWhilePaused
      Since:
      2.9.7
    • setPollTimeoutWhilePaused

      public void setPollTimeoutWhilePaused(Duration pollTimeoutWhilePaused)
      Set the poll timeout to use while paused; usually a lower number than pollTimeout. Should be greater than zero to avoid a tight CPU loop while the consumer is paused. Default is 100ms.
      Parameters:
      pollTimeoutWhilePaused - the pollTimeoutWhilePaused to set
      Since:
      2.9.7
    • isRestartAfterAuthExceptions

      public boolean isRestartAfterAuthExceptions()
      Restart the container if stopped due to an auth exception.
      Returns:
      the restartAfterAuthExceptions
      Since:
      2.9.7
    • setRestartAfterAuthExceptions

      public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions)
      Set to true to automatically restart the container if an auth exception is detected by the container (or all child containers).
      Parameters:
      restartAfterAuthExceptions - true to restart.
      Since:
      2.9.7
    • toString

      public String toString()
      Overrides:
      toString in class ConsumerProperties