Class KafkaMessageSource<K,​V>

Type Parameters:
K - the key type.
V - the value type.
All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, Lifecycle, MessageSource<Object>, Pausable, IntegrationPattern, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle

public class KafkaMessageSource<K,​V>
extends AbstractMessageSource<Object>
implements Pausable
Polled message source for Apache Kafka. Only one thread can poll for data (or acknowledge a message) at a time.

NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.

Starting with version 3.1.2, this source implements Pausable which allows you to pause and resume the Consumer. While the consumer is paused, you must continue to call AbstractMessageSource.receive() within max.poll.interval.ms, to prevent a rebalance.

Since:
5.4
Author:
Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra
  • Field Details

  • Constructor Details

    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
      Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      See Also:
      KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
      Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up to max.poll.records to be fetched on each poll. When false (default) max.poll.records is coerced to 1 if the consumer factory is a DefaultKafkaConsumerFactory or otherwise rejected with an IllegalArgumentException. IMPORTANT: When true, you must call AbstractMessageSource.receive() at a sufficient rate to consume the number of records received within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive() within max.poll.interval.ms. pause() will not take effect until the records from the previous poll are consumed.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      allowMultiFetch - true to allow max.poll.records > 1.
    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,​V> ackCallbackFactory)
      Construct an instance with the supplied parameters. Fetching multiple records per poll will be disabled.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      ackCallbackFactory - the ack callback factory.
      See Also:
      KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,​V> ackCallbackFactory, boolean allowMultiFetch)
      Construct an instance with the supplied parameters. Set 'allowMultiFetch' to true to allow up to max.poll.records to be fetched on each poll. When false (default) max.poll.records is coerced to 1 if the consumer factory is a DefaultKafkaConsumerFactory or otherwise rejected with an IllegalArgumentException. IMPORTANT: When true, you must call AbstractMessageSource.receive() at a sufficient rate to consume the number of records received within max.poll.interval.ms. When false, you must call AbstractMessageSource.receive() within max.poll.interval.ms. pause() will not take effect until the records from the previous poll are consumed.
      Parameters:
      consumerFactory - the consumer factory.
      consumerProperties - the consumer properties.
      ackCallbackFactory - the ack callback factory.
      allowMultiFetch - true to allow max.poll.records > 1.
  • Method Details