K
- the key type.V
- the request value type.R
- the reply value type.public class KafkaInboundGateway<K,V,R> extends MessagingGatewaySupport implements Pausable, OrderlyShutdownCapable
IntegrationManagement.ManagementOverrides
messagingTemplate
lifecycleCondition, lifecycleLock
EXPRESSION_PARSER, logger
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
DEFAULT_PHASE
Constructor and Description |
---|
KafkaInboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer,
org.springframework.kafka.core.KafkaTemplate<K,R> kafkaTemplate)
Construct an instance with the provided container.
|
Modifier and Type | Method and Description |
---|---|
int |
afterShutdown()
Called after normal shutdown of schedulers, executors etc,
and after the shutdown delay has elapsed, but before any
forced shutdown of any remaining active scheduler/executor
threads.Can optionally return the number of active messages
still in process.
|
int |
beforeShutdown()
Called before shutdown begins.
|
protected void |
doStart()
Subclasses must implement this method with the start behavior.
|
protected void |
doStop()
Subclasses must implement this method with the stop behavior.
|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected AttributeAccessor |
getErrorMessageAttributes(Message<?> message)
Populate an
AttributeAccessor to be used when building an error message
with the errorMessageStrategy . |
boolean |
isPaused()
Check if the endpoint is paused.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
pause()
Pause the endpoint.
|
void |
resume()
Resume the endpoint if paused.
|
void |
setBindSourceRecord(boolean bindSourceRecord)
Set to true to bind the source consumer record in the header named
IntegrationMessageHeaderAccessor.SOURCE_DATA . |
void |
setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set the message converter; must be a
RecordMessageConverter or
BatchMessageConverter depending on mode. |
void |
setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
Specify a
BiConsumer for seeks management during
ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from the KafkaMessageListenerContainer . |
void |
setPayloadType(Class<?> payloadType)
When using a type-aware message converter (such as
StringJsonMessageConverter ,
set the payload type the converter should create. |
void |
setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
A
RecoveryCallback instance for retry operation;
if null, the exception will be thrown to the container after retries are exhausted
(unless an error channel is configured). |
void |
setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Specify a
RetryTemplate instance to wrap
KafkaInboundGateway.IntegrationRecordMessageListener into
RetryingMessageListenerAdapter . |
buildErrorMessage, buildSendTimer, destroy, getErrorChannel, getIntegrationPatternType, getManagedName, getManagedType, getOverrides, getReplyChannel, getRequestChannel, isLoggingEnabled, receive, receive, receiveMessage, receiveMessage, registerMetricsCaptor, registerReplyMessageCorrelatorIfNecessary, send, sendAndReceive, sendAndReceiveMessage, sendAndReceiveMessageReactive, sendTimer, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setErrorOnTimeout, setLoggingEnabled, setManagedName, setManagedType, setReplyChannel, setReplyChannelName, setReplyMapper, setReplyTimeout, setRequestChannel, setRequestChannelName, setRequestMapper, setRequestTimeout, setShouldTrack
doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
isRunning, start, stop
getThisAs
getBeanName, getComponentName
public KafkaInboundGateway(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, org.springframework.kafka.core.KafkaTemplate<K,R> kafkaTemplate)
messageListenerContainer
- the container.kafkaTemplate
- the kafka template.public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
RecordMessageConverter
or
BatchMessageConverter
depending on mode.messageConverter
- the converter.public void setPayloadType(Class<?> payloadType)
StringJsonMessageConverter
,
set the payload type the converter should create. Defaults to Object
.payloadType
- the type.public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
RetryTemplate
instance to wrap
KafkaInboundGateway.IntegrationRecordMessageListener
into
RetryingMessageListenerAdapter
.retryTemplate
- the RetryTemplate
to use.public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
RecoveryCallback
instance for retry operation;
if null, the exception will be thrown to the container after retries are exhausted
(unless an error channel is configured).
Does not make sense if setRetryTemplate(RetryTemplate)
isn't specified.recoveryCallback
- the recovery callback.public void setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
BiConsumer
for seeks management during
ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from the KafkaMessageListenerContainer
.
This is called from the internal
MessagingMessageListenerAdapter
implementation.onPartitionsAssignedCallback
- the BiConsumer
to useConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
public void setBindSourceRecord(boolean bindSourceRecord)
IntegrationMessageHeaderAccessor.SOURCE_DATA
.bindSourceRecord
- true to bind.protected void onInit()
IntegrationObjectSupport
onInit
in class MessagingGatewaySupport
protected void doStart()
AbstractEndpoint
AbstractEndpoint.lifecycleLock
.doStart
in class MessagingGatewaySupport
protected void doStop()
AbstractEndpoint
AbstractEndpoint.lifecycleLock
.doStop
in class MessagingGatewaySupport
public void pause()
Pausable
public void resume()
Pausable
public boolean isPaused()
Pausable
public String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class MessagingGatewaySupport
public int beforeShutdown()
OrderlyShutdownCapable
beforeShutdown
in interface OrderlyShutdownCapable
public int afterShutdown()
OrderlyShutdownCapable
afterShutdown
in interface OrderlyShutdownCapable
protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
MessagingGatewaySupport
AttributeAccessor
to be used when building an error message
with the errorMessageStrategy
.getErrorMessageAttributes
in class MessagingGatewaySupport
message
- the message.