1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.xml; |
18 | |
19 | import java.io.BufferedWriter; |
20 | import java.io.File; |
21 | import java.io.FileOutputStream; |
22 | import java.io.IOException; |
23 | import java.io.OutputStreamWriter; |
24 | import java.io.UnsupportedEncodingException; |
25 | import java.io.Writer; |
26 | import java.nio.channels.FileChannel; |
27 | import java.util.List; |
28 | import java.util.Map; |
29 | |
30 | import javax.xml.stream.FactoryConfigurationError; |
31 | import javax.xml.stream.XMLEventFactory; |
32 | import javax.xml.stream.XMLEventWriter; |
33 | import javax.xml.stream.XMLOutputFactory; |
34 | import javax.xml.stream.XMLStreamException; |
35 | import javax.xml.transform.Result; |
36 | |
37 | import org.apache.commons.logging.Log; |
38 | import org.apache.commons.logging.LogFactory; |
39 | import org.springframework.batch.item.ExecutionContext; |
40 | import org.springframework.batch.item.ItemStreamException; |
41 | import org.springframework.batch.item.ItemWriter; |
42 | import org.springframework.batch.item.WriteFailedException; |
43 | import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream; |
44 | import org.springframework.batch.item.support.AbstractItemStreamItemWriter; |
45 | import org.springframework.batch.item.util.FileUtils; |
46 | import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter; |
47 | import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter; |
48 | import org.springframework.beans.factory.InitializingBean; |
49 | import org.springframework.core.io.Resource; |
50 | import org.springframework.dao.DataAccessResourceFailureException; |
51 | import org.springframework.oxm.Marshaller; |
52 | import org.springframework.oxm.XmlMappingException; |
53 | import org.springframework.util.Assert; |
54 | import org.springframework.util.ClassUtils; |
55 | import org.springframework.util.CollectionUtils; |
56 | import org.springframework.util.StringUtils; |
57 | |
58 | /** |
59 | * An implementation of {@link ItemWriter} which uses StAX and |
60 | * {@link Marshaller} for serializing object to XML. |
61 | * |
62 | * This item writer also provides restart, statistics and transaction features |
63 | * by implementing corresponding interfaces. |
64 | * |
65 | * The implementation is *not* thread-safe. |
66 | * |
67 | * @author Peter Zozom |
68 | * @author Robert Kasanicky |
69 | * @author Michael Minella |
70 | * |
71 | */ |
72 | public class StaxEventItemWriter<T> extends AbstractItemStreamItemWriter<T> implements |
73 | ResourceAwareItemWriterItemStream<T>, InitializingBean { |
74 | |
75 | private static final Log log = LogFactory.getLog(StaxEventItemWriter.class); |
76 | |
77 | // default encoding |
78 | private static final String DEFAULT_ENCODING = "UTF-8"; |
79 | |
80 | // default encoding |
81 | private static final String DEFAULT_XML_VERSION = "1.0"; |
82 | |
83 | // default root tag name |
84 | private static final String DEFAULT_ROOT_TAG_NAME = "root"; |
85 | |
86 | // restart data property name |
87 | private static final String RESTART_DATA_NAME = "position"; |
88 | |
89 | // restart data property name |
90 | private static final String WRITE_STATISTICS_NAME = "record.count"; |
91 | |
92 | // file system resource |
93 | private Resource resource; |
94 | |
95 | // xml marshaller |
96 | private Marshaller marshaller; |
97 | |
98 | // encoding to be used while reading from the resource |
99 | private String encoding = DEFAULT_ENCODING; |
100 | |
101 | // XML version |
102 | private String version = DEFAULT_XML_VERSION; |
103 | |
104 | // name of the root tag |
105 | private String rootTagName = DEFAULT_ROOT_TAG_NAME; |
106 | |
107 | // namespace prefix of the root tag |
108 | private String rootTagNamespacePrefix = ""; |
109 | |
110 | // namespace of the root tag |
111 | private String rootTagNamespace = ""; |
112 | |
113 | // root element attributes |
114 | private Map<String, String> rootElementAttributes = null; |
115 | |
116 | // TRUE means, that output file will be overwritten if exists - default is |
117 | // TRUE |
118 | private boolean overwriteOutput = true; |
119 | |
120 | // file channel |
121 | private FileChannel channel; |
122 | |
123 | // wrapper for XML event writer that swallows StartDocument and EndDocument |
124 | // events |
125 | private XMLEventWriter eventWriter; |
126 | |
127 | // XML event writer |
128 | private XMLEventWriter delegateEventWriter; |
129 | |
130 | // current count of processed records |
131 | private long currentRecordCount = 0; |
132 | |
133 | private boolean saveState = true; |
134 | |
135 | private StaxWriterCallback headerCallback; |
136 | |
137 | private StaxWriterCallback footerCallback; |
138 | |
139 | private Writer bufferedWriter; |
140 | |
141 | private boolean transactional = true; |
142 | |
143 | private boolean forceSync; |
144 | |
145 | private boolean shouldDeleteIfEmpty = false; |
146 | |
147 | public StaxEventItemWriter() { |
148 | setExecutionContextName(ClassUtils.getShortName(StaxEventItemWriter.class)); |
149 | } |
150 | |
151 | /** |
152 | * Set output file. |
153 | * |
154 | * @param resource the output file |
155 | */ |
156 | @Override |
157 | public void setResource(Resource resource) { |
158 | this.resource = resource; |
159 | } |
160 | |
161 | /** |
162 | * Set Object to XML marshaller. |
163 | * |
164 | * @param marshaller the Object to XML marshaller |
165 | */ |
166 | public void setMarshaller(Marshaller marshaller) { |
167 | this.marshaller = marshaller; |
168 | } |
169 | |
170 | /** |
171 | * headerCallback is called before writing any items. |
172 | */ |
173 | public void setHeaderCallback(StaxWriterCallback headerCallback) { |
174 | this.headerCallback = headerCallback; |
175 | } |
176 | |
177 | /** |
178 | * footerCallback is called after writing all items but before closing the |
179 | * file |
180 | */ |
181 | public void setFooterCallback(StaxWriterCallback footerCallback) { |
182 | this.footerCallback = footerCallback; |
183 | } |
184 | |
185 | /** |
186 | * Flag to indicate that writes should be deferred to the end of a |
187 | * transaction if present. Defaults to true. |
188 | * |
189 | * @param transactional the flag to set |
190 | */ |
191 | public void setTransactional(boolean transactional) { |
192 | this.transactional = transactional; |
193 | } |
194 | |
195 | /** |
196 | * Flag to indicate that changes should be force-synced to disk on flush. |
197 | * Defaults to false, which means that even with a local disk changes could |
198 | * be lost if the OS crashes in between a write and a cache flush. Setting |
199 | * to true may result in slower performance for usage patterns involving |
200 | * many frequent writes. |
201 | * |
202 | * @param forceSync the flag value to set |
203 | */ |
204 | public void setForceSync(boolean forceSync) { |
205 | this.forceSync = forceSync; |
206 | } |
207 | |
208 | /** |
209 | * Flag to indicate that the target file should be deleted if no items have |
210 | * been written (other than header and footer) on close. Defaults to false. |
211 | * |
212 | * @param shouldDeleteIfEmpty the flag value to set |
213 | */ |
214 | public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) { |
215 | this.shouldDeleteIfEmpty = shouldDeleteIfEmpty; |
216 | } |
217 | |
218 | /** |
219 | * Get used encoding. |
220 | * |
221 | * @return the encoding used |
222 | */ |
223 | public String getEncoding() { |
224 | return encoding; |
225 | } |
226 | |
227 | /** |
228 | * Set encoding to be used for output file. |
229 | * |
230 | * @param encoding the encoding to be used |
231 | */ |
232 | public void setEncoding(String encoding) { |
233 | this.encoding = encoding; |
234 | } |
235 | |
236 | /** |
237 | * Get XML version. |
238 | * |
239 | * @return the XML version used |
240 | */ |
241 | public String getVersion() { |
242 | return version; |
243 | } |
244 | |
245 | /** |
246 | * Set XML version to be used for output XML. |
247 | * |
248 | * @param version the XML version to be used |
249 | */ |
250 | public void setVersion(String version) { |
251 | this.version = version; |
252 | } |
253 | |
254 | /** |
255 | * Get the tag name of the root element. |
256 | * |
257 | * @return the root element tag name |
258 | */ |
259 | public String getRootTagName() { |
260 | return rootTagName; |
261 | } |
262 | |
263 | /** |
264 | * Set the tag name of the root element. If not set, default name is used |
265 | * ("root"). Namespace URI and prefix can also be set optionally using the |
266 | * notation: |
267 | * |
268 | * <pre> |
269 | * {uri}prefix:root |
270 | * </pre> |
271 | * |
272 | * The prefix is optional (defaults to empty), but if it is specified then |
273 | * the uri must be provided. In addition you might want to declare other |
274 | * namespaces using the {@link #setRootElementAttributes(Map) root |
275 | * attributes}. |
276 | * |
277 | * @param rootTagName the tag name to be used for the root element |
278 | */ |
279 | public void setRootTagName(String rootTagName) { |
280 | this.rootTagName = rootTagName; |
281 | } |
282 | |
283 | /** |
284 | * Get the namespace prefix of the root element. Empty by default. |
285 | * |
286 | * @return the rootTagNamespacePrefix |
287 | */ |
288 | public String getRootTagNamespacePrefix() { |
289 | return rootTagNamespacePrefix; |
290 | } |
291 | |
292 | /** |
293 | * Get the namespace of the root element. |
294 | * |
295 | * @return the rootTagNamespace |
296 | */ |
297 | public String getRootTagNamespace() { |
298 | return rootTagNamespace; |
299 | } |
300 | |
301 | /** |
302 | * Get attributes of the root element. |
303 | * |
304 | * @return attributes of the root element |
305 | */ |
306 | public Map<String, String> getRootElementAttributes() { |
307 | return rootElementAttributes; |
308 | } |
309 | |
310 | /** |
311 | * Set the root element attributes to be written. If any of the key names |
312 | * begin with "xmlns:" then they are treated as namespace declarations. |
313 | * |
314 | * @param rootElementAttributes attributes of the root element |
315 | */ |
316 | public void setRootElementAttributes(Map<String, String> rootElementAttributes) { |
317 | this.rootElementAttributes = rootElementAttributes; |
318 | } |
319 | |
320 | /** |
321 | * Set "overwrite" flag for the output file. Flag is ignored when output |
322 | * file processing is restarted. |
323 | * |
324 | * @param overwriteOutput |
325 | */ |
326 | public void setOverwriteOutput(boolean overwriteOutput) { |
327 | this.overwriteOutput = overwriteOutput; |
328 | } |
329 | |
330 | public void setSaveState(boolean saveState) { |
331 | this.saveState = saveState; |
332 | } |
333 | |
334 | /** |
335 | * @throws Exception |
336 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
337 | */ |
338 | @Override |
339 | public void afterPropertiesSet() throws Exception { |
340 | Assert.notNull(marshaller); |
341 | if (rootTagName.contains("{")) { |
342 | rootTagNamespace = rootTagName.replaceAll("\\{(.*)\\}.*", "$1"); |
343 | rootTagName = rootTagName.replaceAll("\\{.*\\}(.*)", "$1"); |
344 | if (rootTagName.contains(":")) { |
345 | rootTagNamespacePrefix = rootTagName.replaceAll("(.*):.*", "$1"); |
346 | rootTagName = rootTagName.replaceAll(".*:(.*)", "$1"); |
347 | } |
348 | } |
349 | } |
350 | |
351 | /** |
352 | * Open the output source |
353 | * |
354 | * @see org.springframework.batch.item.ItemStream#open(ExecutionContext) |
355 | */ |
356 | @Override |
357 | public void open(ExecutionContext executionContext) { |
358 | super.open(executionContext); |
359 | |
360 | Assert.notNull(resource, "The resource must be set"); |
361 | |
362 | long startAtPosition = 0; |
363 | boolean restarted = false; |
364 | |
365 | // if restart data is provided, restart from provided offset |
366 | // otherwise start from beginning |
367 | if (executionContext.containsKey(getExecutionContextKey(RESTART_DATA_NAME))) { |
368 | startAtPosition = executionContext.getLong(getExecutionContextKey(RESTART_DATA_NAME)); |
369 | currentRecordCount = executionContext.getLong(getExecutionContextKey(WRITE_STATISTICS_NAME)); |
370 | restarted = true; |
371 | if (shouldDeleteIfEmpty && currentRecordCount == 0) { |
372 | // previous execution deleted the output file because no items were written |
373 | restarted = false; |
374 | startAtPosition = 0; |
375 | } else { |
376 | restarted = true; |
377 | } |
378 | } |
379 | |
380 | open(startAtPosition, restarted); |
381 | |
382 | if (startAtPosition == 0) { |
383 | try { |
384 | if (headerCallback != null) { |
385 | headerCallback.write(delegateEventWriter); |
386 | } |
387 | } |
388 | catch (IOException e) { |
389 | throw new ItemStreamException("Failed to write headerItems", e); |
390 | } |
391 | } |
392 | |
393 | } |
394 | |
395 | /** |
396 | * Helper method for opening output source at given file position |
397 | */ |
398 | private void open(long position, boolean restarted) { |
399 | |
400 | File file; |
401 | FileOutputStream os = null; |
402 | FileChannel fileChannel = null; |
403 | |
404 | try { |
405 | file = resource.getFile(); |
406 | FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput); |
407 | Assert.state(resource.exists(), "Output resource must exist"); |
408 | os = new FileOutputStream(file, true); |
409 | fileChannel = os.getChannel(); |
410 | channel = os.getChannel(); |
411 | setPosition(position); |
412 | } |
413 | catch (IOException ioe) { |
414 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe); |
415 | } |
416 | |
417 | XMLOutputFactory outputFactory = createXmlOutputFactory(); |
418 | |
419 | if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) { |
420 | // If the current XMLOutputFactory implementation is supplied by |
421 | // Woodstox >= 3.2.9 we want to disable its |
422 | // automatic end element feature (see: |
423 | // http://jira.codehaus.org/browse/WSTX-165) per |
424 | // http://jira.springframework.org/browse/BATCH-761). |
425 | outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE); |
426 | } |
427 | if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) { |
428 | // On restart we don't write the root element so we have to disable |
429 | // structural validation (see: |
430 | // http://jira.springframework.org/browse/BATCH-1681). |
431 | outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE); |
432 | } |
433 | |
434 | try { |
435 | final FileChannel channel = fileChannel; |
436 | if (transactional) { |
437 | TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() { |
438 | @Override |
439 | public void run() { |
440 | closeStream(); |
441 | } |
442 | }); |
443 | |
444 | writer.setEncoding(encoding); |
445 | writer.setForceSync(forceSync); |
446 | bufferedWriter = writer; |
447 | } |
448 | else { |
449 | bufferedWriter = new BufferedWriter(new OutputStreamWriter(os, encoding)); |
450 | } |
451 | delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter); |
452 | eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter); |
453 | initNamespaceContext(delegateEventWriter); |
454 | if (!restarted) { |
455 | startDocument(delegateEventWriter); |
456 | if (forceSync) { |
457 | channel.force(false); |
458 | } |
459 | } |
460 | } |
461 | catch (XMLStreamException xse) { |
462 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse); |
463 | } |
464 | catch (UnsupportedEncodingException e) { |
465 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource |
466 | + "] with encoding=[" + encoding + "]", e); |
467 | } |
468 | catch (IOException e) { |
469 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); |
470 | } |
471 | } |
472 | |
473 | /** |
474 | * Subclasses can override to customize the writer. |
475 | * @param outputFactory |
476 | * @param writer |
477 | * @return an xml writer |
478 | * @throws XMLStreamException |
479 | */ |
480 | protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer) |
481 | throws XMLStreamException { |
482 | return outputFactory.createXMLEventWriter(writer); |
483 | } |
484 | |
485 | /** |
486 | * Subclasses can override to customize the factory. |
487 | * @return a factory for the xml output |
488 | * @throws FactoryConfigurationError |
489 | */ |
490 | protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError { |
491 | return XMLOutputFactory.newInstance(); |
492 | } |
493 | |
494 | /** |
495 | * Subclasses can override to customize the event factory. |
496 | * @return a factory for the xml events |
497 | * @throws FactoryConfigurationError |
498 | */ |
499 | protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError { |
500 | XMLEventFactory factory = XMLEventFactory.newInstance(); |
501 | return factory; |
502 | } |
503 | |
504 | /** |
505 | * Subclasses can override to customize the stax result. |
506 | * @return a result for writing to |
507 | * @throws Exception |
508 | */ |
509 | protected Result createStaxResult() throws Exception { |
510 | return StaxUtils.getResult(eventWriter); |
511 | } |
512 | |
513 | /** |
514 | * Inits the namespace context of the XMLEventWriter: |
515 | * <ul> |
516 | * <li>rootTagNamespacePrefix for rootTagName</li> |
517 | * <li>any other xmlns namespace prefix declarations in the root element attributes</li> |
518 | * </ul> |
519 | * |
520 | * @param writer XML event writer |
521 | * @throws XMLStreamException |
522 | */ |
523 | protected void initNamespaceContext(XMLEventWriter writer) throws XMLStreamException { |
524 | if (StringUtils.hasText(getRootTagNamespace())) { |
525 | if(StringUtils.hasText(getRootTagNamespacePrefix())) { |
526 | writer.setPrefix(getRootTagNamespacePrefix(), getRootTagNamespace()); |
527 | } else { |
528 | writer.setDefaultNamespace(getRootTagNamespace()); |
529 | } |
530 | } |
531 | if (!CollectionUtils.isEmpty(getRootElementAttributes())) { |
532 | for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) { |
533 | String key = entry.getKey(); |
534 | if (key.startsWith("xmlns")) { |
535 | String prefix = ""; |
536 | if (key.contains(":")) { |
537 | prefix = key.substring(key.indexOf(":") + 1); |
538 | } |
539 | log.debug("registering prefix: " +prefix + "=" + entry.getValue()); |
540 | writer.setPrefix(prefix, entry.getValue()); |
541 | } |
542 | } |
543 | } |
544 | } |
545 | |
546 | /** |
547 | * Writes simple XML header containing: |
548 | * <ul> |
549 | * <li>xml declaration - defines encoding and XML version</li> |
550 | * <li>opening tag of the root element and its attributes</li> |
551 | * </ul> |
552 | * If this is not sufficient for you, simply override this method. Encoding, |
553 | * version and root tag name can be retrieved with corresponding getters. |
554 | * |
555 | * @param writer XML event writer |
556 | * @throws XMLStreamException |
557 | */ |
558 | protected void startDocument(XMLEventWriter writer) throws XMLStreamException { |
559 | |
560 | XMLEventFactory factory = createXmlEventFactory(); |
561 | |
562 | // write start document |
563 | writer.add(factory.createStartDocument(getEncoding(), getVersion())); |
564 | |
565 | // write root tag |
566 | writer.add(factory.createStartElement(getRootTagNamespacePrefix(), getRootTagNamespace(), getRootTagName())); |
567 | if (StringUtils.hasText(getRootTagNamespace())) { |
568 | if (StringUtils.hasText(getRootTagNamespacePrefix())) { |
569 | writer.add(factory.createNamespace(getRootTagNamespacePrefix(), getRootTagNamespace())); |
570 | } |
571 | else { |
572 | writer.add(factory.createNamespace(getRootTagNamespace())); |
573 | } |
574 | } |
575 | |
576 | // write root tag attributes |
577 | if (!CollectionUtils.isEmpty(getRootElementAttributes())) { |
578 | |
579 | for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) { |
580 | String key = entry.getKey(); |
581 | if (key.startsWith("xmlns")) { |
582 | String prefix = ""; |
583 | if (key.contains(":")) { |
584 | prefix = key.substring(key.indexOf(":") + 1); |
585 | } |
586 | writer.add(factory.createNamespace(prefix, entry.getValue())); |
587 | } |
588 | else { |
589 | writer.add(factory.createAttribute(key, entry.getValue())); |
590 | } |
591 | } |
592 | |
593 | } |
594 | |
595 | /* |
596 | * This forces the flush to write the end of the root element and avoids |
597 | * an off-by-one error on restart. |
598 | */ |
599 | writer.add(factory.createIgnorableSpace("")); |
600 | writer.flush(); |
601 | |
602 | } |
603 | |
604 | /** |
605 | * Writes the EndDocument tag manually. |
606 | * |
607 | * @param writer XML event writer |
608 | * @throws XMLStreamException |
609 | */ |
610 | protected void endDocument(XMLEventWriter writer) throws XMLStreamException { |
611 | |
612 | // writer.writeEndDocument(); <- this doesn't work after restart |
613 | // we need to write end tag of the root element manually |
614 | |
615 | String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":"; |
616 | try { |
617 | bufferedWriter.write("</" + nsPrefix + getRootTagName() + ">"); |
618 | } |
619 | catch (IOException ioe) { |
620 | throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe); |
621 | } |
622 | } |
623 | |
624 | /** |
625 | * Flush and close the output source. |
626 | * |
627 | * @see org.springframework.batch.item.ItemStream#close() |
628 | */ |
629 | @Override |
630 | public void close() { |
631 | super.close(); |
632 | |
633 | XMLEventFactory factory = createXmlEventFactory(); |
634 | try { |
635 | delegateEventWriter.add(factory.createCharacters("")); |
636 | } |
637 | catch (XMLStreamException e) { |
638 | log.error(e); |
639 | } |
640 | |
641 | try { |
642 | if (footerCallback != null) { |
643 | footerCallback.write(delegateEventWriter); |
644 | } |
645 | delegateEventWriter.flush(); |
646 | endDocument(delegateEventWriter); |
647 | } |
648 | catch (IOException e) { |
649 | throw new ItemStreamException("Failed to write footer items", e); |
650 | } |
651 | catch (XMLStreamException e) { |
652 | throw new ItemStreamException("Failed to write end document tag", e); |
653 | } |
654 | finally { |
655 | |
656 | try { |
657 | delegateEventWriter.close(); |
658 | } |
659 | catch (XMLStreamException e) { |
660 | log.error("Unable to close file resource: [" + resource + "] " + e); |
661 | } |
662 | finally { |
663 | try { |
664 | bufferedWriter.close(); |
665 | } |
666 | catch (IOException e) { |
667 | log.error("Unable to close file resource: [" + resource + "] " + e); |
668 | } |
669 | finally { |
670 | if (!transactional) { |
671 | closeStream(); |
672 | } |
673 | } |
674 | } |
675 | if (currentRecordCount == 0 && shouldDeleteIfEmpty) { |
676 | try { |
677 | resource.getFile().delete(); |
678 | } |
679 | catch (IOException e) { |
680 | throw new ItemStreamException("Failed to delete empty file on close", e); |
681 | } |
682 | } |
683 | } |
684 | } |
685 | |
686 | private void closeStream() { |
687 | try { |
688 | channel.close(); |
689 | } |
690 | catch (IOException ioe) { |
691 | log.error("Unable to close file resource: [" + resource + "] " + ioe); |
692 | } |
693 | } |
694 | |
695 | /** |
696 | * Write the value objects and flush them to the file. |
697 | * |
698 | * @param items the value object |
699 | * @throws IOException |
700 | * @throws XmlMappingException |
701 | */ |
702 | @Override |
703 | public void write(List<? extends T> items) throws XmlMappingException, Exception { |
704 | |
705 | currentRecordCount += items.size(); |
706 | |
707 | for (Object object : items) { |
708 | Assert.state(marshaller.supports(object.getClass()), |
709 | "Marshaller must support the class of the marshalled object"); |
710 | Result result = createStaxResult(); |
711 | marshaller.marshal(object, result); |
712 | } |
713 | try { |
714 | eventWriter.flush(); |
715 | if (forceSync) { |
716 | channel.force(false); |
717 | } |
718 | } |
719 | catch (XMLStreamException e) { |
720 | throw new WriteFailedException("Failed to flush the events", e); |
721 | } |
722 | catch (IOException e) { |
723 | throw new WriteFailedException("Failed to flush the events", e); |
724 | } |
725 | |
726 | } |
727 | |
728 | /** |
729 | * Get the restart data. |
730 | * |
731 | * @see org.springframework.batch.item.ItemStream#update(ExecutionContext) |
732 | */ |
733 | @Override |
734 | public void update(ExecutionContext executionContext) { |
735 | super.update(executionContext); |
736 | if (saveState) { |
737 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
738 | executionContext.putLong(getExecutionContextKey(RESTART_DATA_NAME), getPosition()); |
739 | executionContext.putLong(getExecutionContextKey(WRITE_STATISTICS_NAME), currentRecordCount); |
740 | } |
741 | } |
742 | |
743 | /* |
744 | * Get the actual position in file channel. This method flushes any buffered |
745 | * data before position is read. |
746 | * |
747 | * @return byte offset in file channel |
748 | */ |
749 | private long getPosition() { |
750 | |
751 | long position; |
752 | |
753 | try { |
754 | eventWriter.flush(); |
755 | position = channel.position(); |
756 | if (bufferedWriter instanceof TransactionAwareBufferedWriter) { |
757 | position += ((TransactionAwareBufferedWriter) bufferedWriter).getBufferSize(); |
758 | } |
759 | } |
760 | catch (Exception e) { |
761 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); |
762 | } |
763 | |
764 | return position; |
765 | } |
766 | |
767 | /** |
768 | * Set the file channel position. |
769 | * |
770 | * @param newPosition new file channel position |
771 | */ |
772 | private void setPosition(long newPosition) { |
773 | |
774 | try { |
775 | channel.truncate(newPosition); |
776 | channel.position(newPosition); |
777 | } |
778 | catch (IOException e) { |
779 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); |
780 | } |
781 | |
782 | } |
783 | |
784 | } |