K - the key type.V - the value type.public class KafkaMessageSource<K,V> extends AbstractMessageSource<Object> implements Pausable
NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.
Starting with version 3.1.2, this source implements Pausable which
allows you to pause and resume the Consumer. While the consumer is
paused, you must continue to call AbstractMessageSource.receive() within
max.poll.interval.ms, to prevent a rebalance.
| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaMessageSource.KafkaAckCallback<K,V>
AcknowledgmentCallback for Kafka.
|
static class |
KafkaMessageSource.KafkaAckCallbackFactory<K,V>
AcknowledgmentCallbackFactory for KafkaAckInfo.
|
static interface |
KafkaMessageSource.KafkaAckInfo<K,V>
Information for building an KafkaAckCallback.
|
class |
KafkaMessageSource.KafkaAckInfoImpl
Information for building an KafkaAckCallback.
|
IntegrationManagement.ManagementOverrides| Modifier and Type | Field and Description |
|---|---|
static String |
REMAINING_RECORDS
The number of records remaining from the previous poll.
|
EXPRESSION_PARSER, loggerMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME| Constructor and Description |
|---|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
org.springframework.kafka.listener.ConsumerProperties consumerProperties)
Construct an instance with the supplied parameters.
|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
org.springframework.kafka.listener.ConsumerProperties consumerProperties,
boolean allowMultiFetch)
Construct an instance with the supplied parameters.
|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
org.springframework.kafka.listener.ConsumerProperties consumerProperties,
KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
Construct an instance with the supplied parameters.
|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
org.springframework.kafka.listener.ConsumerProperties consumerProperties,
KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory,
boolean allowMultiFetch)
Construct an instance with the supplied parameters.
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
createConsumer() |
void |
destroy() |
protected Object |
doReceive()
Subclasses must implement this method.
|
Collection<org.apache.kafka.common.TopicPartition> |
getAssignedPartitions()
Return the currently assigned partitions.
|
protected String |
getClientId() |
protected java.time.Duration |
getCommitTimeout() |
String |
getComponentType() |
org.springframework.kafka.listener.ConsumerProperties |
getConsumerProperties()
Get a reference to the configured consumer properties; allows further
customization of the properties before the source is started.
|
protected String |
getGroupId() |
protected org.springframework.kafka.support.converter.RecordMessageConverter |
getMessageConverter() |
protected Class<?> |
getPayloadType() |
protected long |
getPollTimeout() |
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener |
getRebalanceListener() |
boolean |
isPaused()
Check if the endpoint is paused.
|
protected boolean |
isRawMessageHeader() |
boolean |
isRunning() |
protected void |
onInit() |
void |
pause()
Pause the endpoint.
|
void |
resume()
Resume the endpoint if paused.
|
void |
setCloseTimeout(java.time.Duration closeTimeout)
Set the close timeout - default 30 seconds.
|
void |
setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set the message converter to replace the default
MessagingMessageConverter. |
void |
setPayloadType(Class<?> payloadType)
Set the payload type.
|
void |
setRawMessageHeader(boolean rawMessageHeader)
Set to true to include the raw
ConsumerRecord as headers with keys
KafkaHeaders.RAW_DATA and
IntegrationMessageHeaderAccessor.SOURCE_DATA. |
void |
start() |
void |
stop() |
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedTypeafterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionServiceclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitgetIntegrationPatternTypegetThisAspublic static final String REMAINING_RECORDS
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
consumerFactory - the consumer factory.consumerProperties - the consumer properties.KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
max.poll.records to be fetched on each poll. When false
(default) max.poll.records is coerced to 1 if the consumer factory is a
DefaultKafkaConsumerFactory or otherwise rejected with an
IllegalArgumentException. IMPORTANT: When true, you must call
AbstractMessageSource.receive() at a sufficient rate to consume the number of records received
within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive()
within max.poll.interval.ms. pause() will not take effect until
the records from the previous poll are consumed.consumerFactory - the consumer factory.consumerProperties - the consumer properties.allowMultiFetch - true to allow max.poll.records > 1.public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
consumerFactory - the consumer factory.consumerProperties - the consumer properties.ackCallbackFactory - the ack callback factory.KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)
max.poll.records to be fetched on each poll. When false
(default) max.poll.records is coerced to 1 if the consumer factory is a
DefaultKafkaConsumerFactory or otherwise rejected with an
IllegalArgumentException. IMPORTANT: When true, you must call
AbstractMessageSource.receive() at a sufficient rate to consume the number of records received
within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive()
within max.poll.interval.ms. pause() will not take effect until
the records from the previous poll are consumed.consumerFactory - the consumer factory.consumerProperties - the consumer properties.ackCallbackFactory - the ack callback factory.allowMultiFetch - true to allow max.poll.records > 1.public Collection<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
protected void onInit()
onInit in class AbstractExpressionEvaluatorpublic org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
protected String getGroupId()
protected String getClientId()
protected long getPollTimeout()
protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
MessagingMessageConverter.messageConverter - the converter.protected Class<?> getPayloadType()
public void setPayloadType(Class<?> payloadType)
payloadType - the type to convert to.protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
public String getComponentType()
getComponentType in interface NamedComponentprotected boolean isRawMessageHeader()
public void setRawMessageHeader(boolean rawMessageHeader)
ConsumerRecord as headers with keys
KafkaHeaders.RAW_DATA and
IntegrationMessageHeaderAccessor.SOURCE_DATA. enabling callers to have
access to the record to process errors.rawMessageHeader - true to include the header.protected java.time.Duration getCommitTimeout()
public void setCloseTimeout(java.time.Duration closeTimeout)
closeTimeout - the close timeout.public boolean isRunning()
isRunning in interface LifecycleisRunning in interface ManageableLifecyclepublic void start()
start in interface Lifecyclestart in interface ManageableLifecyclepublic void stop()
stop in interface Lifecyclestop in interface ManageableLifecyclepublic void pause()
Pausablepublic void resume()
Pausablepublic boolean isPaused()
Pausableprotected Object doReceive()
AbstractMessageSourcepayload of
type T, but the returned value may also be a Message instance whose payload is of type T;
also can be AbstractIntegrationMessageBuilder which is used for additional headers population.doReceive in class AbstractMessageSource<Object>protected void createConsumer()
public void destroy()
destroy in interface DisposableBeandestroy in interface IntegrationManagementdestroy in class AbstractMessageSource<Object>