Class ConcurrentMessageListenerContainer<K,V>
java.lang.Object
org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanNameAware
,DisposableBean
,ApplicationContextAware
,ApplicationEventPublisherAware
,Lifecycle
,Phased
,SmartLifecycle
,GenericMessageListenerContainer<K,
,V> MessageListenerContainer
Creates 1 or more
KafkaMessageListenerContainer
s based on
concurrency
. If the
ContainerProperties
is configured with TopicPartition
s,
the TopicPartition
s are distributed evenly across the
instances.- Author:
- Marius Bogoevici, Gary Russell, Murali Reddy, Jerome Mirc, Artem Bilan, Vladimir Tsanev, Tomaz Fernandes
-
Field Summary
Fields inherited from class org.springframework.kafka.listener.AbstractMessageListenerContainer
consumerFactory, DEFAULT_PHASE, lifecycleMonitor, logger
-
Constructor Summary
ConstructorsConstructorDescriptionConcurrentMessageListenerContainer
(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the supplied configuration properties. -
Method Summary
Modifier and TypeMethodDescriptionvoid
childStopped
(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) Notify a parent container that a child container has stopped.protected void
doStart()
protected void
Stop the container normally or abnormally.Collection<org.apache.kafka.common.TopicPartition>
Return the assigned topics/partitions for this container.Map<String,
Collection<org.apache.kafka.common.TopicPartition>> Return the assigned topics/partitions for this container, by client.id.int
getContainerFor
(String topic, int partition) If this container has child containers, return the child container that is assigned the topic/partition.Return the list ofKafkaMessageListenerContainer
s created by this container.boolean
If this container has child containers, return true if at least one child is running.boolean
Return true ifMessageListenerContainer.pause()
has been called; and all consumers in this container have actually paused.boolean
Return true if the container is running, has never been started, or has been stopped.boolean
isPartitionPaused
(org.apache.kafka.common.TopicPartition topicPartition) Whether or not this topic's partition is currently paused.metrics()
Return metrics kept by this container's consumer(s), grouped byclient-id
.void
pause()
Pause this container before the next poll().void
pausePartition
(org.apache.kafka.common.TopicPartition topicPartition) Pause this partition before the next poll().void
resume()
Resume this container, if paused, after the next poll().void
resumePartition
(org.apache.kafka.common.TopicPartition topicPartition) Resume this partition, if paused, after the next poll().void
setAlwaysClientIdSuffix
(boolean alwaysClientIdSuffix) Set to false to suppress adding a suffix to the child container's client.id when the concurrency is only 1.void
setConcurrency
(int concurrency) The maximum number of concurrentKafkaMessageListenerContainer
s running.toString()
Methods inherited from class org.springframework.kafka.listener.AbstractMessageListenerContainer
checkGroupId, checkTopics, createSimpleLoggingConsumerRebalanceListener, doStop, getAfterRollbackProcessor, getApplicationContext, getApplicationEventPublisher, getBatchInterceptor, getBeanName, getCommonErrorHandler, getContainerProperties, getGenericErrorHandler, getGroupId, getKafkaAdmin, getListenerId, getListenerInfo, getMainListenerId, getPhase, getRecordInterceptor, getThreadNameSupplier, isAutoStartup, isChangeConsumerThreadName, isInterceptBeforeTx, isPartitionPauseRequested, isPaused, isPauseRequested, isRunning, isStoppedNormally, parentOrThis, publishContainerStoppedEvent, setAfterRollbackProcessor, setApplicationContext, setApplicationEventPublisher, setAutoStartup, setBatchErrorHandler, setBatchInterceptor, setBeanName, setChangeConsumerThreadName, setCommonErrorHandler, setErrorHandler, setGenericErrorHandler, setInterceptBeforeTx, setKafkaAdmin, setListenerInfo, setMainListenerId, setPhase, setRecordInterceptor, setRunning, setStoppedNormally, setThreadNameSupplier, setTopicCheckTimeout, setupMessageListener, start, stop, stop, stop, stopAbnormally
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
destroy
-
Constructor Details
-
ConcurrentMessageListenerContainer
public ConcurrentMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the supplied configuration properties. The topic partitions are distributed evenly across the delegateKafkaMessageListenerContainer
s.- Parameters:
consumerFactory
- the consumer factory.containerProperties
- the container properties.
-
-
Method Details
-
getConcurrency
public int getConcurrency() -
setConcurrency
public void setConcurrency(int concurrency) The maximum number of concurrentKafkaMessageListenerContainer
s running. Messages from within the same partition will be processed sequentially.- Parameters:
concurrency
- the concurrency.
-
setAlwaysClientIdSuffix
public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) Set to false to suppress adding a suffix to the child container's client.id when the concurrency is only 1.- Parameters:
alwaysClientIdSuffix
- false to suppress, true (default) to include.- Since:
- 2.2.14
-
getContainers
Return the list ofKafkaMessageListenerContainer
s created by this container.- Returns:
- the list of
KafkaMessageListenerContainer
s created by this container.
-
getContainerFor
Description copied from interface:MessageListenerContainer
If this container has child containers, return the child container that is assigned the topic/partition. Return this when there are no child containers.- Parameters:
topic
- the topic.partition
- the partition.- Returns:
- the container.
-
getAssignedPartitions
Description copied from interface:MessageListenerContainer
Return the assigned topics/partitions for this container.- Returns:
- the topics/partitions.
-
getAssignmentsByClientId
Description copied from interface:MessageListenerContainer
Return the assigned topics/partitions for this container, by client.id.- Returns:
- the topics/partitions.
-
isContainerPaused
public boolean isContainerPaused()Description copied from interface:MessageListenerContainer
Return true ifMessageListenerContainer.pause()
has been called; and all consumers in this container have actually paused.- Returns:
- true if the container is paused.
-
isChildRunning
public boolean isChildRunning()Description copied from interface:MessageListenerContainer
If this container has child containers, return true if at least one child is running. If there are not child containers, returnsLifecycle.isRunning()
.- Returns:
- true if a child is running.
-
metrics
public Map<String,Map<org.apache.kafka.common.MetricName, metrics()? extends org.apache.kafka.common.Metric>> Description copied from interface:MessageListenerContainer
Return metrics kept by this container's consumer(s), grouped byclient-id
.- Returns:
- the consumer(s) metrics grouped by
client-id
- See Also:
-
Consumer.metrics()
-
doStart
protected void doStart()- Specified by:
doStart
in classAbstractMessageListenerContainer<K,
V>
-
doStop
Description copied from class:AbstractMessageListenerContainer
Stop the container normally or abnormally.- Specified by:
doStop
in classAbstractMessageListenerContainer<K,
V> - Parameters:
callback
- the callback.normal
- true for an expected stop.
-
childStopped
Description copied from interface:MessageListenerContainer
Notify a parent container that a child container has stopped.- Parameters:
child
- the container.reason
- the reason.
-
pause
public void pause()Description copied from interface:MessageListenerContainer
Pause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pause
in interfaceMessageListenerContainer
- Overrides:
pause
in classAbstractMessageListenerContainer<K,
V> - See Also:
-
KafkaConsumer.pause(Collection)
-
resume
public void resume()Description copied from interface:MessageListenerContainer
Resume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resume
in interfaceMessageListenerContainer
- Overrides:
resume
in classAbstractMessageListenerContainer<K,
V> - See Also:
-
KafkaConsumer.resume(Collection)
-
pausePartition
public void pausePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Pause this partition before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausePartition
in interfaceMessageListenerContainer
- Overrides:
pausePartition
in classAbstractMessageListenerContainer<K,
V> - Parameters:
topicPartition
- the topicPartition to pause.
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Resume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartition
in interfaceMessageListenerContainer
- Overrides:
resumePartition
in classAbstractMessageListenerContainer<K,
V> - Parameters:
topicPartition
- the topicPartition to resume.
-
isPartitionPaused
public boolean isPartitionPaused(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainer
Whether or not this topic's partition is currently paused.- Parameters:
topicPartition
- the topic partition to check- Returns:
- true if this partition has been paused.
-
isInExpectedState
public boolean isInExpectedState()Description copied from interface:MessageListenerContainer
Return true if the container is running, has never been started, or has been stopped.- Returns:
- true if the state is as expected.
- See Also:
-
toString
-