Class KafkaInboundGateway<K,V,R>  
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.gateway.MessagingGatewaySupport
org.springframework.integration.kafka.inbound.KafkaInboundGateway<K,V,R>  
- Type Parameters:
- K- the key type.
- V- the request value type.
- R- the reply value type.
- All Implemented Interfaces:
- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- ApplicationContextAware,- Lifecycle,- Phased,- SmartLifecycle,- ComponentSourceAware,- ExpressionCapable,- OrderlyShutdownCapable,- Pausable,- IntegrationPattern,- KafkaInboundEndpoint,- NamedComponent,- IntegrationInboundManagement,- IntegrationManagement,- ManageableLifecycle,- ManageableSmartLifecycle,- TrackableComponent
public class KafkaInboundGateway<K,V,R>  
extends MessagingGatewaySupport
implements KafkaInboundEndpoint, Pausable, OrderlyShutdownCapable
Inbound gateway.
- Since:
- 5.4
- Author:
- Gary Russell, Artem Bilan, Urs Keller
- 
Nested Class SummaryNested classes/interfaces inherited from class org.springframework.integration.gateway.MessagingGatewaySupportMessagingGatewaySupport.ConvertingMessagingTemplateNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields inherited from class org.springframework.integration.gateway.MessagingGatewaySupportmessagingTemplateFields inherited from class org.springframework.integration.endpoint.AbstractEndpointlifecycleCondition, lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpointATTRIBUTES_HOLDER, CONTEXT_ACKNOWLEDGMENT, CONTEXT_CONSUMER, CONTEXT_RECORDFields inherited from interface org.springframework.context.SmartLifecycleDEFAULT_PHASE
- 
Constructor SummaryConstructorsConstructorDescriptionKafkaInboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K, R> kafkaTemplate) Construct an instance with the provided container.
- 
Method SummaryModifier and TypeMethodDescriptionintCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.intCalled before shutdown begins.protected voiddoStart()Subclasses must implement this method with the start behavior.protected voiddoStop()Subclasses must implement this method with the stop behavior.Subclasses may implement this method to provide component type information.protected AttributeAccessorgetErrorMessageAttributes(Message<?> message) Populate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.booleanisPaused()Check if the endpoint is paused.protected voidonInit()Subclasses may implement this for initialization logic.voidpause()Pause the endpoint.voidresume()Resume the endpoint if paused.voidsetBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.voidsetMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.voidsetOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumerfor seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)call from theKafkaMessageListenerContainer.voidsetPayloadType(Class<?> payloadType) When using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create.voidsetRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured).voidsetRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplateinstance to use for retrying deliveries.Methods inherited from class org.springframework.integration.gateway.MessagingGatewaySupportbuildErrorMessage, buildSendTimer, destroy, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getManagedName, getManagedType, getOverrides, getReplyChannel, getRequestChannel, isLoggingEnabled, isObserved, receive, receive, receiveMessage, receiveMessage, registerMetricsCaptor, registerObservationRegistry, registerReplyMessageCorrelatorIfNecessary, send, sendAndReceive, sendAndReceiveMessage, sendAndReceiveMessageReactive, sendTimer, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setErrorOnTimeout, setLoggingEnabled, setManagedName, setManagedType, setObservationConvention, setReceiverObservationConvention, setReplyChannel, setReplyChannelName, setReplyMapper, setReplyTimeout, setRequestChannel, setRequestChannelName, setRequestMapper, setRequestTimeout, setShouldTrackMethods inherited from class org.springframework.integration.endpoint.AbstractEndpointdoStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementgetThisAsMethods inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpointdoWithRetryMethods inherited from interface org.springframework.integration.support.management.ManageableLifecycleisRunning, start, stopMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Constructor Details- 
KafkaInboundGatewaypublic KafkaInboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K, R> kafkaTemplate) Construct an instance with the provided container.- Parameters:
- messageListenerContainer- the container.
- kafkaTemplate- the kafka template.
 
 
- 
- 
Method Details- 
setMessageConverterpublic void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter; must be aRecordMessageConverterorBatchMessageConverterdepending on mode.- Parameters:
- messageConverter- the converter.
 
- 
setPayloadTypeWhen using a type-aware message converter such asStringJsonMessageConverter, set the payload type the converter should create. Defaults toObject.- Parameters:
- payloadType- the type.
 
- 
setRetryTemplatepublic void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplateinstance to use for retrying deliveries.IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms. Instead, consider adding aDefaultErrorHandlerto the listener container, configured with aKafkaErrorSendingMessageRecoverer.- Parameters:
- retryTemplate- the- RetryTemplateto use.
 
- 
setRecoveryCallbackpublic void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallbackinstance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Only used ifsetRetryTemplate(RetryTemplate)is specified. Default is anErrorMessageSendingRecovererif an error channel has been provided. Set to null if you wish to throw the exception back to the container after retries are exhausted.- Parameters:
- recoveryCallback- the recovery callback.
 
- 
setOnPartitionsAssignedSeekCallbackpublic void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumerfor seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)call from theKafkaMessageListenerContainer. This is called from the internalMessagingMessageListenerAdapterimplementation.- Parameters:
- onPartitionsAssignedCallback- the- BiConsumerto use
- See Also:
- 
- ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
 
 
- 
setBindSourceRecordpublic void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.- Parameters:
- bindSourceRecord- true to bind.
 
- 
onInitprotected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
- onInitin class- MessagingGatewaySupport
 
- 
doStartprotected void doStart()Description copied from class:AbstractEndpointSubclasses must implement this method with the start behavior. This method will be invoked while holding theAbstractEndpoint.lifecycleLock.- Overrides:
- doStartin class- MessagingGatewaySupport
 
- 
doStopprotected void doStop()Description copied from class:AbstractEndpointSubclasses must implement this method with the stop behavior. This method will be invoked while holding theAbstractEndpoint.lifecycleLock.- Overrides:
- doStopin class- MessagingGatewaySupport
 
- 
pausepublic void pause()Description copied from interface:PausablePause the endpoint.
- 
resumepublic void resume()Description copied from interface:PausableResume the endpoint if paused.
- 
isPausedpublic boolean isPaused()Description copied from interface:PausableCheck if the endpoint is paused.
- 
getComponentTypeDescription copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
- getComponentTypein interface- NamedComponent
- Overrides:
- getComponentTypein class- MessagingGatewaySupport
 
- 
beforeShutdownpublic int beforeShutdown()Description copied from interface:OrderlyShutdownCapableCalled before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.- Specified by:
- beforeShutdownin interface- OrderlyShutdownCapable
- Returns:
- The number of active messages if available.
 
- 
afterShutdownpublic int afterShutdown()Description copied from interface:OrderlyShutdownCapableCalled after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.- Specified by:
- afterShutdownin interface- OrderlyShutdownCapable
- Returns:
- The number of active messages if available.
 
- 
getErrorMessageAttributesDescription copied from class:MessagingGatewaySupportPopulate anAttributeAccessorto be used when building an error message with theerrorMessageStrategy.- Overrides:
- getErrorMessageAttributesin class- MessagingGatewaySupport
- Parameters:
- message- the message.
- Returns:
- the attributes.
 
 
-