public class FaultTolerantChunkProcessor<I,O> extends SimpleChunkProcessor<I,O>
ChunkProcessor
interface, that
allows for skipping or retry of items that cause exceptions during writing.Constructor and Description |
---|
FaultTolerantChunkProcessor(ItemProcessor<? super I,? extends O> itemProcessor,
ItemWriter<? super O> itemWriter,
BatchRetryTemplate batchRetryTemplate) |
Modifier and Type | Method and Description |
---|---|
protected Chunk<O> |
getAdjustedOutputs(Chunk<I> inputs,
Chunk<O> outputs)
Extension point for subclasses that want to adjust the outputs based on
additional saved data in the inputs.
|
protected int |
getFilterCount(Chunk<I> inputs,
Chunk<O> outputs)
Extension point for subclasses to calculate the filter count.
|
protected void |
initializeUserData(Chunk<I> inputs)
Extension point for subclasses to allow them to memorise the contents of
the inputs, in case they are needed for accounting purposes later.
|
protected boolean |
isComplete(Chunk<I> inputs)
Extension point for subclasses that want to store additional data in the
inputs.
|
void |
setBuffering(boolean buffering)
A flag to indicate that items have been buffered and therefore will
always come back as a chunk after a rollback.
|
void |
setChunkMonitor(ChunkMonitor chunkMonitor) |
void |
setKeyGenerator(KeyGenerator keyGenerator)
The
KeyGenerator to use to identify failed items across rollback. |
void |
setProcessorTransactional(boolean processorTransactional)
Flag to say that the
ItemProcessor is transactional (defaults to
true). |
void |
setProcessSkipPolicy(SkipPolicy SkipPolicy) |
void |
setRollbackClassifier(org.springframework.classify.Classifier<java.lang.Throwable,java.lang.Boolean> rollbackClassifier)
A classifier that can distinguish between exceptions that cause rollback
(return true) or not (return false).
|
void |
setWriteSkipPolicy(SkipPolicy SkipPolicy) |
protected Chunk<O> |
transform(StepContribution contribution,
Chunk<I> inputs) |
protected void |
write(StepContribution contribution,
Chunk<I> inputs,
Chunk<O> outputs)
Simple implementation delegates to the
SimpleChunkProcessor.doWrite(List) method and
increments the write count in the contribution. |
afterPropertiesSet, doAfterWrite, doOnWriteError, doProcess, doWrite, getListener, process, registerListener, setItemProcessor, setItemWriter, setListeners, writeItems
public FaultTolerantChunkProcessor(ItemProcessor<? super I,? extends O> itemProcessor, ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate)
public void setKeyGenerator(KeyGenerator keyGenerator)
KeyGenerator
to use to identify failed items across rollback.
Not used in the case of the buffering flag
being true (the default).keyGenerator
- the KeyGenerator
to setpublic void setProcessSkipPolicy(SkipPolicy SkipPolicy)
SkipPolicy
- the SkipPolicy
for item processingpublic void setWriteSkipPolicy(SkipPolicy SkipPolicy)
SkipPolicy
- the SkipPolicy
for item writingpublic void setRollbackClassifier(org.springframework.classify.Classifier<java.lang.Throwable,java.lang.Boolean> rollbackClassifier)
rollbackClassifier
- classifierpublic void setChunkMonitor(ChunkMonitor chunkMonitor)
chunkMonitor
- monitorpublic void setBuffering(boolean buffering)
buffering
- true if items will be bufferedpublic void setProcessorTransactional(boolean processorTransactional)
ItemProcessor
is transactional (defaults to
true). If false then the processor is only called once per item per
chunk, even if there are rollbacks with retries and skips.processorTransactional
- the flag value to setprotected void initializeUserData(Chunk<I> inputs)
SimpleChunkProcessor
SimpleChunkProcessor.isComplete(Chunk)
, SimpleChunkProcessor.getFilterCount(Chunk, Chunk)
and
SimpleChunkProcessor.getAdjustedOutputs(Chunk, Chunk)
might also need to be, to
ensure that the user data is handled consistently.initializeUserData
in class SimpleChunkProcessor<I,O>
inputs
- the inputs for the processprotected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs)
SimpleChunkProcessor
getFilterCount
in class SimpleChunkProcessor<I,O>
inputs
- the inputs after transformationoutputs
- the outputs after transformationSimpleChunkProcessor.initializeUserData(Chunk)
protected boolean isComplete(Chunk<I> inputs)
SimpleChunkProcessor
isComplete
in class SimpleChunkProcessor<I,O>
inputs
- the input chunkSimpleChunkProcessor.initializeUserData(Chunk)
protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs)
SimpleChunkProcessor
getAdjustedOutputs
in class SimpleChunkProcessor<I,O>
inputs
- the inputs for the transformationoutputs
- the result of the transformationSimpleChunkProcessor.initializeUserData(Chunk)
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws java.lang.Exception
transform
in class SimpleChunkProcessor<I,O>
java.lang.Exception
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws java.lang.Exception
SimpleChunkProcessor
SimpleChunkProcessor.doWrite(List)
method and
increments the write count in the contribution. Subclasses can handle
more complicated scenarios, e.g.with fault tolerance. If output items are
skipped they should be removed from the inputs as well.write
in class SimpleChunkProcessor<I,O>
contribution
- the current step contributioninputs
- the inputs that gave rise to the outputsoutputs
- the outputs to writejava.lang.Exception
- if there is a problem