Class ContainerProperties


  • public class ContainerProperties
    extends ConsumerProperties
    Contains runtime properties for a listener container.
    Author:
    Gary Russell, Artem Bilan, Artem Yakshin, Johnny Lim, Lukasz Kaminski, Kyuhyeok Park
    • Constructor Detail

      • ContainerProperties

        public ContainerProperties​(java.lang.String... topics)
        Create properties for a container that will subscribe to the specified topics.
        Parameters:
        topics - the topics.
      • ContainerProperties

        public ContainerProperties​(java.util.regex.Pattern topicPattern)
        Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.
        Parameters:
        topicPattern - the pattern.
        See Also:
        CommonClientConfigs.METADATA_MAX_AGE_CONFIG
      • ContainerProperties

        public ContainerProperties​(TopicPartitionOffset... topicPartitions)
        Create properties for a container that will assign itself the provided topic partitions.
        Parameters:
        topicPartitions - the topic partitions.
    • Method Detail

      • setMessageListener

        public void setMessageListener​(java.lang.Object messageListener)
        Set the message listener; must be a MessageListener or AcknowledgingMessageListener.
        Parameters:
        messageListener - the listener.
      • setAckMode

        public void setAckMode​(ContainerProperties.AckMode ackMode)
        Set the ack mode to use when auto ack (in the configuration properties) is false.
        • RECORD: Commit the offset after each record has been processed by the listener.
        • BATCH: Commit the offsets for each batch of records received from the consumer when they all have been processed by the listener
        • TIME: Commit pending offsets after ackTime number of milliseconds; (should be greater than #setPollTimeout(long) pollTimeout.
        • COUNT: Commit pending offsets after at least ackCount number of records have been processed
        • COUNT_TIME: Commit pending offsets after ackTime number of milliseconds or at least ackCount number of records have been processed
        • MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener. Acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener.
        • MANUAL_IMMDEDIATE: Listener is responsible for acking - use a AcknowledgingMessageListener. The commit will be performed immediately if the Acknowledgment is acknowledged on the calling consumer thread. Otherwise, the acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener. Results will be indeterminate if you sometimes acknowledge on the calling thread and sometimes not.
        Parameters:
        ackMode - the ContainerProperties.AckMode; default BATCH.
        See Also:
        setTransactionManager(PlatformTransactionManager)
      • setConsumerTaskExecutor

        public void setConsumerTaskExecutor​(@Nullable
                                            org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
        Set the executor for threads that poll the consumer.
        Parameters:
        consumerTaskExecutor - the executor
      • setShutdownTimeout

        public void setShutdownTimeout​(long shutdownTimeout)
        Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to #stop(Runnable) will block for, before returning; default 10000L.
        Parameters:
        shutdownTimeout - the shutdown timeout.
      • setIdleEventInterval

        public void setIdleEventInterval​(@Nullable
                                         java.lang.Long idleEventInterval)
        Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.
        Parameters:
        idleEventInterval - the interval.
        See Also:
        setIdleBeforeDataMultiplier(double)
      • setIdleBeforeDataMultiplier

        public void setIdleBeforeDataMultiplier​(double idleBeforeDataMultiplier)
        Multiply the setIdleEventInterval(Long) by this value until at least one record is received. Default 5.0.
        Parameters:
        idleBeforeDataMultiplier - false to allow publishing.
        Since:
        2.8
        See Also:
        setIdleEventInterval(Long)
      • setIdlePartitionEventInterval

        public void setIdlePartitionEventInterval​(@Nullable
                                                  java.lang.Long idlePartitionEventInterval)
        Set the idle partition event interval; when set, an event is emitted if a poll returns no records for a partition and this interval has elapsed since a record was returned.
        Parameters:
        idlePartitionEventInterval - the interval.
      • getAckCount

        public int getAckCount()
      • getAckTime

        public long getAckTime()
      • getMessageListener

        public java.lang.Object getMessageListener()
      • getConsumerTaskExecutor

        @Nullable
        public org.springframework.core.task.AsyncListenableTaskExecutor getConsumerTaskExecutor()
        Return the consumer task executor.
        Returns:
        the executor.
      • getShutdownTimeout

        public long getShutdownTimeout()
      • getIdleEventInterval

        @Nullable
        public java.lang.Long getIdleEventInterval()
        Return the idle event interval.
        Returns:
        the interval.
      • getIdleBeforeDataMultiplier

        public double getIdleBeforeDataMultiplier()
        Multiply the setIdleEventInterval(Long) by this value until at least one record is received. Default 5.0.
        Returns:
        the noIdleBeforeData.
        Since:
        2.8
        See Also:
        getIdleEventInterval()
      • getIdlePartitionEventInterval

        @Nullable
        public java.lang.Long getIdlePartitionEventInterval()
        Return the idle partition event interval.
        Returns:
        the interval.
      • getTransactionManager

        @Nullable
        public org.springframework.transaction.PlatformTransactionManager getTransactionManager()
      • setTransactionManager

        public void setTransactionManager​(@Nullable
                                          org.springframework.transaction.PlatformTransactionManager transactionManager)
        Set the transaction manager to start a transaction; offsets are committed with semantics equivalent to ContainerProperties.AckMode.RECORD and ContainerProperties.AckMode.BATCH depending on the listener type (record or batch).
        Parameters:
        transactionManager - the transaction manager.
        Since:
        1.3
        See Also:
        setAckMode(AckMode)
      • getMonitorInterval

        public int getMonitorInterval()
      • setMonitorInterval

        public void setMonitorInterval​(int monitorInterval)
        The interval between checks for a non-responsive consumer in seconds; default 30.
        Parameters:
        monitorInterval - the interval.
        Since:
        1.3.1
      • getScheduler

        @Nullable
        public org.springframework.scheduling.TaskScheduler getScheduler()
        Return the task scheduler, if present.
        Returns:
        the scheduler.
      • setScheduler

        public void setScheduler​(@Nullable
                                 org.springframework.scheduling.TaskScheduler scheduler)
        A scheduler used with the monitor interval.
        Parameters:
        scheduler - the scheduler.
        Since:
        1.3.1
        See Also:
        setMonitorInterval(int)
      • getNoPollThreshold

        public float getNoPollThreshold()
      • setNoPollThreshold

        public void setNoPollThreshold​(float noPollThreshold)
        If the time since the last poll / poll timeout exceeds this value, a NonResponsiveConsumerEvent is published. This value should be more than 1.0 to avoid a race condition that can cause spurious events to be published. Default 3.0f.
        Parameters:
        noPollThreshold - the threshold
        Since:
        1.3.1
      • isLogContainerConfig

        public boolean isLogContainerConfig()
        Log the container configuration if true (INFO).
        Returns:
        true to log.
        Since:
        2.1.1
      • setLogContainerConfig

        public void setLogContainerConfig​(boolean logContainerConfig)
        Set to true to instruct each container to log this configuration.
        Parameters:
        logContainerConfig - true to log.
        Since:
        2.1.1
      • isMissingTopicsFatal

        public boolean isMissingTopicsFatal()
        If true, the container won't start if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false.
        Returns:
        the missingTopicsFatal.
        Since:
        2.2
      • setMissingTopicsFatal

        public void setMissingTopicsFatal​(boolean missingTopicsFatal)
        Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;
        Parameters:
        missingTopicsFatal - the missingTopicsFatal.
        Since:
        2.2
      • setIdleBetweenPolls

        public void setIdleBetweenPolls​(long idleBetweenPolls)
        The sleep interval in milliseconds used in the main loop between Consumer.poll(Duration) calls. Defaults to 0 - no idling.
        Parameters:
        idleBetweenPolls - the interval to sleep between polling cycles.
        Since:
        2.3
      • getIdleBetweenPolls

        public long getIdleBetweenPolls()
      • isMicrometerEnabled

        public boolean isMicrometerEnabled()
      • setMicrometerEnabled

        public void setMicrometerEnabled​(boolean micrometerEnabled)
        Set to false to disable the Micrometer listener timers. Default true.
        Parameters:
        micrometerEnabled - false to disable.
        Since:
        2.3
      • setMicrometerTags

        public void setMicrometerTags​(java.util.Map<java.lang.String,​java.lang.String> tags)
        Set additional tags for the Micrometer listener timers.
        Parameters:
        tags - the tags.
        Since:
        2.3
      • getMicrometerTags

        public java.util.Map<java.lang.String,​java.lang.String> getMicrometerTags()
      • getConsumerStartTimeout

        public java.time.Duration getConsumerStartTimeout()
      • getConsumerStartTimout

        @Deprecated
        public java.time.Duration getConsumerStartTimout()
        Deprecated.
      • setConsumerStartTimeout

        public void setConsumerStartTimeout​(java.time.Duration consumerStartTimeout)
        Set the timeout to wait for a consumer thread to start before logging an error. Default 30 seconds.
        Parameters:
        consumerStartTimeout - the consumer start timeout.
      • setConsumerStartTimout

        @Deprecated
        public void setConsumerStartTimout​(java.time.Duration consumerStartTimeout)
        Deprecated.
      • isSubBatchPerPartition

        public boolean isSubBatchPerPartition()
        Return whether to split batches by partition.
        Returns:
        subBatchPerPartition.
        Since:
        2.3.2
      • getSubBatchPerPartition

        @Nullable
        public java.lang.Boolean getSubBatchPerPartition()
        Return whether to split batches by partition; null if not set.
        Returns:
        subBatchPerPartition.
        Since:
        2.5
      • setSubBatchPerPartition

        public void setSubBatchPerPartition​(@Nullable
                                            java.lang.Boolean subBatchPerPartition)
        When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by the poll(). Useful when using transactions to enable zombie fencing, by using a transactional.id that is unique for each group/topic/partition. Defaults to true when using transactions with EOSMode.ALPHA and false when not using transactions or with EOSMode.BETA.
        Parameters:
        subBatchPerPartition - true for a separate transaction for each partition.
        Since:
        2.3.2
      • isDeliveryAttemptHeader

        public boolean isDeliveryAttemptHeader()
      • setDeliveryAttemptHeader

        public void setDeliveryAttemptHeader​(boolean deliveryAttemptHeader)
        Set to true to populate the KafkaHeaders.DELIVERY_ATTEMPT header when the error handler or after rollback processor implements DeliveryAttemptAware. There is a small overhead so this is false by default.
        Parameters:
        deliveryAttemptHeader - true to populate
        Since:
        2.5
      • setEosMode

        public void setEosMode​(ContainerProperties.EOSMode eosMode)
        Set the exactly once semantics mode. When ContainerProperties.EOSMode.V1 a producer per group/topic/partition is used (enabling 'transactional.id fencing`). ContainerProperties.EOSMode.V2 enables fetch-offset-request fencing, and requires brokers 2.5 or later. With the 2.6 client, the default is now V2 because the 2.6 client can automatically fall back to ALPHA. IMPORTANT the 3.0 clients cannot be used with ContainerProperties.EOSMode.V2 unless the broker is 2.5 or higher.
        Parameters:
        eosMode - the mode; default V2.
        Since:
        2.5
      • getTransactionDefinition

        @Nullable
        public org.springframework.transaction.TransactionDefinition getTransactionDefinition()
        Get the transaction definition.
        Returns:
        the definition.
        Since:
        2.5.4
      • setTransactionDefinition

        public void setTransactionDefinition​(@Nullable
                                             org.springframework.transaction.TransactionDefinition transactionDefinition)
        Set a transaction definition with properties (e.g. timeout) that will be copied to the container's transaction template. Note that this is only generally useful when used with a PlatformTransactionManager that supports a custom definition; this does NOT include the KafkaTransactionManager which has no concept of transaction timeout. It can be useful to start, for example a database transaction, in the container, rather than using @Transactional on the listener, because then a record interceptor, or filter in a listener adapter can participate in the transaction.
        Parameters:
        transactionDefinition - the definition.
        Since:
        2.5.4
        See Also:
        setTransactionManager(PlatformTransactionManager)
      • getAdviceChain

        public org.aopalliance.aop.Advice[] getAdviceChain()
        A chain of listener Advices.
        Returns:
        the adviceChain.
        Since:
        2.5.6
      • setAdviceChain

        public void setAdviceChain​(org.aopalliance.aop.Advice... adviceChain)
        Set a chain of listener Advices; must not be null or have null elements.
        Parameters:
        adviceChain - the adviceChain to set.
        Since:
        2.5.6
      • isStopContainerWhenFenced

        public boolean isStopContainerWhenFenced()
        When true, the container will stop after a ProducerFencedException.
        Returns:
        the stopContainerWhenFenced
        Since:
        2.5.8
      • setStopContainerWhenFenced

        public void setStopContainerWhenFenced​(boolean stopContainerWhenFenced)
        Set to true to stop the container when a ProducerFencedException is thrown. Currently, there is no way to determine if such an exception is thrown due to a rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The best solution is to ensure that the transaction.timeout.ms is large enough so that transactions don't time out.
        Parameters:
        stopContainerWhenFenced - true to stop the container.
        Since:
        2.5.8
      • isStopImmediate

        public boolean isStopImmediate()
        When true, the container will be stopped immediately after processing the current record.
        Returns:
        true to stop immediately.
        Since:
        2.5.11
      • setStopImmediate

        public void setStopImmediate​(boolean stopImmediate)
        Set to true to stop the container after processing the current record (when stop() is called). When false (default), the container will stop after all the results of the previous poll are processed.
        Parameters:
        stopImmediate - true to stop after the current record.
        Since:
        2.5.11
      • isAsyncAcks

        public boolean isAsyncAcks()
        When true, async manual acknowledgments are supported.
        Returns:
        true for async ack support.
        Since:
        2.8
      • setAsyncAcks

        public void setAsyncAcks​(boolean asyncAcks)
        Set to true to support asynchronous record acknowledgments. Only applies with ContainerProperties.AckMode.MANUAL or ContainerProperties.AckMode.MANUAL_IMMEDIATE. Out of order offset commits are deferred until all previous offsets in the partition have been committed. The consumer is paused, if necessary, until all acks have been completed.
        Parameters:
        asyncAcks - true to use async acks.
        Since:
        2.8