Class ConsumerProperties

java.lang.Object
org.springframework.kafka.listener.ConsumerProperties
Direct Known Subclasses:
ContainerProperties

public class ConsumerProperties extends Object
Common consumer properties.
Since:
2.3
Author:
Gary Russell
  • Field Details

  • Constructor Details

    • ConsumerProperties

      public ConsumerProperties(String... topics)
      Create properties for a container that will subscribe to the specified topics.
      Parameters:
      topics - the topics.
    • ConsumerProperties

      public ConsumerProperties(Pattern topicPattern)
      Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.
      Parameters:
      topicPattern - the pattern.
      See Also:
      • CommonClientConfigs.METADATA_MAX_AGE_CONFIG
    • ConsumerProperties

      public ConsumerProperties(TopicPartitionOffset... topicPartitions)
      Create properties for a container that will assign itself the provided topic partitions.
      Parameters:
      topicPartitions - the topic partitions.
  • Method Details

    • getTopics

      @Nullable public String[] getTopics()
      Return the configured topics.
      Returns:
      the topics.
    • getTopicPattern

      @Nullable public Pattern getTopicPattern()
      Return the configured topic pattern.
      Returns:
      the topic pattern.
    • getTopicPartitions

      @Nullable public TopicPartitionOffset[] getTopicPartitions()
      Return the configured TopicPartitionOffsets.
      Returns:
      the topics/partitions.
      Since:
      2.5
    • setPollTimeout

      public void setPollTimeout(long pollTimeout)
      Set the max time to block in the consumer waiting for records.
      Parameters:
      pollTimeout - the timeout in ms; default 5000L.
    • getPollTimeout

      public long getPollTimeout()
    • setGroupId

      public void setGroupId(String groupId)
      Set the group id for this container. Overrides any group.id property provided by the consumer factory configuration.
      Parameters:
      groupId - the group id.
    • getGroupId

      @Nullable public String getGroupId()
      Return the container's group id.
      Returns:
      the group id.
    • getClientId

      public String getClientId()
      Return the client id.
      Returns:
      the client id.
      See Also:
    • setClientId

      public void setClientId(String clientId)
      Set the client id; overrides the consumer factory client.id property. When used in a concurrent container, will be suffixed with '-n' to provide a unique value for each consumer.
      Parameters:
      clientId - the client id.
    • setConsumerRebalanceListener

      public void setConsumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)
      Set the user defined ConsumerRebalanceListener implementation.
      Parameters:
      consumerRebalanceListener - the ConsumerRebalanceListener instance
    • getConsumerRebalanceListener

      @Nullable public org.apache.kafka.clients.consumer.ConsumerRebalanceListener getConsumerRebalanceListener()
      Return the rebalance listener.
      Returns:
      the listener.
    • setSyncCommitTimeout

      public void setSyncCommitTimeout(@Nullable Duration syncCommitTimeout)
      Set the timeout for commitSync operations (if isSyncCommits(). Overrides the default api timeout property.
      Parameters:
      syncCommitTimeout - the timeout.
      See Also:
    • getSyncCommitTimeout

      @Nullable public Duration getSyncCommitTimeout()
      Return the sync commit timeout.
      Returns:
      the timeout.
    • setCommitCallback

      public void setCommitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)
      Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level. Used when syncCommits is false.
      Parameters:
      commitCallback - the callback.
      See Also:
    • setOffsetAndMetadataProvider

      public void setOffsetAndMetadataProvider(OffsetAndMetadataProvider offsetAndMetadataProvider)
      Set the offset and metadata provider associated to a commit callback.
      Parameters:
      offsetAndMetadataProvider - an offset and metadata provider.
      Since:
      2.8.5
      See Also:
    • getCommitCallback

      @Nullable public org.apache.kafka.clients.consumer.OffsetCommitCallback getCommitCallback()
      Return the commit callback.
      Returns:
      the callback.
    • getOffsetAndMetadataProvider

      @Nullable public OffsetAndMetadataProvider getOffsetAndMetadataProvider()
      Return the offset and metadata provider.
      Returns:
      the offset and metadata provider.
    • setSyncCommits

      public void setSyncCommits(boolean syncCommits)
      Set whether or not to call consumer.commitSync() or commitAsync() when the container is responsible for commits. Default true.
      Parameters:
      syncCommits - true to use commitSync().
      See Also:
    • isSyncCommits

      public boolean isSyncCommits()
    • getCommitLogLevel

      public LogIfLevelEnabled.Level getCommitLogLevel()
      The level at which to log offset commits.
      Returns:
      the level.
    • setCommitLogLevel

      public void setCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel)
      Set the level at which to log offset commits. Default: DEBUG.
      Parameters:
      commitLogLevel - the level.
    • getKafkaConsumerProperties

      public Properties getKafkaConsumerProperties()
      Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory. You can add non-String-valued properties, but the property name (hashtable key) must be String; all others will be ignored. group.id and client.id are ignored.
      Returns:
      the properties.
      See Also:
    • setKafkaConsumerProperties

      public void setKafkaConsumerProperties(Properties kafkaConsumerProperties)
      Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory. group.id and client.id are ignored. Property keys must be Strings.
      Parameters:
      kafkaConsumerProperties - the properties.
      See Also:
    • getAuthExceptionRetryInterval

      @Nullable public Duration getAuthExceptionRetryInterval()
      Get the authentication/authorization retry interval.
      Returns:
      the interval.
    • setAuthExceptionRetryInterval

      public void setAuthExceptionRetryInterval(Duration authExceptionRetryInterval)
      Set the interval between retries after and AuthenticationException or org.apache.kafka.common.errors.AuthorizationException is thrown by KafkaConsumer. By default the field is null and retries are disabled. In such case the container will be stopped. The interval must be less than max.poll.interval.ms consumer property.
      Parameters:
      authExceptionRetryInterval - the duration between retries
      Since:
      2.8
    • getCommitRetries

      public int getCommitRetries()
      The number of retries allowed when a RetriableCommitFailedException is thrown by the consumer when using setSyncCommits(boolean) set to true.
      Returns:
      the number of retries.
      Since:
      2.3.9
      See Also:
    • setCommitRetries

      public void setCommitRetries(int commitRetries)
      Set number of retries allowed when a RetriableCommitFailedException is thrown by the consumer when using setSyncCommits(boolean) set to true. Default 3 (4 attempts total).
      Parameters:
      commitRetries - the commitRetries.
      Since:
      2.3.9
      See Also:
    • isFixTxOffsets

      public boolean isFixTxOffsets()
      Whether or not to correct terminal transactional offsets.
      Returns:
      true to fix.
      Since:
      2.5.6
      See Also:
    • setFixTxOffsets

      public void setFixTxOffsets(boolean fixTxOffsets)
      When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records. This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero. Set this to true and the container will correct such mis-reported offsets. The check is performed before the next poll to avoid adding significant complexity to the commit processing. IMPORTANT: At the time of writing, the lag will only be corrected if the consumer is configured with isolation.level=read_committed and max.poll.records is greater than 1. See https://issues.apache.org/jira/browse/KAFKA-10683 for more information.
      Parameters:
      fixTxOffsets - true to correct the offset(s).
      Since:
      2.5.6
    • isCheckDeserExWhenKeyNull

      public boolean isCheckDeserExWhenKeyNull()
      Always check for a deserialization exception header with a null key.
      Returns:
      true to check.
      Since:
      2.8.1
    • setCheckDeserExWhenKeyNull

      public void setCheckDeserExWhenKeyNull(boolean checkDeserExWhenKeyNull)
      Set to true to always check for DeserializationException header when a null key is received. Useful when the consumer code cannot determine that an ErrorHandlingDeserializer has been configured, such as when using a delegating deserializer.
      Parameters:
      checkDeserExWhenKeyNull - true to always check.
      Since:
      2.8.1
    • isCheckDeserExWhenValueNull

      public boolean isCheckDeserExWhenValueNull()
      Always check for a deserialization exception header with a null value.
      Returns:
      true to check.
      Since:
      2.8.1
    • setCheckDeserExWhenValueNull

      public void setCheckDeserExWhenValueNull(boolean checkDeserExWhenValueNull)
      Set to true to always check for DeserializationException header when a null value is received. Useful when the consumer code cannot determine that an ErrorHandlingDeserializer has been configured, such as when using a delegating deserializer.
      Parameters:
      checkDeserExWhenValueNull - true to always check.
      Since:
      2.8.1
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • renderProperties

      protected final String renderProperties()