Class AbstractMessageListenerContainer<K,V>
java.lang.Object
org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanNameAware,DisposableBean,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Phased,SmartLifecycle,GenericMessageListenerContainer<K,,V> MessageListenerContainer
- Direct Known Subclasses:
ConcurrentMessageListenerContainer,KafkaMessageListenerContainer
public abstract class AbstractMessageListenerContainer<K,V>
extends Object
implements GenericMessageListenerContainer<K,V>, BeanNameAware, ApplicationEventPublisherAware, ApplicationContextAware
The base implementation for the
MessageListenerContainer.- Author:
- Gary Russell, Marius Bogoevici, Artem Bilan, Tomaz Fernandes, Wang Zhiyang, Soby Chacko, Sanghyeok An, Lokesh Alamuri
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ConsumerFactory<K,V> static final intThe defaultSmartLifecyclephase for listener containers 2147483547.protected final AtomicBooleanprotected final ReentrantLockprotected final LogAccessor -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedAbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the provided factory and properties. -
Method Summary
Modifier and TypeMethodDescriptionvoidprotected voidprotected final org.apache.kafka.clients.consumer.ConsumerRebalanceListenerReturn default implementation ofConsumerRebalanceListenerinstance.protected abstract voiddoStart()protected voidprotected abstract voidStop the container normally or abnormally.AfterRollbackProcessor<? super K,? super V> Return the currently configuredAfterRollbackProcessor.protected ApplicationContextGet the event publisher.protected BatchInterceptor<K,V> Return the bean name.Get theCommonErrorHandler.Return the container properties for this container.Return thegroup.idproperty for this container whether specifically set on the container or via a consumer property on the consumer factory.Return theKafkaAdmin, used to find the cluster id for observation, if present.The 'id' attribute of a@KafkaListeneror the bean name for spring-managed containers.byte[]Get arbitrary static information that will be added to theKafkaHeaders.LISTENER_INFOheader of all records.The 'id' attribute of the main@KafkaListenercontainer, if this container is for a retry topic; null otherwise.intgetPhase()protected RecordInterceptor<K,V> Return the function used to change the consumer thread name.booleanbooleanReturn true if the container should change the consumer thread name during initialization.protected booleanbooleanisPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition) Whether or not this topic's partition pause has been requested.protected booleanisPaused()Deprecated, for removal: This API element is subject to removal in a future version.booleanReturn true ifMessageListenerContainer.pause()has been called; the container might not have actually paused yet.booleanprotected booleanprotected AbstractMessageListenerContainer<?,?> Return this or a parent container if this has a parent.voidpause()Pause this container before the next poll().voidpausePartition(org.apache.kafka.common.TopicPartition topicPartition) Pause this partition before the next poll().protected PropertiesMake any default consumer override properties explicit properties.protected voidvoidresume()Resume this container, if paused, after the next poll().voidresumePartition(org.apache.kafka.common.TopicPartition topicPartition) Resume this partition, if paused, after the next poll().voidsetAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) Set a processor to perform seeks on unprocessed records after a rollback.voidsetApplicationContext(ApplicationContext applicationContext) voidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) voidsetAutoStartup(boolean autoStartup) Set the autoStartup.voidsetBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) Set an interceptor to be called before calling the record listener.voidsetBeanName(String name) voidsetChangeConsumerThreadName(boolean changeConsumerThreadName) Set to true to instruct the container to change the consumer thread name during initialization.voidsetCommonErrorHandler(CommonErrorHandler commonErrorHandler) Set theCommonErrorHandlerwhich can handle errors for both record and batch listeners.protected voidsetFenced(boolean fenced) voidsetInterceptBeforeTx(boolean interceptBeforeTx) When false, invoke the interceptor after the transaction starts.voidsetKafkaAdmin(KafkaAdmin kafkaAdmin) Set theKafkaAdmin, used to find the cluster id for observation, if present.voidsetListenerInfo(byte[] listenerInfo) Set arbitrary information that will be added to theKafkaHeaders.LISTENER_INFOheader of all records.voidSet the main listener id, if this container is for a retry topic.voidsetPhase(int phase) voidsetRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) Set an interceptor to be called before calling the record listener.protected voidsetRunning(boolean running) protected voidsetStoppedNormally(boolean stoppedNormally) voidsetThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) Set a function used to change the consumer thread name.voidsetTopicCheckTimeout(int topicCheckTimeout) How long to wait forAdmin.describeTopics(Collection)result futures to complete.voidsetupMessageListener(Object messageListener) Setup the message listener to use.final voidstart()final voidstop()final voidstop(boolean wait) Stop the container.voidvoidstopAbnormally(Runnable callback) Stop the container after some exception so thatMessageListenerContainer.isInExpectedState()will return false.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
childStarted, childStopped, destroy, enforceRebalance, getAssignedPartitions, getAssignmentsByClientId, getContainerFor, isChildRunning, isContainerPaused, isInExpectedState, isPartitionPaused, metrics
-
Field Details
-
DEFAULT_PHASE
public static final int DEFAULT_PHASEThe defaultSmartLifecyclephase for listener containers 2147483547.- See Also:
-
logger
-
consumerFactory
-
lifecycleLock
-
enforceRebalanceRequested
-
-
Constructor Details
-
AbstractMessageListenerContainer
protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the provided factory and properties.- Parameters:
consumerFactory- the factory.containerProperties- the properties.
-
-
Method Details
-
setApplicationContext
- Specified by:
setApplicationContextin interfaceApplicationContextAware- Throws:
BeansException
-
getApplicationContext
-
setBeanName
- Specified by:
setBeanNamein interfaceBeanNameAware
-
getBeanName
Return the bean name.- Returns:
- the bean name.
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisherin interfaceApplicationEventPublisherAware
-
getApplicationEventPublisher
Get the event publisher.- Returns:
- the publisher
-
getCommonErrorHandler
Get theCommonErrorHandler.- Returns:
- the handler.
- Since:
- 2.8
-
setCommonErrorHandler
Set theCommonErrorHandlerwhich can handle errors for both record and batch listeners.- Parameters:
commonErrorHandler- the handler.- Since:
- 2.8
-
isStoppedNormally
protected boolean isStoppedNormally() -
setStoppedNormally
protected void setStoppedNormally(boolean stoppedNormally) -
isAutoStartup
public boolean isAutoStartup()- Specified by:
isAutoStartupin interfaceSmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup) Description copied from interface:MessageListenerContainerSet the autoStartup.- Specified by:
setAutoStartupin interfaceMessageListenerContainer- Parameters:
autoStartup- the autoStartup to set.- See Also:
-
setRunning
protected void setRunning(boolean running) -
isRunning
public boolean isRunning() -
setFenced
protected void setFenced(boolean fenced) -
isPaused
Deprecated, for removal: This API element is subject to removal in a future version. -
isPartitionPauseRequested
public boolean isPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainerWhether or not this topic's partition pause has been requested.- Specified by:
isPartitionPauseRequestedin interfaceMessageListenerContainer- Parameters:
topicPartition- the topic partition to check- Returns:
- true if pause for this TopicPartition has been requested
-
pausePartition
public void pausePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainerPause this partition before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausePartitionin interfaceMessageListenerContainer- Parameters:
topicPartition- the topicPartition to pause.
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainerResume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartitionin interfaceMessageListenerContainer- Parameters:
topicPartition- the topicPartition to resume.
-
isPauseRequested
public boolean isPauseRequested()Description copied from interface:MessageListenerContainerReturn true ifMessageListenerContainer.pause()has been called; the container might not have actually paused yet.- Specified by:
isPauseRequestedin interfaceMessageListenerContainer- Returns:
- true if pause has been requested.
-
setPhase
public void setPhase(int phase) -
getPhase
public int getPhase()- Specified by:
getPhasein interfacePhased- Specified by:
getPhasein interfaceSmartLifecycle
-
getAfterRollbackProcessor
Return the currently configuredAfterRollbackProcessor.- Returns:
- the after rollback processor.
- Since:
- 2.2.14
-
setAfterRollbackProcessor
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) Set a processor to perform seeks on unprocessed records after a rollback. Default will seek to current position all topics/partitions, including the failed record.- Parameters:
afterRollbackProcessor- the processor.- Since:
- 1.3.5
-
getContainerProperties
Description copied from interface:MessageListenerContainerReturn the container properties for this container.- Specified by:
getContainerPropertiesin interfaceMessageListenerContainer- Returns:
- the properties.
-
getGroupId
Description copied from interface:MessageListenerContainerReturn thegroup.idproperty for this container whether specifically set on the container or via a consumer property on the consumer factory.- Specified by:
getGroupIdin interfaceMessageListenerContainer- Returns:
- the group id.
-
getListenerId
Description copied from interface:MessageListenerContainerThe 'id' attribute of a@KafkaListeneror the bean name for spring-managed containers.- Specified by:
getListenerIdin interfaceMessageListenerContainer- Returns:
- the id or bean name.
-
setMainListenerId
Set the main listener id, if this container is for a retry topic.- Parameters:
id- the id.- Since:
- 3.0.
-
getMainListenerId
Description copied from interface:MessageListenerContainerThe 'id' attribute of the main@KafkaListenercontainer, if this container is for a retry topic; null otherwise.- Specified by:
getMainListenerIdin interfaceMessageListenerContainer- Returns:
- the id.
-
getListenerInfo
Description copied from interface:MessageListenerContainerGet arbitrary static information that will be added to theKafkaHeaders.LISTENER_INFOheader of all records.- Specified by:
getListenerInfoin interfaceMessageListenerContainer- Returns:
- the info.
-
setListenerInfo
Set arbitrary information that will be added to theKafkaHeaders.LISTENER_INFOheader of all records.- Parameters:
listenerInfo- the info.- Since:
- 2.8.4
-
setTopicCheckTimeout
public void setTopicCheckTimeout(int topicCheckTimeout) How long to wait forAdmin.describeTopics(Collection)result futures to complete.- Parameters:
topicCheckTimeout- the timeout in seconds; default 30.- Since:
- 2.3
-
isChangeConsumerThreadName
public boolean isChangeConsumerThreadName()Return true if the container should change the consumer thread name during initialization.- Returns:
- true to change.
- Since:
- 3.0.1
-
setChangeConsumerThreadName
public void setChangeConsumerThreadName(boolean changeConsumerThreadName) Set to true to instruct the container to change the consumer thread name during initialization.- Parameters:
changeConsumerThreadName- true to change.- Since:
- 3.0.1
- See Also:
-
getThreadNameSupplier
Return the function used to change the consumer thread name.- Returns:
- the function.
- Since:
- 3.0.1
-
setThreadNameSupplier
Set a function used to change the consumer thread name. The default returns the containerlistenerId.- Parameters:
threadNameSupplier- the function.- Since:
- 3.0.1
- See Also:
-
getKafkaAdmin
Return theKafkaAdmin, used to find the cluster id for observation, if present.- Returns:
- the kafkaAdmin
- Since:
- 3.0.5
-
setKafkaAdmin
Set theKafkaAdmin, used to find the cluster id for observation, if present.- Parameters:
kafkaAdmin- the admin.
-
getRecordInterceptor
-
setRecordInterceptor
Set an interceptor to be called before calling the record listener. Does not apply to batch listeners.- Parameters:
recordInterceptor- the interceptor.- Since:
- 2.2.7
- See Also:
-
getBatchInterceptor
-
setBatchInterceptor
Set an interceptor to be called before calling the record listener.- Parameters:
batchInterceptor- the interceptor.- Since:
- 2.6.6
- See Also:
-
isInterceptBeforeTx
protected boolean isInterceptBeforeTx() -
setInterceptBeforeTx
public void setInterceptBeforeTx(boolean interceptBeforeTx) When false, invoke the interceptor after the transaction starts.- Parameters:
interceptBeforeTx- false to intercept within the transaction. Default true since 2.8.- Since:
- 2.3.4
- See Also:
-
setupMessageListener
Description copied from interface:MessageListenerContainerSetup the message listener to use. Throws anIllegalArgumentExceptionif that message listener type is not supported.- Specified by:
setupMessageListenerin interfaceMessageListenerContainer- Parameters:
messageListener- theobjectto wrapped to theMessageListener.
-
start
public final void start() -
checkTopics
protected void checkTopics() -
checkGroupId
public void checkGroupId() -
doStart
protected abstract void doStart() -
stop
public final void stop() -
stop
public final void stop(boolean wait) Stop the container.- Parameters:
wait- wait for the listener to terminate.- Since:
- 2.3.8
-
pause
public void pause()Description copied from interface:MessageListenerContainerPause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausein interfaceMessageListenerContainer- See Also:
-
KafkaConsumer.pause(Collection)
-
resume
public void resume()Description copied from interface:MessageListenerContainerResume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resumein interfaceMessageListenerContainer- See Also:
-
KafkaConsumer.resume(Collection)
-
stop
- Specified by:
stopin interfaceSmartLifecycle
-
stopAbnormally
Description copied from interface:MessageListenerContainerStop the container after some exception so thatMessageListenerContainer.isInExpectedState()will return false.- Specified by:
stopAbnormallyin interfaceMessageListenerContainer- Parameters:
callback- the callback.- See Also:
-
doStop
-
doStop
Stop the container normally or abnormally.- Parameters:
callback- the callback.normal- true for an expected stop.- Since:
- 2.8
-
createSimpleLoggingConsumerRebalanceListener
protected final org.apache.kafka.clients.consumer.ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener()Return default implementation ofConsumerRebalanceListenerinstance.- Returns:
- the
ConsumerRebalanceListenercurrently assigned to this container.
-
publishContainerStoppedEvent
protected void publishContainerStoppedEvent() -
parentOrThis
Return this or a parent container if this has a parent.- Returns:
- the parent or this.
- Since:
- 2.2.1
-
propertiesFromConsumerPropertyOverrides
Make any default consumer override properties explicit properties.- Returns:
- the properties.
- Since:
- 2.9.11
-