C
- the AbstractMessageListenerContainer
implementation type.K
- the key type.V
- the value type.public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V> extends java.lang.Object implements KafkaListenerContainerFactory<C>, org.springframework.context.ApplicationEventPublisherAware, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware
KafkaListenerContainerFactory
for Spring's base container implementation.AbstractMessageListenerContainer
Modifier and Type | Field and Description |
---|---|
protected org.springframework.core.log.LogAccessor |
logger |
Constructor and Description |
---|
AbstractKafkaListenerContainerFactory() |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
C |
createContainer(java.util.Collection<TopicPartitionInitialOffset> topicPartitions)
Deprecated.
in favor of
createContainer(TopicPartitionOffset[]) |
C |
createContainer(java.util.regex.Pattern topicPattern)
Create and configure a container without a listener; used to create containers that
are not used for KafkaListener annotations.
|
C |
createContainer(java.lang.String... topics)
Create and configure a container without a listener; used to create containers that
are not used for KafkaListener annotations.
|
C |
createContainer(TopicPartitionOffset... topicsAndPartitions)
Create and configure a container without a listener; used to create containers that
are not used for KafkaListener annotations.
|
protected abstract C |
createContainerInstance(KafkaListenerEndpoint endpoint)
Create an empty container instance.
|
C |
createListenerContainer(KafkaListenerEndpoint endpoint)
Create a
MessageListenerContainer for the given KafkaListenerEndpoint . |
ConsumerFactory<? super K,? super V> |
getConsumerFactory() |
ContainerProperties |
getContainerProperties()
Obtain the properties template for this factory - set properties as needed
and they will be copied to a final properties instance for the endpoint.
|
protected void |
initializeContainer(C instance,
KafkaListenerEndpoint endpoint)
Further initialize the specified container.
|
java.lang.Boolean |
isBatchListener()
Return true if this endpoint creates a batch listener.
|
void |
setAckDiscarded(java.lang.Boolean ackDiscarded)
Set to true to ack discards when a filter strategy is in use.
|
void |
setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
Set a processor to invoke after a transaction rollback; typically will
seek the unprocessed topic/partition to reprocess the records.
|
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) |
void |
setAutoStartup(java.lang.Boolean autoStartup)
Specify an
autoStartup boolean flag. |
void |
setBatchErrorHandler(BatchErrorHandler errorHandler)
Set the batch error handler to call when the listener throws an exception.
|
void |
setBatchListener(java.lang.Boolean batchListener)
Set to true if this endpoint should create a batch listener.
|
void |
setConsumerFactory(ConsumerFactory<? super K,? super V> consumerFactory)
Specify a
ConsumerFactory to use. |
void |
setContainerCustomizer(ContainerCustomizer<K,V,C> containerCustomizer)
Set a customizer used to further configure a container after it has been created.
|
void |
setErrorHandler(ErrorHandler errorHandler)
Set the error handler to call when the listener throws an exception.
|
void |
setMessageConverter(MessageConverter messageConverter)
Set the message converter to use if dynamic argument type matching is needed.
|
void |
setMissingTopicsFatal(boolean missingTopicsFatal)
Set to false to allow the container to start even if any of the configured topics
are not present on the broker.
|
void |
setPhase(int phase)
Specify a
phase to use. |
void |
setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
Set the record filter strategy.
|
void |
setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
Set an interceptor to be called before calling the listener.
|
void |
setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)
Set a callback to be used with the
retryTemplate . |
void |
setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.
|
void |
setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set the
KafkaTemplate to use to send replies. |
void |
setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Set a retryTemplate.
|
void |
setStatefulRetry(boolean statefulRetry)
When using a
RetryTemplate Set to true to enable stateful retry. |
public AbstractKafkaListenerContainerFactory()
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
org.springframework.beans.BeansException
public void setConsumerFactory(ConsumerFactory<? super K,? super V> consumerFactory)
ConsumerFactory
to use.consumerFactory
- The consumer factory.public ConsumerFactory<? super K,? super V> getConsumerFactory()
public void setAutoStartup(java.lang.Boolean autoStartup)
autoStartup boolean
flag.autoStartup
- true for auto startup.AbstractMessageListenerContainer.setAutoStartup(boolean)
public void setPhase(int phase)
phase
to use.phase
- The phase.AbstractMessageListenerContainer.setPhase(int)
public void setMessageConverter(MessageConverter messageConverter)
messageConverter
- the converter.public void setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
recordFilterStrategy
- the strategy.public void setAckDiscarded(java.lang.Boolean ackDiscarded)
ackDiscarded
- the ackDiscarded.public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
retryTemplate
- the template.public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)
retryTemplate
.recoveryCallback
- the callback.public void setStatefulRetry(boolean statefulRetry)
RetryTemplate
Set to true to enable stateful retry. Use in
conjunction with a
SeekToCurrentErrorHandler
when retry can
take excessive time; each failure goes back to the broker, to keep the Consumer
alive.statefulRetry
- true to enable stateful retry.public java.lang.Boolean isBatchListener()
public void setBatchListener(java.lang.Boolean batchListener)
batchListener
- true for a batch listener.public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher
in interface org.springframework.context.ApplicationEventPublisherAware
public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
KafkaTemplate
to use to send replies.replyTemplate
- the template.public void setErrorHandler(ErrorHandler errorHandler)
errorHandler
- the error handler.public void setBatchErrorHandler(BatchErrorHandler errorHandler)
errorHandler
- the error handler.public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
afterRollbackProcessor
- the processor.public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
replyHeadersConfigurer
- the configurer.public void setMissingTopicsFatal(boolean missingTopicsFatal)
missingTopicsFatal
- the missingTopicsFatal.public ContainerProperties getContainerProperties()
public void setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
recordInterceptor
- the interceptor.public void setContainerCustomizer(ContainerCustomizer<K,V,C> containerCustomizer)
containerCustomizer
- the customizer.public void afterPropertiesSet()
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
public C createListenerContainer(KafkaListenerEndpoint endpoint)
KafkaListenerContainerFactory
MessageListenerContainer
for the given KafkaListenerEndpoint
.
Containers created using this method are added to the listener endpoint registry.createListenerContainer
in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
endpoint
- the endpoint to configureprotected abstract C createContainerInstance(KafkaListenerEndpoint endpoint)
endpoint
- the endpoint.protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint)
Subclasses can inherit from this method to apply extra configuration if necessary.
instance
- the container instance to configure.endpoint
- the endpoint.@Deprecated public C createContainer(java.util.Collection<TopicPartitionInitialOffset> topicPartitions)
createContainer(TopicPartitionOffset[])
createContainer
in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
topicPartitions
- the topicPartitions to assign.public C createContainer(TopicPartitionOffset... topicsAndPartitions)
KafkaListenerContainerFactory
createContainer
in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
topicsAndPartitions
- the topicPartitions to assign.public C createContainer(java.lang.String... topics)
KafkaListenerContainerFactory
createContainer
in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
topics
- the topics.public C createContainer(java.util.regex.Pattern topicPattern)
KafkaListenerContainerFactory
createContainer
in interface KafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>
topicPattern
- the topicPattern.