Class KafkaMessageDrivenChannelAdapter<K,V>
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter<K,V>
- Type Parameters:
K
- the key type.V
- the value type.
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,SmartInitializingSingleton
,ApplicationContextAware
,Lifecycle
,Phased
,SmartLifecycle
,ExpressionCapable
,OrderlyShutdownCapable
,MessageProducer
,Pausable
,IntegrationPattern
,NamedComponent
,ManageableLifecycle
,ManageableSmartLifecycle
,TrackableComponent
public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements OrderlyShutdownCapable, Pausable
Message-driven channel adapter.
- Since:
- 5.4
- Author:
- Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaMessageDrivenChannelAdapter.ListenerMode
The listener mode for the container, record or batch. -
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
-
Constructor Summary
Constructors Constructor Description KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record
.KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)
Construct an instance with the provided mode. -
Method Summary
Modifier and Type Method Description int
afterShutdown()
Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.int
beforeShutdown()
Called before shutdown begins.protected void
doStart()
Take no action by default.protected void
doStop()
Take no action by default.String
getComponentType()
Subclasses may implement this method to provide component type information.protected AttributeAccessor
getErrorMessageAttributes(Message<?> message)
Populate anAttributeAccessor
to be used when building an error message with theerrorMessageStrategy
.boolean
isPaused()
Check if the endpoint is paused.protected void
onInit()
Subclasses may implement this for initialization logic.void
pause()
Pause the endpoint.void
resume()
Resume the endpoint if paused.void
setAckDiscarded(boolean ackDiscarded)
Aboolean
flag to indicate ifFilteringMessageListenerAdapter
should acknowledge discarded records or not.void
setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
Set the message converter to use with a batch-based consumer.void
setBindSourceRecord(boolean bindSourceRecord)
Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA
.void
setFilterInRetry(boolean filterInRetry)
Theboolean
flag to specify the order howRetryingMessageListenerAdapter
andFilteringMessageListenerAdapter
are wrapped to each other, if both of them are present.void
setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
Set the message converter; must be aRecordMessageConverter
orBatchMessageConverter
depending on mode.void
setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
Specify aBiConsumer
for seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from theKafkaMessageListenerContainer
.void
setPayloadType(Class<?> payloadType)
When using a type-aware message converter (such asStringJsonMessageConverter
, set the payload type the converter should create.void
setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
Specify aRecordFilterStrategy
to wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
intoFilteringMessageListenerAdapter
.void
setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set the message converter to use with a record-based consumer.void
setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
ARecoveryCallback
instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured).void
setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Specify aRetryTemplate
instance to wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
intoRetryingMessageListenerAdapter
.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.ManageableLifecycle
isRunning, start, stop
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record
.- Parameters:
messageListenerContainer
- the container.
-
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)Construct an instance with the provided mode.- Parameters:
messageListenerContainer
- the container.mode
- the mode.- Since:
- 1.2
-
-
Method Details
-
setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)Set the message converter; must be aRecordMessageConverter
orBatchMessageConverter
depending on mode.- Parameters:
messageConverter
- the converter.
-
setRecordMessageConverter
public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)Set the message converter to use with a record-based consumer.- Parameters:
messageConverter
- the converter.- Since:
- 2.1
-
setBatchMessageConverter
public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)Set the message converter to use with a batch-based consumer.- Parameters:
messageConverter
- the converter.- Since:
- 2.1
-
setRecordFilterStrategy
public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)Specify aRecordFilterStrategy
to wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
intoFilteringMessageListenerAdapter
.- Parameters:
recordFilterStrategy
- theRecordFilterStrategy
to use.- Since:
- 2.0.1
-
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded)Aboolean
flag to indicate ifFilteringMessageListenerAdapter
should acknowledge discarded records or not. Does not make sense ifsetRecordFilterStrategy(RecordFilterStrategy)
isn't specified.- Parameters:
ackDiscarded
- true to ack (commit offset for) discarded messages.- Since:
- 2.0.1
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)Specify aRetryTemplate
instance to wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
intoRetryingMessageListenerAdapter
.- Parameters:
retryTemplate
- theRetryTemplate
to use.- Since:
- 2.0.1
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)ARecoveryCallback
instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Does not make sense ifsetRetryTemplate(RetryTemplate)
isn't specified.- Parameters:
recoveryCallback
- the recovery callback.- Since:
- 2.0.1
-
setFilterInRetry
public void setFilterInRetry(boolean filterInRetry)Theboolean
flag to specify the order howRetryingMessageListenerAdapter
andFilteringMessageListenerAdapter
are wrapped to each other, if both of them are present. Does not make sense if only one ofRetryTemplate
orRecordFilterStrategy
is present, or any.- Parameters:
filterInRetry
- the order forRetryingMessageListenerAdapter
andFilteringMessageListenerAdapter
wrapping. Defaults tofalse
.- Since:
- 2.0.1
-
setPayloadType
When using a type-aware message converter (such asStringJsonMessageConverter
, set the payload type the converter should create. Defaults toObject
.- Parameters:
payloadType
- the type.- Since:
- 2.1.1
-
setOnPartitionsAssignedSeekCallback
public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)Specify aBiConsumer
for seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from theKafkaMessageListenerContainer
. This is called from the internalMessagingMessageListenerAdapter
implementation.- Parameters:
onPartitionsAssignedCallback
- theBiConsumer
to use- Since:
- 3.0.4
- See Also:
ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
-
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord)Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA
. Does not apply to batch listeners.- Parameters:
bindSourceRecord
- true to bind.- Since:
- 3.1.4
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classIntegrationObjectSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classMessageProducerSupport
-
doStart
protected void doStart()Description copied from class:MessageProducerSupport
Take no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
doStart
in classMessageProducerSupport
-
doStop
protected void doStop()Description copied from class:MessageProducerSupport
Take no action by default. Subclasses may override this if they need lifecycle-managed behavior.- Overrides:
doStop
in classMessageProducerSupport
-
pause
public void pause()Description copied from interface:Pausable
Pause the endpoint. -
resume
public void resume()Description copied from interface:Pausable
Resume the endpoint if paused. -
isPaused
public boolean isPaused()Description copied from interface:Pausable
Check if the endpoint is paused. -
beforeShutdown
public int beforeShutdown()Description copied from interface:OrderlyShutdownCapable
Called before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.- Specified by:
beforeShutdown
in interfaceOrderlyShutdownCapable
- Returns:
- The number of active messages if available.
-
afterShutdown
public int afterShutdown()Description copied from interface:OrderlyShutdownCapable
Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.- Specified by:
afterShutdown
in interfaceOrderlyShutdownCapable
- Returns:
- The number of active messages if available.
-
getErrorMessageAttributes
Description copied from class:MessageProducerSupport
Populate anAttributeAccessor
to be used when building an error message with theerrorMessageStrategy
.- Overrides:
getErrorMessageAttributes
in classMessageProducerSupport
- Parameters:
message
- the message.- Returns:
- the attributes.
-