View Javadoc

1   /*
2    * Copyright 2006-2012 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.xml;
18  
19  import java.io.BufferedWriter;
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStreamWriter;
24  import java.io.UnsupportedEncodingException;
25  import java.io.Writer;
26  import java.nio.channels.FileChannel;
27  import java.util.List;
28  import java.util.Map;
29  
30  import javax.xml.stream.FactoryConfigurationError;
31  import javax.xml.stream.XMLEventFactory;
32  import javax.xml.stream.XMLEventWriter;
33  import javax.xml.stream.XMLOutputFactory;
34  import javax.xml.stream.XMLStreamException;
35  import javax.xml.transform.Result;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.springframework.batch.item.ExecutionContext;
40  import org.springframework.batch.item.ItemStreamException;
41  import org.springframework.batch.item.ItemWriter;
42  import org.springframework.batch.item.WriteFailedException;
43  import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream;
44  import org.springframework.batch.item.util.ExecutionContextUserSupport;
45  import org.springframework.batch.item.util.FileUtils;
46  import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter;
47  import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
48  import org.springframework.beans.factory.InitializingBean;
49  import org.springframework.core.io.Resource;
50  import org.springframework.dao.DataAccessResourceFailureException;
51  import org.springframework.oxm.Marshaller;
52  import org.springframework.oxm.XmlMappingException;
53  import org.springframework.util.Assert;
54  import org.springframework.util.ClassUtils;
55  import org.springframework.util.CollectionUtils;
56  import org.springframework.util.StringUtils;
57  
58  /**
59   * An implementation of {@link ItemWriter} which uses StAX and
60   * {@link Marshaller} for serializing object to XML.
61   * 
62   * This item writer also provides restart, statistics and transaction features
63   * by implementing corresponding interfaces.
64   * 
65   * The implementation is *not* thread-safe.
66   * 
67   * @author Peter Zozom
68   * @author Robert Kasanicky
69   * @author Michael Minella
70   * 
71   */
72  public class StaxEventItemWriter<T> extends ExecutionContextUserSupport implements
73  		ResourceAwareItemWriterItemStream<T>, InitializingBean {
74  
75  	private static final Log log = LogFactory.getLog(StaxEventItemWriter.class);
76  
77  	// default encoding
78  	private static final String DEFAULT_ENCODING = "UTF-8";
79  
80  	// default encoding
81  	private static final String DEFAULT_XML_VERSION = "1.0";
82  
83  	// default root tag name
84  	private static final String DEFAULT_ROOT_TAG_NAME = "root";
85  
86  	// restart data property name
87  	private static final String RESTART_DATA_NAME = "position";
88  
89  	// restart data property name
90  	private static final String WRITE_STATISTICS_NAME = "record.count";
91  
92  	// file system resource
93  	private Resource resource;
94  
95  	// xml marshaller
96  	private Marshaller marshaller;
97  
98  	// encoding to be used while reading from the resource
99  	private String encoding = DEFAULT_ENCODING;
100 
101 	// XML version
102 	private String version = DEFAULT_XML_VERSION;
103 
104 	// name of the root tag
105 	private String rootTagName = DEFAULT_ROOT_TAG_NAME;
106 
107 	// namespace prefix of the root tag
108 	private String rootTagNamespacePrefix = "";
109 
110 	// namespace of the root tag
111 	private String rootTagNamespace = "";
112 
113 	// root element attributes
114 	private Map<String, String> rootElementAttributes = null;
115 
116 	// TRUE means, that output file will be overwritten if exists - default is
117 	// TRUE
118 	private boolean overwriteOutput = true;
119 
120 	// file channel
121 	private FileChannel channel;
122 
123 	// wrapper for XML event writer that swallows StartDocument and EndDocument
124 	// events
125 	private XMLEventWriter eventWriter;
126 
127 	// XML event writer
128 	private XMLEventWriter delegateEventWriter;
129 
130 	// current count of processed records
131 	private long currentRecordCount = 0;
132 
133 	private boolean saveState = true;
134 
135 	private StaxWriterCallback headerCallback;
136 
137 	private StaxWriterCallback footerCallback;
138 
139 	private Writer bufferedWriter;
140 
141 	private boolean transactional = true;
142 
143 	private boolean forceSync;
144 
145 	public StaxEventItemWriter() {
146 		setName(ClassUtils.getShortName(StaxEventItemWriter.class));
147 	}
148 
149 	/**
150 	 * Set output file.
151 	 * 
152 	 * @param resource the output file
153 	 */
154     @Override
155 	public void setResource(Resource resource) {
156 		this.resource = resource;
157 	}
158 
159 	/**
160 	 * Set Object to XML marshaller.
161 	 * 
162 	 * @param marshaller the Object to XML marshaller
163 	 */
164 	public void setMarshaller(Marshaller marshaller) {
165 		this.marshaller = marshaller;
166 	}
167 
168 	/**
169 	 * headerCallback is called before writing any items.
170 	 */
171 	public void setHeaderCallback(StaxWriterCallback headerCallback) {
172 		this.headerCallback = headerCallback;
173 	}
174 
175 	/**
176 	 * footerCallback is called after writing all items but before closing the
177 	 * file
178 	 */
179 	public void setFooterCallback(StaxWriterCallback footerCallback) {
180 		this.footerCallback = footerCallback;
181 	}
182 
183 	/**
184 	 * Flag to indicate that writes should be deferred to the end of a
185 	 * transaction if present. Defaults to true.
186 	 * 
187 	 * @param transactional the flag to set
188 	 */
189 	public void setTransactional(boolean transactional) {
190 		this.transactional = transactional;
191 	}
192 
193 	/**
194 	 * Flag to indicate that changes should be force-synced to disk on flush.
195 	 * Defaults to false, which means that even with a local disk changes could
196 	 * be lost if the OS crashes in between a write and a cache flush. Setting
197 	 * to true may result in slower performance for usage patterns involving
198 	 * many frequent writes.
199 	 * 
200 	 * @param forceSync the flag value to set
201 	 */
202 	public void setForceSync(boolean forceSync) {
203 		this.forceSync = forceSync;
204 	}
205 
206 	/**
207 	 * Get used encoding.
208 	 * 
209 	 * @return the encoding used
210 	 */
211 	public String getEncoding() {
212 		return encoding;
213 	}
214 
215 	/**
216 	 * Set encoding to be used for output file.
217 	 * 
218 	 * @param encoding the encoding to be used
219 	 */
220 	public void setEncoding(String encoding) {
221 		this.encoding = encoding;
222 	}
223 
224 	/**
225 	 * Get XML version.
226 	 * 
227 	 * @return the XML version used
228 	 */
229 	public String getVersion() {
230 		return version;
231 	}
232 
233 	/**
234 	 * Set XML version to be used for output XML.
235 	 * 
236 	 * @param version the XML version to be used
237 	 */
238 	public void setVersion(String version) {
239 		this.version = version;
240 	}
241 
242 	/**
243 	 * Get the tag name of the root element.
244 	 * 
245 	 * @return the root element tag name
246 	 */
247 	public String getRootTagName() {
248 		return rootTagName;
249 	}
250 
251 	/**
252 	 * Set the tag name of the root element. If not set, default name is used
253 	 * ("root"). Namespace URI and prefix can also be set optionally using the
254 	 * notation:
255 	 * 
256 	 * <pre>
257 	 * {uri}prefix:root
258 	 * </pre>
259 	 * 
260 	 * The prefix is optional (defaults to empty), but if it is specified then
261 	 * the uri must be provided. In addition you might want to declare other
262 	 * namespaces using the {@link #setRootElementAttributes(Map) root
263 	 * attributes}.
264 	 * 
265 	 * @param rootTagName the tag name to be used for the root element
266 	 */
267 	public void setRootTagName(String rootTagName) {
268 		this.rootTagName = rootTagName;
269 	}
270 
271 	/**
272 	 * Get the namespace prefix of the root element. Empty by default.
273 	 * 
274 	 * @return the rootTagNamespacePrefix
275 	 */
276 	public String getRootTagNamespacePrefix() {
277 		return rootTagNamespacePrefix;
278 	}
279 
280 	/**
281 	 * Get the namespace of the root element.
282 	 * 
283 	 * @return the rootTagNamespace
284 	 */
285 	public String getRootTagNamespace() {
286 		return rootTagNamespace;
287 	}
288 
289 	/**
290 	 * Get attributes of the root element.
291 	 * 
292 	 * @return attributes of the root element
293 	 */
294 	public Map<String, String> getRootElementAttributes() {
295 		return rootElementAttributes;
296 	}
297 
298 	/**
299 	 * Set the root element attributes to be written. If any of the key names
300 	 * begin with "xmlns:" then they are treated as namespace declarations.
301 	 * 
302 	 * @param rootElementAttributes attributes of the root element
303 	 */
304 	public void setRootElementAttributes(Map<String, String> rootElementAttributes) {
305 		this.rootElementAttributes = rootElementAttributes;
306 	}
307 
308 	/**
309 	 * Set "overwrite" flag for the output file. Flag is ignored when output
310 	 * file processing is restarted.
311 	 * 
312 	 * @param overwriteOutput
313 	 */
314 	public void setOverwriteOutput(boolean overwriteOutput) {
315 		this.overwriteOutput = overwriteOutput;
316 	}
317 
318 	public void setSaveState(boolean saveState) {
319 		this.saveState = saveState;
320 	}
321 
322 	/**
323 	 * @throws Exception
324 	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
325 	 */
326     @Override
327 	public void afterPropertiesSet() throws Exception {
328 		Assert.notNull(marshaller);
329 		if (rootTagName.contains("{")) {
330 			rootTagNamespace = rootTagName.replaceAll("\\{(.*)\\}.*", "$1");
331 			rootTagName = rootTagName.replaceAll("\\{.*\\}(.*)", "$1");
332 			if (rootTagName.contains(":")) {
333 				rootTagNamespacePrefix = rootTagName.replaceAll("(.*):.*", "$1");
334 				rootTagName = rootTagName.replaceAll(".*:(.*)", "$1");
335 			}
336 		}
337 	}
338 
339 	/**
340 	 * Open the output source
341 	 * 
342 	 * @see org.springframework.batch.item.ItemStream#open(ExecutionContext)
343 	 */
344     @Override
345 	public void open(ExecutionContext executionContext) {
346 
347 		Assert.notNull(resource, "The resource must be set");
348 
349 		long startAtPosition = 0;
350 		boolean restarted = false;
351 
352 		// if restart data is provided, restart from provided offset
353 		// otherwise start from beginning
354 		if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
355 			startAtPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
356 			restarted = true;
357 		}
358 
359 		open(startAtPosition, restarted);
360 
361 		if (startAtPosition == 0) {
362 			try {
363 				if (headerCallback != null) {
364 					headerCallback.write(delegateEventWriter);
365 				}
366 			}
367 			catch (IOException e) {
368 				throw new ItemStreamException("Failed to write headerItems", e);
369 			}
370 		}
371 
372 	}
373 
374 	/**
375 	 * Helper method for opening output source at given file position
376 	 */
377 	private void open(long position, boolean restarted) {
378 
379 		File file;
380 		FileOutputStream os = null;
381 		FileChannel fileChannel = null;
382 
383 		try {
384 			file = resource.getFile();
385 			FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput);
386 			Assert.state(resource.exists(), "Output resource must exist");
387 			os = new FileOutputStream(file, true);
388 			fileChannel = os.getChannel();
389 			channel = os.getChannel();
390 			setPosition(position);
391 		}
392 		catch (IOException ioe) {
393 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe);
394 		}
395 
396 		XMLOutputFactory outputFactory = createXmlOutputFactory();
397 
398 		if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) {
399 			// If the current XMLOutputFactory implementation is supplied by
400 			// Woodstox >= 3.2.9 we want to disable its
401 			// automatic end element feature (see:
402 			// http://jira.codehaus.org/browse/WSTX-165) per
403 			// http://jira.springframework.org/browse/BATCH-761).
404 			outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE);
405 		}
406 		if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) {
407 			// On restart we don't write the root element so we have to disable
408 			// structural validation (see:
409 			// http://jira.springframework.org/browse/BATCH-1681).
410 			outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE);
411 		}
412 
413 		try {
414 			final FileChannel channel = fileChannel;
415 			if (transactional) {
416 				TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() {
417                     @Override
418 					public void run() {
419 						closeStream();
420 					}
421 				});
422 				
423 				writer.setEncoding(encoding);
424 				bufferedWriter = writer;
425 			}
426 			else {
427 				Writer writer = new BufferedWriter(new OutputStreamWriter(os, encoding)) {
428 					@Override
429 					public void flush() throws IOException {
430 						super.flush();
431 						if (forceSync) {
432 							channel.force(false);
433 						}
434 					}
435 				};
436 				bufferedWriter = writer;
437 			}
438 			delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter);
439 			eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter);
440 			if (!restarted) {
441 				startDocument(delegateEventWriter);
442 			}
443 		}
444 		catch (XMLStreamException xse) {
445 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse);
446 		}
447 		catch (UnsupportedEncodingException e) {
448 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource
449 					+ "] with encoding=[" + encoding + "]", e);
450 		}
451 
452 	}
453 
454 	/**
455 	 * Subclasses can override to customize the writer.
456 	 * @param outputFactory
457 	 * @param writer
458 	 * @return an xml writer
459 	 * @throws XMLStreamException
460 	 */
461 	protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer)
462 			throws XMLStreamException {
463 		return outputFactory.createXMLEventWriter(writer);
464 	}
465 
466 	/**
467 	 * Subclasses can override to customize the factory.
468 	 * @return a factory for the xml output
469 	 * @throws FactoryConfigurationError
470 	 */
471 	protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError {
472 		return XMLOutputFactory.newInstance();
473 	}
474 
475 	/**
476 	 * Subclasses can override to customize the event factory.
477 	 * @return a factory for the xml events
478 	 * @throws FactoryConfigurationError
479 	 */
480 	protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError {
481 		XMLEventFactory factory = XMLEventFactory.newInstance();
482 		return factory;
483 	}
484 
485 	/**
486 	 * Subclasses can override to customize the stax result.
487 	 * @return a result for writing to
488 	 * @throws Exception
489 	 */
490 	protected Result createStaxResult() throws Exception {
491 		return StaxUtils.getResult(eventWriter);
492 	}
493 
494 	/**
495 	 * Writes simple XML header containing:
496 	 * <ul>
497 	 * <li>xml declaration - defines encoding and XML version</li>
498 	 * <li>opening tag of the root element and its attributes</li>
499 	 * </ul>
500 	 * If this is not sufficient for you, simply override this method. Encoding,
501 	 * version and root tag name can be retrieved with corresponding getters.
502 	 * 
503 	 * @param writer XML event writer
504 	 * @throws XMLStreamException
505 	 */
506 	protected void startDocument(XMLEventWriter writer) throws XMLStreamException {
507 
508 		XMLEventFactory factory = createXmlEventFactory();
509 
510 		// write start document
511 		writer.add(factory.createStartDocument(getEncoding(), getVersion()));
512 
513 		// write root tag
514 		writer.add(factory.createStartElement(getRootTagNamespacePrefix(), getRootTagNamespace(), getRootTagName()));
515 		if (StringUtils.hasText(getRootTagNamespace())) {
516 			if (StringUtils.hasText(getRootTagNamespacePrefix())) {
517 				writer.add(factory.createNamespace(getRootTagNamespacePrefix(), getRootTagNamespace()));
518 			}
519 			else {
520 				writer.add(factory.createNamespace(getRootTagNamespace()));
521 			}
522 		}
523 
524 		// write root tag attributes
525 		if (!CollectionUtils.isEmpty(getRootElementAttributes())) {
526 
527 			for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) {
528 				String key = entry.getKey();
529 				if (key.startsWith("xmlns")) {
530 					String prefix = "";
531 					if (key.contains(":")) {
532 						prefix = key.substring(key.indexOf(":") + 1);
533 					}
534 					writer.add(factory.createNamespace(prefix, entry.getValue()));
535 				}
536 				else {
537 					writer.add(factory.createAttribute(key, entry.getValue()));
538 				}
539 			}
540 
541 		}
542 
543 		/*
544 		 * This forces the flush to write the end of the root element and avoids
545 		 * an off-by-one error on restart.
546 		 */
547 		writer.add(factory.createIgnorableSpace(""));
548 		writer.flush();
549 
550 	}
551 
552 	/**
553 	 * Writes the EndDocument tag manually.
554 	 * 
555 	 * @param writer XML event writer
556 	 * @throws XMLStreamException
557 	 */
558 	protected void endDocument(XMLEventWriter writer) throws XMLStreamException {
559 
560 		// writer.writeEndDocument(); <- this doesn't work after restart
561 		// we need to write end tag of the root element manually
562 
563 		String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":";
564 		try {
565 			bufferedWriter.write("</" + nsPrefix + getRootTagName() + ">");
566 		}
567 		catch (IOException ioe) {
568 			throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe);
569 		}
570 	}
571 
572 	/**
573 	 * Flush and close the output source.
574 	 * 
575 	 * @see org.springframework.batch.item.ItemStream#close()
576 	 */
577     @Override
578 	public void close() {
579 
580 		XMLEventFactory factory = createXmlEventFactory();
581 		try {
582 			delegateEventWriter.add(factory.createCharacters(""));
583 		}
584 		catch (XMLStreamException e) {
585 			log.error(e);
586 		}
587 
588 		try {
589 			if (footerCallback != null) {
590 				footerCallback.write(delegateEventWriter);
591 			}
592 			delegateEventWriter.flush();
593 			endDocument(delegateEventWriter);
594 		}
595 		catch (IOException e) {
596 			throw new ItemStreamException("Failed to write footer items", e);
597 		}
598 		catch (XMLStreamException e) {
599 			throw new ItemStreamException("Failed to write end document tag", e);
600 		}
601 		finally {
602 
603 			try {
604 				eventWriter.close();
605 			}
606 			catch (XMLStreamException e) {
607 				log.error("Unable to close file resource: [" + resource + "] " + e);
608 			}
609 			finally {
610 				try {
611 					bufferedWriter.close();
612 				}
613 				catch (IOException e) {
614 					log.error("Unable to close file resource: [" + resource + "] " + e);
615 				}
616 				finally {
617 					if (!transactional) {
618 						closeStream();
619 					}
620 				}
621 			}
622 		}
623 	}
624 
625 	private void closeStream() {
626 		try {
627 			channel.close();
628 		}
629 		catch (IOException ioe) {
630 			log.error("Unable to close file resource: [" + resource + "] " + ioe);
631 		}
632 	}
633 
634 	/**
635 	 * Write the value objects and flush them to the file.
636 	 * 
637 	 * @param items the value object
638 	 * @throws IOException
639 	 * @throws XmlMappingException
640 	 */
641     @Override
642 	public void write(List<? extends T> items) throws XmlMappingException, Exception {
643 
644 		currentRecordCount += items.size();
645 
646 		for (Object object : items) {
647 			Assert.state(marshaller.supports(object.getClass()),
648 					"Marshaller must support the class of the marshalled object");
649 			Result result = createStaxResult();
650 			marshaller.marshal(object, result);
651 		}
652 		try {
653 			eventWriter.flush();
654 		}
655 		catch (XMLStreamException e) {
656 			throw new WriteFailedException("Failed to flush the events", e);
657 		}
658 
659 	}
660 
661 	/**
662 	 * Get the restart data.
663 	 * 
664 	 * @see org.springframework.batch.item.ItemStream#update(ExecutionContext)
665 	 */
666     @Override
667 	public void update(ExecutionContext executionContext) {
668 
669 		if (saveState) {
670 			Assert.notNull(executionContext, "ExecutionContext must not be null");
671 			executionContext.putLong(getKey(RESTART_DATA_NAME), getPosition());
672 			executionContext.putLong(getKey(WRITE_STATISTICS_NAME), currentRecordCount);
673 		}
674 	}
675 
676 	/*
677 	 * Get the actual position in file channel. This method flushes any buffered
678 	 * data before position is read.
679 	 * 
680 	 * @return byte offset in file channel
681 	 */
682 	private long getPosition() {
683 
684 		long position;
685 
686 		try {
687 			eventWriter.flush();
688 			position = channel.position();
689 			if (bufferedWriter instanceof TransactionAwareBufferedWriter) {
690 				position += ((TransactionAwareBufferedWriter) bufferedWriter).getBufferSize();
691 			}
692 		}
693 		catch (Exception e) {
694 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
695 		}
696 
697 		return position;
698 	}
699 
700 	/**
701 	 * Set the file channel position.
702 	 * 
703 	 * @param newPosition new file channel position
704 	 */
705 	private void setPosition(long newPosition) {
706 
707 		try {
708 			channel.truncate(newPosition);
709 			channel.position(newPosition);
710 		}
711 		catch (IOException e) {
712 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
713 		}
714 
715 	}
716 
717 }