Class ContainerProperties


  • public class ContainerProperties
    extends ConsumerProperties
    Contains runtime properties for a listener container.
    Author:
    Gary Russell, Artem Bilan, Artem Yakshin, Johnny Lim, Lukasz Kaminski, Kyuhyeok Park
    • Constructor Detail

      • ContainerProperties

        public ContainerProperties​(java.lang.String... topics)
        Create properties for a container that will subscribe to the specified topics.
        Parameters:
        topics - the topics.
      • ContainerProperties

        public ContainerProperties​(java.util.regex.Pattern topicPattern)
        Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.
        Parameters:
        topicPattern - the pattern.
        See Also:
        CommonClientConfigs.METADATA_MAX_AGE_CONFIG
      • ContainerProperties

        public ContainerProperties​(TopicPartitionOffset... topicPartitions)
        Create properties for a container that will assign itself the provided topic partitions.
        Parameters:
        topicPartitions - the topic partitions.
    • Method Detail

      • setMessageListener

        public void setMessageListener​(java.lang.Object messageListener)
        Set the message listener; must be a MessageListener or AcknowledgingMessageListener.
        Parameters:
        messageListener - the listener.
      • setAckMode

        public void setAckMode​(ContainerProperties.AckMode ackMode)
        Set the ack mode to use when auto ack (in the configuration properties) is false.
        • RECORD: Commit the offset after each record has been processed by the listener.
        • BATCH: Commit the offsets for each batch of records received from the consumer when they all have been processed by the listener
        • TIME: Commit pending offsets after ackTime number of milliseconds; (should be greater than #setPollTimeout(long) pollTimeout.
        • COUNT: Commit pending offsets after at least ackCount number of records have been processed
        • COUNT_TIME: Commit pending offsets after ackTime number of milliseconds or at least ackCount number of records have been processed
        • MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener. Acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener.
        • MANUAL_IMMDEDIATE: Listener is responsible for acking - use a AcknowledgingMessageListener. The commit will be performed immediately if the Acknowledgment is acknowledged on the calling consumer thread. Otherwise, the acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener. Results will be indeterminate if you sometimes acknowledge on the calling thread and sometimes not.
        Parameters:
        ackMode - the ContainerProperties.AckMode; default BATCH.
        See Also:
        setTransactionManager(PlatformTransactionManager)
      • setConsumerTaskExecutor

        public void setConsumerTaskExecutor​(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
        Set the executor for threads that poll the consumer.
        Parameters:
        consumerTaskExecutor - the executor
      • setShutdownTimeout

        public void setShutdownTimeout​(long shutdownTimeout)
        Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to #stop(Runnable) will block for, before returning; default 10000L.
        Parameters:
        shutdownTimeout - the shutdown timeout.
      • setIdleEventInterval

        public void setIdleEventInterval​(java.lang.Long idleEventInterval)
        Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.
        Parameters:
        idleEventInterval - the interval.
      • setAckOnError

        @Deprecated
        public void setAckOnError​(boolean ackOnError)
        Deprecated.
        in favor of GenericErrorHandler.isAckAfterHandle().
        Set whether or not the container should commit offsets (ack messages) where the listener throws exceptions. This works in conjunction with ackMode and is effective only when the kafka property enable.auto.commit is false; it is not applicable to manual ack modes. When this property is set to true, all messages handled will have their offset committed. When set to false (the default), offsets will be committed only for successfully handled messages. Manual acks will always be applied. Bear in mind that, if the next message is successfully handled, its offset will be committed, effectively committing the offset of the failed message anyway, so this option has limited applicability, unless you are using a SeekToCurrentBatchErrorHandler which will seek the current record so that it is reprocessed.

        Does not apply when transactions are used - in that case, whether or not the offsets are sent to the transaction depends on whether the transaction is committed or rolled back. If a listener throws an exception, the transaction will normally be rolled back unless an error handler is provided that handles the error and exits normally; in which case the offsets are sent to the transaction and the transaction is committed.

        Parameters:
        ackOnError - whether the container should acknowledge messages that throw exceptions.
      • getAckCount

        public int getAckCount()
      • getAckTime

        public long getAckTime()
      • getMessageListener

        public java.lang.Object getMessageListener()
      • getConsumerTaskExecutor

        public org.springframework.core.task.AsyncListenableTaskExecutor getConsumerTaskExecutor()
      • getShutdownTimeout

        public long getShutdownTimeout()
      • getIdleEventInterval

        public java.lang.Long getIdleEventInterval()
      • isAckOnError

        public boolean isAckOnError()
      • getTransactionManager

        public org.springframework.transaction.PlatformTransactionManager getTransactionManager()
      • setTransactionManager

        public void setTransactionManager​(org.springframework.transaction.PlatformTransactionManager transactionManager)
        Set the transaction manager to start a transaction; offsets are committed with semantics equivalent to ContainerProperties.AckMode.RECORD and ContainerProperties.AckMode.BATCH depending on the listener type (record or batch).
        Parameters:
        transactionManager - the transaction manager.
        Since:
        1.3
        See Also:
        setAckMode(AckMode)
      • getMonitorInterval

        public int getMonitorInterval()
      • setMonitorInterval

        public void setMonitorInterval​(int monitorInterval)
        The interval between checks for a non-responsive consumer in seconds; default 30.
        Parameters:
        monitorInterval - the interval.
        Since:
        1.3.1
      • getScheduler

        public org.springframework.scheduling.TaskScheduler getScheduler()
      • setScheduler

        public void setScheduler​(org.springframework.scheduling.TaskScheduler scheduler)
        A scheduler used with the monitor interval.
        Parameters:
        scheduler - the scheduler.
        Since:
        1.3.1
        See Also:
        setMonitorInterval(int)
      • getNoPollThreshold

        public float getNoPollThreshold()
      • setNoPollThreshold

        public void setNoPollThreshold​(float noPollThreshold)
        If the time since the last poll / poll timeout exceeds this value, a NonResponsiveConsumerEvent is published. This value should be more than 1.0 to avoid a race condition that can cause spurious events to be published. Default 3.0f.
        Parameters:
        noPollThreshold - the threshold
        Since:
        1.3.1
      • isLogContainerConfig

        public boolean isLogContainerConfig()
        Log the container configuration if true (INFO).
        Returns:
        true to log.
        Since:
        2.1.1
      • setLogContainerConfig

        public void setLogContainerConfig​(boolean logContainerConfig)
        Set to true to instruct each container to log this configuration.
        Parameters:
        logContainerConfig - true to log.
        Since:
        2.1.1
      • isMissingTopicsFatal

        public boolean isMissingTopicsFatal()
        If true, the container won't start if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false.
        Returns:
        the missingTopicsFatal.
        Since:
        2.2
      • setMissingTopicsFatal

        public void setMissingTopicsFatal​(boolean missingTopicsFatal)
        Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;
        Parameters:
        missingTopicsFatal - the missingTopicsFatal.
        Since:
        2.2
      • setIdleBetweenPolls

        public void setIdleBetweenPolls​(long idleBetweenPolls)
        The sleep interval in milliseconds used in the main loop between Consumer.poll(Duration) calls. Defaults to 0 - no idling.
        Parameters:
        idleBetweenPolls - the interval to sleep between polling cycles.
        Since:
        2.3
      • getIdleBetweenPolls

        public long getIdleBetweenPolls()
      • isMicrometerEnabled

        public boolean isMicrometerEnabled()
      • setMicrometerEnabled

        public void setMicrometerEnabled​(boolean micrometerEnabled)
        Set to false to disable the Micrometer listener timers. Default true.
        Parameters:
        micrometerEnabled - false to disable.
        Since:
        2.3
      • setMicrometerTags

        public void setMicrometerTags​(java.util.Map<java.lang.String,​java.lang.String> tags)
        Set additional tags for the Micrometer listener timers.
        Parameters:
        tags - the tags.
        Since:
        2.3
      • getMicrometerTags

        public java.util.Map<java.lang.String,​java.lang.String> getMicrometerTags()
      • getConsumerStartTimeout

        public java.time.Duration getConsumerStartTimeout()
      • getConsumerStartTimout

        @Deprecated
        public java.time.Duration getConsumerStartTimout()
        Deprecated.
      • setConsumerStartTimeout

        public void setConsumerStartTimeout​(java.time.Duration consumerStartTimeout)
        Set the timeout to wait for a consumer thread to start before logging an error. Default 30 seconds.
        Parameters:
        consumerStartTimeout - the consumer start timeout.
      • setConsumerStartTimout

        @Deprecated
        public void setConsumerStartTimout​(java.time.Duration consumerStartTimeout)
        Deprecated.
      • isSubBatchPerPartition

        public boolean isSubBatchPerPartition()
        Return whether to split batches by partition.
        Returns:
        subBatchPerPartition.
        Since:
        2.3.2
      • getSubBatchPerPartition

        @Nullable
        public java.lang.Boolean getSubBatchPerPartition()
        Return whether to split batches by partition; null if not set.
        Returns:
        subBatchPerPartition.
        Since:
        2.5
      • setSubBatchPerPartition

        public void setSubBatchPerPartition​(boolean subBatchPerPartition)
        When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by the poll(). Useful when using transactions to enable zombie fencing, by using a transactional.id that is unique for each group/topic/partition. Defaults to true when using transactions with EOSMode.ALPHA and false when not using transactions or with EOSMode.BETA.
        Parameters:
        subBatchPerPartition - true for a separate transaction for each partition.
        Since:
        2.3.2
      • isDeliveryAttemptHeader

        public boolean isDeliveryAttemptHeader()
      • setDeliveryAttemptHeader

        public void setDeliveryAttemptHeader​(boolean deliveryAttemptHeader)
        Set to true to populate the KafkaHeaders.DELIVERY_ATTEMPT header when the error handler or after rollback processor implements DeliveryAttemptAware. There is a small overhead so this is false by default.
        Parameters:
        deliveryAttemptHeader - true to populate
        Since:
        2.5
      • setEosMode

        public void setEosMode​(ContainerProperties.EOSMode eosMode)
        Set the exactly once semantics mode. When ContainerProperties.EOSMode.ALPHA a producer per group/topic/partition is used (enabling 'transactional.id fencing`). ContainerProperties.EOSMode.BETA enables fetch-offset-request fencing, and requires brokers 2.5 or later. With the 2.6 client, the default is now BETA because the 2.6 client can automatically fall back to ALPHA.
        Parameters:
        eosMode - the mode; default BETA.
        Since:
        2.5
      • getTransactionDefinition

        @Nullable
        public org.springframework.transaction.TransactionDefinition getTransactionDefinition()
        Get the transaction definition.
        Returns:
        the definition.
        Since:
        2.5.4
      • setTransactionDefinition

        public void setTransactionDefinition​(org.springframework.transaction.TransactionDefinition transactionDefinition)
        Set a transaction definition with properties (e.g. timeout) that will be copied to the container's transaction template. Note that this is only generally useful when used with a ChainedKafkaTransactionManager configured with a non-Kafka transaction manager. Kafka has no concept of transaction timeout, for example.
        Parameters:
        transactionDefinition - the definition.
        Since:
        2.5.4
      • getAdviceChain

        public org.aopalliance.aop.Advice[] getAdviceChain()
        A chain of listener Advices.
        Returns:
        the adviceChain.
        Since:
        2.5.6
      • setAdviceChain

        public void setAdviceChain​(org.aopalliance.aop.Advice... adviceChain)
        Set a chain of listener Advices; must not be null or have null elements.
        Parameters:
        adviceChain - the adviceChain to set.
        Since:
        2.5.6
      • isStopContainerWhenFenced

        public boolean isStopContainerWhenFenced()
        When true, the container will stop after a ProducerFencedException.
        Returns:
        the stopContainerWhenFenced
        Since:
        2.5.8
      • setStopContainerWhenFenced

        public void setStopContainerWhenFenced​(boolean stopContainerWhenFenced)
        Set to true to stop the container when a ProducerFencedException is thrown. Currently, there is no way to determine if such an exception is thrown due to a rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The best solution is to ensure that the transaction.timeout.ms is large enough so that transactions don't time out.
        Parameters:
        stopContainerWhenFenced - true to stop the container.
        Since:
        2.5.8
      • isStopImmediate

        public boolean isStopImmediate()
        When true, the container will be stopped immediately after processing the current record.
        Returns:
        true to stop immediately.
        Since:
        2.5.11
      • setStopImmediate

        public void setStopImmediate​(boolean stopImmediate)
        Set to true to stop the container after processing the current record (when stop() is called). When false (default), the container will stop after all the results of the previous poll are processed.
        Parameters:
        stopImmediate - true to stop after the current record.
        Since:
        2.5.11