1 | package org.springframework.batch.item.xml; |
2 | |
3 | import java.io.File; |
4 | import java.io.FileOutputStream; |
5 | import java.io.IOException; |
6 | import java.nio.ByteBuffer; |
7 | import java.nio.channels.FileChannel; |
8 | import java.util.ArrayList; |
9 | import java.util.Arrays; |
10 | import java.util.Iterator; |
11 | import java.util.List; |
12 | import java.util.Map; |
13 | |
14 | import javax.xml.stream.XMLEventFactory; |
15 | import javax.xml.stream.XMLEventWriter; |
16 | import javax.xml.stream.XMLOutputFactory; |
17 | import javax.xml.stream.XMLStreamException; |
18 | |
19 | import org.apache.commons.logging.Log; |
20 | import org.apache.commons.logging.LogFactory; |
21 | import org.springframework.batch.item.ClearFailedException; |
22 | import org.springframework.batch.item.ExecutionContext; |
23 | import org.springframework.batch.item.FlushFailedException; |
24 | import org.springframework.batch.item.ItemStream; |
25 | import org.springframework.batch.item.ItemWriter; |
26 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
27 | import org.springframework.batch.item.util.FileUtils; |
28 | import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter; |
29 | import org.springframework.beans.factory.InitializingBean; |
30 | import org.springframework.core.io.Resource; |
31 | import org.springframework.dao.DataAccessResourceFailureException; |
32 | import org.springframework.util.Assert; |
33 | import org.springframework.util.ClassUtils; |
34 | import org.springframework.util.CollectionUtils; |
35 | |
36 | /** |
37 | * An implementation of {@link ItemWriter} which uses StAX and |
38 | * {@link EventWriterSerializer} for serializing object to XML. |
39 | * |
40 | * This item writer also provides restart, statistics and transaction features |
41 | * by implementing corresponding interfaces. |
42 | * |
43 | * Output is buffered until {@link #flush()} is called - only then the actual |
44 | * writing to file takes place. |
45 | * |
46 | * The implementation is *not* thread-safe. |
47 | * |
48 | * @author Peter Zozom |
49 | * @author Robert Kasanicky |
50 | * |
51 | */ |
52 | public class StaxEventItemWriter extends ExecutionContextUserSupport implements ItemWriter, ItemStream, |
53 | InitializingBean { |
54 | |
55 | private static final Log log = LogFactory.getLog(StaxEventItemWriter.class); |
56 | |
57 | // default encoding |
58 | private static final String DEFAULT_ENCODING = "UTF-8"; |
59 | |
60 | // default encoding |
61 | private static final String DEFAULT_XML_VERSION = "1.0"; |
62 | |
63 | // default root tag name |
64 | private static final String DEFAULT_ROOT_TAG_NAME = "root"; |
65 | |
66 | // restart data property name |
67 | private static final String RESTART_DATA_NAME = "position"; |
68 | |
69 | // restart data property name |
70 | private static final String WRITE_STATISTICS_NAME = "record.count"; |
71 | |
72 | // file system resource |
73 | private Resource resource; |
74 | |
75 | // xml serializer |
76 | private EventWriterSerializer serializer; |
77 | |
78 | // encoding to be used while reading from the resource |
79 | private String encoding = DEFAULT_ENCODING; |
80 | |
81 | // XML version |
82 | private String version = DEFAULT_XML_VERSION; |
83 | |
84 | // name of the root tag |
85 | private String rootTagName = DEFAULT_ROOT_TAG_NAME; |
86 | |
87 | // root element attributes |
88 | private Map rootElementAttributes = null; |
89 | |
90 | // TRUE means, that output file will be overwritten if exists - default is |
91 | // TRUE |
92 | private boolean overwriteOutput = true; |
93 | |
94 | // file channel |
95 | private FileChannel channel; |
96 | |
97 | // wrapper for XML event writer that swallows StartDocument and EndDocument |
98 | // events |
99 | private XMLEventWriter eventWriter; |
100 | |
101 | // XML event writer |
102 | private XMLEventWriter delegateEventWriter; |
103 | |
104 | // byte offset in file channel at last commit point |
105 | private long lastCommitPointPosition = 0; |
106 | |
107 | // processed record count at last commit point |
108 | private long lastCommitPointRecordCount = 0; |
109 | |
110 | // current count of processed records |
111 | private long currentRecordCount = 0; |
112 | |
113 | private boolean saveState = true; |
114 | |
115 | // holds the list of items for writing before they are actually written on |
116 | // #flush() |
117 | private List buffer = new ArrayList(); |
118 | |
119 | private List headers = new ArrayList(); |
120 | |
121 | public StaxEventItemWriter() { |
122 | setName(ClassUtils.getShortName(StaxEventItemWriter.class)); |
123 | } |
124 | |
125 | /** |
126 | * Set output file. |
127 | * |
128 | * @param resource the output file |
129 | */ |
130 | public void setResource(Resource resource) { |
131 | this.resource = resource; |
132 | } |
133 | |
134 | /** |
135 | * Set Object to XML serializer. |
136 | * |
137 | * @param serializer the Object to XML serializer |
138 | */ |
139 | public void setSerializer(EventWriterSerializer serializer) { |
140 | this.serializer = serializer; |
141 | } |
142 | |
143 | /** |
144 | * Get used encoding. |
145 | * |
146 | * @return the encoding used |
147 | */ |
148 | public String getEncoding() { |
149 | return encoding; |
150 | } |
151 | |
152 | /** |
153 | * Set encoding to be used for output file. |
154 | * |
155 | * @param encoding the encoding to be used |
156 | */ |
157 | public void setEncoding(String encoding) { |
158 | this.encoding = encoding; |
159 | } |
160 | |
161 | /** |
162 | * Get XML version. |
163 | * |
164 | * @return the XML version used |
165 | */ |
166 | public String getVersion() { |
167 | return version; |
168 | } |
169 | |
170 | /** |
171 | * Set XML version to be used for output XML. |
172 | * |
173 | * @param version the XML version to be used |
174 | */ |
175 | public void setVersion(String version) { |
176 | this.version = version; |
177 | } |
178 | |
179 | /** |
180 | * Get the tag name of the root element. |
181 | * |
182 | * @return the root element tag name |
183 | */ |
184 | public String getRootTagName() { |
185 | return rootTagName; |
186 | } |
187 | |
188 | /** |
189 | * Set the tag name of the root element. If not set, default name is used |
190 | * ("root"). |
191 | * |
192 | * @param rootTagName the tag name to be used for the root element |
193 | */ |
194 | public void setRootTagName(String rootTagName) { |
195 | this.rootTagName = rootTagName; |
196 | } |
197 | |
198 | /** |
199 | * Get attributes of the root element. |
200 | * |
201 | * @return attributes of the root element |
202 | */ |
203 | public Map getRootElementAttributes() { |
204 | return rootElementAttributes; |
205 | } |
206 | |
207 | /** |
208 | * Set the root element attributes to be written. |
209 | * |
210 | * @param rootElementAttributes attributes of the root element |
211 | */ |
212 | public void setRootElementAttributes(Map rootElementAttributes) { |
213 | this.rootElementAttributes = rootElementAttributes; |
214 | } |
215 | |
216 | /** |
217 | * Set "overwrite" flag for the output file. Flag is ignored when output |
218 | * file processing is restarted. |
219 | * |
220 | * @param overwriteOutput |
221 | */ |
222 | public void setOverwriteOutput(boolean overwriteOutput) { |
223 | this.overwriteOutput = overwriteOutput; |
224 | } |
225 | |
226 | /** |
227 | * Setter for the headers. This list will be marshalled and output before |
228 | * any calls to {@link #write(Object)}. |
229 | * @param headers |
230 | */ |
231 | public void setHeaderItems(Object[] headers) { |
232 | this.headers = Arrays.asList(headers); |
233 | } |
234 | |
235 | public void setSaveState(boolean saveState) { |
236 | this.saveState = saveState; |
237 | } |
238 | |
239 | /** |
240 | * @throws Exception |
241 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
242 | */ |
243 | public void afterPropertiesSet() throws Exception { |
244 | Assert.notNull(serializer); |
245 | } |
246 | |
247 | /** |
248 | * Open the output source |
249 | * |
250 | * @see org.springframework.batch.item.ItemStream#open(ExecutionContext) |
251 | */ |
252 | public void open(ExecutionContext executionContext) { |
253 | |
254 | Assert.notNull(resource); |
255 | |
256 | long startAtPosition = 0; |
257 | boolean restarted = false; |
258 | |
259 | // if restart data is provided, restart from provided offset |
260 | // otherwise start from beginning |
261 | if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) { |
262 | startAtPosition = executionContext.getLong(getKey(RESTART_DATA_NAME)); |
263 | restarted = true; |
264 | } |
265 | |
266 | open(startAtPosition, restarted); |
267 | |
268 | if (startAtPosition == 0) { |
269 | for (Iterator iterator = headers.iterator(); iterator.hasNext();) { |
270 | Object header = (Object) iterator.next(); |
271 | write(header); |
272 | } |
273 | } |
274 | } |
275 | |
276 | /** |
277 | * Helper method for opening output source at given file position |
278 | */ |
279 | private void open(long position, boolean restarted) { |
280 | |
281 | File file; |
282 | FileOutputStream os = null; |
283 | |
284 | try { |
285 | file = resource.getFile(); |
286 | FileUtils.setUpOutputFile(file, restarted, overwriteOutput); |
287 | Assert.state(resource.exists(), "Output resource must exist"); |
288 | os = new FileOutputStream(file, true); |
289 | channel = os.getChannel(); |
290 | setPosition(position); |
291 | } |
292 | catch (IOException ioe) { |
293 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe); |
294 | } |
295 | |
296 | XMLOutputFactory outputFactory = XMLOutputFactory.newInstance(); |
297 | |
298 | try { |
299 | delegateEventWriter = outputFactory.createXMLEventWriter(os, encoding); |
300 | eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter); |
301 | if (!restarted) { |
302 | startDocument(delegateEventWriter); |
303 | } |
304 | } |
305 | catch (XMLStreamException xse) { |
306 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse); |
307 | } |
308 | |
309 | } |
310 | |
311 | /** |
312 | * Writes simple XML header containing: |
313 | * <ul> |
314 | * <li>xml declaration - defines encoding and XML version</li> |
315 | * <li>opening tag of the root element and its attributes</li> |
316 | * </ul> |
317 | * If this is not sufficient for you, simply override this method. Encoding, |
318 | * version and root tag name can be retrieved with corresponding getters. |
319 | * |
320 | * @param writer XML event writer |
321 | * @throws XMLStreamException |
322 | */ |
323 | private void startDocument(XMLEventWriter writer) throws XMLStreamException { |
324 | |
325 | XMLEventFactory factory = XMLEventFactory.newInstance(); |
326 | |
327 | // write start document |
328 | writer.add(factory.createStartDocument(getEncoding(), getVersion())); |
329 | |
330 | // write root tag |
331 | writer.add(factory.createStartElement("", "", getRootTagName())); |
332 | |
333 | // write root tag attributes |
334 | if (!CollectionUtils.isEmpty(getRootElementAttributes())) { |
335 | |
336 | for (Iterator i = getRootElementAttributes().entrySet().iterator(); i.hasNext();) { |
337 | Map.Entry entry = (Map.Entry) i.next(); |
338 | writer.add(factory.createAttribute((String) entry.getKey(), (String) entry.getValue())); |
339 | } |
340 | |
341 | } |
342 | |
343 | } |
344 | |
345 | /** |
346 | * Writes the EndDocument tag manually. |
347 | * |
348 | * @param writer XML event writer |
349 | * @throws XMLStreamException |
350 | */ |
351 | protected void endDocument(XMLEventWriter writer) throws XMLStreamException { |
352 | |
353 | // writer.writeEndDocument(); <- this doesn't work after restart |
354 | // we need to write end tag of the root element manually |
355 | |
356 | ByteBuffer bbuf = ByteBuffer.wrap(("</" + getRootTagName() + ">").getBytes()); |
357 | try { |
358 | channel.write(bbuf); |
359 | } |
360 | catch (IOException ioe) { |
361 | throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe); |
362 | } |
363 | } |
364 | |
365 | /** |
366 | * Flush and close the output source. |
367 | * |
368 | * @see org.springframework.batch.item.ItemStream#close(ExecutionContext) |
369 | */ |
370 | public void close(ExecutionContext executionContext) { |
371 | |
372 | // harmless event to close the root tag if there were no items |
373 | XMLEventFactory factory = XMLEventFactory.newInstance(); |
374 | try { |
375 | delegateEventWriter.add(factory.createCharacters("")); |
376 | } |
377 | catch (XMLStreamException e) { |
378 | log.error(e); |
379 | } |
380 | |
381 | flush(); |
382 | try { |
383 | endDocument(delegateEventWriter); |
384 | eventWriter.close(); |
385 | channel.close(); |
386 | } |
387 | catch (XMLStreamException xse) { |
388 | throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", xse); |
389 | } |
390 | catch (IOException ioe) { |
391 | throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe); |
392 | } |
393 | } |
394 | |
395 | /** |
396 | * Write the value object to internal buffer. |
397 | * |
398 | * @param item the value object |
399 | * @see #flush() |
400 | */ |
401 | public void write(Object item) { |
402 | |
403 | currentRecordCount++; |
404 | buffer.add(item); |
405 | } |
406 | |
407 | /** |
408 | * Get the restart data. |
409 | * |
410 | * @see org.springframework.batch.item.ItemStream#update(ExecutionContext) |
411 | */ |
412 | public void update(ExecutionContext executionContext) { |
413 | |
414 | if (saveState) { |
415 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
416 | executionContext.putLong(getKey(RESTART_DATA_NAME), getPosition()); |
417 | executionContext.putLong(getKey(WRITE_STATISTICS_NAME), currentRecordCount); |
418 | } |
419 | } |
420 | |
421 | /* |
422 | * Get the actual position in file channel. This method flushes any buffered |
423 | * data before position is read. |
424 | * |
425 | * @return byte offset in file channel |
426 | */ |
427 | private long getPosition() { |
428 | |
429 | long position; |
430 | |
431 | try { |
432 | eventWriter.flush(); |
433 | position = channel.position(); |
434 | } |
435 | catch (Exception e) { |
436 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); |
437 | } |
438 | |
439 | return position; |
440 | } |
441 | |
442 | /** |
443 | * Set the file channel position. |
444 | * |
445 | * @param newPosition new file channel position |
446 | */ |
447 | private void setPosition(long newPosition) { |
448 | |
449 | try { |
450 | Assert.state(channel.size() >= lastCommitPointPosition, |
451 | "Current file size is smaller than size at last commit"); |
452 | channel.truncate(newPosition); |
453 | channel.position(newPosition); |
454 | } |
455 | catch (IOException e) { |
456 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); |
457 | } |
458 | |
459 | } |
460 | |
461 | /** |
462 | * Writes buffered items to XML stream and marks restore point. |
463 | */ |
464 | public void flush() throws FlushFailedException { |
465 | |
466 | for (Iterator iterator = buffer.listIterator(); iterator.hasNext();) { |
467 | Object item = iterator.next(); |
468 | serializer.serializeObject(eventWriter, item); |
469 | } |
470 | try { |
471 | eventWriter.flush(); |
472 | } |
473 | catch (XMLStreamException e) { |
474 | throw new FlushFailedException("Failed to flush the events", e); |
475 | } |
476 | buffer.clear(); |
477 | |
478 | lastCommitPointPosition = getPosition(); |
479 | lastCommitPointRecordCount = currentRecordCount; |
480 | } |
481 | |
482 | /** |
483 | * Clear the output buffer |
484 | */ |
485 | public void clear() throws ClearFailedException { |
486 | currentRecordCount = lastCommitPointRecordCount; |
487 | buffer.clear(); |
488 | } |
489 | |
490 | } |