Package org.springframework.kafka.config
Class AbstractKafkaListenerEndpoint<K,V>
java.lang.Object
org.springframework.kafka.config.AbstractKafkaListenerEndpoint<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,InitializingBean
,KafkaListenerEndpoint
- Direct Known Subclasses:
MethodKafkaListenerEndpoint
public abstract class AbstractKafkaListenerEndpoint<K,V>
extends Object
implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean
Base model for a Kafka listener endpoint.
- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
protected abstract MessagingMessageListenerAdapter<K,
V> createMessageListener
(MessageListenerContainer container, MessageConverter messageConverter) Create aMessageListener
that is able to serve this endpoint for the specified container.Return the autoStartup for this endpoint's container.Return the current batch listener flag for this endpoint, or null if not explicitly set.protected BatchToRecordAdapter<K,
V> protected BeanExpressionContext
protected BeanFactory
protected BeanResolver
Return the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.Return the concurrency for this endpoint's container.Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.protected StringBuilder
Return a description for this endpoint.getGroup()
Return the group of this endpoint or null if not in a group.Return the groupId of this endpoint - if present, overrides thegroup.id
property of the consumer factory.getId()
Return the id of this endpoint.byte[]
Get the listener info to insert in the record header.Return the main listener id if this container is for a retry topic.protected RecordFilterStrategy<? super K,
? super V> protected KafkaTemplate<?,
?> protected BeanExpressionResolver
Return the topicPartitions for this endpoint.Return the topicPattern for this endpoint.Return the topics for this endpoint.protected boolean
boolean
Return true if this endpoint creates a batch listener.boolean
When true,Iterable
return results will be split into discrete records.void
setAckDiscarded
(boolean ackDiscarded) Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)
is in use.void
setAutoStartup
(Boolean autoStartup) Set the autoStartup for this endpoint's container.void
setBatchListener
(boolean batchListener) Set to true if this endpoint should create a batch listener.void
setBatchToRecordAdapter
(BatchToRecordAdapter<K, V> batchToRecordAdapter) Set aBatchToRecordAdapter
.void
setBeanFactory
(BeanFactory beanFactory) void
setClientIdPrefix
(String clientIdPrefix) Set the client id prefix; overrides the client id in the consumer configuration properties.void
setConcurrency
(Integer concurrency) Set the concurrency for this endpoint's container.void
setConsumerProperties
(Properties consumerProperties) Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.void
setCorrelationHeaderName
(String correlationHeaderName) Set a custom header name for the correlation id.void
Set the group for the corresponding listener container.void
setGroupId
(String groupId) Set the group id to override thegroup.id
property in the ContainerFactory.void
void
setListenerInfo
(byte[] listenerInfo) Set the listener info to insert in the record header.void
void
setRecordFilterStrategy
(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) Set aRecordFilterStrategy
implementation.void
setReplyHeadersConfigurer
(ReplyHeadersConfigurer replyHeadersConfigurer) Set a configurer which will be invoked when creating a reply message.void
setReplyTemplate
(KafkaTemplate<?, ?> replyTemplate) Set theKafkaTemplate
to use to send replies.void
setSplitIterables
(boolean splitIterables) Set to false to disable splittingIterable
reply values into separate records.void
setTopicPartitions
(TopicPartitionOffset... topicPartitions) Set the topicPartitions to use.void
setTopicPattern
(Pattern topicPattern) Set the topic pattern to use.void
Set the topics to use.void
setupListenerContainer
(MessageListenerContainer listenerContainer, MessageConverter messageConverter) Setup the specified message listener container with the model defined by this endpoint.toString()
-
Constructor Details
-
AbstractKafkaListenerEndpoint
public AbstractKafkaListenerEndpoint()
-
-
Method Details
-
setBeanFactory
- Specified by:
setBeanFactory
in interfaceBeanFactoryAware
- Throws:
BeansException
-
getBeanFactory
-
getResolver
-
getBeanExpressionContext
-
getBeanResolver
-
setId
-
setMainListenerId
-
getMainListenerId
Description copied from interface:KafkaListenerEndpoint
Return the main listener id if this container is for a retry topic.- Specified by:
getMainListenerId
in interfaceKafkaListenerEndpoint
- Returns:
- the main listener id or null.
-
getId
Description copied from interface:KafkaListenerEndpoint
Return the id of this endpoint.- Specified by:
getId
in interfaceKafkaListenerEndpoint
- Returns:
- the id of this endpoint. The id can be further qualified when the endpoint is resolved against its actual listener container.
- See Also:
-
setGroupId
Set the group id to override thegroup.id
property in the ContainerFactory.- Parameters:
groupId
- the group id.- Since:
- 1.3
-
getGroupId
Description copied from interface:KafkaListenerEndpoint
Return the groupId of this endpoint - if present, overrides thegroup.id
property of the consumer factory.- Specified by:
getGroupId
in interfaceKafkaListenerEndpoint
- Returns:
- the group id; may be null.
-
setTopics
Set the topics to use. Either these or 'topicPattern' or 'topicPartitions' should be provided, but not a mixture.- Parameters:
topics
- to set.- See Also:
-
getTopics
Return the topics for this endpoint.- Specified by:
getTopics
in interfaceKafkaListenerEndpoint
- Returns:
- the topics for this endpoint.
-
setTopicPartitions
Set the topicPartitions to use. Either this or 'topic' or 'topicPattern' should be provided, but not a mixture.- Parameters:
topicPartitions
- to set.- Since:
- 2.3
- See Also:
-
getTopicPartitionsToAssign
Return the topicPartitions for this endpoint.- Specified by:
getTopicPartitionsToAssign
in interfaceKafkaListenerEndpoint
- Returns:
- the topicPartitions for this endpoint.
- Since:
- 2.3
-
setTopicPattern
Set the topic pattern to use. Cannot be used with topics or topicPartitions.- Parameters:
topicPattern
- the pattern- See Also:
-
getTopicPattern
Return the topicPattern for this endpoint.- Specified by:
getTopicPattern
in interfaceKafkaListenerEndpoint
- Returns:
- the topicPattern for this endpoint.
-
getGroup
Description copied from interface:KafkaListenerEndpoint
Return the group of this endpoint or null if not in a group.- Specified by:
getGroup
in interfaceKafkaListenerEndpoint
- Returns:
- the group of this endpoint or null if not in a group.
-
setGroup
Set the group for the corresponding listener container.- Parameters:
group
- the group.
-
isBatchListener
public boolean isBatchListener()Return true if this endpoint creates a batch listener.- Returns:
- true for a batch listener.
- Since:
- 1.1
-
getBatchListener
Return the current batch listener flag for this endpoint, or null if not explicitly set.- Specified by:
getBatchListener
in interfaceKafkaListenerEndpoint
- Returns:
- the batch listener flag.
- Since:
- 2.8
-
setBatchListener
public void setBatchListener(boolean batchListener) Set to true if this endpoint should create a batch listener.- Parameters:
batchListener
- true for a batch listener.- Since:
- 1.1
-
setReplyTemplate
Set theKafkaTemplate
to use to send replies.- Parameters:
replyTemplate
- the template.- Since:
- 2.0
-
getReplyTemplate
-
getRecordFilterStrategy
-
setRecordFilterStrategy
Set aRecordFilterStrategy
implementation.- Parameters:
recordFilterStrategy
- the strategy implementation.
-
isAckDiscarded
protected boolean isAckDiscarded() -
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded) Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)
is in use.- Parameters:
ackDiscarded
- the ackDiscarded.
-
getClientIdPrefix
Description copied from interface:KafkaListenerEndpoint
Return the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.- Specified by:
getClientIdPrefix
in interfaceKafkaListenerEndpoint
- Returns:
- the client id prefix.
-
setClientIdPrefix
Set the client id prefix; overrides the client id in the consumer configuration properties.- Parameters:
clientIdPrefix
- the prefix.- Since:
- 2.1.1
-
getConcurrency
Description copied from interface:KafkaListenerEndpoint
Return the concurrency for this endpoint's container.- Specified by:
getConcurrency
in interfaceKafkaListenerEndpoint
- Returns:
- the concurrency.
-
setConcurrency
Set the concurrency for this endpoint's container.- Parameters:
concurrency
- the concurrency.- Since:
- 2.2
-
getAutoStartup
Description copied from interface:KafkaListenerEndpoint
Return the autoStartup for this endpoint's container.- Specified by:
getAutoStartup
in interfaceKafkaListenerEndpoint
- Returns:
- the autoStartup.
-
setAutoStartup
Set the autoStartup for this endpoint's container.- Parameters:
autoStartup
- the autoStartup.- Since:
- 2.2
-
setReplyHeadersConfigurer
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer
- the configurer.- Since:
- 2.2
-
getConsumerProperties
Description copied from interface:KafkaListenerEndpoint
Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.id
andclient.id
are ignored.- Specified by:
getConsumerProperties
in interfaceKafkaListenerEndpoint
- Returns:
- the properties.
- See Also:
-
setConsumerProperties
Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.id
andclient.id
are ignored.- Parameters:
consumerProperties
- the properties.- Since:
- 2.1.4
- See Also:
-
ConsumerConfig
setGroupId(String)
setClientIdPrefix(String)
-
isSplitIterables
public boolean isSplitIterables()Description copied from interface:KafkaListenerEndpoint
When true,Iterable
return results will be split into discrete records.- Specified by:
isSplitIterables
in interfaceKafkaListenerEndpoint
- Returns:
- true to split.
-
setSplitIterables
public void setSplitIterables(boolean splitIterables) Set to false to disable splittingIterable
reply values into separate records.- Parameters:
splitIterables
- false to disable; default true.- Since:
- 2.3.5
-
getListenerInfo
Description copied from interface:KafkaListenerEndpoint
Get the listener info to insert in the record header.- Specified by:
getListenerInfo
in interfaceKafkaListenerEndpoint
- Returns:
- the info.
-
setListenerInfo
Set the listener info to insert in the record header.- Parameters:
listenerInfo
- the info.- Since:
- 2.8.4
-
getBatchToRecordAdapter
-
setBatchToRecordAdapter
Set aBatchToRecordAdapter
.- Parameters:
batchToRecordAdapter
- the adapter.- Since:
- 2.4.2
-
setCorrelationHeaderName
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID
. This header will be echoed back in any reply message.- Parameters:
correlationHeaderName
- the header name.- Since:
- 3.0
-
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSet
in interfaceInitializingBean
-
setupListenerContainer
public void setupListenerContainer(MessageListenerContainer listenerContainer, @Nullable MessageConverter messageConverter) Description copied from interface:KafkaListenerEndpoint
Setup the specified message listener container with the model defined by this endpoint.This endpoint must provide the requested missing option(s) of the specified container to make it usable. Usually, this is about setting the
queues
and themessageListener
to use but an implementation may override any default setting that was already set.- Specified by:
setupListenerContainer
in interfaceKafkaListenerEndpoint
- Parameters:
listenerContainer
- the listener container to configuremessageConverter
- the message converter - can be null
-
createMessageListener
protected abstract MessagingMessageListenerAdapter<K,V> createMessageListener(MessageListenerContainer container, @Nullable MessageConverter messageConverter) Create aMessageListener
that is able to serve this endpoint for the specified container.- Parameters:
container
- theMessageListenerContainer
to create aMessageListener
.messageConverter
- the message converter - may be null.- Returns:
- a
MessageListener
instance.
-
getEndpointDescription
Return a description for this endpoint.- Returns:
- a description for this endpoint.
Available to subclasses, for inclusion in their
toString()
result.
-
toString
-