Class FaultTolerantChunkProcessor<I,O>
java.lang.Object
org.springframework.batch.core.step.item.SimpleChunkProcessor<I,O>
org.springframework.batch.core.step.item.FaultTolerantChunkProcessor<I,O>
- All Implemented Interfaces:
ChunkProcessor<I>,org.springframework.beans.factory.InitializingBean
FaultTolerant implementation of the
ChunkProcessor interface, that allows for
skipping or retry of items that cause exceptions during writing.-
Field Summary
Fields inherited from class org.springframework.batch.core.step.item.SimpleChunkProcessor
meterRegistry -
Constructor Summary
ConstructorsConstructorDescriptionFaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) -
Method Summary
Modifier and TypeMethodDescriptiongetAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) Extension point for subclasses that want to adjust the outputs based on additional saved data in the inputs.protected intgetFilterCount(Chunk<I> inputs, Chunk<O> outputs) Extension point for subclasses to calculate the filter count.protected voidinitializeUserData(Chunk<I> inputs) Extension point for subclasses to allow them to memorise the contents of the inputs, in case they are needed for accounting purposes later.protected booleanisComplete(Chunk<I> inputs) Extension point for subclasses that want to store additional data in the inputs.voidsetBuffering(boolean buffering) A flag to indicate that items have been buffered and therefore will always come back as a chunk after a rollback.voidsetChunkMonitor(ChunkMonitor chunkMonitor) voidsetKeyGenerator(KeyGenerator keyGenerator) TheKeyGeneratorto use to identify failed items across rollback.voidsetProcessorTransactional(boolean processorTransactional) Flag to say that theItemProcessoris transactional (defaults to true).voidsetProcessSkipPolicy(SkipPolicy SkipPolicy) voidsetRollbackClassifier(org.springframework.classify.Classifier<Throwable, Boolean> rollbackClassifier) A classifier that can distinguish between exceptions that cause rollback (return true) or not (return false).voidsetWriteSkipPolicy(SkipPolicy SkipPolicy) transform(StepContribution contribution, Chunk<I> inputs) protected voidSimple implementation delegates to theSimpleChunkProcessor.doWrite(Chunk)method and increments the write count in the contribution.Methods inherited from class org.springframework.batch.core.step.item.SimpleChunkProcessor
afterPropertiesSet, doAfterWrite, doOnWriteError, doProcess, doWrite, getListener, process, registerListener, setItemProcessor, setItemWriter, setListeners, setMeterRegistry, stopTimer, writeItems
-
Constructor Details
-
FaultTolerantChunkProcessor
public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate)
-
-
Method Details
-
setKeyGenerator
TheKeyGeneratorto use to identify failed items across rollback. Not used in the case of thebuffering flagbeing true (the default).- Parameters:
keyGenerator- theKeyGeneratorto set
-
setProcessSkipPolicy
- Parameters:
SkipPolicy- theSkipPolicyfor item processing
-
setWriteSkipPolicy
- Parameters:
SkipPolicy- theSkipPolicyfor item writing
-
setRollbackClassifier
public void setRollbackClassifier(org.springframework.classify.Classifier<Throwable, Boolean> rollbackClassifier) A classifier that can distinguish between exceptions that cause rollback (return true) or not (return false).- Parameters:
rollbackClassifier- classifier
-
setChunkMonitor
- Parameters:
chunkMonitor- monitor
-
setBuffering
public void setBuffering(boolean buffering) A flag to indicate that items have been buffered and therefore will always come back as a chunk after a rollback. Otherwise things are more complicated because after a rollback the new chunk might or might not contain items from the previous failed chunk.- Parameters:
buffering- true if items will be buffered
-
setProcessorTransactional
public void setProcessorTransactional(boolean processorTransactional) Flag to say that theItemProcessoris transactional (defaults to true). If false then the processor is only called once per item per chunk, even if there are rollbacks with retries and skips.- Parameters:
processorTransactional- the flag value to set
-
initializeUserData
Description copied from class:SimpleChunkProcessorExtension point for subclasses to allow them to memorise the contents of the inputs, in case they are needed for accounting purposes later. The default implementation sets up some user data to remember the original size of the inputs. If this method is overridden then some or all ofSimpleChunkProcessor.isComplete(Chunk),SimpleChunkProcessor.getFilterCount(Chunk, Chunk)andSimpleChunkProcessor.getAdjustedOutputs(Chunk, Chunk)might also need to be, to ensure that the user data is handled consistently.- Overrides:
initializeUserDatain classSimpleChunkProcessor<I,O> - Parameters:
inputs- the inputs for the process
-
getFilterCount
Description copied from class:SimpleChunkProcessorExtension point for subclasses to calculate the filter count. Defaults to the difference between input size and output size.- Overrides:
getFilterCountin classSimpleChunkProcessor<I,O> - Parameters:
inputs- the inputs after transformationoutputs- the outputs after transformation- Returns:
- the difference in sizes
- See Also:
-
isComplete
Description copied from class:SimpleChunkProcessorExtension point for subclasses that want to store additional data in the inputs. Default just checks if inputs are empty.- Overrides:
isCompletein classSimpleChunkProcessor<I,O> - Parameters:
inputs- the input chunk- Returns:
- true if it is empty
- See Also:
-
getAdjustedOutputs
Description copied from class:SimpleChunkProcessorExtension point for subclasses that want to adjust the outputs based on additional saved data in the inputs. Default implementation just returns the outputs unchanged.- Overrides:
getAdjustedOutputsin classSimpleChunkProcessor<I,O> - Parameters:
inputs- the inputs for the transformationoutputs- the result of the transformation- Returns:
- the outputs unchanged
- See Also:
-
transform
- Overrides:
transformin classSimpleChunkProcessor<I,O> - Throws:
Exception
-
write
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception Description copied from class:SimpleChunkProcessorSimple implementation delegates to theSimpleChunkProcessor.doWrite(Chunk)method and increments the write count in the contribution. Subclasses can handle more complicated scenarios, e.g.with fault tolerance. If output items are skipped they should be removed from the inputs as well.- Overrides:
writein classSimpleChunkProcessor<I,O> - Parameters:
contribution- the current step contributioninputs- the inputs that gave rise to the outputsoutputs- the outputs to write- Throws:
Exception- if there is a problem
-