1 | package org.springframework.batch.item.xml; |
2 | |
3 | import java.io.IOException; |
4 | import java.io.InputStream; |
5 | |
6 | import javax.xml.namespace.QName; |
7 | import javax.xml.stream.XMLEventReader; |
8 | import javax.xml.stream.XMLInputFactory; |
9 | import javax.xml.stream.XMLStreamException; |
10 | import javax.xml.stream.events.StartElement; |
11 | |
12 | import org.springframework.batch.item.ExecutionContext; |
13 | import org.springframework.batch.item.ExecutionContextUserSupport; |
14 | import org.springframework.batch.item.ItemReader; |
15 | import org.springframework.batch.item.ItemStream; |
16 | import org.springframework.batch.item.ItemStreamException; |
17 | import org.springframework.batch.item.ReaderNotOpenException; |
18 | import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader; |
19 | import org.springframework.batch.item.xml.stax.DefaultTransactionalEventReader; |
20 | import org.springframework.batch.item.xml.stax.FragmentEventReader; |
21 | import org.springframework.batch.item.xml.stax.TransactionalEventReader; |
22 | import org.springframework.beans.factory.InitializingBean; |
23 | import org.springframework.core.io.Resource; |
24 | import org.springframework.dao.DataAccessResourceFailureException; |
25 | import org.springframework.util.Assert; |
26 | import org.springframework.util.ClassUtils; |
27 | |
28 | /** |
29 | * Input source for reading XML input based on StAX. |
30 | * |
31 | * It extracts fragments from the input XML document which correspond to records |
32 | * for processing. The fragments are wrapped with StartDocument and EndDocument |
33 | * events so that the fragments can be further processed like standalone XML |
34 | * documents. |
35 | * |
36 | * @author Robert Kasanicky |
37 | */ |
38 | public class StaxEventItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream, |
39 | InitializingBean { |
40 | |
41 | private static final String READ_COUNT_STATISTICS_NAME = "read.count"; |
42 | |
43 | private FragmentEventReader fragmentReader; |
44 | |
45 | private TransactionalEventReader txReader; |
46 | |
47 | private EventReaderDeserializer eventReaderDeserializer; |
48 | |
49 | private Resource resource; |
50 | |
51 | private InputStream inputStream; |
52 | |
53 | private String fragmentRootElementName; |
54 | |
55 | private boolean initialized = false; |
56 | |
57 | private long lastCommitPointRecordCount = 0; |
58 | |
59 | private long currentRecordCount = 0; |
60 | |
61 | private boolean saveState = false; |
62 | |
63 | public StaxEventItemReader() { |
64 | setName(ClassUtils.getShortName(StaxEventItemReader.class)); |
65 | } |
66 | |
67 | /** |
68 | * Read in the next root element from the file, and return it. |
69 | * |
70 | * @return the next available record, if none exist, return null |
71 | * @see org.springframework.batch.item.ItemReader#read() |
72 | */ |
73 | public Object read() { |
74 | if (!initialized) { |
75 | throw new ReaderNotOpenException("Reader must be open before it can be read."); |
76 | } |
77 | Object item = null; |
78 | |
79 | currentRecordCount++; |
80 | if (moveCursorToNextFragment(fragmentReader)) { |
81 | fragmentReader.markStartFragment(); |
82 | item = eventReaderDeserializer.deserializeFragment(fragmentReader); |
83 | fragmentReader.markFragmentProcessed(); |
84 | } |
85 | |
86 | if (item == null) { |
87 | currentRecordCount--; |
88 | } |
89 | return item; |
90 | } |
91 | |
92 | public void close(ExecutionContext executionContext) { |
93 | initialized = false; |
94 | currentRecordCount = 0; |
95 | try { |
96 | if (fragmentReader != null) { |
97 | fragmentReader.close(); |
98 | } |
99 | if (inputStream != null) { |
100 | inputStream.close(); |
101 | } |
102 | } |
103 | catch (XMLStreamException e) { |
104 | throw new DataAccessResourceFailureException("Error while closing event reader", e); |
105 | } |
106 | catch (IOException e) { |
107 | throw new DataAccessResourceFailureException("Error while closing input stream", e); |
108 | } |
109 | finally { |
110 | fragmentReader = null; |
111 | inputStream = null; |
112 | } |
113 | } |
114 | |
115 | public void open(ExecutionContext executionContext) { |
116 | Assert.state(resource.exists(), "Input resource does not exist: [" + resource + "]"); |
117 | |
118 | try { |
119 | inputStream = resource.getInputStream(); |
120 | txReader = new DefaultTransactionalEventReader(XMLInputFactory.newInstance().createXMLEventReader( |
121 | inputStream)); |
122 | fragmentReader = new DefaultFragmentEventReader(txReader); |
123 | } |
124 | catch (XMLStreamException xse) { |
125 | throw new DataAccessResourceFailureException("Unable to create XML reader", xse); |
126 | } |
127 | catch (IOException ioe) { |
128 | throw new DataAccessResourceFailureException("Unable to get input stream", ioe); |
129 | } |
130 | initialized = true; |
131 | |
132 | if (executionContext.containsKey(getKey(READ_COUNT_STATISTICS_NAME))) { |
133 | long restoredRecordCount = executionContext.getLong(getKey(READ_COUNT_STATISTICS_NAME)); |
134 | int REASONABLE_ADHOC_COMMIT_FREQUENCY = 100; |
135 | while (currentRecordCount <= restoredRecordCount) { |
136 | currentRecordCount++; |
137 | if (currentRecordCount % REASONABLE_ADHOC_COMMIT_FREQUENCY == 0) { |
138 | txReader.onCommit(); // reset the history buffer |
139 | } |
140 | if (!fragmentReader.hasNext()) { |
141 | throw new ItemStreamException("Restore point must be before end of input"); |
142 | } |
143 | fragmentReader.next(); |
144 | moveCursorToNextFragment(fragmentReader); |
145 | } |
146 | mark(); // reset the history buffer |
147 | } |
148 | } |
149 | |
150 | public void setResource(Resource resource) { |
151 | this.resource = resource; |
152 | } |
153 | |
154 | /** |
155 | * @param eventReaderDeserializer maps xml fragments corresponding to |
156 | * records to objects |
157 | */ |
158 | public void setFragmentDeserializer(EventReaderDeserializer eventReaderDeserializer) { |
159 | this.eventReaderDeserializer = eventReaderDeserializer; |
160 | } |
161 | |
162 | /** |
163 | * @param fragmentRootElementName name of the root element of the fragment |
164 | * TODO String can be ambiguous due to namespaces, use QName? |
165 | */ |
166 | public void setFragmentRootElementName(String fragmentRootElementName) { |
167 | this.fragmentRootElementName = fragmentRootElementName; |
168 | } |
169 | |
170 | /** |
171 | * Ensure that all required dependencies for the ItemReader to run are |
172 | * provided after all properties have been set. |
173 | * |
174 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
175 | * @throws IllegalArgumentException if the Resource, FragmentDeserializer or |
176 | * FragmentRootElementName is null, or if the root element is empty. |
177 | * @throws IllegalStateException if the Resource does not exist. |
178 | */ |
179 | public void afterPropertiesSet() throws Exception { |
180 | Assert.notNull(resource, "The Resource must not be null."); |
181 | Assert.notNull(eventReaderDeserializer, "The FragmentDeserializer must not be null."); |
182 | Assert.hasLength(fragmentRootElementName, "The FragmentRootElementName must not be null"); |
183 | } |
184 | |
185 | /** |
186 | * @see ItemStream#update(ExecutionContext) |
187 | */ |
188 | public void update(ExecutionContext executionContext) { |
189 | if (saveState) { |
190 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
191 | executionContext.putLong(getKey(READ_COUNT_STATISTICS_NAME), currentRecordCount); |
192 | } |
193 | } |
194 | |
195 | /** |
196 | * Responsible for moving the cursor before the StartElement of the fragment |
197 | * root. |
198 | * |
199 | * This implementation simply looks for the next corresponding element, it |
200 | * does not care about element nesting. You will need to override this |
201 | * method to correctly handle composite fragments. |
202 | * |
203 | * @return <code>true</code> if next fragment was found, |
204 | * <code>false</code> otherwise. |
205 | */ |
206 | protected boolean moveCursorToNextFragment(XMLEventReader reader) { |
207 | try { |
208 | while (true) { |
209 | while (reader.peek() != null && !reader.peek().isStartElement()) { |
210 | reader.nextEvent(); |
211 | } |
212 | if (reader.peek() == null) { |
213 | return false; |
214 | } |
215 | QName startElementName = ((StartElement) reader.peek()).getName(); |
216 | if (startElementName.getLocalPart().equals(fragmentRootElementName)) { |
217 | return true; |
218 | } |
219 | else { |
220 | reader.nextEvent(); |
221 | } |
222 | } |
223 | } |
224 | catch (XMLStreamException e) { |
225 | throw new DataAccessResourceFailureException("Error while reading from event reader", e); |
226 | } |
227 | } |
228 | |
229 | /** |
230 | * Mark is supported as long as this {@link ItemStream} is used in a |
231 | * single-threaded environment. The state backing the mark is a single |
232 | * counter, keeping track of the current position, so multiple threads |
233 | * cannot be accommodated. |
234 | * |
235 | * @see org.springframework.batch.item.AbstractItemReader#mark() |
236 | */ |
237 | public void mark() { |
238 | lastCommitPointRecordCount = currentRecordCount; |
239 | txReader.onCommit(); |
240 | } |
241 | |
242 | /* |
243 | * (non-Javadoc) |
244 | * |
245 | * @see org.springframework.batch.item.ItemStream#reset(org.springframework.batch.item.ExecutionContext) |
246 | */ |
247 | public void reset() { |
248 | currentRecordCount = lastCommitPointRecordCount; |
249 | txReader.onRollback(); |
250 | fragmentReader.reset(); |
251 | } |
252 | |
253 | /** |
254 | * Set the flag that determines whether to save internal data for |
255 | * {@link ExecutionContext}. Only switch this to false if you don't want to |
256 | * save any state from this stream, and you don't need it to be restartable. |
257 | * |
258 | * @param saveState flag value (default true) |
259 | */ |
260 | public void setSaveState(boolean saveState) { |
261 | this.saveState = saveState; |
262 | } |
263 | } |