Class ContainerProperties


  • public class ContainerProperties
    extends ConsumerProperties
    Contains runtime properties for a listener container.
    Author:
    Gary Russell, Artem Bilan, Artem Yakshin, Johnny Lim, Lukasz Kaminski
    • Constructor Detail

      • ContainerProperties

        public ContainerProperties​(java.lang.String... topics)
        Create properties for a container that will subscribe to the specified topics.
        Parameters:
        topics - the topics.
      • ContainerProperties

        public ContainerProperties​(java.util.regex.Pattern topicPattern)
        Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.
        Parameters:
        topicPattern - the pattern.
        See Also:
        CommonClientConfigs.METADATA_MAX_AGE_CONFIG
      • ContainerProperties

        public ContainerProperties​(TopicPartitionOffset... topicPartitions)
        Create properties for a container that will assign itself the provided topic partitions.
        Parameters:
        topicPartitions - the topic partitions.
    • Method Detail

      • setMessageListener

        public void setMessageListener​(java.lang.Object messageListener)
        Set the message listener; must be a MessageListener or AcknowledgingMessageListener.
        Parameters:
        messageListener - the listener.
      • setAckMode

        public void setAckMode​(ContainerProperties.AckMode ackMode)
        Set the ack mode to use when auto ack (in the configuration properties) is false.
        • RECORD: Ack after each record has been passed to the listener.
        • BATCH: Ack after each batch of records received from the consumer has been passed to the listener
        • TIME: Ack after this number of milliseconds; (should be greater than #setPollTimeout(long) pollTimeout.
        • COUNT: Ack after at least this number of records have been received
        • MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener.
        Ignored when transactions are being used. Transactional consumers commit offsets with semantics equivalent to RECORD or BATCH, depending on the listener type.
        Parameters:
        ackMode - the ContainerProperties.AckMode; default BATCH.
        See Also:
        setTransactionManager(PlatformTransactionManager)
      • setConsumerTaskExecutor

        public void setConsumerTaskExecutor​(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
        Set the executor for threads that poll the consumer.
        Parameters:
        consumerTaskExecutor - the executor
      • setShutdownTimeout

        public void setShutdownTimeout​(long shutdownTimeout)
        Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to #stop(Runnable) will block for, before returning; default 10000L.
        Parameters:
        shutdownTimeout - the shutdown timeout.
      • setIdleEventInterval

        public void setIdleEventInterval​(java.lang.Long idleEventInterval)
        Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.
        Parameters:
        idleEventInterval - the interval.
      • setAckOnError

        @Deprecated
        public void setAckOnError​(boolean ackOnError)
        Deprecated.
        in favor of GenericErrorHandler.isAckAfterHandle().
        Set whether or not the container should commit offsets (ack messages) where the listener throws exceptions. This works in conjunction with ackMode and is effective only when the kafka property enable.auto.commit is false; it is not applicable to manual ack modes. When this property is set to true, all messages handled will have their offset committed. When set to false (the default), offsets will be committed only for successfully handled messages. Manual acks will always be applied. Bear in mind that, if the next message is successfully handled, its offset will be committed, effectively committing the offset of the failed message anyway, so this option has limited applicability, unless you are using a SeekToCurrentBatchErrorHandler which will seek the current record so that it is reprocessed.

        Does not apply when transactions are used - in that case, whether or not the offsets are sent to the transaction depends on whether the transaction is committed or rolled back. If a listener throws an exception, the transaction will normally be rolled back unless an error handler is provided that handles the error and exits normally; in which case the offsets are sent to the transaction and the transaction is committed.

        Parameters:
        ackOnError - whether the container should acknowledge messages that throw exceptions.
      • getAckCount

        public int getAckCount()
      • getAckTime

        public long getAckTime()
      • getMessageListener

        public java.lang.Object getMessageListener()
      • getConsumerTaskExecutor

        public org.springframework.core.task.AsyncListenableTaskExecutor getConsumerTaskExecutor()
      • getShutdownTimeout

        public long getShutdownTimeout()
      • getIdleEventInterval

        public java.lang.Long getIdleEventInterval()
      • isAckOnError

        public boolean isAckOnError()
      • getTransactionManager

        public org.springframework.transaction.PlatformTransactionManager getTransactionManager()
      • setTransactionManager

        public void setTransactionManager​(org.springframework.transaction.PlatformTransactionManager transactionManager)
        Set the transaction manager to start a transaction; offsets are committed with semantics equivalent to ContainerProperties.AckMode.RECORD and ContainerProperties.AckMode.BATCH depending on the listener type (record or batch).
        Parameters:
        transactionManager - the transaction manager.
        Since:
        1.3
        See Also:
        setAckMode(AckMode)
      • getMonitorInterval

        public int getMonitorInterval()
      • setMonitorInterval

        public void setMonitorInterval​(int monitorInterval)
        The interval between checks for a non-responsive consumer in seconds; default 30.
        Parameters:
        monitorInterval - the interval.
        Since:
        1.3.1
      • getScheduler

        public org.springframework.scheduling.TaskScheduler getScheduler()
      • setScheduler

        public void setScheduler​(org.springframework.scheduling.TaskScheduler scheduler)
        A scheduler used with the monitor interval.
        Parameters:
        scheduler - the scheduler.
        Since:
        1.3.1
        See Also:
        setMonitorInterval(int)
      • getNoPollThreshold

        public float getNoPollThreshold()
      • setNoPollThreshold

        public void setNoPollThreshold​(float noPollThreshold)
        If the time since the last poll / poll timeout exceeds this value, a NonResponsiveConsumerEvent is published. This value should be more than 1.0 to avoid a race condition that can cause spurious events to be published. Default 3.0f.
        Parameters:
        noPollThreshold - the threshold
        Since:
        1.3.1
      • isLogContainerConfig

        public boolean isLogContainerConfig()
        Log the container configuration if true (INFO).
        Returns:
        true to log.
        Since:
        2.1.1
      • setLogContainerConfig

        public void setLogContainerConfig​(boolean logContainerConfig)
        Set to true to instruct each container to log this configuration.
        Parameters:
        logContainerConfig - true to log.
        Since:
        2.1.1
      • isMissingTopicsFatal

        public boolean isMissingTopicsFatal()
        If true, the container won't start if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false.
        Returns:
        the missingTopicsFatal.
        Since:
        2.2
      • setMissingTopicsFatal

        public void setMissingTopicsFatal​(boolean missingTopicsFatal)
        Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;
        Parameters:
        missingTopicsFatal - the missingTopicsFatal.
        Since:
        2.2
      • setIdleBetweenPolls

        public void setIdleBetweenPolls​(long idleBetweenPolls)
        The sleep interval in milliseconds used in the main loop between Consumer.poll(Duration) calls. Defaults to 0 - no idling.
        Parameters:
        idleBetweenPolls - the interval to sleep between polling cycles.
        Since:
        2.3
      • getIdleBetweenPolls

        public long getIdleBetweenPolls()
      • isMicrometerEnabled

        public boolean isMicrometerEnabled()
      • setMicrometerEnabled

        public void setMicrometerEnabled​(boolean micrometerEnabled)
        Set to false to disable the Micrometer listener timers. Default true.
        Parameters:
        micrometerEnabled - false to disable.
        Since:
        2.3
      • setMicrometerTags

        public void setMicrometerTags​(java.util.Map<java.lang.String,​java.lang.String> tags)
        Set additional tags for the Micrometer listener timers.
        Parameters:
        tags - the tags.
        Since:
        2.3
      • getMicrometerTags

        public java.util.Map<java.lang.String,​java.lang.String> getMicrometerTags()
      • getConsumerStartTimout

        public java.time.Duration getConsumerStartTimout()
      • setConsumerStartTimout

        public void setConsumerStartTimout​(java.time.Duration consumerStartTimout)
        Set the timeout to wait for a consumer thread to start before logging an error. Default 30 seconds.
        Parameters:
        consumerStartTimout - the consumer start timeout.
      • isSubBatchPerPartition

        public boolean isSubBatchPerPartition()
        Return whether to split batches by partition.
        Returns:
        subBatchPerPartition.
        Since:
        2.3.2
      • getSubBatchPerPartition

        @Nullable
        public java.lang.Boolean getSubBatchPerPartition()
        Return whether to split batches by partition; null if not set.
        Returns:
        subBatchPerPartition.
        Since:
        2.5
      • setSubBatchPerPartition

        public void setSubBatchPerPartition​(boolean subBatchPerPartition)
        When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by the poll(). Useful when using transactions to enable zombie fencing, by using a transactional.id that is unique for each group/topic/partition. Defaults to true when using transactions with EOSMode.ALPHA and false when not using transactions or with EOSMode.BETA.
        Parameters:
        subBatchPerPartition - true for a separate transaction for each partition.
        Since:
        2.3.2
      • isDeliveryAttemptHeader

        public boolean isDeliveryAttemptHeader()
      • setDeliveryAttemptHeader

        public void setDeliveryAttemptHeader​(boolean deliveryAttemptHeader)
        Set to true to populate the KafkaHeaders.DELIVERY_ATTEMPT header when the error handler or after rollback processor implements DeliveryAttemptAware. There is a small overhead so this is false by default.
        Parameters:
        deliveryAttemptHeader - true to populate
        Since:
        2.5
      • setEosMode

        public void setEosMode​(ContainerProperties.EOSMode eosMode)
        Set the exactly once semantics mode. When ContainerProperties.EOSMode.ALPHA a producer per group/topic/partition is used (enabling 'transactional.id fencing`). ContainerProperties.EOSMode.BETA enables fetch-offset-request fencing, and requires brokers 2.5 or later. In the 2.6 client, the default will be BETA because the 2.6 client can automatically fall back to ALPHA.
        Parameters:
        eosMode - the mode; default ALPHA.
        Since:
        2.5
      • getTransactionDefinition

        @Nullable
        public org.springframework.transaction.TransactionDefinition getTransactionDefinition()
        Get the transaction definition.
        Returns:
        the definition.
        Since:
        2.5.4
      • setTransactionDefinition

        public void setTransactionDefinition​(org.springframework.transaction.TransactionDefinition transactionDefinition)
        Set a transaction definition with properties (e.g. timeout) that will be copied to the container's transaction template. Note that this is only generally useful when used with a ChainedKafkaTransactionManager configured with a non-Kafka transaction manager. Kafka has no concept of transaction timeout, for example.
        Parameters:
        transactionDefinition - the definition.
        Since:
        2.5.4
      • getAdviceChain

        public org.aopalliance.aop.Advice[] getAdviceChain()
        A chain of listener Advices.
        Returns:
        the adviceChain.
        Since:
        2.5.6
      • setAdviceChain

        public void setAdviceChain​(org.aopalliance.aop.Advice... adviceChain)
        Set a chain of listener Advices; must not be null or have null elements.
        Parameters:
        adviceChain - the adviceChain to set.
        Since:
        2.5.6