Package org.springframework.kafka.config
Class AbstractKafkaListenerEndpoint<K,V>
- java.lang.Object
-
- org.springframework.kafka.config.AbstractKafkaListenerEndpoint<K,V>
-
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanFactoryAware
,org.springframework.beans.factory.InitializingBean
,KafkaListenerEndpoint
- Direct Known Subclasses:
MethodKafkaListenerEndpoint
public abstract class AbstractKafkaListenerEndpoint<K,V> extends java.lang.Object implements KafkaListenerEndpoint, org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.InitializingBean
Base model for a Kafka listener endpoint.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
MethodKafkaListenerEndpoint
-
-
Constructor Summary
Constructors Constructor Description AbstractKafkaListenerEndpoint()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
afterPropertiesSet()
protected abstract MessagingMessageListenerAdapter<K,V>
createMessageListener(MessageListenerContainer container, MessageConverter messageConverter)
Create aMessageListener
that is able to serve this endpoint for the specified container.java.lang.Boolean
getAutoStartup()
Return the autoStartup for this endpoint's container.protected BatchToRecordAdapter<K,V>
getBatchToRecordAdapter()
protected org.springframework.beans.factory.config.BeanExpressionContext
getBeanExpressionContext()
protected org.springframework.beans.factory.BeanFactory
getBeanFactory()
protected org.springframework.expression.BeanResolver
getBeanResolver()
java.lang.String
getClientIdPrefix()
Return the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.java.lang.Integer
getConcurrency()
Return the concurrency for this endpoint's container.java.util.Properties
getConsumerProperties()
Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.protected java.lang.StringBuilder
getEndpointDescription()
Return a description for this endpoint.java.lang.String
getGroup()
Return the group of this endpoint or null if not in a group.java.lang.String
getGroupId()
Return the groupId of this endpoint - if present, overrides thegroup.id
property of the consumer factory.java.lang.String
getId()
Return the id of this endpoint.protected RecordFilterStrategy<? super K,? super V>
getRecordFilterStrategy()
protected org.springframework.retry.RecoveryCallback<?>
getRecoveryCallback()
protected KafkaTemplate<?,?>
getReplyTemplate()
protected org.springframework.beans.factory.config.BeanExpressionResolver
getResolver()
protected org.springframework.retry.support.RetryTemplate
getRetryTemplate()
TopicPartitionOffset[]
getTopicPartitionsToAssign()
Return the topicPartitions for this endpoint.java.util.regex.Pattern
getTopicPattern()
Return the topicPattern for this endpoint.java.util.Collection<java.lang.String>
getTopics()
Return the topics for this endpoint.protected boolean
isAckDiscarded()
boolean
isBatchListener()
Return true if this endpoint creates a batch listener.boolean
isSplitIterables()
When true,Iterable
return results will be split into discrete records.protected boolean
isStatefulRetry()
void
setAckDiscarded(boolean ackDiscarded)
Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)
is in use.void
setAutoStartup(java.lang.Boolean autoStartup)
Set the autoStartup for this endpoint's container.void
setBatchListener(boolean batchListener)
Set to true if this endpoint should create a batch listener.void
setBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)
Set aBatchToRecordAdapter
.void
setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory)
void
setClientIdPrefix(java.lang.String clientIdPrefix)
Set the client id prefix; overrides the client id in the consumer configuration properties.void
setConcurrency(java.lang.Integer concurrency)
Set the concurrency for this endpoint's container.void
setConsumerProperties(java.util.Properties consumerProperties)
Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.void
setGroup(java.lang.String group)
Set the group for the corresponding listener container.void
setGroupId(java.lang.String groupId)
Set the group id to override thegroup.id
property in the ContainerFactory.void
setId(java.lang.String id)
void
setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
Set aRecordFilterStrategy
implementation.void
setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)
Set a callback to be used with thesetRetryTemplate(RetryTemplate)
.void
setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.void
setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set theKafkaTemplate
to use to send replies.void
setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Set a retryTemplate.void
setSplitIterables(boolean splitIterables)
Set to false to disable splittingIterable
reply values into separate records.void
setStatefulRetry(boolean statefulRetry)
When using aRetryTemplate
, set to true to enable stateful retry.void
setTopicPartitions(TopicPartitionOffset... topicPartitions)
Set the topicPartitions to use.void
setTopicPattern(java.util.regex.Pattern topicPattern)
Set the topic pattern to use.void
setTopics(java.lang.String... topics)
Set the topics to use.void
setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter)
Setup the specified message listener container with the model defined by this endpoint.java.lang.String
toString()
-
-
-
Method Detail
-
setBeanFactory
public void setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory) throws org.springframework.beans.BeansException
- Specified by:
setBeanFactory
in interfaceorg.springframework.beans.factory.BeanFactoryAware
- Throws:
org.springframework.beans.BeansException
-
getBeanFactory
protected org.springframework.beans.factory.BeanFactory getBeanFactory()
-
getResolver
protected org.springframework.beans.factory.config.BeanExpressionResolver getResolver()
-
getBeanExpressionContext
protected org.springframework.beans.factory.config.BeanExpressionContext getBeanExpressionContext()
-
getBeanResolver
protected org.springframework.expression.BeanResolver getBeanResolver()
-
setId
public void setId(java.lang.String id)
-
getId
public java.lang.String getId()
Description copied from interface:KafkaListenerEndpoint
Return the id of this endpoint.- Specified by:
getId
in interfaceKafkaListenerEndpoint
- Returns:
- the id of this endpoint. The id can be further qualified when the endpoint is resolved against its actual listener container.
- See Also:
KafkaListenerContainerFactory.createListenerContainer(org.springframework.kafka.config.KafkaListenerEndpoint)
-
setGroupId
public void setGroupId(java.lang.String groupId)
Set the group id to override thegroup.id
property in the ContainerFactory.- Parameters:
groupId
- the group id.- Since:
- 1.3
-
getGroupId
public java.lang.String getGroupId()
Description copied from interface:KafkaListenerEndpoint
Return the groupId of this endpoint - if present, overrides thegroup.id
property of the consumer factory.- Specified by:
getGroupId
in interfaceKafkaListenerEndpoint
- Returns:
- the group id; may be null.
-
setTopics
public void setTopics(java.lang.String... topics)
Set the topics to use. Either these or 'topicPattern' or 'topicPartitions' should be provided, but not a mixture.- Parameters:
topics
- to set.- See Also:
setTopicPartitions(TopicPartitionOffset...)
,setTopicPattern(Pattern)
-
getTopics
public java.util.Collection<java.lang.String> getTopics()
Return the topics for this endpoint.- Specified by:
getTopics
in interfaceKafkaListenerEndpoint
- Returns:
- the topics for this endpoint.
-
setTopicPartitions
public void setTopicPartitions(TopicPartitionOffset... topicPartitions)
Set the topicPartitions to use. Either this or 'topic' or 'topicPattern' should be provided, but not a mixture.- Parameters:
topicPartitions
- to set.- Since:
- 2.3
- See Also:
setTopics(String...)
,setTopicPattern(Pattern)
-
getTopicPartitionsToAssign
public TopicPartitionOffset[] getTopicPartitionsToAssign()
Return the topicPartitions for this endpoint.- Specified by:
getTopicPartitionsToAssign
in interfaceKafkaListenerEndpoint
- Returns:
- the topicPartitions for this endpoint.
- Since:
- 2.3
-
setTopicPattern
public void setTopicPattern(java.util.regex.Pattern topicPattern)
Set the topic pattern to use. Cannot be used with topics or topicPartitions.- Parameters:
topicPattern
- the pattern- See Also:
setTopicPartitions(TopicPartitionOffset...)
,setTopics(String...)
-
getTopicPattern
public java.util.regex.Pattern getTopicPattern()
Return the topicPattern for this endpoint.- Specified by:
getTopicPattern
in interfaceKafkaListenerEndpoint
- Returns:
- the topicPattern for this endpoint.
-
getGroup
public java.lang.String getGroup()
Description copied from interface:KafkaListenerEndpoint
Return the group of this endpoint or null if not in a group.- Specified by:
getGroup
in interfaceKafkaListenerEndpoint
- Returns:
- the group of this endpoint or null if not in a group.
-
setGroup
public void setGroup(java.lang.String group)
Set the group for the corresponding listener container.- Parameters:
group
- the group.
-
isBatchListener
public boolean isBatchListener()
Return true if this endpoint creates a batch listener.- Returns:
- true for a batch listener.
- Since:
- 1.1
-
setBatchListener
public void setBatchListener(boolean batchListener)
Set to true if this endpoint should create a batch listener.- Parameters:
batchListener
- true for a batch listener.- Since:
- 1.1
-
setReplyTemplate
public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set theKafkaTemplate
to use to send replies.- Parameters:
replyTemplate
- the template.- Since:
- 2.0
-
getReplyTemplate
protected KafkaTemplate<?,?> getReplyTemplate()
-
getRecordFilterStrategy
protected RecordFilterStrategy<? super K,? super V> getRecordFilterStrategy()
-
setRecordFilterStrategy
public void setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
Set aRecordFilterStrategy
implementation.- Parameters:
recordFilterStrategy
- the strategy implementation.
-
isAckDiscarded
protected boolean isAckDiscarded()
-
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded)
Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)
is in use.- Parameters:
ackDiscarded
- the ackDiscarded.
-
getRetryTemplate
protected org.springframework.retry.support.RetryTemplate getRetryTemplate()
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Set a retryTemplate.- Parameters:
retryTemplate
- the template.
-
getRecoveryCallback
protected org.springframework.retry.RecoveryCallback<?> getRecoveryCallback()
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)
Set a callback to be used with thesetRetryTemplate(RetryTemplate)
.- Parameters:
recoveryCallback
- the callback.
-
isStatefulRetry
protected boolean isStatefulRetry()
-
setStatefulRetry
public void setStatefulRetry(boolean statefulRetry)
When using aRetryTemplate
, set to true to enable stateful retry. Use in conjunction with aSeekToCurrentErrorHandler
when retry can take excessive time; each failure goes back to the broker, to keep the Consumer alive.- Parameters:
statefulRetry
- true to enable stateful retry.- Since:
- 2.1.3
-
getClientIdPrefix
public java.lang.String getClientIdPrefix()
Description copied from interface:KafkaListenerEndpoint
Return the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.- Specified by:
getClientIdPrefix
in interfaceKafkaListenerEndpoint
- Returns:
- the client id prefix.
-
setClientIdPrefix
public void setClientIdPrefix(java.lang.String clientIdPrefix)
Set the client id prefix; overrides the client id in the consumer configuration properties.- Parameters:
clientIdPrefix
- the prefix.- Since:
- 2.1.1
-
getConcurrency
public java.lang.Integer getConcurrency()
Description copied from interface:KafkaListenerEndpoint
Return the concurrency for this endpoint's container.- Specified by:
getConcurrency
in interfaceKafkaListenerEndpoint
- Returns:
- the concurrency.
-
setConcurrency
public void setConcurrency(java.lang.Integer concurrency)
Set the concurrency for this endpoint's container.- Parameters:
concurrency
- the concurrency.- Since:
- 2.2
-
getAutoStartup
public java.lang.Boolean getAutoStartup()
Description copied from interface:KafkaListenerEndpoint
Return the autoStartup for this endpoint's container.- Specified by:
getAutoStartup
in interfaceKafkaListenerEndpoint
- Returns:
- the autoStartup.
-
setAutoStartup
public void setAutoStartup(java.lang.Boolean autoStartup)
Set the autoStartup for this endpoint's container.- Parameters:
autoStartup
- the autoStartup.- Since:
- 2.2
-
setReplyHeadersConfigurer
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer
- the configurer.- Since:
- 2.2
-
getConsumerProperties
@Nullable public java.util.Properties getConsumerProperties()
Description copied from interface:KafkaListenerEndpoint
Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.id
andclient.id
are ignored.- Specified by:
getConsumerProperties
in interfaceKafkaListenerEndpoint
- Returns:
- the properties.
- See Also:
ConsumerConfig
,KafkaListenerEndpoint.getGroupId()
,KafkaListenerEndpoint.getClientIdPrefix()
-
setConsumerProperties
public void setConsumerProperties(java.util.Properties consumerProperties)
Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.id
andclient.id
are ignored.- Parameters:
consumerProperties
- the properties.- Since:
- 2.1.4
- See Also:
ConsumerConfig
,setGroupId(String)
,setClientIdPrefix(String)
-
isSplitIterables
public boolean isSplitIterables()
Description copied from interface:KafkaListenerEndpoint
When true,Iterable
return results will be split into discrete records.- Specified by:
isSplitIterables
in interfaceKafkaListenerEndpoint
- Returns:
- true to split.
-
setSplitIterables
public void setSplitIterables(boolean splitIterables)
Set to false to disable splittingIterable
reply values into separate records.- Parameters:
splitIterables
- false to disable; default true.- Since:
- 2.3.5
-
getBatchToRecordAdapter
protected BatchToRecordAdapter<K,V> getBatchToRecordAdapter()
-
setBatchToRecordAdapter
public void setBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)
Set aBatchToRecordAdapter
.- Parameters:
batchToRecordAdapter
- the adapter.- Since:
- 2.4.2
-
afterPropertiesSet
public void afterPropertiesSet()
- Specified by:
afterPropertiesSet
in interfaceorg.springframework.beans.factory.InitializingBean
-
setupListenerContainer
public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter)
Description copied from interface:KafkaListenerEndpoint
Setup the specified message listener container with the model defined by this endpoint.This endpoint must provide the requested missing option(s) of the specified container to make it usable. Usually, this is about setting the
queues
and themessageListener
to use but an implementation may override any default setting that was already set.- Specified by:
setupListenerContainer
in interfaceKafkaListenerEndpoint
- Parameters:
listenerContainer
- the listener container to configuremessageConverter
- the message converter - can be null
-
createMessageListener
protected abstract MessagingMessageListenerAdapter<K,V> createMessageListener(MessageListenerContainer container, MessageConverter messageConverter)
Create aMessageListener
that is able to serve this endpoint for the specified container.- Parameters:
container
- theMessageListenerContainer
to create aMessageListener
.messageConverter
- the message converter - may be null.- Returns:
- a
MessageListener
instance.
-
getEndpointDescription
protected java.lang.StringBuilder getEndpointDescription()
Return a description for this endpoint.- Returns:
- a description for this endpoint.
Available to subclasses, for inclusion in their
toString()
result.
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
-