Class KafkaMessageListenerContainerSpec<K,V>
java.lang.Object
org.springframework.beans.factory.config.AbstractFactoryBean<T>
org.springframework.integration.dsl.IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
org.springframework.integration.kafka.dsl.KafkaMessageListenerContainerSpec<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanClassLoaderAware
,BeanFactoryAware
,DisposableBean
,FactoryBean<org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
,InitializingBean
,Lifecycle
,Phased
,SmartLifecycle
public class KafkaMessageListenerContainerSpec<K,V> extends IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
A helper class in the Builder pattern style to delegate options to the
ConcurrentMessageListenerContainer
.- Since:
- 5.4
- Author:
- Artem Bilan, Gary Russell
-
Field Summary
Fields inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
PARSER, target
-
Method Summary
Modifier and Type Method Description KafkaMessageListenerContainerSpec<K,V>
ackCount(int count)
Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNT
orContainerProperties.AckMode.COUNT_TIME
is being used.KafkaMessageListenerContainerSpec<K,V>
ackMode(org.springframework.kafka.listener.ContainerProperties.AckMode ackMode)
Set the ack mode to use when auto ack (in the configuration properties) is false.KafkaMessageListenerContainerSpec<K,V>
ackTime(long millis)
Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIME
orContainerProperties.AckMode.COUNT_TIME
is being used.KafkaMessageListenerContainerSpec<K,V>
commitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)
Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level.KafkaMessageListenerContainerSpec<K,V>
concurrency(int concurrency)
Specify a concurrency maximum number for theAbstractMessageListenerContainer
.KafkaMessageListenerContainerSpec<K,V>
consumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)
Set the user definedConsumerRebalanceListener
implementation.KafkaMessageListenerContainerSpec<K,V>
consumerTaskExecutor(AsyncListenableTaskExecutor consumerTaskExecutor)
Set the executor for threads that poll the consumer.KafkaMessageListenerContainerSpec<K,V>
errorHandler(org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler)
Deprecated.- will be replaced in 6.0 withCommonErrorHandler
.KafkaMessageListenerContainerSpec<K,V>
groupId(String groupId)
Set the group id for this container.KafkaMessageListenerContainerSpec<K,V>
id(String id)
Configure the component identifier.KafkaMessageListenerContainerSpec<K,V>
idleEventInterval(Long idleEventInterval)
Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.KafkaMessageListenerContainerSpec<K,V>
pollTimeout(long pollTimeout)
Set the max time to block in the consumer waiting for records.KafkaMessageListenerContainerSpec<K,V>
shutdownTimeout(long shutdownTimeout)
Set the timeout for shutting down the container.KafkaMessageListenerContainerSpec<K,V>
syncCommits(boolean syncCommits)
Set whether to call consumer.commitSync() or commitAsync() when the container is responsible for commits.Methods inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
_this, createInstance, destroyInstance, doGet, get, getId, getObjectType, getPhase, isAutoStartup, isRunning, start, stop, stop
Methods inherited from class org.springframework.beans.factory.config.AbstractFactoryBean
afterPropertiesSet, destroy, getBeanFactory, getBeanTypeConverter, getEarlySingletonInterfaces, getObject, isSingleton, setBeanClassLoader, setBeanFactory, setSingleton
-
Method Details
-
id
Description copied from class:IntegrationComponentSpec
Configure the component identifier. Used as thebeanName
to register the bean in the application context for this component.- Overrides:
id
in classIntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
- Parameters:
id
- the id.- Returns:
- the spec.
-
concurrency
Specify a concurrency maximum number for theAbstractMessageListenerContainer
.- Parameters:
concurrency
- the concurrency maximum number.- Returns:
- the spec.
- See Also:
ConcurrentMessageListenerContainer.setConcurrency(int)
-
errorHandler
@Deprecated public KafkaMessageListenerContainerSpec<K,V> errorHandler(org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler)Deprecated.- will be replaced in 6.0 withCommonErrorHandler
.Specify anErrorHandler
for theAbstractMessageListenerContainer
.- Parameters:
errorHandler
- theErrorHandler
.- Returns:
- the spec.
- See Also:
ErrorHandler
-
ackMode
public KafkaMessageListenerContainerSpec<K,V> ackMode(org.springframework.kafka.listener.ContainerProperties.AckMode ackMode)Set the ack mode to use when auto ack (in the configuration properties) is false.- RECORD: Ack after each record has been passed to the listener.
- BATCH: Ack after each batch of records received from the consumer has been passed to the listener
- TIME: Ack after this number of milliseconds; (should be greater than
#setPollTimeout(long) pollTimeout
. - COUNT: Ack after at least this number of records have been received
- MANUAL: Listener is responsible for acking - use a
AcknowledgingMessageListener
.
- Parameters:
ackMode
- theContainerProperties.AckMode
; default BATCH.- Returns:
- the spec.
- See Also:
ContainerProperties.AckMode
-
pollTimeout
Set the max time to block in the consumer waiting for records.- Parameters:
pollTimeout
- the timeout in ms; default 1000.- Returns:
- the spec.
- See Also:
ConsumerProperties.setPollTimeout(long)
-
ackCount
Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNT
orContainerProperties.AckMode.COUNT_TIME
is being used.- Parameters:
count
- the count- Returns:
- the spec.
- See Also:
ContainerProperties.setAckCount(int)
-
ackTime
Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIME
orContainerProperties.AckMode.COUNT_TIME
is being used. Should be larger than zero.- Parameters:
millis
- the time- Returns:
- the spec.
- See Also:
ContainerProperties.setAckTime(long)
-
consumerTaskExecutor
public KafkaMessageListenerContainerSpec<K,V> consumerTaskExecutor(AsyncListenableTaskExecutor consumerTaskExecutor)Set the executor for threads that poll the consumer.- Parameters:
consumerTaskExecutor
- the executor- Returns:
- the spec.
- See Also:
ContainerProperties.setConsumerTaskExecutor(AsyncListenableTaskExecutor)
-
shutdownTimeout
Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to#stop(Runnable)
will block for, before returning.- Parameters:
shutdownTimeout
- the shutdown timeout.- Returns:
- the spec.
- See Also:
ContainerProperties.setShutdownTimeout(long)
-
consumerRebalanceListener
public KafkaMessageListenerContainerSpec<K,V> consumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)Set the user definedConsumerRebalanceListener
implementation.- Parameters:
consumerRebalanceListener
- theConsumerRebalanceListener
instance- Returns:
- the spec.
- See Also:
ConsumerProperties.setConsumerRebalanceListener(ConsumerRebalanceListener)
-
commitCallback
public KafkaMessageListenerContainerSpec<K,V> commitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level.- Parameters:
commitCallback
- the callback.- Returns:
- the spec.
- See Also:
ConsumerProperties.setCommitCallback(OffsetCommitCallback)
-
syncCommits
Set whether to call consumer.commitSync() or commitAsync() when the container is responsible for commits. Default true. See https://github.com/spring-projects/spring-kafka/issues/62 At the time of writing, async commits are not entirely reliable.- Parameters:
syncCommits
- true to use commitSync().- Returns:
- the spec.
- See Also:
ConsumerProperties.setSyncCommits(boolean)
-
idleEventInterval
Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.- Parameters:
idleEventInterval
- the interval.- Returns:
- the spec.
- See Also:
ContainerProperties.setIdleEventInterval(Long)
-
groupId
Set the group id for this container. Overrides anygroup.id
property provided by the consumer factory configuration.- Parameters:
groupId
- the group id.- Returns:
- the spec.
- See Also:
ConsumerProperties.setGroupId(String)
-