Class KafkaMessageDrivenChannelAdapter<K,V> 
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter<K,V> 
- Type Parameters:
- K- the key type.
- V- the value type.
- All Implemented Interfaces:
- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- SmartInitializingSingleton,- ApplicationContextAware,- Lifecycle,- Phased,- SmartLifecycle,- ExpressionCapable,- OrderlyShutdownCapable,- MessageProducer,- Pausable,- IntegrationPattern,- KafkaInboundEndpoint,- NamedComponent,- IntegrationInboundManagement,- IntegrationManagement,- ManageableLifecycle,- ManageableSmartLifecycle,- TrackableComponent
public class KafkaMessageDrivenChannelAdapter<K,V> 
extends MessageProducerSupport
implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable
Message-driven channel adapter.
- Since:
- 5.4
- Author:
- Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic enumThe listener mode for the container, record or batch.Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields inherited from class org.springframework.integration.endpoint.AbstractEndpointlifecycleCondition, lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpointCONTEXT_ACKNOWLEDGMENT, CONTEXT_CONSUMER, CONTEXT_RECORDFields inherited from interface org.springframework.context.SmartLifecycleDEFAULT_PHASE
- 
Constructor SummaryConstructorsConstructorDescriptionKafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record.KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) Construct an instance with the provided mode.
- 
Method SummaryModifier and TypeMethodDescriptionintCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.intCalled before shutdown begins.protected voiddoStart()Take no action by default.protected voiddoStop()Take no action by default.Subclasses may implement this method to provide component type information.protected AttributeAccessorgetErrorMessageAttributes(Message<?> message) Populate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.booleanisPaused()Check if the endpoint is paused.protected voidonInit()Subclasses may implement this for initialization logic.voidpause()Pause the endpoint.voidresume()Resume the endpoint if paused.voidsetAckDiscarded(boolean ackDiscarded) Abooleanflag to indicate ifFilteringMessageListenerAdaptershould acknowledge discarded records or not.voidsetBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) Set the message converter to use with a batch-based consumer.voidsetBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.voidsetFilterInRetry(boolean filterInRetry) Thebooleanflag to specify the order in which the filter and retry operations are performed.voidsetMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.voidsetOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumerfor seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)call from theKafkaMessageListenerContainer.voidsetPayloadType(Class<?> payloadType) When using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create.voidsetRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) Specify aRecordFilterStrategyto wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoFilteringMessageListenerAdapter.voidsetRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter to use with a record-based consumer.voidsetRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured).voidsetRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplateinstance to use for retrying deliveries.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupportafterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpointdestroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementdestroy, getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpointdoWithRetryMethods inherited from interface org.springframework.integration.support.management.ManageableLifecycleisRunning, start, stopMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Constructor Details- 
KafkaMessageDrivenChannelAdapterpublic KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) Construct an instance with modeKafkaMessageDrivenChannelAdapter.ListenerMode.record.- Parameters:
- messageListenerContainer- the container.
 
- 
KafkaMessageDrivenChannelAdapterpublic KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) Construct an instance with the provided mode.- Parameters:
- messageListenerContainer- the container.
- mode- the mode.
 
 
- 
- 
Method Details- 
setMessageConverterpublic void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.- Parameters:
- messageConverter- the converter.
 
- 
setRecordMessageConverterpublic void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter to use with a record-based consumer.- Parameters:
- messageConverter- the converter.
 
- 
setBatchMessageConverterpublic void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) Set the message converter to use with a batch-based consumer.- Parameters:
- messageConverter- the converter.
 
- 
setRecordFilterStrategypublic void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) Specify aRecordFilterStrategyto wrapKafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListenerintoFilteringMessageListenerAdapter.- Parameters:
- recordFilterStrategy- the- RecordFilterStrategyto use.
 
- 
setAckDiscardedpublic void setAckDiscarded(boolean ackDiscarded) Abooleanflag to indicate ifFilteringMessageListenerAdaptershould acknowledge discarded records or not. Does not make sense ifsetRecordFilterStrategy(RecordFilterStrategy)isn't specified.- Parameters:
- ackDiscarded- true to ack (commit offset for) discarded messages.
 
- 
setRetryTemplatepublic void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplateinstance to use for retrying deliveries.IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms. Instead, consider adding aDefaultErrorHandlerto the listener container, configured with aKafkaErrorSendingMessageRecoverer.- Parameters:
- retryTemplate- the- RetryTemplateto use.
 
- 
setRecoveryCallbackpublic void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Only used if asetRetryTemplate(RetryTemplate)is specified. Default is anErrorMessageSendingRecovererif an error channel has been provided. Set to null if you wish to throw the exception back to the container after retries are exhausted.- Parameters:
- recoveryCallback- the recovery callback.
 
- 
setFilterInRetrypublic void setFilterInRetry(boolean filterInRetry) Thebooleanflag to specify the order in which the filter and retry operations are performed. Does not make sense if only one ofRetryTemplateorRecordFilterStrategyis present, or none. When true, the filter is called for each retry; when false, the filter is only called once for each delivery from the container.- Parameters:
- filterInRetry- true to filter for each retry. Defaults to- false.
 
- 
setPayloadTypeWhen using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create. Defaults toObject.- Parameters:
- payloadType- the type.
 
- 
setOnPartitionsAssignedSeekCallbackpublic void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumerfor seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)call from theKafkaMessageListenerContainer. This is called from the internalMessagingMessageListenerAdapterimplementation.- Parameters:
- onPartitionsAssignedCallback- the- BiConsumerto use
- See Also:
- 
- ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
 
 
- 
setBindSourceRecordpublic void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.- Parameters:
- bindSourceRecord- true to bind.
 
- 
getComponentTypeDescription copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
- getComponentTypein interface- NamedComponent
- Overrides:
- getComponentTypein class- IntegrationObjectSupport
 
- 
onInitprotected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
- onInitin class- MessageProducerSupport
 
- 
doStartprotected void doStart()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
- doStartin class- MessageProducerSupport
 
- 
doStopprotected void doStop()Description copied from class:MessageProducerSupportTake no action by default. Subclasses may override this if they need lifecycle-managed behavior.- Overrides:
- doStopin class- MessageProducerSupport
 
- 
pausepublic void pause()Description copied from interface:PausablePause the endpoint.
- 
resumepublic void resume()Description copied from interface:PausableResume the endpoint if paused.
- 
isPausedpublic boolean isPaused()Description copied from interface:PausableCheck if the endpoint is paused.
- 
beforeShutdownpublic int beforeShutdown()Description copied from interface:OrderlyShutdownCapableCalled before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.- Specified by:
- beforeShutdownin interface- OrderlyShutdownCapable
- Returns:
- The number of active messages if available.
 
- 
afterShutdownpublic int afterShutdown()Description copied from interface:OrderlyShutdownCapableCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.- Specified by:
- afterShutdownin interface- OrderlyShutdownCapable
- Returns:
- The number of active messages if available.
 
- 
getErrorMessageAttributesDescription copied from class:MessageProducerSupportPopulate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.- Overrides:
- getErrorMessageAttributesin class- MessageProducerSupport
- Parameters:
- message- the message.
- Returns:
- the attributes.
 
 
-