Class KafkaMessageSource<K,V>

Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanClassLoaderAware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, Lifecycle, MessageSource<Object>, Pausable, IntegrationPattern, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle

public class KafkaMessageSource<K,V> extends AbstractMessageSource<Object> implements Pausable, BeanClassLoaderAware
Polled message source for Apache Kafka. Only one thread can poll for data (or acknowledge a message) at a time.

NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.

Starting with version 3.1.2, this source implements Pausable which allows you to pause and resume the Consumer. While the consumer is paused, you must continue to call AbstractMessageSource.receive() within max.poll.interval.ms, to prevent a rebalance.

Since:
5.4
Author:
Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra, Christian Tzolov, Ngoc Nhan
  • Field Details Link icon

    • REMAINING_RECORDS Link icon

      public static final String REMAINING_RECORDS
      The number of records remaining from the previous poll.
      Since:
      3.2
      See Also:
    • newAssignment Link icon

      public volatile boolean newAssignment
  • Constructor Details Link icon

    • KafkaMessageSource Link icon

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
      Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      See Also:
    • KafkaMessageSource Link icon

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
      Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up to max.poll.records to be fetched on each poll. When false (default) max.poll.records is coerced to 1 if the consumer factory is a DefaultKafkaConsumerFactory or otherwise rejected with an IllegalArgumentException. IMPORTANT: When true, you must call AbstractMessageSource.receive() at a sufficient rate to consume the number of records received within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive() within max.poll.interval.ms. pause() will not take effect until the records from the previous poll are consumed.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      allowMultiFetch - true to allow max.poll.records > 1.
    • KafkaMessageSource Link icon

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
      Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      ackCallbackFactory - the ack callback factory.
      See Also:
    • KafkaMessageSource Link icon

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)
      Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up to max.poll.records to be fetched on each poll. When false (default) max.poll.records is coerced to 1 if the consumer factory is a DefaultKafkaConsumerFactory or otherwise rejected with an IllegalArgumentException. IMPORTANT: When true, you must call AbstractMessageSource.receive() at a sufficient rate to consume the number of records received within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive() within max.poll.interval.ms. pause() will not take effect until the records from the previous poll are consumed.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      ackCallbackFactory - the ack callback factory.
      allowMultiFetch - true to allow max.poll.records > 1.
  • Method Details Link icon

    • getAssignedPartitions Link icon

      public Collection<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
      Return the currently assigned partitions.
      Returns:
      the partitions.
    • setBeanClassLoader Link icon

      public void setBeanClassLoader(ClassLoader classLoader)
      Specified by:
      setBeanClassLoader in interface BeanClassLoaderAware
    • onInit Link icon

      protected void onInit()
      Overrides:
      onInit in class AbstractExpressionEvaluator
    • getConsumerProperties Link icon

      public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
      Get a reference to the configured consumer properties; allows further customization of the properties before the source is started.
      Returns:
      the properties.
    • getGroupId Link icon

      protected String getGroupId()
    • getClientId Link icon

      protected String getClientId()
    • getPollTimeout Link icon

      protected long getPollTimeout()
    • getMessageConverter Link icon

      protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
    • setMessageConverter Link icon

      public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      Set the message converter to replace the default MessagingMessageConverter.
      Parameters:
      messageConverter - the converter.
    • getPayloadType Link icon

      protected Class<?> getPayloadType()
    • setPayloadType Link icon

      public void setPayloadType(Class<?> payloadType)
      Set the payload type. Only applies if a type-aware message converter is provided.
      Parameters:
      payloadType - the type to convert to.
    • getRebalanceListener Link icon

      protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
    • getComponentType Link icon

      public String getComponentType()
      Specified by:
      getComponentType in interface NamedComponent
    • isRawMessageHeader Link icon

      protected boolean isRawMessageHeader()
    • setRawMessageHeader Link icon

      public void setRawMessageHeader(boolean rawMessageHeader)
      Set to true to include the raw ConsumerRecord as headers with keys KafkaHeaders.RAW_DATA and IntegrationMessageHeaderAccessor.SOURCE_DATA. enabling callers to have access to the record to process errors.
      Parameters:
      rawMessageHeader - true to include the header.
    • getCommitTimeout Link icon

      protected Duration getCommitTimeout()
    • setCloseTimeout Link icon

      public void setCloseTimeout(Duration closeTimeout)
      Set the close timeout - default 30 seconds.
      Parameters:
      closeTimeout - the close timeout.
    • isRunning Link icon

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • start Link icon

      public void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • stop Link icon

      public void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • pause Link icon

      public void pause()
      Description copied from interface: Pausable
      Pause the endpoint.
      Specified by:
      pause in interface Pausable
    • resume Link icon

      public void resume()
      Description copied from interface: Pausable
      Resume the endpoint if paused.
      Specified by:
      resume in interface Pausable
    • isPaused Link icon

      public boolean isPaused()
      Description copied from interface: Pausable
      Check if the endpoint is paused.
      Specified by:
      isPaused in interface Pausable
      Returns:
      true if paused.
    • doReceive Link icon

      protected Object doReceive()
      Description copied from class: AbstractMessageSource
      Subclasses must implement this method. Typically the returned value will be the payload of type T, but the returned value may also be a Message instance whose payload is of type T; also can be AbstractIntegrationMessageBuilder which is used for additional headers population.
      Specified by:
      doReceive in class AbstractMessageSource<Object>
      Returns:
      The value returned.
    • createConsumer Link icon

      protected void createConsumer()
    • destroy Link icon

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface IntegrationManagement
      Overrides:
      destroy in class AbstractMessageSource<Object>