1 | package org.springframework.batch.core.step.item; |
2 | |
3 | import org.springframework.batch.item.ExecutionContext; |
4 | import org.springframework.batch.item.ItemReader; |
5 | import org.springframework.batch.item.ItemStream; |
6 | import org.springframework.batch.item.ItemStreamException; |
7 | import org.springframework.batch.item.ParseException; |
8 | import org.springframework.batch.item.UnexpectedInputException; |
9 | |
10 | /** |
11 | * Convenience wrapper for an ItemReader that keeps track of how many items |
12 | * were successfully processed. |
13 | */ |
14 | class OffsetItemReader<T> implements ItemReader<T>, ItemStream { |
15 | |
16 | private static final String OFFSET_KEY = FaultTolerantStepFactoryBean.class.getName()+".OFFSET_KEY"; |
17 | private final ItemReader<? extends T> itemReader; |
18 | private int offset; |
19 | |
20 | /** |
21 | * @param itemReader |
22 | */ |
23 | public OffsetItemReader(ItemReader<? extends T> itemReader) { |
24 | this.itemReader = itemReader; |
25 | } |
26 | |
27 | public T read() throws Exception, UnexpectedInputException, ParseException { |
28 | for (int i=0; i<offset; i++) { |
29 | // Discard items that are already processed |
30 | itemReader.read(); |
31 | } |
32 | offset = 0; |
33 | return itemReader.read(); |
34 | } |
35 | |
36 | /** |
37 | * {@inheritDoc} |
38 | */ |
39 | public void close() throws ItemStreamException { |
40 | } |
41 | |
42 | /** |
43 | * {@inheritDoc} |
44 | */ |
45 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
46 | offset = executionContext.getInt(OFFSET_KEY, 0); |
47 | } |
48 | |
49 | /** |
50 | * {@inheritDoc} |
51 | */ |
52 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
53 | executionContext.putInt(OFFSET_KEY, offset); |
54 | } |
55 | |
56 | } |