Class AbstractMessageListenerContainer<K,V>

java.lang.Object
org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanNameAware, DisposableBean, ApplicationContextAware, ApplicationEventPublisherAware, Lifecycle, Phased, SmartLifecycle, GenericMessageListenerContainer<K,V>, MessageListenerContainer
Direct Known Subclasses:
ConcurrentMessageListenerContainer, KafkaMessageListenerContainer

public abstract class AbstractMessageListenerContainer<K,V> extends Object implements GenericMessageListenerContainer<K,V>, BeanNameAware, ApplicationEventPublisherAware, ApplicationContextAware
The base implementation for the MessageListenerContainer.
Author:
Gary Russell, Marius Bogoevici, Artem Bilan, Tomaz Fernandes, Soby Chacko
  • Field Details

  • Constructor Details

    • AbstractMessageListenerContainer

      protected AbstractMessageListenerContainer(ConsumerFactory<? super K,? super V> consumerFactory, ContainerProperties containerProperties)
      Construct an instance with the provided factory and properties.
      Parameters:
      consumerFactory - the factory.
      containerProperties - the properties.
  • Method Details

    • setApplicationContext

      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
      Specified by:
      setApplicationContext in interface ApplicationContextAware
      Throws:
      BeansException
    • getApplicationContext

      @Nullable protected ApplicationContext getApplicationContext()
    • setBeanName

      public void setBeanName(String name)
      Specified by:
      setBeanName in interface BeanNameAware
    • getBeanName

      @Nullable public String getBeanName()
      Return the bean name.
      Returns:
      the bean name.
    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface ApplicationEventPublisherAware
    • getApplicationEventPublisher

      @Nullable public ApplicationEventPublisher getApplicationEventPublisher()
      Get the event publisher.
      Returns:
      the publisher
    • getCommonErrorHandler

      @Nullable public CommonErrorHandler getCommonErrorHandler()
      Returns:
      the handler.
      Since:
      2.8
    • setCommonErrorHandler

      public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler)
      Set the CommonErrorHandler which can handle errors for both record and batch listeners.
      Parameters:
      commonErrorHandler - the handler.
      Since:
      2.8
    • isStoppedNormally

      protected boolean isStoppedNormally()
    • setStoppedNormally

      protected void setStoppedNormally(boolean stoppedNormally)
    • isAutoStartup

      public boolean isAutoStartup()
      Specified by:
      isAutoStartup in interface SmartLifecycle
    • setAutoStartup

      public void setAutoStartup(boolean autoStartup)
      Description copied from interface: MessageListenerContainer
      Set the autoStartup.
      Specified by:
      setAutoStartup in interface MessageListenerContainer
      Parameters:
      autoStartup - the autoStartup to set.
      See Also:
    • setRunning

      protected void setRunning(boolean running)
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
    • isPaused

      protected boolean isPaused()
    • isPartitionPauseRequested

      public boolean isPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition)
      Description copied from interface: MessageListenerContainer
      Whether or not this topic's partition pause has been requested.
      Specified by:
      isPartitionPauseRequested in interface MessageListenerContainer
      Parameters:
      topicPartition - the topic partition to check
      Returns:
      true if pause for this TopicPartition has been requested
    • pausePartition

      public void pausePartition(org.apache.kafka.common.TopicPartition topicPartition)
      Description copied from interface: MessageListenerContainer
      Pause this partition before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.
      Specified by:
      pausePartition in interface MessageListenerContainer
      Parameters:
      topicPartition - the topicPartition to pause.
    • resumePartition

      public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition)
      Description copied from interface: MessageListenerContainer
      Resume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.
      Specified by:
      resumePartition in interface MessageListenerContainer
      Parameters:
      topicPartition - the topicPartition to resume.
    • isPauseRequested

      public boolean isPauseRequested()
      Description copied from interface: MessageListenerContainer
      Return true if MessageListenerContainer.pause() has been called; the container might not have actually paused yet.
      Specified by:
      isPauseRequested in interface MessageListenerContainer
      Returns:
      true if pause has been requested.
    • setPhase

      public void setPhase(int phase)
    • getPhase

      public int getPhase()
      Specified by:
      getPhase in interface Phased
      Specified by:
      getPhase in interface SmartLifecycle
    • getAfterRollbackProcessor

      public AfterRollbackProcessor<? super K,? super V> getAfterRollbackProcessor()
      Return the currently configured AfterRollbackProcessor.
      Returns:
      the after rollback processor.
      Since:
      2.2.14
    • setAfterRollbackProcessor

      public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
      Set a processor to perform seeks on unprocessed records after a rollback. Default will seek to current position all topics/partitions, including the failed record.
      Parameters:
      afterRollbackProcessor - the processor.
      Since:
      1.3.5
    • getContainerProperties

      public ContainerProperties getContainerProperties()
      Description copied from interface: MessageListenerContainer
      Return the container properties for this container.
      Specified by:
      getContainerProperties in interface MessageListenerContainer
      Returns:
      the properties.
    • getGroupId

      @Nullable public String getGroupId()
      Description copied from interface: MessageListenerContainer
      Return the group.id property for this container whether specifically set on the container or via a consumer property on the consumer factory.
      Specified by:
      getGroupId in interface MessageListenerContainer
      Returns:
      the group id.
    • getListenerId

      public String getListenerId()
      Description copied from interface: MessageListenerContainer
      The 'id' attribute of a @KafkaListener or the bean name for spring-managed containers.
      Specified by:
      getListenerId in interface MessageListenerContainer
      Returns:
      the id or bean name.
    • setMainListenerId

      public void setMainListenerId(String id)
      Set the main listener id, if this container is for a retry topic.
      Parameters:
      id - the id.
      Since:
      3.0.
    • getMainListenerId

      @Nullable public String getMainListenerId()
      Description copied from interface: MessageListenerContainer
      The 'id' attribute of the main @KafkaListener container, if this container is for a retry topic; null otherwise.
      Specified by:
      getMainListenerId in interface MessageListenerContainer
      Returns:
      the id.
    • getListenerInfo

      @Nullable public byte[] getListenerInfo()
      Description copied from interface: MessageListenerContainer
      Get arbitrary static information that will be added to the KafkaHeaders.LISTENER_INFO header of all records.
      Specified by:
      getListenerInfo in interface MessageListenerContainer
      Returns:
      the info.
    • setListenerInfo

      public void setListenerInfo(@Nullable byte[] listenerInfo)
      Set arbitrary information that will be added to the KafkaHeaders.LISTENER_INFO header of all records.
      Parameters:
      listenerInfo - the info.
      Since:
      2.8.4
    • setTopicCheckTimeout

      public void setTopicCheckTimeout(int topicCheckTimeout)
      How long to wait for Admin.describeTopics(Collection) result futures to complete.
      Parameters:
      topicCheckTimeout - the timeout in seconds; default 30.
      Since:
      2.3
    • isChangeConsumerThreadName

      public boolean isChangeConsumerThreadName()
      Return true if the container should change the consumer thread name during initialization.
      Returns:
      true to change.
      Since:
      3.0.1
    • setChangeConsumerThreadName

      public void setChangeConsumerThreadName(boolean changeConsumerThreadName)
      Set to true to instruct the container to change the consumer thread name during initialization.
      Parameters:
      changeConsumerThreadName - true to change.
      Since:
      3.0.1
      See Also:
    • getThreadNameSupplier

      public Function<MessageListenerContainer,String> getThreadNameSupplier()
      Return the function used to change the consumer thread name.
      Returns:
      the function.
      Since:
      3.0.1
    • setThreadNameSupplier

      public void setThreadNameSupplier(Function<MessageListenerContainer,String> threadNameSupplier)
      Set a function used to change the consumer thread name. The default returns the container listenerId.
      Parameters:
      threadNameSupplier - the function.
      Since:
      3.0.1
      See Also:
    • getKafkaAdmin

      @Nullable public KafkaAdmin getKafkaAdmin()
      Return the KafkaAdmin, used to find the cluster id for observation, if present.
      Returns:
      the kafkaAdmin
      Since:
      3.0.5
    • setKafkaAdmin

      public void setKafkaAdmin(KafkaAdmin kafkaAdmin)
      Set the KafkaAdmin, used to find the cluster id for observation, if present.
      Parameters:
      kafkaAdmin - the admin.
    • getRecordInterceptor

      protected RecordInterceptor<K,V> getRecordInterceptor()
    • setRecordInterceptor

      public void setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
      Set an interceptor to be called before calling the record listener. Does not apply to batch listeners.
      Parameters:
      recordInterceptor - the interceptor.
      Since:
      2.2.7
      See Also:
    • getBatchInterceptor

      protected BatchInterceptor<K,V> getBatchInterceptor()
    • setBatchInterceptor

      public void setBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)
      Set an interceptor to be called before calling the record listener.
      Parameters:
      batchInterceptor - the interceptor.
      Since:
      2.6.6
      See Also:
    • isInterceptBeforeTx

      protected boolean isInterceptBeforeTx()
    • setInterceptBeforeTx

      public void setInterceptBeforeTx(boolean interceptBeforeTx)
      When false, invoke the interceptor after the transaction starts.
      Parameters:
      interceptBeforeTx - false to intercept within the transaction. Default true since 2.8.
      Since:
      2.3.4
      See Also:
    • setupMessageListener

      public void setupMessageListener(Object messageListener)
      Description copied from interface: MessageListenerContainer
      Setup the message listener to use. Throws an IllegalArgumentException if that message listener type is not supported.
      Specified by:
      setupMessageListener in interface MessageListenerContainer
      Parameters:
      messageListener - the object to wrapped to the MessageListener.
    • start

      public final void start()
      Specified by:
      start in interface Lifecycle
    • checkTopics

      protected void checkTopics()
    • checkGroupId

      public void checkGroupId()
    • doStart

      protected abstract void doStart()
    • stop

      public final void stop()
      Specified by:
      stop in interface Lifecycle
    • stop

      public final void stop(boolean wait)
      Stop the container.
      Parameters:
      wait - wait for the listener to terminate.
      Since:
      2.3.8
    • pause

      public void pause()
      Description copied from interface: MessageListenerContainer
      Pause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.
      Specified by:
      pause in interface MessageListenerContainer
      See Also:
      • KafkaConsumer.pause(Collection)
    • resume

      public void resume()
      Description copied from interface: MessageListenerContainer
      Resume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.
      Specified by:
      resume in interface MessageListenerContainer
      See Also:
      • KafkaConsumer.resume(Collection)
    • stop

      public void stop(Runnable callback)
      Specified by:
      stop in interface SmartLifecycle
    • stopAbnormally

      public void stopAbnormally(Runnable callback)
      Description copied from interface: MessageListenerContainer
      Stop the container after some exception so that MessageListenerContainer.isInExpectedState() will return false.
      Specified by:
      stopAbnormally in interface MessageListenerContainer
      Parameters:
      callback - the callback.
      See Also:
    • doStop

      protected void doStop(Runnable callback)
    • doStop

      protected abstract void doStop(Runnable callback, boolean normal)
      Stop the container normally or abnormally.
      Parameters:
      callback - the callback.
      normal - true for an expected stop.
      Since:
      2.8
    • createSimpleLoggingConsumerRebalanceListener

      protected final org.apache.kafka.clients.consumer.ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener()
      Return default implementation of ConsumerRebalanceListener instance.
      Returns:
      the ConsumerRebalanceListener currently assigned to this container.
    • publishContainerStoppedEvent

      protected void publishContainerStoppedEvent()
    • parentOrThis

      protected AbstractMessageListenerContainer<?,?> parentOrThis()
      Return this or a parent container if this has a parent.
      Returns:
      the parent or this.
      Since:
      2.2.1
    • propertiesFromConsumerPropertyOverrides

      protected Properties propertiesFromConsumerPropertyOverrides()
      Make any default consumer override properties explicit properties.
      Returns:
      the properties.
      Since:
      2.9.11