public class KafkaItemReader<K,V> extends AbstractItemStreamItemReader<V>
An ItemReader
implementation for Apache Kafka.
Uses a KafkaConsumer
to read data from a given topic.
Multiple partitions within the same topic can be assigned to this reader.
Since KafkaConsumer
is not thread-safe, this reader is not thread-safe.
Constructor and Description |
---|
KafkaItemReader(java.util.Properties consumerProperties,
java.lang.String topicName,
java.lang.Integer... partitions)
Create a new
KafkaItemReader . |
KafkaItemReader(java.util.Properties consumerProperties,
java.lang.String topicName,
java.util.List<java.lang.Integer> partitions)
Create a new
KafkaItemReader . |
Modifier and Type | Method and Description |
---|---|
void |
close()
No-op.
|
boolean |
isSaveState()
The flag that determines whether to save internal state for restarts.
|
void |
open(ExecutionContext executionContext)
No-op.
|
V |
read()
Reads a piece of input data and advance to the next one.
|
void |
setPartitionOffsets(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> partitionOffsets)
Setter for partition offsets.
|
void |
setPollTimeout(java.time.Duration pollTimeout)
Set a timeout for the consumer topic polling duration.
|
void |
setSaveState(boolean saveState)
Set the flag that determines whether to save internal data for
ExecutionContext . |
void |
update(ExecutionContext executionContext)
Return empty
ExecutionContext . |
getExecutionContextKey, getName, setExecutionContextName, setName
public KafkaItemReader(java.util.Properties consumerProperties, java.lang.String topicName, java.lang.Integer... partitions)
KafkaItemReader
.
consumerProperties
must contain the following keys:
'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer'
consumerProperties
- properties of the consumertopicName
- name of the topic to read data frompartitions
- list of partitions to read data frompublic KafkaItemReader(java.util.Properties consumerProperties, java.lang.String topicName, java.util.List<java.lang.Integer> partitions)
KafkaItemReader
.
consumerProperties
must contain the following keys:
'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer'
consumerProperties
- properties of the consumertopicName
- name of the topic to read data frompartitions
- list of partitions to read data frompublic void setPollTimeout(java.time.Duration pollTimeout)
pollTimeout
- for the consumer poll operationpublic void setSaveState(boolean saveState)
ExecutionContext
. Only switch this to false if you don't want to
save any state from this stream, and you don't need it to be restartable.
Always set it to false if the reader is being used in a concurrent
environment.saveState
- flag value (default true).public boolean isSaveState()
public void setPartitionOffsets(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> partitionOffsets)
In case of a restart, offsets stored in the execution context will take precedence.
partitionOffsets
- mapping of starting offset in each partitionpublic void open(ExecutionContext executionContext)
ItemStreamSupport
open
in interface ItemStream
open
in class ItemStreamSupport
executionContext
- current step's ExecutionContext
. Will be the
executionContext from the last run of the step on a restart.ItemStream.open(ExecutionContext)
@Nullable public V read()
ItemReader
null
at the end of the input
data set. In a transactional setting, caller might get the same item
twice from successive calls (or otherwise), if the first call was in a
transaction that rolled back.null
if the data source is
exhaustedpublic void update(ExecutionContext executionContext)
ItemStreamSupport
ExecutionContext
.update
in interface ItemStream
update
in class ItemStreamSupport
executionContext
- to be updatedItemStream.update(ExecutionContext)
public void close()
ItemStreamSupport
close
in interface ItemStream
close
in class ItemStreamSupport
ItemStream.close()