Package org.springframework.kafka.config
Class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V>
java.lang.Object
org.springframework.kafka.config.AbstractKafkaListenerContainerFactory<C,K,V>
- Type Parameters:
C
- theAbstractMessageListenerContainer
implementation type.K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,InitializingBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,KafkaListenerContainerFactory<C>
- Direct Known Subclasses:
ConcurrentKafkaListenerContainerFactory
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V>
extends Object
implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean, ApplicationContextAware
Base
KafkaListenerContainerFactory
for Spring's base container implementation.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
-
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
createContainer
(String... topics) Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations.createContainer
(Pattern topicPattern) Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations.createContainer
(TopicPartitionOffset... topicsAndPartitions) Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations.protected abstract C
createContainerInstance
(KafkaListenerEndpoint endpoint) Create an empty container instance.createListenerContainer
(KafkaListenerEndpoint endpoint) Create aMessageListenerContainer
for the givenKafkaListenerEndpoint
.ConsumerFactory<? super K,
? super V> Obtain the properties template for this factory - set properties as needed and they will be copied to a final properties instance for the endpoint.protected void
initializeContainer
(C instance, KafkaListenerEndpoint endpoint) Further initialize the specified container.Return true if this endpoint creates a batch listener.void
setAckDiscarded
(Boolean ackDiscarded) Set to true to ack discards when a filter strategy is in use.void
setAfterRollbackProcessor
(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) Set a processor to invoke after a transaction rollback; typically will seek the unprocessed topic/partition to reprocess the records.void
setApplicationContext
(ApplicationContext applicationContext) void
setApplicationEventPublisher
(ApplicationEventPublisher applicationEventPublisher) void
setAutoStartup
(Boolean autoStartup) Specify anautoStartup boolean
flag.void
setBatchErrorHandler
(BatchErrorHandler errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.void
setBatchInterceptor
(BatchInterceptor<K, V> batchInterceptor) Set a batch interceptor to be called before and after calling the listener.void
setBatchListener
(Boolean batchListener) Set to true if this endpoint should create a batch listener.void
setBatchMessageConverter
(BatchMessageConverter batchMessageConverter) Set the message converter to use if dynamic argument type matching is needed for batch listeners.void
setBatchToRecordAdapter
(BatchToRecordAdapter<K, V> batchToRecordAdapter) Set aBatchToRecordAdapter
.void
setChangeConsumerThreadName
(boolean changeConsumerThreadName) Set to true to instruct the container to change the consumer thread name during initialization.void
setCommonErrorHandler
(CommonErrorHandler commonErrorHandler) Set theCommonErrorHandler
which can handle errors for both record and batch listeners.void
setConsumerFactory
(ConsumerFactory<? super K, ? super V> consumerFactory) Specify aConsumerFactory
to use.void
setContainerCustomizer
(ContainerCustomizer<K, V, C> containerCustomizer) Set a customizer used to further configure a container after it has been created.void
setCorrelationHeaderName
(String correlationHeaderName) Set a custom header name for the correlation id.void
setErrorHandler
(ErrorHandler errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
void
setMessageConverter
(MessageConverter messageConverter) Deprecated.since 2.9.6 in favor ofsetBatchMessageConverter(BatchMessageConverter)
andsetRecordMessageConverter(RecordMessageConverter)
.void
setMissingTopicsFatal
(boolean missingTopicsFatal) Set to false to allow the container to start even if any of the configured topics are not present on the broker.void
setPhase
(int phase) Specify aphase
to use.void
setRecordFilterStrategy
(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) Set the record filter strategy.void
setRecordInterceptor
(RecordInterceptor<K, V> recordInterceptor) Set an interceptor to be called before calling the listener.void
setRecordMessageConverter
(RecordMessageConverter recordMessageConverter) Set the message converter to use if dynamic argument type matching is needed for record listeners.void
setReplyHeadersConfigurer
(ReplyHeadersConfigurer replyHeadersConfigurer) Set a configurer which will be invoked when creating a reply message.void
setReplyTemplate
(KafkaTemplate<?, ?> replyTemplate) Set theKafkaTemplate
to use to send replies.void
setThreadNameSupplier
(Function<MessageListenerContainer, String> threadNameSupplier) Set a function used to change the consumer thread name.
-
Field Details
-
logger
-
-
Constructor Details
-
AbstractKafkaListenerContainerFactory
public AbstractKafkaListenerContainerFactory()
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-
setConsumerFactory
Specify aConsumerFactory
to use.- Parameters:
consumerFactory
- The consumer factory.
-
getConsumerFactory
-
setAutoStartup
Specify anautoStartup boolean
flag.- Parameters:
autoStartup
- true for auto startup.- See Also:
-
setPhase
public void setPhase(int phase) Specify aphase
to use.- Parameters:
phase
- The phase.- See Also:
-
setMessageConverter
Deprecated.since 2.9.6 in favor ofsetBatchMessageConverter(BatchMessageConverter)
andsetRecordMessageConverter(RecordMessageConverter)
.Set the message converter to use if dynamic argument type matching is needed.- Parameters:
messageConverter
- the converter.
-
setRecordMessageConverter
Set the message converter to use if dynamic argument type matching is needed for record listeners.- Parameters:
recordMessageConverter
- the converter.- Since:
- 2.9.6
-
setBatchMessageConverter
Set the message converter to use if dynamic argument type matching is needed for batch listeners.- Parameters:
batchMessageConverter
- the converter.- Since:
- 2.9.6
-
setRecordFilterStrategy
Set the record filter strategy.- Parameters:
recordFilterStrategy
- the strategy.
-
setAckDiscarded
Set to true to ack discards when a filter strategy is in use.- Parameters:
ackDiscarded
- the ackDiscarded.
-
isBatchListener
Return true if this endpoint creates a batch listener.- Returns:
- true for a batch listener.
- Since:
- 1.1
-
setBatchListener
Set to true if this endpoint should create a batch listener.- Parameters:
batchListener
- true for a batch listener.- Since:
- 1.1
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisher
in interfaceApplicationEventPublisherAware
-
setReplyTemplate
Set theKafkaTemplate
to use to send replies.- Parameters:
replyTemplate
- the template.- Since:
- 2.0
-
setErrorHandler
Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
- See Also:
-
setBatchErrorHandler
@Deprecated(since="2.8", forRemoval=true) public void setBatchErrorHandler(BatchErrorHandler errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
Set the batch error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
- See Also:
-
setCommonErrorHandler
Set theCommonErrorHandler
which can handle errors for both record and batch listeners. Replaces the use ofGenericErrorHandler
s.- Parameters:
commonErrorHandler
- the handler.- Since:
- 2.8
-
setAfterRollbackProcessor
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) Set a processor to invoke after a transaction rollback; typically will seek the unprocessed topic/partition to reprocess the records. The default does so, including the failed record.- Parameters:
afterRollbackProcessor
- the processor.- Since:
- 1.3.5
-
setReplyHeadersConfigurer
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer
- the configurer.- Since:
- 2.2
-
setMissingTopicsFatal
public void setMissingTopicsFatal(boolean missingTopicsFatal) Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;- Parameters:
missingTopicsFatal
- the missingTopicsFatal.- Since:
- 2.3
-
getContainerProperties
Obtain the properties template for this factory - set properties as needed and they will be copied to a final properties instance for the endpoint.- Returns:
- the properties.
-
setRecordInterceptor
Set an interceptor to be called before calling the listener. Only used with record listeners.- Parameters:
recordInterceptor
- the interceptor.- Since:
- 2.2.7
- See Also:
-
setBatchInterceptor
Set a batch interceptor to be called before and after calling the listener. Only used with batch listeners.- Parameters:
batchInterceptor
- the interceptor.- Since:
- 2.7
- See Also:
-
setBatchToRecordAdapter
Set aBatchToRecordAdapter
.- Parameters:
batchToRecordAdapter
- the adapter.- Since:
- 2.4.2
-
setContainerCustomizer
Set a customizer used to further configure a container after it has been created.- Parameters:
containerCustomizer
- the customizer.- Since:
- 2.3.4
-
setCorrelationHeaderName
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID
. This header will be echoed back in any reply message.- Parameters:
correlationHeaderName
- the header name.- Since:
- 3.0
-
setChangeConsumerThreadName
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) Set to true to instruct the container to change the consumer thread name during initialization.- Parameters:
changeConsumerThreadName
- true to change.- Since:
- 3.0.1
- See Also:
-
setThreadNameSupplier
Set a function used to change the consumer thread name. The default returns the containerlistenerId
.- Parameters:
threadNameSupplier
- the function.- Since:
- 3.0.1
- See Also:
-
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSet
in interfaceInitializingBean
-
createListenerContainer
Description copied from interface:KafkaListenerContainerFactory
Create aMessageListenerContainer
for the givenKafkaListenerEndpoint
. Containers created using this method are added to the listener endpoint registry.- Specified by:
createListenerContainer
in interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,
V>> - Parameters:
endpoint
- the endpoint to configure- Returns:
- the created container
-
createContainerInstance
Create an empty container instance.- Parameters:
endpoint
- the endpoint.- Returns:
- the new container instance.
-
initializeContainer
Further initialize the specified container.Subclasses can inherit from this method to apply extra configuration if necessary.
- Parameters:
instance
- the container instance to configure.endpoint
- the endpoint.
-
createContainer
Description copied from interface:KafkaListenerContainerFactory
Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.- Specified by:
createContainer
in interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,
V>> - Parameters:
topicsAndPartitions
- the topicPartitions to assign.- Returns:
- the container.
-
createContainer
Description copied from interface:KafkaListenerContainerFactory
Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.- Specified by:
createContainer
in interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,
V>> - Parameters:
topics
- the topics.- Returns:
- the container.
-
createContainer
Description copied from interface:KafkaListenerContainerFactory
Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.- Specified by:
createContainer
in interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,
V>> - Parameters:
topicPattern
- the topicPattern.- Returns:
- the container.
-
setCommonErrorHandler(CommonErrorHandler)