1 | package org.springframework.batch.item.support; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.List; |
5 | import java.util.ListIterator; |
6 | |
7 | import org.springframework.batch.item.ExecutionContext; |
8 | import org.springframework.batch.item.ItemReader; |
9 | import org.springframework.batch.item.ItemStream; |
10 | import org.springframework.batch.item.ItemStreamException; |
11 | import org.springframework.batch.item.MarkFailedException; |
12 | import org.springframework.batch.item.NoWorkFoundException; |
13 | import org.springframework.batch.item.ParseException; |
14 | import org.springframework.batch.item.ResetFailedException; |
15 | import org.springframework.batch.item.UnexpectedInputException; |
16 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
17 | import org.springframework.util.Assert; |
18 | |
19 | /** |
20 | * Abstract superclass for {@link ItemReader}s which use item buffering to |
21 | * support reset/rollback. Supports restart by storing item count in the |
22 | * {@link ExecutionContext} (therefore requires item ordering to be preserved |
23 | * between runs). |
24 | * |
25 | * Subclasses are inherently *not* thread-safe. |
26 | * |
27 | * @author Robert Kasanicky |
28 | */ |
29 | public abstract class AbstractBufferedItemReaderItemStream implements ItemReader, ItemStream { |
30 | |
31 | private static final String READ_COUNT = "read.count"; |
32 | |
33 | private int currentItemCount = 0; |
34 | |
35 | private int lastMarkedItemCount = 0; |
36 | |
37 | private boolean shouldReadBuffer = false; |
38 | |
39 | private List itemBuffer = new ArrayList(); |
40 | |
41 | private ListIterator itemBufferIterator = null; |
42 | |
43 | private int lastMarkedBufferIndex = 0; |
44 | |
45 | private ExecutionContextUserSupport ecSupport = new ExecutionContextUserSupport(); |
46 | |
47 | private boolean saveState = true; |
48 | |
49 | /** |
50 | * Read next item from input. |
51 | * @return item |
52 | * @throws Exception |
53 | */ |
54 | protected abstract Object doRead() throws Exception; |
55 | |
56 | /** |
57 | * Open resources necessary to start reading input. |
58 | */ |
59 | protected abstract void doOpen() throws Exception; |
60 | |
61 | /** |
62 | * Close the resources opened in {@link #doOpen()}. |
63 | */ |
64 | protected abstract void doClose() throws Exception; |
65 | |
66 | /** |
67 | * Move to the given item index. Subclasses should override this method if |
68 | * there is a more efficient way of moving to given index than re-reading |
69 | * the input using {@link #doRead()}. |
70 | */ |
71 | protected void jumpToItem(int itemIndex) throws Exception { |
72 | for (int i = 0; i < itemIndex; i++) { |
73 | doRead(); |
74 | } |
75 | } |
76 | |
77 | public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException { |
78 | |
79 | currentItemCount++; |
80 | |
81 | if (shouldReadBuffer) { |
82 | if (itemBufferIterator.hasNext()) { |
83 | return itemBufferIterator.next(); |
84 | } |
85 | else { |
86 | // buffer is exhausted, continue reading from file |
87 | shouldReadBuffer = false; |
88 | itemBufferIterator = null; |
89 | } |
90 | } |
91 | |
92 | Object item = doRead(); |
93 | itemBuffer.add(item); |
94 | |
95 | return item; |
96 | } |
97 | |
98 | /** |
99 | * Mark is supported as long as this {@link ItemStream} is used in a |
100 | * single-threaded environment. The state backing the mark is a single |
101 | * counter, keeping track of the current position, so multiple threads |
102 | * cannot be accommodated. |
103 | * |
104 | * @see org.springframework.batch.item.support.AbstractItemReader#mark() |
105 | */ |
106 | public void mark() throws MarkFailedException { |
107 | |
108 | if (!shouldReadBuffer) { |
109 | itemBuffer.clear(); |
110 | itemBufferIterator = null; |
111 | lastMarkedBufferIndex = 0; |
112 | } |
113 | else { |
114 | lastMarkedBufferIndex = itemBufferIterator.nextIndex(); |
115 | } |
116 | |
117 | lastMarkedItemCount = currentItemCount; |
118 | } |
119 | |
120 | public void reset() throws ResetFailedException { |
121 | |
122 | currentItemCount = lastMarkedItemCount; |
123 | shouldReadBuffer = true; |
124 | itemBufferIterator = itemBuffer.listIterator(lastMarkedBufferIndex); |
125 | } |
126 | |
127 | protected int getCurrentItemCount() { |
128 | return currentItemCount; |
129 | } |
130 | |
131 | protected void setCurrentItemCount(int count) { |
132 | this.currentItemCount = count; |
133 | } |
134 | |
135 | public void close(ExecutionContext executionContext) throws ItemStreamException { |
136 | currentItemCount = 0; |
137 | lastMarkedItemCount = 0; |
138 | lastMarkedBufferIndex = 0; |
139 | itemBufferIterator = null; |
140 | shouldReadBuffer = false; |
141 | itemBuffer.clear(); |
142 | |
143 | try { |
144 | doClose(); |
145 | } |
146 | catch (Exception e) { |
147 | throw new ItemStreamException("Error while closing item reader", e); |
148 | } |
149 | } |
150 | |
151 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
152 | |
153 | try { |
154 | doOpen(); |
155 | } |
156 | catch (Exception e) { |
157 | throw new ItemStreamException("Failed to initialize the reader", e); |
158 | } |
159 | |
160 | if (executionContext.containsKey(ecSupport.getKey(READ_COUNT))) { |
161 | int itemCount = new Long(executionContext.getLong(ecSupport.getKey(READ_COUNT))).intValue(); |
162 | |
163 | try { |
164 | jumpToItem(itemCount); |
165 | } |
166 | catch (Exception e) { |
167 | throw new ItemStreamException("Could not move to stored position on restart", e); |
168 | } |
169 | |
170 | currentItemCount = itemCount; |
171 | } |
172 | |
173 | } |
174 | |
175 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
176 | if (saveState) { |
177 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
178 | executionContext.putLong(ecSupport.getKey(READ_COUNT), currentItemCount); |
179 | } |
180 | |
181 | } |
182 | |
183 | public void setName(String name) { |
184 | ecSupport.setName(name); |
185 | } |
186 | |
187 | /** |
188 | * Set the flag that determines whether to save internal data for |
189 | * {@link ExecutionContext}. Only switch this to false if you don't want to |
190 | * save any state from this stream, and you don't need it to be restartable. |
191 | * |
192 | * @param saveState flag value (default true). |
193 | */ |
194 | public void setSaveState(boolean saveState) { |
195 | this.saveState = saveState; |
196 | } |
197 | |
198 | } |