Class KafkaMessageListenerContainerSpec<K,V>

java.lang.Object
org.springframework.beans.factory.config.AbstractFactoryBean<T>
org.springframework.integration.dsl.IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
org.springframework.integration.kafka.dsl.KafkaMessageListenerContainerSpec<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanClassLoaderAware, BeanFactoryAware, DisposableBean, FactoryBean<org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>, InitializingBean, Lifecycle, Phased, SmartLifecycle

public class KafkaMessageListenerContainerSpec<K,V> extends IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
A helper class in the Builder pattern style to delegate options to the ConcurrentMessageListenerContainer.
Since:
5.4
Author:
Artem Bilan, Gary Russell
  • Method Details

    • id

      Description copied from class: IntegrationComponentSpec
      Configure the component identifier. Used as the beanName to register the bean in the application context for this component.
      Overrides:
      id in class IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
      Parameters:
      id - the id.
      Returns:
      the spec.
    • concurrency

      public KafkaMessageListenerContainerSpec<K,V> concurrency(int concurrency)
      Specify a concurrency maximum number for the AbstractMessageListenerContainer.
      Parameters:
      concurrency - the concurrency maximum number.
      Returns:
      the spec.
      See Also:
      • ConcurrentMessageListenerContainer.setConcurrency(int)
    • errorHandler

      public KafkaMessageListenerContainerSpec<K,V> errorHandler(org.springframework.kafka.listener.CommonErrorHandler errorHandler)
      Specify an CommonErrorHandler for the AbstractMessageListenerContainer.
      Parameters:
      errorHandler - the CommonErrorHandler.
      Returns:
      the spec.
      Since:
      6.0
      See Also:
      • CommonErrorHandler
    • ackMode

      public KafkaMessageListenerContainerSpec<K,V> ackMode(org.springframework.kafka.listener.ContainerProperties.AckMode ackMode)
      Set the ack mode to use when auto ack (in the configuration properties) is false.
      • RECORD: Ack after each record has been passed to the listener.
      • BATCH: Ack after each batch of records received from the consumer has been passed to the listener
      • TIME: Ack after this number of milliseconds; (should be greater than #setPollTimeout(long) pollTimeout.
      • COUNT: Ack after at least this number of records have been received
      • MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener.
      Parameters:
      ackMode - the ContainerProperties.AckMode; default BATCH.
      Returns:
      the spec.
      See Also:
      • ContainerProperties.AckMode
    • pollTimeout

      public KafkaMessageListenerContainerSpec<K,V> pollTimeout(long pollTimeout)
      Set the max time to block in the consumer waiting for records.
      Parameters:
      pollTimeout - the timeout in ms; default 1000.
      Returns:
      the spec.
      See Also:
      • ConsumerProperties.setPollTimeout(long)
    • ackCount

      public KafkaMessageListenerContainerSpec<K,V> ackCount(int count)
      Set the number of outstanding record count after which offsets should be committed when ContainerProperties.AckMode.COUNT or ContainerProperties.AckMode.COUNT_TIME is being used.
      Parameters:
      count - the count
      Returns:
      the spec.
      See Also:
      • ContainerProperties.setAckCount(int)
    • ackTime

      public KafkaMessageListenerContainerSpec<K,V> ackTime(long millis)
      Set the time (ms) after which outstanding offsets should be committed when ContainerProperties.AckMode.TIME or ContainerProperties.AckMode.COUNT_TIME is being used. Should be larger than zero.
      Parameters:
      millis - the time
      Returns:
      the spec.
      See Also:
      • ContainerProperties.setAckTime(long)
    • consumerTaskExecutor

      public KafkaMessageListenerContainerSpec<K,V> consumerTaskExecutor(AsyncListenableTaskExecutor consumerTaskExecutor)
      Set the executor for threads that poll the consumer.
      Parameters:
      consumerTaskExecutor - the executor
      Returns:
      the spec.
      See Also:
      • ContainerProperties.setConsumerTaskExecutor(AsyncListenableTaskExecutor)
    • shutdownTimeout

      public KafkaMessageListenerContainerSpec<K,V> shutdownTimeout(long shutdownTimeout)
      Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to #stop(Runnable) will block for, before returning.
      Parameters:
      shutdownTimeout - the shutdown timeout.
      Returns:
      the spec.
      See Also:
      • ContainerProperties.setShutdownTimeout(long)
    • consumerRebalanceListener

      public KafkaMessageListenerContainerSpec<K,V> consumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)
      Set the user defined ConsumerRebalanceListener implementation.
      Parameters:
      consumerRebalanceListener - the ConsumerRebalanceListener instance
      Returns:
      the spec.
      See Also:
      • ConsumerProperties.setConsumerRebalanceListener(ConsumerRebalanceListener)
    • commitCallback

      public KafkaMessageListenerContainerSpec<K,V> commitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)
      Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level.
      Parameters:
      commitCallback - the callback.
      Returns:
      the spec.
      See Also:
      • ConsumerProperties.setCommitCallback(OffsetCommitCallback)
    • syncCommits

      public KafkaMessageListenerContainerSpec<K,V> syncCommits(boolean syncCommits)
      Set whether to call consumer.commitSync() or commitAsync() when the container is responsible for commits. Default true. See https://github.com/spring-projects/spring-kafka/issues/62 At the time of writing, async commits are not entirely reliable.
      Parameters:
      syncCommits - true to use commitSync().
      Returns:
      the spec.
      See Also:
      • ConsumerProperties.setSyncCommits(boolean)
    • idleEventInterval

      public KafkaMessageListenerContainerSpec<K,V> idleEventInterval(Long idleEventInterval)
      Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.
      Parameters:
      idleEventInterval - the interval.
      Returns:
      the spec.
      See Also:
      • ContainerProperties.setIdleEventInterval(Long)
    • groupId

      public KafkaMessageListenerContainerSpec<K,V> groupId(String groupId)
      Set the group id for this container. Overrides any group.id property provided by the consumer factory configuration.
      Parameters:
      groupId - the group id.
      Returns:
      the spec.
      See Also:
      • ConsumerProperties.setGroupId(String)