Class KafkaMessageListenerContainer<K,V>
java.lang.Object
org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
org.springframework.kafka.listener.KafkaMessageListenerContainer<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanNameAware
,DisposableBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Phased
,SmartLifecycle
,ConsumerPauseResumeEventPublisher
,GenericMessageListenerContainer<K,
,V> MessageListenerContainer
public class KafkaMessageListenerContainer<K,V>
extends AbstractMessageListenerContainer<K,V>
implements ConsumerPauseResumeEventPublisher
Single-threaded Message listener container using the Java
Consumer
supporting
auto-partition assignment or user-configured assignment.
With the latter, initial partition offsets can be provided.
- Author:
- Gary Russell, Murali Reddy, Marius Bogoevici, Martin Dam, Artem Bilan, Loic Talhouarne, Vladimir Tsanev, Chen Binbin, Yang Qiju, Tom van den Berge, Lukasz Kaminski, Tomaz Fernandes, Francois Rosiere, Daniel Gentes, Soby Chacko, Wang Zhiyang, Raphael Rösch, Christian Mergenthaler, Mikael Carlstedt, Borahm Lee, Lokesh Alamuri, Sanghyeok An, Christian Fredriksson, Timofey Barabanov, Janek Lasocki-Biczysko
-
Field Summary
Fields inherited from class org.springframework.kafka.listener.AbstractMessageListenerContainer
consumerFactory, DEFAULT_PHASE, enforceRebalanceRequested, lifecycleLock, logger
-
Constructor Summary
ConstructorsConstructorDescriptionKafkaMessageListenerContainer
(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the supplied configuration properties. -
Method Summary
Modifier and TypeMethodDescriptionvoid
childStopped
(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) Notify a parent container that a child container has stopped.protected void
doStart()
protected void
Stop the container normally or abnormally.void
Alerting the consumer to trigger an enforced rebalance.Collection<org.apache.kafka.common.TopicPartition>
Return theTopicPartition
s currently assigned to this container, either explicitly or by Kafka; may be null if not assigned yet.Map<String,
Collection<org.apache.kafka.common.TopicPartition>> Return the assigned topics/partitions for this container, by client.id.boolean
Return true ifMessageListenerContainer.pause()
has been called; and all consumers in this container have actually paused.boolean
Return true if the container is running, has never been started, or has been stopped.boolean
isPartitionPaused
(org.apache.kafka.common.TopicPartition topicPartition) Whether or not this topic's partition is currently paused.metrics()
Return metrics kept by this container's consumer(s), grouped byclient-id
.protected AbstractMessageListenerContainer<?,
?> Return this or a parent container if this has a parent.void
pause()
Pause this container before the next poll().void
publishConsumerPausedEvent
(Collection<org.apache.kafka.common.TopicPartition> partitions, String reason) Publish a consumer paused event.void
publishConsumerResumedEvent
(Collection<org.apache.kafka.common.TopicPartition> partitions) Publish a consumer resumed event.void
resume()
Resume this container, if paused, after the next poll().void
resumePartition
(org.apache.kafka.common.TopicPartition topicPartition) Resume this partition, if paused, after the next poll().void
setClientIdSuffix
(String clientIdSuffix) Set a suffix to add to theclient.id
consumer property (if the consumer factory supports it).void
setEmergencyStop
(Runnable emergencyStop) Set aRunnable
to call whenever a fatal error occurs on the listener thread.toString()
Methods inherited from class org.springframework.kafka.listener.AbstractMessageListenerContainer
checkGroupId, checkTopics, createSimpleLoggingConsumerRebalanceListener, doStop, getAfterRollbackProcessor, getApplicationContext, getApplicationEventPublisher, getBatchInterceptor, getBeanName, getCommonErrorHandler, getContainerProperties, getGroupId, getKafkaAdmin, getListenerId, getListenerInfo, getMainListenerId, getPhase, getRecordInterceptor, getThreadNameSupplier, isAutoStartup, isChangeConsumerThreadName, isInterceptBeforeTx, isPartitionPauseRequested, isPaused, isPauseRequested, isRunning, isStoppedNormally, pausePartition, propertiesFromConsumerPropertyOverrides, publishContainerStoppedEvent, setAfterRollbackProcessor, setApplicationContext, setApplicationEventPublisher, setAutoStartup, setBatchInterceptor, setBeanName, setChangeConsumerThreadName, setCommonErrorHandler, setFenced, setInterceptBeforeTx, setKafkaAdmin, setListenerInfo, setMainListenerId, setPhase, setRecordInterceptor, setRunning, setStoppedNormally, setThreadNameSupplier, setTopicCheckTimeout, setupMessageListener, start, stop, stop, stop, stopAbnormally
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
childStarted, destroy, getContainerFor, isChildRunning
-
Constructor Details
-
KafkaMessageListenerContainer
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the supplied configuration properties.- Parameters:
consumerFactory
- the consumer factory.containerProperties
- the container properties.
-
-
Method Details
-
setEmergencyStop
Set aRunnable
to call whenever a fatal error occurs on the listener thread.- Parameters:
emergencyStop
- the Runnable.- Since:
- 2.2.1
-
setClientIdSuffix
Set a suffix to add to theclient.id
consumer property (if the consumer factory supports it).- Parameters:
clientIdSuffix
- the suffix to add.- Since:
- 1.0.6
-
getAssignedPartitions
Return theTopicPartition
s currently assigned to this container, either explicitly or by Kafka; may be null if not assigned yet.- Specified by:
getAssignedPartitions
in interfaceMessageListenerContainer
- Returns:
- the
TopicPartition
s currently assigned to this container, either explicitly or by Kafka; may be null if not assigned yet.
-
getAssignmentsByClientId
@Nullable public Map<String,Collection<org.apache.kafka.common.TopicPartition>> getAssignmentsByClientId()Description copied from interface:MessageListenerContainer
Return the assigned topics/partitions for this container, by client.id.- Specified by:
getAssignmentsByClientId
in interfaceMessageListenerContainer
- Returns:
- the topics/partitions.
-
isContainerPaused
public boolean isContainerPaused()Description copied from interface:MessageListenerContainer
Return true ifMessageListenerContainer.pause()
has been called; and all consumers in this container have actually paused.- Specified by:
isContainerPaused
in interfaceMessageListenerContainer
- Returns:
- true if the container is paused.
-
isPartitionPaused
public boolean isPartitionPaused(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Whether or not this topic's partition is currently paused.- Specified by:
isPartitionPaused
in interfaceMessageListenerContainer
- Parameters:
topicPartition
- the topic partition to check- Returns:
- true if this partition has been paused.
-
isInExpectedState
public boolean isInExpectedState()Description copied from interface:MessageListenerContainer
Return true if the container is running, has never been started, or has been stopped.- Specified by:
isInExpectedState
in interfaceMessageListenerContainer
- Returns:
- true if the state is as expected.
- See Also:
-
enforceRebalance
public void enforceRebalance()Description copied from interface:MessageListenerContainer
Alerting the consumer to trigger an enforced rebalance. The actual enforce will happen when the next poll() operation is invoked.- Specified by:
enforceRebalance
in interfaceMessageListenerContainer
- See Also:
-
KafkaConsumer.enforceRebalance()
-
pause
public void pause()Description copied from interface:MessageListenerContainer
Pause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pause
in interfaceMessageListenerContainer
- Overrides:
pause
in classAbstractMessageListenerContainer<K,
V> - See Also:
-
KafkaConsumer.pause(Collection)
-
resume
public void resume()Description copied from interface:MessageListenerContainer
Resume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resume
in interfaceMessageListenerContainer
- Overrides:
resume
in classAbstractMessageListenerContainer<K,
V> - See Also:
-
KafkaConsumer.resume(Collection)
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Resume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartition
in interfaceMessageListenerContainer
- Overrides:
resumePartition
in classAbstractMessageListenerContainer<K,
V> - Parameters:
topicPartition
- the topicPartition to resume.
-
metrics
public Map<String,Map<org.apache.kafka.common.MetricName, metrics()? extends org.apache.kafka.common.Metric>> Description copied from interface:MessageListenerContainer
Return metrics kept by this container's consumer(s), grouped byclient-id
.- Specified by:
metrics
in interfaceMessageListenerContainer
- Returns:
- the consumer(s) metrics grouped by
client-id
- See Also:
-
Consumer.metrics()
-
doStart
protected void doStart()- Specified by:
doStart
in classAbstractMessageListenerContainer<K,
V>
-
doStop
Description copied from class:AbstractMessageListenerContainer
Stop the container normally or abnormally.- Specified by:
doStop
in classAbstractMessageListenerContainer<K,
V> - Parameters:
callback
- the callback.normal
- true for an expected stop.
-
childStopped
Description copied from interface:MessageListenerContainer
Notify a parent container that a child container has stopped.- Specified by:
childStopped
in interfaceMessageListenerContainer
- Parameters:
child
- the container.reason
- the reason.
-
publishConsumerPausedEvent
public void publishConsumerPausedEvent(Collection<org.apache.kafka.common.TopicPartition> partitions, String reason) Description copied from interface:ConsumerPauseResumeEventPublisher
Publish a consumer paused event.- Specified by:
publishConsumerPausedEvent
in interfaceConsumerPauseResumeEventPublisher
- Parameters:
partitions
- the paused partitions.reason
- the reason.
-
publishConsumerResumedEvent
public void publishConsumerResumedEvent(Collection<org.apache.kafka.common.TopicPartition> partitions) Description copied from interface:ConsumerPauseResumeEventPublisher
Publish a consumer resumed event.- Specified by:
publishConsumerResumedEvent
in interfaceConsumerPauseResumeEventPublisher
- Parameters:
partitions
- the resumed partitions.
-
parentOrThis
Description copied from class:AbstractMessageListenerContainer
Return this or a parent container if this has a parent.- Overrides:
parentOrThis
in classAbstractMessageListenerContainer<K,
V> - Returns:
- the parent or this.
-
toString
-