Class KafkaItemReader<K,V>

All Implemented Interfaces:
ItemReader<V>, ItemStream, ItemStreamReader<V>

public class KafkaItemReader<K,V> extends AbstractItemStreamItemReader<V>

An ItemReader implementation for Apache Kafka. Uses a KafkaConsumer to read data from a given topic. Multiple partitions within the same topic can be assigned to this reader.

Since KafkaConsumer is not thread-safe, this reader is not thread-safe.

Since:
4.2
Author:
Mathieu Ouellet, Mahmoud Ben Hassine
  • Constructor Details

    • KafkaItemReader

      public KafkaItemReader(Properties consumerProperties, String topicName, Integer... partitions)
      Create a new KafkaItemReader.

      consumerProperties must contain the following keys: 'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer'

      .
      Parameters:
      consumerProperties - properties of the consumer
      topicName - name of the topic to read data from
      partitions - list of partitions to read data from
    • KafkaItemReader

      public KafkaItemReader(Properties consumerProperties, String topicName, List<Integer> partitions)
      Create a new KafkaItemReader.

      consumerProperties must contain the following keys: 'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer'

      .
      Parameters:
      consumerProperties - properties of the consumer
      topicName - name of the topic to read data from
      partitions - list of partitions to read data from
  • Method Details

    • setPollTimeout

      public void setPollTimeout(Duration pollTimeout)
      Set a timeout for the consumer topic polling duration. Default to 30 seconds.
      Parameters:
      pollTimeout - for the consumer poll operation
    • setSaveState

      public void setSaveState(boolean saveState)
      Set the flag that determines whether to save internal data for ExecutionContext. Only switch this to false if you don't want to save any state from this stream, and you don't need it to be restartable. Always set it to false if the reader is being used in a concurrent environment.
      Parameters:
      saveState - flag value (default true).
    • isSaveState

      public boolean isSaveState()
      The flag that determines whether to save internal state for restarts.
      Returns:
      true if the flag was set
    • setPartitionOffsets

      public void setPartitionOffsets(Map<org.apache.kafka.common.TopicPartition,Long> partitionOffsets)
      Setter for partition offsets. This mapping tells the reader the offset to start reading from in each partition. This is optional, defaults to starting from offset 0 in each partition. Passing an empty map makes the reader start from the offset stored in Kafka for the consumer group ID.

      In case of a restart, offsets stored in the execution context will take precedence.

      Parameters:
      partitionOffsets - mapping of starting offset in each partition
    • open

      public void open(ExecutionContext executionContext)
      Description copied from interface: ItemStream
      Open the stream for the provided ExecutionContext.
      Parameters:
      executionContext - current step's ExecutionContext. Will be the executionContext from the last run of the step on a restart.
    • read

      @Nullable public V read()
      Description copied from interface: ItemReader
      Reads a piece of input data and advance to the next one. Implementations must return null at the end of the input data set. In a transactional setting, caller might get the same item twice from successive calls (or otherwise), if the first call was in a transaction that rolled back.
      Returns:
      T the item to be processed or null if the data source is exhausted
    • update

      public void update(ExecutionContext executionContext)
      Description copied from interface: ItemStream
      Indicates that the execution context provided during open is about to be saved. If any state is remaining, but has not been put in the context, it should be added here.
      Parameters:
      executionContext - to be updated
    • close

      public void close()
      Description copied from interface: ItemStream
      If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been called all other methods (except open) may throw an exception.