Class KafkaInboundGateway<K,V,R>
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.gateway.MessagingGatewaySupport
org.springframework.integration.kafka.inbound.KafkaInboundGateway<K,V,R>
- Type Parameters:
K
- the key type.V
- the request value type.R
- the reply value type.
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,Lifecycle
,Phased
,SmartLifecycle
,ExpressionCapable
,OrderlyShutdownCapable
,Pausable
,IntegrationPattern
,KafkaInboundEndpoint
,NamedComponent
,IntegrationInboundManagement
,IntegrationManagement
,ManageableLifecycle
,ManageableSmartLifecycle
,TrackableComponent
public class KafkaInboundGateway<K,V,R>
extends MessagingGatewaySupport
implements KafkaInboundEndpoint, Pausable, OrderlyShutdownCapable
Inbound gateway.
- Since:
- 5.4
- Author:
- Gary Russell, Artem Bilan, Urs Keller
-
Nested Class Summary
Nested classes/interfaces inherited from class org.springframework.integration.gateway.MessagingGatewaySupport
MessagingGatewaySupport.ConvertingMessagingTemplate
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.gateway.MessagingGatewaySupport
messagingTemplate
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpoint
CONTEXT_ACKNOWLEDGMENT, CONTEXT_CONSUMER, CONTEXT_RECORD
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorDescriptionKafkaInboundGateway
(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K, R> kafkaTemplate) Construct an instance with the provided container. -
Method Summary
Modifier and TypeMethodDescriptionint
Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.int
Called before shutdown begins.protected void
doStart()
Subclasses must implement this method with the start behavior.protected void
doStop()
Subclasses must implement this method with the stop behavior.Subclasses may implement this method to provide component type information.protected AttributeAccessor
getErrorMessageAttributes
(Message<?> message) Populate anAttributeAccessor
to be used when building an error message with theerrorMessageStrategy
.boolean
isPaused()
Check if the endpoint is paused.protected void
onInit()
Subclasses may implement this for initialization logic.void
pause()
Pause the endpoint.void
resume()
Resume the endpoint if paused.void
setBindSourceRecord
(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA
.void
setMessageConverter
(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter; must be aRecordMessageConverter
orBatchMessageConverter
depending on mode.void
setOnPartitionsAssignedSeekCallback
(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumer
for seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from theKafkaMessageListenerContainer
.void
setPayloadType
(Class<?> payloadType) When using a type-aware message converter such asStringJsonMessageConverter
, set the payload type the converter should create.void
setRecoveryCallback
(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallback
instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured).void
setRetryTemplate
(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplate
instance to use for retrying deliveries.Methods inherited from class org.springframework.integration.gateway.MessagingGatewaySupport
buildErrorMessage, buildSendTimer, destroy, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getManagedName, getManagedType, getOverrides, getReplyChannel, getRequestChannel, isLoggingEnabled, isObserved, receive, receive, receiveMessage, receiveMessage, registerMetricsCaptor, registerObservationRegistry, registerReplyMessageCorrelatorIfNecessary, send, sendAndReceive, sendAndReceiveMessage, sendAndReceiveMessageReactive, sendTimer, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setErrorOnTimeout, setLoggingEnabled, setManagedName, setManagedType, setObservationConvention, setReceiverObservationConvention, setReplyChannel, setReplyChannelName, setReplyMapper, setReplyTimeout, setRequestChannel, setRequestChannelName, setRequestMapper, setRequestTimeout, setShouldTrack
Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs
Methods inherited from interface org.springframework.integration.kafka.inbound.KafkaInboundEndpoint
doWithRetry
Methods inherited from interface org.springframework.integration.support.management.ManageableLifecycle
isRunning, start, stop
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
KafkaInboundGateway
public KafkaInboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K, R> kafkaTemplate) Construct an instance with the provided container.- Parameters:
messageListenerContainer
- the container.kafkaTemplate
- the kafka template.
-
-
Method Details
-
setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) Set the message converter; must be aRecordMessageConverter
orBatchMessageConverter
depending on mode.- Parameters:
messageConverter
- the converter.
-
setPayloadType
When using a type-aware message converter such asStringJsonMessageConverter
, set the payload type the converter should create. Defaults toObject
.- Parameters:
payloadType
- the type.
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) Specify aRetryTemplate
instance to use for retrying deliveries.IMPORTANT: This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the
max.poll.interval.ms
. Instead, consider adding aDefaultErrorHandler
to the listener container, configured with aKafkaErrorSendingMessageRecoverer
.- Parameters:
retryTemplate
- theRetryTemplate
to use.
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) ARecoveryCallback
instance for retry operation; if null, the exception will be thrown to the container after retries are exhausted (unless an error channel is configured). Only used ifsetRetryTemplate(RetryTemplate)
is specified. Default is anErrorMessageSendingRecoverer
if an error channel has been provided. Set to null if you wish to throw the exception back to the container after retries are exhausted.- Parameters:
recoveryCallback
- the recovery callback.
-
setOnPartitionsAssignedSeekCallback
public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<org.apache.kafka.common.TopicPartition, Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) Specify aBiConsumer
for seeks management duringConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from theKafkaMessageListenerContainer
. This is called from the internalMessagingMessageListenerAdapter
implementation.- Parameters:
onPartitionsAssignedCallback
- theBiConsumer
to use- See Also:
-
ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
-
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA
.- Parameters:
bindSourceRecord
- true to bind.
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classMessagingGatewaySupport
-
doStart
protected void doStart()Description copied from class:AbstractEndpoint
Subclasses must implement this method with the start behavior. This method will be invoked while holding theAbstractEndpoint.lifecycleLock
.- Overrides:
doStart
in classMessagingGatewaySupport
-
doStop
protected void doStop()Description copied from class:AbstractEndpoint
Subclasses must implement this method with the stop behavior. This method will be invoked while holding theAbstractEndpoint.lifecycleLock
.- Overrides:
doStop
in classMessagingGatewaySupport
-
pause
public void pause()Description copied from interface:Pausable
Pause the endpoint. -
resume
public void resume()Description copied from interface:Pausable
Resume the endpoint if paused. -
isPaused
public boolean isPaused()Description copied from interface:Pausable
Check if the endpoint is paused. -
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classMessagingGatewaySupport
-
beforeShutdown
public int beforeShutdown()Description copied from interface:OrderlyShutdownCapable
Called before shutdown begins. Implementations should stop accepting new messages. Can optionally return the number of active messages in process.- Specified by:
beforeShutdown
in interfaceOrderlyShutdownCapable
- Returns:
- The number of active messages if available.
-
afterShutdown
public int afterShutdown()Description copied from interface:OrderlyShutdownCapable
Called after normal shutdown of schedulers, executors etc, and after the shutdown delay has elapsed, but before any forced shutdown of any remaining active scheduler/executor threads.Can optionally return the number of active messages still in process.- Specified by:
afterShutdown
in interfaceOrderlyShutdownCapable
- Returns:
- The number of active messages if available.
-
getErrorMessageAttributes
Description copied from class:MessagingGatewaySupport
Populate anAttributeAccessor
to be used when building an error message with theerrorMessageStrategy
.- Overrides:
getErrorMessageAttributes
in classMessagingGatewaySupport
- Parameters:
message
- the message.- Returns:
- the attributes.
-