K - the key type.V - the value type.public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements OrderlyShutdownCapable, Pausable
| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaMessageDrivenChannelAdapter.ListenerMode
The listener mode for the container, record or batch.
|
lifecycleCondition, lifecycleLockEXPRESSION_PARSER, loggerDEFAULT_PHASE| Constructor and Description |
|---|
KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
Construct an instance with mode
KafkaMessageDrivenChannelAdapter.ListenerMode.record. |
KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer,
KafkaMessageDrivenChannelAdapter.ListenerMode mode)
Construct an instance with the provided mode.
|
| Modifier and Type | Method and Description |
|---|---|
int |
afterShutdown()
Called after normal shutdown of schedulers, executors etc,
and after the shutdown delay has elapsed, but before any
forced shutdown of any remaining active scheduler/executor
threads.Can optionally return the number of active messages
still in process.
|
int |
beforeShutdown()
Called before shutdown begins.
|
protected void |
doStart()
Take no action by default.
|
protected void |
doStop()
Take no action by default.
|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected AttributeAccessor |
getErrorMessageAttributes(Message<?> message)
Populate an
AttributeAccessor to be used when building an error message
with the errorMessageStrategy. |
boolean |
isPaused()
Check if the endpoint is paused.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
pause()
Pause the endpoint.
|
void |
resume()
Resume the endpoint if paused.
|
void |
setAckDiscarded(boolean ackDiscarded)
A
boolean flag to indicate if FilteringMessageListenerAdapter
should acknowledge discarded records or not. |
void |
setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
Set the message converter to use with a batch-based consumer.
|
void |
setBindSourceRecord(boolean bindSourceRecord)
Set to true to bind the source consumer record in the header named
IntegrationMessageHeaderAccessor.SOURCE_DATA. |
void |
setFilterInRetry(boolean filterInRetry)
The
boolean flag to specify the order how
RetryingMessageListenerAdapter and
FilteringMessageListenerAdapter are wrapped to each other,
if both of them are present. |
void |
setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
Set the message converter; must be a
RecordMessageConverter or
BatchMessageConverter depending on mode. |
void |
setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
Specify a
BiConsumer for seeks management during
ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from the KafkaMessageListenerContainer. |
void |
setPayloadType(Class<?> payloadType)
When using a type-aware message converter (such as
StringJsonMessageConverter,
set the payload type the converter should create. |
void |
setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
Specify a
RecordFilterStrategy to wrap
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener into
FilteringMessageListenerAdapter. |
void |
setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set the message converter to use with a record-based consumer.
|
void |
setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends Object> recoveryCallback)
A
RecoveryCallback instance for retry operation;
if null, the exception will be thrown to the container after retries are exhausted
(unless an error channel is configured). |
void |
setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Specify a
RetryTemplate instance to wrap
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener into
RetryingMessageListenerAdapter. |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherdestroy, doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitisRunning, start, stopgetBeanName, getComponentNamepublic KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
KafkaMessageDrivenChannelAdapter.ListenerMode.record.messageListenerContainer - the container.public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)
messageListenerContainer - the container.mode - the mode.public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
RecordMessageConverter or
BatchMessageConverter depending on mode.messageConverter - the converter.public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
messageConverter - the converter.public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
messageConverter - the converter.public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
RecordFilterStrategy to wrap
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener into
FilteringMessageListenerAdapter.recordFilterStrategy - the RecordFilterStrategy to use.public void setAckDiscarded(boolean ackDiscarded)
boolean flag to indicate if FilteringMessageListenerAdapter
should acknowledge discarded records or not.
Does not make sense if setRecordFilterStrategy(RecordFilterStrategy) isn't specified.ackDiscarded - true to ack (commit offset for) discarded messages.public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
RetryTemplate instance to wrap
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener into
RetryingMessageListenerAdapter.retryTemplate - the RetryTemplate to use.public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends Object> recoveryCallback)
RecoveryCallback instance for retry operation;
if null, the exception will be thrown to the container after retries are exhausted
(unless an error channel is configured).
Does not make sense if setRetryTemplate(RetryTemplate) isn't specified.recoveryCallback - the recovery callback.public void setFilterInRetry(boolean filterInRetry)
boolean flag to specify the order how
RetryingMessageListenerAdapter and
FilteringMessageListenerAdapter are wrapped to each other,
if both of them are present.
Does not make sense if only one of RetryTemplate or
RecordFilterStrategy is present, or any.filterInRetry - the order for RetryingMessageListenerAdapter and
FilteringMessageListenerAdapter wrapping. Defaults to false.public void setPayloadType(Class<?> payloadType)
StringJsonMessageConverter,
set the payload type the converter should create. Defaults to Object.payloadType - the type.public void setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumer<Map<org.apache.kafka.common.TopicPartition,Long>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
BiConsumer for seeks management during
ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
call from the KafkaMessageListenerContainer.
This is called from the internal
MessagingMessageListenerAdapter implementation.onPartitionsAssignedCallback - the BiConsumer to useConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)public void setBindSourceRecord(boolean bindSourceRecord)
IntegrationMessageHeaderAccessor.SOURCE_DATA.
Does not apply to batch listeners.bindSourceRecord - true to bind.public String getComponentType()
IntegrationObjectSupportgetComponentType in interface NamedComponentgetComponentType in class IntegrationObjectSupportprotected void onInit()
IntegrationObjectSupportonInit in class MessageProducerSupportprotected void doStart()
MessageProducerSupportdoStart in class MessageProducerSupportprotected void doStop()
MessageProducerSupportdoStop in class MessageProducerSupportpublic void pause()
Pausablepublic void resume()
Pausablepublic boolean isPaused()
Pausablepublic int beforeShutdown()
OrderlyShutdownCapablebeforeShutdown in interface OrderlyShutdownCapablepublic int afterShutdown()
OrderlyShutdownCapableafterShutdown in interface OrderlyShutdownCapableprotected AttributeAccessor getErrorMessageAttributes(Message<?> message)
MessageProducerSupportAttributeAccessor to be used when building an error message
with the errorMessageStrategy.getErrorMessageAttributes in class MessageProducerSupportmessage - the message.