Class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V>

java.lang.Object
org.springframework.kafka.config.AbstractKafkaListenerContainerFactory<C,K,V>
Type Parameters:
C - the AbstractMessageListenerContainer implementation type.
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, ApplicationContextAware, ApplicationEventPublisherAware, KafkaListenerContainerFactory<C>
Direct Known Subclasses:
ConcurrentKafkaListenerContainerFactory

public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V> extends Object implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, ApplicationContextAware
Base KafkaListenerContainerFactory for Spring's base container implementation.
Author:
Stephane Nicoll, Gary Russell, Artem Bilan
See Also:
  • Field Details

  • Constructor Details

    • AbstractKafkaListenerContainerFactory

      public AbstractKafkaListenerContainerFactory()
  • Method Details

    • setApplicationContext

      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
      Specified by:
      setApplicationContext in interface ApplicationContextAware
      Throws:
      BeansException
    • setConsumerFactory

      public void setConsumerFactory(ConsumerFactory<? super K,? super V> consumerFactory)
      Specify a ConsumerFactory to use.
      Parameters:
      consumerFactory - The consumer factory.
    • getConsumerFactory

      public ConsumerFactory<? super K,? super V> getConsumerFactory()
    • setAutoStartup

      public void setAutoStartup(Boolean autoStartup)
      Specify an autoStartup boolean flag.
      Parameters:
      autoStartup - true for auto startup.
      See Also:
    • setPhase

      public void setPhase(int phase)
      Specify a phase to use.
      Parameters:
      phase - The phase.
      See Also:
    • setRecordMessageConverter

      public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter)
      Set the message converter to use if dynamic argument type matching is needed for record listeners.
      Parameters:
      recordMessageConverter - the converter.
      Since:
      2.9.6
    • setBatchMessageConverter

      public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter)
      Set the message converter to use if dynamic argument type matching is needed for batch listeners.
      Parameters:
      batchMessageConverter - the converter.
      Since:
      2.9.6
    • setRecordFilterStrategy

      public void setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
      Set the record filter strategy.
      Parameters:
      recordFilterStrategy - the strategy.
    • setAckDiscarded

      public void setAckDiscarded(Boolean ackDiscarded)
      Set to true to ack discards when a filter strategy is in use.
      Parameters:
      ackDiscarded - the ackDiscarded.
    • isBatchListener

      public Boolean isBatchListener()
      Return true if this endpoint creates a batch listener.
      Returns:
      true for a batch listener.
      Since:
      1.1
    • setBatchListener

      public void setBatchListener(Boolean batchListener)
      Set to true if this endpoint should create a batch listener.
      Parameters:
      batchListener - true for a batch listener.
      Since:
      1.1
    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface ApplicationEventPublisherAware
    • setReplyTemplate

      public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
      Set the KafkaTemplate to use to send replies.
      Parameters:
      replyTemplate - the template.
      Since:
      2.0
    • setCommonErrorHandler

      public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler)
      Set the CommonErrorHandler which can handle errors for both record and batch listeners.
      Parameters:
      commonErrorHandler - the handler.
      Since:
      2.8
    • setAfterRollbackProcessor

      public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
      Set a processor to invoke after a transaction rollback; typically will seek the unprocessed topic/partition to reprocess the records. The default does so, including the failed record.
      Parameters:
      afterRollbackProcessor - the processor.
      Since:
      1.3.5
    • setReplyHeadersConfigurer

      public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
      Set a configurer which will be invoked when creating a reply message.
      Parameters:
      replyHeadersConfigurer - the configurer.
      Since:
      2.2
    • setMissingTopicsFatal

      public void setMissingTopicsFatal(boolean missingTopicsFatal)
      Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;
      Parameters:
      missingTopicsFatal - the missingTopicsFatal.
      Since:
      2.3
    • getContainerProperties

      public ContainerProperties getContainerProperties()
      Obtain the properties template for this factory - set properties as needed and they will be copied to a final properties instance for the endpoint.
      Returns:
      the properties.
    • setRecordInterceptor

      public void setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
      Set an interceptor to be called before calling the listener. Only used with record listeners.
      Parameters:
      recordInterceptor - the interceptor.
      Since:
      2.2.7
      See Also:
    • setBatchInterceptor

      public void setBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)
      Set a batch interceptor to be called before and after calling the listener. Only used with batch listeners.
      Parameters:
      batchInterceptor - the interceptor.
      Since:
      2.7
      See Also:
    • setBatchToRecordAdapter

      public void setBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)
      Parameters:
      batchToRecordAdapter - the adapter.
      Since:
      2.4.2
    • setContainerCustomizer

      public void setContainerCustomizer(ContainerCustomizer<K,V,C> containerCustomizer)
      Set a customizer used to further configure a container after it has been created.
      Parameters:
      containerCustomizer - the customizer.
      Since:
      2.3.4
    • setCorrelationHeaderName

      public void setCorrelationHeaderName(String correlationHeaderName)
      Set a custom header name for the correlation id. Default KafkaHeaders.CORRELATION_ID. This header will be echoed back in any reply message.
      Parameters:
      correlationHeaderName - the header name.
      Since:
      3.0
    • setChangeConsumerThreadName

      public void setChangeConsumerThreadName(boolean changeConsumerThreadName)
      Set to true to instruct the container to change the consumer thread name during initialization.
      Parameters:
      changeConsumerThreadName - true to change.
      Since:
      3.0.1
      See Also:
    • setThreadNameSupplier

      public void setThreadNameSupplier(Function<MessageListenerContainer,String> threadNameSupplier)
      Set a function used to change the consumer thread name. The default returns the container listenerId.
      Parameters:
      threadNameSupplier - the function.
      Since:
      3.0.1
      See Also:
    • createListenerContainer

      public C createListenerContainer(KafkaListenerEndpoint endpoint)
      Description copied from interface: KafkaListenerContainerFactory
      Create a MessageListenerContainer for the given KafkaListenerEndpoint. Containers created using this method are added to the listener endpoint registry.
      Specified by:
      createListenerContainer in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
      Parameters:
      endpoint - the endpoint to configure
      Returns:
      the created container
    • createContainerInstance

      protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint)
      Create an empty container instance.
      Parameters:
      endpoint - the endpoint.
      Returns:
      the new container instance.
    • initializeContainer

      protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint)
      Further initialize the specified container.

      Subclasses can inherit from this method to apply extra configuration if necessary.

      Parameters:
      instance - the container instance to configure.
      endpoint - the endpoint.
    • createContainer

      public C createContainer(TopicPartitionOffset... topicsAndPartitions)
      Description copied from interface: KafkaListenerContainerFactory
      Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.
      Specified by:
      createContainer in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
      Parameters:
      topicsAndPartitions - the topicPartitions to assign.
      Returns:
      the container.
    • createContainer

      public C createContainer(String... topics)
      Description copied from interface: KafkaListenerContainerFactory
      Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.
      Specified by:
      createContainer in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
      Parameters:
      topics - the topics.
      Returns:
      the container.
    • createContainer

      public C createContainer(Pattern topicPattern)
      Description copied from interface: KafkaListenerContainerFactory
      Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.
      Specified by:
      createContainer in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
      Parameters:
      topicPattern - the topicPattern.
      Returns:
      the container.
    • createContainer

      protected C createContainer(KafkaListenerEndpoint endpoint)
    • customizeContainer

      protected void customizeContainer(C instance, KafkaListenerEndpoint endpoint)