Class AbstractMessageListenerContainer<K,V>
- java.lang.Object
-
- org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
-
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware
,org.springframework.beans.factory.BeanNameAware
,org.springframework.context.ApplicationContextAware
,org.springframework.context.ApplicationEventPublisherAware
,org.springframework.context.Lifecycle
,org.springframework.context.Phased
,org.springframework.context.SmartLifecycle
,GenericMessageListenerContainer<K,V>
,MessageListenerContainer
- Direct Known Subclasses:
ConcurrentMessageListenerContainer
,KafkaMessageListenerContainer
public abstract class AbstractMessageListenerContainer<K,V> extends java.lang.Object implements GenericMessageListenerContainer<K,V>, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationEventPublisherAware, org.springframework.context.ApplicationContextAware
The base implementation for theMessageListenerContainer
.- Author:
- Gary Russell, Marius Bogoevici, Artem Bilan, Tomaz Fernandes
-
-
Field Summary
Fields Modifier and Type Field Description protected ConsumerFactory<K,V>
consumerFactory
static int
DEFAULT_PHASE
The defaultSmartLifecycle
phase for listener containers 2147483547.protected java.lang.Object
lifecycleMonitor
protected org.springframework.core.log.LogAccessor
logger
-
Constructor Summary
Constructors Modifier Constructor Description protected
AbstractMessageListenerContainer(ConsumerFactory<? super K,? super V> consumerFactory, ContainerProperties containerProperties)
Construct an instance with the provided factory and properties.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
checkGroupId()
protected void
checkTopics()
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener
createSimpleLoggingConsumerRebalanceListener()
Return default implementation ofConsumerRebalanceListener
instance.protected abstract void
doStart()
protected abstract void
doStop(java.lang.Runnable callback)
AfterRollbackProcessor<? super K,? super V>
getAfterRollbackProcessor()
Return the currently configuredAfterRollbackProcessor
.protected org.springframework.context.ApplicationContext
getApplicationContext()
org.springframework.context.ApplicationEventPublisher
getApplicationEventPublisher()
Get the event publisher.protected BatchInterceptor<K,V>
getBatchInterceptor()
java.lang.String
getBeanName()
Return the bean name.CommonErrorHandler
getCommonErrorHandler()
Get theCommonErrorHandler
.ContainerProperties
getContainerProperties()
Return the container properties for this container.GenericErrorHandler<?>
getGenericErrorHandler()
Get the configured error handler.java.lang.String
getGroupId()
Return thegroup.id
property for this container whether specifically set on the container or via a consumer property on the consumer factory.java.lang.String
getListenerId()
The 'id' attribute of a@KafkaListener
or the bean name for spring-managed containers.int
getPhase()
protected RecordInterceptor<K,V>
getRecordInterceptor()
boolean
isAutoStartup()
protected boolean
isInterceptBeforeTx()
boolean
isPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition)
Whether or not this topic's partition pause has been requested.protected boolean
isPaused()
boolean
isPauseRequested()
Return true ifMessageListenerContainer.pause()
has been called; the container might not have actually paused yet.boolean
isRunning()
protected AbstractMessageListenerContainer<?,?>
parentOrThis()
Return this or a parent container if this has a parent.void
pause()
Pause this container before the next poll().void
pausePartition(org.apache.kafka.common.TopicPartition topicPartition)
Pause this partition before the next poll().protected void
publishContainerStoppedEvent()
void
resume()
Resume this container, if paused, after the next poll().void
resumePartition(org.apache.kafka.common.TopicPartition topicPartition)
Resume this partition, if paused, after the next poll().void
setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
Set a processor to perform seeks on unprocessed records after a rollback.void
setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
void
setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
void
setAutoStartup(boolean autoStartup)
Set the autoStartup.void
setBatchErrorHandler(BatchErrorHandler errorHandler)
Set the batch error handler to call when the listener throws an exception.void
setBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)
Set an interceptor to be called before calling the record listener.void
setBeanName(java.lang.String name)
void
setCommonErrorHandler(CommonErrorHandler commonErrorHandler)
Set theCommonErrorHandler
which can handle errors for both record and batch listeners.void
setErrorHandler(ErrorHandler errorHandler)
Set the error handler to call when the listener throws an exception.void
setGenericErrorHandler(GenericErrorHandler<?> errorHandler)
Set the error handler to call when the listener throws an exception.void
setInterceptBeforeTx(boolean interceptBeforeTx)
When false, invoke the interceptor after the transaction starts.void
setPhase(int phase)
void
setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
Set an interceptor to be called before calling the record listener.protected void
setRunning(boolean running)
void
setTopicCheckTimeout(int topicCheckTimeout)
How long to wait forAdmin.describeTopics(Collection)
result futures to complete.void
setupMessageListener(java.lang.Object messageListener)
Setup the message listener to use.void
start()
void
stop()
void
stop(boolean wait)
Stop the container.void
stop(java.lang.Runnable callback)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
getAssignedPartitions, getAssignmentsByClientId, isChildRunning, isContainerPaused, isPartitionPaused, metrics
-
-
-
-
Field Detail
-
DEFAULT_PHASE
public static final int DEFAULT_PHASE
The defaultSmartLifecycle
phase for listener containers 2147483547.- See Also:
- Constant Field Values
-
logger
protected final org.springframework.core.log.LogAccessor logger
-
consumerFactory
protected final ConsumerFactory<K,V> consumerFactory
-
lifecycleMonitor
protected final java.lang.Object lifecycleMonitor
-
-
Constructor Detail
-
AbstractMessageListenerContainer
protected AbstractMessageListenerContainer(ConsumerFactory<? super K,? super V> consumerFactory, ContainerProperties containerProperties)
Construct an instance with the provided factory and properties.- Parameters:
consumerFactory
- the factory.containerProperties
- the properties.
-
-
Method Detail
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
- Specified by:
setApplicationContext
in interfaceorg.springframework.context.ApplicationContextAware
- Throws:
org.springframework.beans.BeansException
-
getApplicationContext
@Nullable protected org.springframework.context.ApplicationContext getApplicationContext()
-
setBeanName
public void setBeanName(java.lang.String name)
- Specified by:
setBeanName
in interfaceorg.springframework.beans.factory.BeanNameAware
-
getBeanName
@Nullable public java.lang.String getBeanName()
Return the bean name.- Returns:
- the bean name.
-
setApplicationEventPublisher
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
- Specified by:
setApplicationEventPublisher
in interfaceorg.springframework.context.ApplicationEventPublisherAware
-
getApplicationEventPublisher
@Nullable public org.springframework.context.ApplicationEventPublisher getApplicationEventPublisher()
Get the event publisher.- Returns:
- the publisher
-
setErrorHandler
public void setErrorHandler(ErrorHandler errorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
-
setGenericErrorHandler
public void setGenericErrorHandler(GenericErrorHandler<?> errorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
-
setBatchErrorHandler
public void setBatchErrorHandler(BatchErrorHandler errorHandler)
Set the batch error handler to call when the listener throws an exception.- Parameters:
errorHandler
- the error handler.- Since:
- 2.2
-
getGenericErrorHandler
@Nullable public GenericErrorHandler<?> getGenericErrorHandler()
Get the configured error handler.- Returns:
- the error handler.
- Since:
- 2.2
-
getCommonErrorHandler
@Nullable public CommonErrorHandler getCommonErrorHandler()
Get theCommonErrorHandler
.- Returns:
- the handler.
- Since:
- 2.8
-
setCommonErrorHandler
public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler)
Set theCommonErrorHandler
which can handle errors for both record and batch listeners. Replaces the use ofGenericErrorHandler
s.- Parameters:
commonErrorHandler
- the handler.- Since:
- 2.8
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartup
in interfaceorg.springframework.context.SmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
Description copied from interface:MessageListenerContainer
Set the autoStartup.- Specified by:
setAutoStartup
in interfaceMessageListenerContainer
- Parameters:
autoStartup
- the autoStartup to set.- See Also:
SmartLifecycle
-
setRunning
protected void setRunning(boolean running)
-
isRunning
public boolean isRunning()
- Specified by:
isRunning
in interfaceorg.springframework.context.Lifecycle
-
isPaused
protected boolean isPaused()
-
isPartitionPauseRequested
public boolean isPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition)
Description copied from interface:MessageListenerContainer
Whether or not this topic's partition pause has been requested.- Specified by:
isPartitionPauseRequested
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topic partition to check- Returns:
- true if pause for this TopicPartition has been requested
-
pausePartition
public void pausePartition(org.apache.kafka.common.TopicPartition topicPartition)
Description copied from interface:MessageListenerContainer
Pause this partition before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausePartition
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topicPartition to pause.
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition)
Description copied from interface:MessageListenerContainer
Resume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartition
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topicPartition to resume.
-
isPauseRequested
public boolean isPauseRequested()
Description copied from interface:MessageListenerContainer
Return true ifMessageListenerContainer.pause()
has been called; the container might not have actually paused yet.- Specified by:
isPauseRequested
in interfaceMessageListenerContainer
- Returns:
- true if pause has been requested.
-
setPhase
public void setPhase(int phase)
-
getPhase
public int getPhase()
- Specified by:
getPhase
in interfaceorg.springframework.context.Phased
- Specified by:
getPhase
in interfaceorg.springframework.context.SmartLifecycle
-
getAfterRollbackProcessor
public AfterRollbackProcessor<? super K,? super V> getAfterRollbackProcessor()
Return the currently configuredAfterRollbackProcessor
.- Returns:
- the after rollback processor.
- Since:
- 2.2.14
-
setAfterRollbackProcessor
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
Set a processor to perform seeks on unprocessed records after a rollback. Default will seek to current position all topics/partitions, including the failed record.- Parameters:
afterRollbackProcessor
- the processor.- Since:
- 1.3.5
-
getContainerProperties
public ContainerProperties getContainerProperties()
Description copied from interface:MessageListenerContainer
Return the container properties for this container.- Specified by:
getContainerProperties
in interfaceMessageListenerContainer
- Returns:
- the properties.
-
getGroupId
@Nullable public java.lang.String getGroupId()
Description copied from interface:MessageListenerContainer
Return thegroup.id
property for this container whether specifically set on the container or via a consumer property on the consumer factory.- Specified by:
getGroupId
in interfaceMessageListenerContainer
- Returns:
- the group id.
-
getListenerId
@Nullable public java.lang.String getListenerId()
Description copied from interface:MessageListenerContainer
The 'id' attribute of a@KafkaListener
or the bean name for spring-managed containers.- Specified by:
getListenerId
in interfaceMessageListenerContainer
- Returns:
- the id or bean name.
-
setTopicCheckTimeout
public void setTopicCheckTimeout(int topicCheckTimeout)
How long to wait forAdmin.describeTopics(Collection)
result futures to complete.- Parameters:
topicCheckTimeout
- the timeout in seconds; default 30.- Since:
- 2.3
-
getRecordInterceptor
protected RecordInterceptor<K,V> getRecordInterceptor()
-
setRecordInterceptor
public void setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
Set an interceptor to be called before calling the record listener. Does not apply to batch listeners.- Parameters:
recordInterceptor
- the interceptor.- Since:
- 2.2.7
- See Also:
setInterceptBeforeTx(boolean)
-
getBatchInterceptor
protected BatchInterceptor<K,V> getBatchInterceptor()
-
setBatchInterceptor
public void setBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)
Set an interceptor to be called before calling the record listener.- Parameters:
batchInterceptor
- the interceptor.- Since:
- 2.6.6
- See Also:
setInterceptBeforeTx(boolean)
-
isInterceptBeforeTx
protected boolean isInterceptBeforeTx()
-
setInterceptBeforeTx
public void setInterceptBeforeTx(boolean interceptBeforeTx)
When false, invoke the interceptor after the transaction starts.- Parameters:
interceptBeforeTx
- false to intercept within the transaction. Default true since 2.8.- Since:
- 2.3.4
- See Also:
setRecordInterceptor(RecordInterceptor)
,setBatchInterceptor(BatchInterceptor)
-
setupMessageListener
public void setupMessageListener(java.lang.Object messageListener)
Description copied from interface:MessageListenerContainer
Setup the message listener to use. Throws anIllegalArgumentException
if that message listener type is not supported.- Specified by:
setupMessageListener
in interfaceMessageListenerContainer
- Parameters:
messageListener
- theobject
to wrapped to theMessageListener
.
-
start
public final void start()
- Specified by:
start
in interfaceorg.springframework.context.Lifecycle
-
checkTopics
protected void checkTopics()
-
checkGroupId
public void checkGroupId()
-
doStart
protected abstract void doStart()
-
stop
public final void stop()
- Specified by:
stop
in interfaceorg.springframework.context.Lifecycle
-
stop
public final void stop(boolean wait)
Stop the container.- Parameters:
wait
- wait for the listener to terminate.- Since:
- 2.3.8
-
pause
public void pause()
Description copied from interface:MessageListenerContainer
Pause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pause
in interfaceMessageListenerContainer
- See Also:
KafkaConsumer.pause(Collection)
-
resume
public void resume()
Description copied from interface:MessageListenerContainer
Resume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resume
in interfaceMessageListenerContainer
- See Also:
KafkaConsumer.resume(Collection)
-
stop
public void stop(java.lang.Runnable callback)
- Specified by:
stop
in interfaceorg.springframework.context.SmartLifecycle
-
doStop
protected abstract void doStop(java.lang.Runnable callback)
-
createSimpleLoggingConsumerRebalanceListener
protected final org.apache.kafka.clients.consumer.ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener()
Return default implementation ofConsumerRebalanceListener
instance.- Returns:
- the
ConsumerRebalanceListener
currently assigned to this container.
-
publishContainerStoppedEvent
protected void publishContainerStoppedEvent()
-
parentOrThis
protected AbstractMessageListenerContainer<?,?> parentOrThis()
Return this or a parent container if this has a parent.- Returns:
- the parent or this.
- Since:
- 2.2.1
-
-