Class AbstractMessageListenerContainer<K,V>
java.lang.Object
org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanNameAware
,DisposableBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Phased
,SmartLifecycle
,GenericMessageListenerContainer<K,
,V> MessageListenerContainer
- Direct Known Subclasses:
ConcurrentMessageListenerContainer
,KafkaMessageListenerContainer
public abstract class AbstractMessageListenerContainer<K,V>
extends Object
implements GenericMessageListenerContainer<K,V>, BeanNameAware, ApplicationEventPublisherAware, ApplicationContextAware
The base implementation for the
MessageListenerContainer
.- Author:
- Gary Russell, Marius Bogoevici, Artem Bilan, Tomaz Fernandes
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ConsumerFactory<K,
V> static final int
The defaultSmartLifecycle
phase for listener containers 2147483547.protected final Object
protected final LogAccessor
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprotected
AbstractMessageListenerContainer
(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the provided factory and properties. -
Method Summary
Modifier and TypeMethodDescriptionvoid
protected void
protected final org.apache.kafka.clients.consumer.ConsumerRebalanceListener
Return default implementation ofConsumerRebalanceListener
instance.protected abstract void
doStart()
protected void
protected abstract void
Stop the container normally or abnormally.AfterRollbackProcessor<? super K,
? super V> Return the currently configuredAfterRollbackProcessor
.protected ApplicationContext
Get the event publisher.protected BatchInterceptor<K,
V> Return the bean name.Get theCommonErrorHandler
.Return the container properties for this container.Deprecated, for removal: This API element is subject to removal in a future version.Return thegroup.id
property for this container whether specifically set on the container or via a consumer property on the consumer factory.Return theKafkaAdmin
, used to find the cluster id for observation, if present.The 'id' attribute of a@KafkaListener
or the bean name for spring-managed containers.byte[]
Get arbitrary static information that will be added to theKafkaHeaders.LISTENER_INFO
header of all records.The 'id' attribute of the main@KafkaListener
container, if this container is for a retry topic; null otherwise.int
getPhase()
protected RecordInterceptor<K,
V> Return the function used to change the consumer thread name.boolean
boolean
Return true if the container should change the consumer thread name during initialization.protected boolean
boolean
isPartitionPauseRequested
(org.apache.kafka.common.TopicPartition topicPartition) Whether or not this topic's partition pause has been requested.protected boolean
isPaused()
boolean
Return true ifMessageListenerContainer.pause()
has been called; the container might not have actually paused yet.boolean
protected boolean
protected AbstractMessageListenerContainer<?,
?> Return this or a parent container if this has a parent.void
pause()
Pause this container before the next poll().void
pausePartition
(org.apache.kafka.common.TopicPartition topicPartition) Pause this partition before the next poll().protected void
void
resume()
Resume this container, if paused, after the next poll().void
resumePartition
(org.apache.kafka.common.TopicPartition topicPartition) Resume this partition, if paused, after the next poll().void
setAfterRollbackProcessor
(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) Set a processor to perform seeks on unprocessed records after a rollback.void
setApplicationContext
(ApplicationContext applicationContext) void
setApplicationEventPublisher
(ApplicationEventPublisher applicationEventPublisher) void
setAutoStartup
(boolean autoStartup) Set the autoStartup.void
setBatchErrorHandler
(BatchErrorHandler errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
void
setBatchInterceptor
(BatchInterceptor<K, V> batchInterceptor) Set an interceptor to be called before calling the record listener.void
setBeanName
(String name) void
setChangeConsumerThreadName
(boolean changeConsumerThreadName) Set to true to instruct the container to change the consumer thread name during initialization.void
setCommonErrorHandler
(CommonErrorHandler commonErrorHandler) Set theCommonErrorHandler
which can handle errors for both record and batch listeners.void
setErrorHandler
(ErrorHandler errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
void
setGenericErrorHandler
(GenericErrorHandler<?> errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
void
setInterceptBeforeTx
(boolean interceptBeforeTx) When false, invoke the interceptor after the transaction starts.void
setKafkaAdmin
(KafkaAdmin kafkaAdmin) Set theKafkaAdmin
, used to find the cluster id for observation, if present.void
setListenerInfo
(byte[] listenerInfo) Set arbitrary information that will be added to theKafkaHeaders.LISTENER_INFO
header of all records.void
Set the main listener id, if this container is for a retry topic.void
setPhase
(int phase) void
setRecordInterceptor
(RecordInterceptor<K, V> recordInterceptor) Set an interceptor to be called before calling the record listener.protected void
setRunning
(boolean running) protected void
setStoppedNormally
(boolean stoppedNormally) void
setThreadNameSupplier
(Function<MessageListenerContainer, String> threadNameSupplier) Set a function used to change the consumer thread name.void
setTopicCheckTimeout
(int topicCheckTimeout) How long to wait forAdmin.describeTopics(Collection)
result futures to complete.void
setupMessageListener
(Object messageListener) Setup the message listener to use.final void
start()
final void
stop()
final void
stop
(boolean wait) Stop the container.void
void
stopAbnormally
(Runnable callback) Stop the container after some exception so thatMessageListenerContainer.isInExpectedState()
will return false.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
childStopped, destroy, getAssignedPartitions, getAssignmentsByClientId, getContainerFor, isChildRunning, isContainerPaused, isInExpectedState, isPartitionPaused, metrics
-
Field Details
-
DEFAULT_PHASE
public static final int DEFAULT_PHASEThe defaultSmartLifecycle
phase for listener containers 2147483547.- See Also:
-
logger
-
consumerFactory
-
lifecycleMonitor
-
-
Constructor Details
-
AbstractMessageListenerContainer
protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the provided factory and properties.- Parameters:
consumerFactory
- the factory.containerProperties
- the properties.
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContext
in interfaceApplicationContextAware
- Throws:
BeansException
-
getApplicationContext
-
setBeanName
- Specified by:
setBeanName
in interfaceBeanNameAware
-
getBeanName
Return the bean name.- Returns:
- the bean name.
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisher
in interfaceApplicationEventPublisherAware
-
getApplicationEventPublisher
Get the event publisher.- Returns:
- the publisher
-
setErrorHandler
Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
- See Also:
-
setGenericErrorHandler
@Deprecated(since="2.8", forRemoval=true) public void setGenericErrorHandler(@Nullable GenericErrorHandler<?> errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
- See Also:
-
setBatchErrorHandler
@Deprecated(since="2.8", forRemoval=true) public void setBatchErrorHandler(BatchErrorHandler errorHandler) Deprecated, for removal: This API element is subject to removal in a future version.in favor ofsetCommonErrorHandler(CommonErrorHandler)
Set the batch error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
- See Also:
-
getGenericErrorHandler
@Deprecated(since="2.8", forRemoval=true) @Nullable public GenericErrorHandler<?> getGenericErrorHandler()Deprecated, for removal: This API element is subject to removal in a future version.in favor ofgetCommonErrorHandler()
Get the configured error handler.- Returns:
- the error handler.
- Since:
- 2.2
- See Also:
-
getCommonErrorHandler
Get theCommonErrorHandler
.- Returns:
- the handler.
- Since:
- 2.8
-
setCommonErrorHandler
Set theCommonErrorHandler
which can handle errors for both record and batch listeners. Replaces the use ofGenericErrorHandler
s.- Parameters:
commonErrorHandler
- the handler.- Since:
- 2.8
-
isStoppedNormally
protected boolean isStoppedNormally() -
setStoppedNormally
protected void setStoppedNormally(boolean stoppedNormally) -
isAutoStartup
public boolean isAutoStartup()- Specified by:
isAutoStartup
in interfaceSmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup) Description copied from interface:MessageListenerContainer
Set the autoStartup.- Specified by:
setAutoStartup
in interfaceMessageListenerContainer
- Parameters:
autoStartup
- the autoStartup to set.- See Also:
-
setRunning
protected void setRunning(boolean running) -
isRunning
public boolean isRunning() -
isPaused
protected boolean isPaused() -
isPartitionPauseRequested
public boolean isPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Whether or not this topic's partition pause has been requested.- Specified by:
isPartitionPauseRequested
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topic partition to check- Returns:
- true if pause for this TopicPartition has been requested
-
pausePartition
public void pausePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Pause this partition before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausePartition
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topicPartition to pause.
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Resume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartition
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topicPartition to resume.
-
isPauseRequested
public boolean isPauseRequested()Description copied from interface:MessageListenerContainer
Return true ifMessageListenerContainer.pause()
has been called; the container might not have actually paused yet.- Specified by:
isPauseRequested
in interfaceMessageListenerContainer
- Returns:
- true if pause has been requested.
-
setPhase
public void setPhase(int phase) -
getPhase
public int getPhase()- Specified by:
getPhase
in interfacePhased
- Specified by:
getPhase
in interfaceSmartLifecycle
-
getAfterRollbackProcessor
Return the currently configuredAfterRollbackProcessor
.- Returns:
- the after rollback processor.
- Since:
- 2.2.14
-
setAfterRollbackProcessor
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) Set a processor to perform seeks on unprocessed records after a rollback. Default will seek to current position all topics/partitions, including the failed record.- Parameters:
afterRollbackProcessor
- the processor.- Since:
- 1.3.5
-
getContainerProperties
Description copied from interface:MessageListenerContainer
Return the container properties for this container.- Specified by:
getContainerProperties
in interfaceMessageListenerContainer
- Returns:
- the properties.
-
getGroupId
Description copied from interface:MessageListenerContainer
Return thegroup.id
property for this container whether specifically set on the container or via a consumer property on the consumer factory.- Specified by:
getGroupId
in interfaceMessageListenerContainer
- Returns:
- the group id.
-
getListenerId
Description copied from interface:MessageListenerContainer
The 'id' attribute of a@KafkaListener
or the bean name for spring-managed containers.- Specified by:
getListenerId
in interfaceMessageListenerContainer
- Returns:
- the id or bean name.
-
setMainListenerId
Set the main listener id, if this container is for a retry topic.- Parameters:
id
- the id.- Since:
- 3.0.
-
getMainListenerId
Description copied from interface:MessageListenerContainer
The 'id' attribute of the main@KafkaListener
container, if this container is for a retry topic; null otherwise.- Specified by:
getMainListenerId
in interfaceMessageListenerContainer
- Returns:
- the id.
-
getListenerInfo
Description copied from interface:MessageListenerContainer
Get arbitrary static information that will be added to theKafkaHeaders.LISTENER_INFO
header of all records.- Specified by:
getListenerInfo
in interfaceMessageListenerContainer
- Returns:
- the info.
-
setListenerInfo
Set arbitrary information that will be added to theKafkaHeaders.LISTENER_INFO
header of all records.- Parameters:
listenerInfo
- the info.- Since:
- 2.8.4
-
setTopicCheckTimeout
public void setTopicCheckTimeout(int topicCheckTimeout) How long to wait forAdmin.describeTopics(Collection)
result futures to complete.- Parameters:
topicCheckTimeout
- the timeout in seconds; default 30.- Since:
- 2.3
-
isChangeConsumerThreadName
public boolean isChangeConsumerThreadName()Return true if the container should change the consumer thread name during initialization.- Returns:
- true to change.
- Since:
- 3.0.1
-
setChangeConsumerThreadName
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) Set to true to instruct the container to change the consumer thread name during initialization.- Parameters:
changeConsumerThreadName
- true to change.- Since:
- 3.0.1
- See Also:
-
getThreadNameSupplier
Return the function used to change the consumer thread name.- Returns:
- the function.
- Since:
- 3.0.1
-
setThreadNameSupplier
Set a function used to change the consumer thread name. The default returns the containerlistenerId
.- Parameters:
threadNameSupplier
- the function.- Since:
- 3.0.1
- See Also:
-
getKafkaAdmin
Return theKafkaAdmin
, used to find the cluster id for observation, if present.- Returns:
- the kafkaAdmin
- Since:
- 3.0.5
-
setKafkaAdmin
Set theKafkaAdmin
, used to find the cluster id for observation, if present.- Parameters:
kafkaAdmin
- the admin.
-
getRecordInterceptor
-
setRecordInterceptor
Set an interceptor to be called before calling the record listener. Does not apply to batch listeners.- Parameters:
recordInterceptor
- the interceptor.- Since:
- 2.2.7
- See Also:
-
getBatchInterceptor
-
setBatchInterceptor
Set an interceptor to be called before calling the record listener.- Parameters:
batchInterceptor
- the interceptor.- Since:
- 2.6.6
- See Also:
-
isInterceptBeforeTx
protected boolean isInterceptBeforeTx() -
setInterceptBeforeTx
public void setInterceptBeforeTx(boolean interceptBeforeTx) When false, invoke the interceptor after the transaction starts.- Parameters:
interceptBeforeTx
- false to intercept within the transaction. Default true since 2.8.- Since:
- 2.3.4
- See Also:
-
setupMessageListener
Description copied from interface:MessageListenerContainer
Setup the message listener to use. Throws anIllegalArgumentException
if that message listener type is not supported.- Specified by:
setupMessageListener
in interfaceMessageListenerContainer
- Parameters:
messageListener
- theobject
to wrapped to theMessageListener
.
-
start
public final void start() -
checkTopics
protected void checkTopics() -
checkGroupId
public void checkGroupId() -
doStart
protected abstract void doStart() -
stop
public final void stop() -
stop
public final void stop(boolean wait) Stop the container.- Parameters:
wait
- wait for the listener to terminate.- Since:
- 2.3.8
-
pause
public void pause()Description copied from interface:MessageListenerContainer
Pause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pause
in interfaceMessageListenerContainer
- See Also:
-
KafkaConsumer.pause(Collection)
-
resume
public void resume()Description copied from interface:MessageListenerContainer
Resume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resume
in interfaceMessageListenerContainer
- See Also:
-
KafkaConsumer.resume(Collection)
-
stop
- Specified by:
stop
in interfaceSmartLifecycle
-
stopAbnormally
Description copied from interface:MessageListenerContainer
Stop the container after some exception so thatMessageListenerContainer.isInExpectedState()
will return false.- Specified by:
stopAbnormally
in interfaceMessageListenerContainer
- Parameters:
callback
- the callback.- See Also:
-
doStop
-
doStop
Stop the container normally or abnormally.- Parameters:
callback
- the callback.normal
- true for an expected stop.- Since:
- 2.8
-
createSimpleLoggingConsumerRebalanceListener
protected final org.apache.kafka.clients.consumer.ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener()Return default implementation ofConsumerRebalanceListener
instance.- Returns:
- the
ConsumerRebalanceListener
currently assigned to this container.
-
publishContainerStoppedEvent
protected void publishContainerStoppedEvent() -
parentOrThis
Return this or a parent container if this has a parent.- Returns:
- the parent or this.
- Since:
- 2.2.1
-
getCommonErrorHandler()