1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.xml;
18
19 import java.io.BufferedWriter;
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStreamWriter;
24 import java.io.UnsupportedEncodingException;
25 import java.io.Writer;
26 import java.nio.channels.Channels;
27 import java.nio.channels.FileChannel;
28 import java.util.List;
29 import java.util.Map;
30
31 import javax.xml.stream.FactoryConfigurationError;
32 import javax.xml.stream.XMLEventFactory;
33 import javax.xml.stream.XMLEventWriter;
34 import javax.xml.stream.XMLOutputFactory;
35 import javax.xml.stream.XMLStreamException;
36 import javax.xml.transform.Result;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.springframework.batch.item.ExecutionContext;
41 import org.springframework.batch.item.ItemStreamException;
42 import org.springframework.batch.item.ItemWriter;
43 import org.springframework.batch.item.WriteFailedException;
44 import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream;
45 import org.springframework.batch.item.util.ExecutionContextUserSupport;
46 import org.springframework.batch.item.util.FileUtils;
47 import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter;
48 import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
49 import org.springframework.beans.factory.InitializingBean;
50 import org.springframework.core.io.Resource;
51 import org.springframework.dao.DataAccessResourceFailureException;
52 import org.springframework.oxm.Marshaller;
53 import org.springframework.oxm.XmlMappingException;
54 import org.springframework.util.Assert;
55 import org.springframework.util.ClassUtils;
56 import org.springframework.util.CollectionUtils;
57 import org.springframework.util.StringUtils;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class StaxEventItemWriter<T> extends ExecutionContextUserSupport implements
73 ResourceAwareItemWriterItemStream<T>, InitializingBean {
74
75 private static final Log log = LogFactory.getLog(StaxEventItemWriter.class);
76
77
78 private static final String DEFAULT_ENCODING = "UTF-8";
79
80
81 private static final String DEFAULT_XML_VERSION = "1.0";
82
83
84 private static final String DEFAULT_ROOT_TAG_NAME = "root";
85
86
87 private static final String RESTART_DATA_NAME = "position";
88
89
90 private static final String WRITE_STATISTICS_NAME = "record.count";
91
92
93 private Resource resource;
94
95
96 private Marshaller marshaller;
97
98
99 private String encoding = DEFAULT_ENCODING;
100
101
102 private String version = DEFAULT_XML_VERSION;
103
104
105 private String rootTagName = DEFAULT_ROOT_TAG_NAME;
106
107
108 private String rootTagNamespacePrefix = "";
109
110
111 private String rootTagNamespace = "";
112
113
114 private Map<String, String> rootElementAttributes = null;
115
116
117
118 private boolean overwriteOutput = true;
119
120
121 private FileChannel channel;
122
123
124
125 private XMLEventWriter eventWriter;
126
127
128 private XMLEventWriter delegateEventWriter;
129
130
131 private long currentRecordCount = 0;
132
133 private boolean saveState = true;
134
135 private StaxWriterCallback headerCallback;
136
137 private StaxWriterCallback footerCallback;
138
139 private Writer bufferedWriter;
140
141 private boolean transactional = true;
142
143 private boolean forceSync;
144
145 public StaxEventItemWriter() {
146 setName(ClassUtils.getShortName(StaxEventItemWriter.class));
147 }
148
149
150
151
152
153
154 public void setResource(Resource resource) {
155 this.resource = resource;
156 }
157
158
159
160
161
162
163 public void setMarshaller(Marshaller marshaller) {
164 this.marshaller = marshaller;
165 }
166
167
168
169
170 public void setHeaderCallback(StaxWriterCallback headerCallback) {
171 this.headerCallback = headerCallback;
172 }
173
174
175
176
177
178 public void setFooterCallback(StaxWriterCallback footerCallback) {
179 this.footerCallback = footerCallback;
180 }
181
182
183
184
185
186
187
188 public void setTransactional(boolean transactional) {
189 this.transactional = transactional;
190 }
191
192
193
194
195
196
197
198
199
200
201 public void setForceSync(boolean forceSync) {
202 this.forceSync = forceSync;
203 }
204
205
206
207
208
209
210 public String getEncoding() {
211 return encoding;
212 }
213
214
215
216
217
218
219 public void setEncoding(String encoding) {
220 this.encoding = encoding;
221 }
222
223
224
225
226
227
228 public String getVersion() {
229 return version;
230 }
231
232
233
234
235
236
237 public void setVersion(String version) {
238 this.version = version;
239 }
240
241
242
243
244
245
246 public String getRootTagName() {
247 return rootTagName;
248 }
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266 public void setRootTagName(String rootTagName) {
267 this.rootTagName = rootTagName;
268 }
269
270
271
272
273
274
275 public String getRootTagNamespacePrefix() {
276 return rootTagNamespacePrefix;
277 }
278
279
280
281
282
283
284 public String getRootTagNamespace() {
285 return rootTagNamespace;
286 }
287
288
289
290
291
292
293 public Map<String, String> getRootElementAttributes() {
294 return rootElementAttributes;
295 }
296
297
298
299
300
301
302
303 public void setRootElementAttributes(Map<String, String> rootElementAttributes) {
304 this.rootElementAttributes = rootElementAttributes;
305 }
306
307
308
309
310
311
312
313 public void setOverwriteOutput(boolean overwriteOutput) {
314 this.overwriteOutput = overwriteOutput;
315 }
316
317 public void setSaveState(boolean saveState) {
318 this.saveState = saveState;
319 }
320
321
322
323
324
325 public void afterPropertiesSet() throws Exception {
326 Assert.notNull(marshaller);
327 if (rootTagName.contains("{")) {
328 rootTagNamespace = rootTagName.replaceAll("\\{(.*)\\}.*", "$1");
329 rootTagName = rootTagName.replaceAll("\\{.*\\}(.*)", "$1");
330 if (rootTagName.contains(":")) {
331 rootTagNamespacePrefix = rootTagName.replaceAll("(.*):.*", "$1");
332 rootTagName = rootTagName.replaceAll(".*:(.*)", "$1");
333 }
334 }
335 }
336
337
338
339
340
341
342 public void open(ExecutionContext executionContext) {
343
344 Assert.notNull(resource, "The resource must be set");
345
346 long startAtPosition = 0;
347 boolean restarted = false;
348
349
350
351 if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
352 startAtPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
353 restarted = true;
354 }
355
356 open(startAtPosition, restarted);
357
358 if (startAtPosition == 0) {
359 try {
360 if (headerCallback != null) {
361 headerCallback.write(delegateEventWriter);
362 }
363 }
364 catch (IOException e) {
365 throw new ItemStreamException("Failed to write headerItems", e);
366 }
367 }
368
369 }
370
371
372
373
374 private void open(long position, boolean restarted) {
375
376 File file;
377 FileOutputStream os = null;
378 FileChannel fileChannel = null;
379
380 try {
381 file = resource.getFile();
382 FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput);
383 Assert.state(resource.exists(), "Output resource must exist");
384 os = new FileOutputStream(file, true);
385 fileChannel = os.getChannel();
386 channel = os.getChannel();
387 setPosition(position);
388 }
389 catch (IOException ioe) {
390 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe);
391 }
392
393 XMLOutputFactory outputFactory = createXmlOutputFactory();
394
395 if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) {
396
397
398
399
400
401 outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE);
402 }
403 if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) {
404
405
406
407 outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE);
408 }
409
410 try {
411 final FileChannel channel = fileChannel;
412 Writer writer = new BufferedWriter(new OutputStreamWriter(os, encoding)) {
413 @Override
414 public void flush() throws IOException {
415 super.flush();
416 if (forceSync) {
417 channel.force(false);
418 }
419 }
420 };
421 if (transactional) {
422 bufferedWriter = new TransactionAwareBufferedWriter(writer, new Runnable() {
423 public void run() {
424 closeStream();
425 }
426 });
427 }
428 else {
429 bufferedWriter = writer;
430 }
431 delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter);
432 eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter);
433 if (!restarted) {
434 startDocument(delegateEventWriter);
435 }
436 }
437 catch (XMLStreamException xse) {
438 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse);
439 }
440 catch (UnsupportedEncodingException e) {
441 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource
442 + "] with encoding=[" + encoding + "]", e);
443 }
444
445 }
446
447
448
449
450
451
452
453
454 protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer)
455 throws XMLStreamException {
456 return outputFactory.createXMLEventWriter(writer);
457 }
458
459
460
461
462
463
464 protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError {
465 return XMLOutputFactory.newInstance();
466 }
467
468
469
470
471
472
473 protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError {
474 XMLEventFactory factory = XMLEventFactory.newInstance();
475 return factory;
476 }
477
478
479
480
481
482
483 protected Result createStaxResult() throws Exception {
484 return StaxUtils.getResult(eventWriter);
485 }
486
487
488
489
490
491
492
493
494
495
496
497
498
499 protected void startDocument(XMLEventWriter writer) throws XMLStreamException {
500
501 XMLEventFactory factory = createXmlEventFactory();
502
503
504 writer.add(factory.createStartDocument(getEncoding(), getVersion()));
505
506
507 writer.add(factory.createStartElement(getRootTagNamespacePrefix(), getRootTagNamespace(), getRootTagName()));
508 if (StringUtils.hasText(getRootTagNamespace())) {
509 if (StringUtils.hasText(getRootTagNamespacePrefix())) {
510 writer.add(factory.createNamespace(getRootTagNamespacePrefix(), getRootTagNamespace()));
511 }
512 else {
513 writer.add(factory.createNamespace(getRootTagNamespace()));
514 }
515 }
516
517
518 if (!CollectionUtils.isEmpty(getRootElementAttributes())) {
519
520 for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) {
521 String key = entry.getKey();
522 if (key.startsWith("xmlns")) {
523 String prefix = "";
524 if (key.contains(":")) {
525 prefix = key.substring(key.indexOf(":") + 1);
526 }
527 writer.add(factory.createNamespace(prefix, entry.getValue()));
528 }
529 else {
530 writer.add(factory.createAttribute(key, entry.getValue()));
531 }
532 }
533
534 }
535
536
537
538
539
540 writer.add(factory.createIgnorableSpace(""));
541 writer.flush();
542
543 }
544
545
546
547
548
549
550
551 protected void endDocument(XMLEventWriter writer) throws XMLStreamException {
552
553
554
555
556 String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":";
557 try {
558 bufferedWriter.write("</" + nsPrefix + getRootTagName() + ">");
559 }
560 catch (IOException ioe) {
561 throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe);
562 }
563 }
564
565
566
567
568
569
570 public void close() {
571
572 XMLEventFactory factory = createXmlEventFactory();
573 try {
574 delegateEventWriter.add(factory.createCharacters(""));
575 }
576 catch (XMLStreamException e) {
577 log.error(e);
578 }
579
580 try {
581 if (footerCallback != null) {
582 footerCallback.write(delegateEventWriter);
583 }
584 delegateEventWriter.flush();
585 endDocument(delegateEventWriter);
586 }
587 catch (IOException e) {
588 throw new ItemStreamException("Failed to write footer items", e);
589 }
590 catch (XMLStreamException e) {
591 throw new ItemStreamException("Failed to write end document tag", e);
592 }
593 finally {
594
595 try {
596 eventWriter.close();
597 }
598 catch (XMLStreamException e) {
599 log.error("Unable to close file resource: [" + resource + "] " + e);
600 }
601 finally {
602 try {
603 bufferedWriter.close();
604 }
605 catch (IOException e) {
606 log.error("Unable to close file resource: [" + resource + "] " + e);
607 }
608 finally {
609 if (!transactional) {
610 closeStream();
611 }
612 }
613 }
614 }
615 }
616
617 private void closeStream() {
618 try {
619 channel.close();
620 }
621 catch (IOException ioe) {
622 log.error("Unable to close file resource: [" + resource + "] " + ioe);
623 }
624 }
625
626
627
628
629
630
631
632
633 public void write(List<? extends T> items) throws XmlMappingException, Exception {
634
635 currentRecordCount += items.size();
636
637 for (Object object : items) {
638 Assert.state(marshaller.supports(object.getClass()),
639 "Marshaller must support the class of the marshalled object");
640 Result result = createStaxResult();
641 marshaller.marshal(object, result);
642 }
643 try {
644 eventWriter.flush();
645 }
646 catch (XMLStreamException e) {
647 throw new WriteFailedException("Failed to flush the events", e);
648 }
649
650 }
651
652
653
654
655
656
657 public void update(ExecutionContext executionContext) {
658
659 if (saveState) {
660 Assert.notNull(executionContext, "ExecutionContext must not be null");
661 executionContext.putLong(getKey(RESTART_DATA_NAME), getPosition());
662 executionContext.putLong(getKey(WRITE_STATISTICS_NAME), currentRecordCount);
663 }
664 }
665
666
667
668
669
670
671
672 private long getPosition() {
673
674 long position;
675
676 try {
677 eventWriter.flush();
678 position = channel.position();
679 if (bufferedWriter instanceof TransactionAwareBufferedWriter) {
680 position += ((TransactionAwareBufferedWriter) bufferedWriter).getBufferSize();
681 }
682 }
683 catch (Exception e) {
684 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
685 }
686
687 return position;
688 }
689
690
691
692
693
694
695 private void setPosition(long newPosition) {
696
697 try {
698 channel.truncate(newPosition);
699 channel.position(newPosition);
700 }
701 catch (IOException e) {
702 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
703 }
704
705 }
706
707 }