View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.xml;
18  
19  import java.io.BufferedWriter;
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStreamWriter;
24  import java.io.UnsupportedEncodingException;
25  import java.io.Writer;
26  import java.nio.channels.Channels;
27  import java.nio.channels.FileChannel;
28  import java.util.List;
29  import java.util.Map;
30  
31  import javax.xml.stream.FactoryConfigurationError;
32  import javax.xml.stream.XMLEventFactory;
33  import javax.xml.stream.XMLEventWriter;
34  import javax.xml.stream.XMLOutputFactory;
35  import javax.xml.stream.XMLStreamException;
36  import javax.xml.transform.Result;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.springframework.batch.item.ExecutionContext;
41  import org.springframework.batch.item.ItemStreamException;
42  import org.springframework.batch.item.ItemWriter;
43  import org.springframework.batch.item.WriteFailedException;
44  import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream;
45  import org.springframework.batch.item.util.ExecutionContextUserSupport;
46  import org.springframework.batch.item.util.FileUtils;
47  import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter;
48  import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
49  import org.springframework.beans.factory.InitializingBean;
50  import org.springframework.core.io.Resource;
51  import org.springframework.dao.DataAccessResourceFailureException;
52  import org.springframework.oxm.Marshaller;
53  import org.springframework.oxm.XmlMappingException;
54  import org.springframework.util.Assert;
55  import org.springframework.util.ClassUtils;
56  import org.springframework.util.CollectionUtils;
57  import org.springframework.util.StringUtils;
58  
59  /**
60   * An implementation of {@link ItemWriter} which uses StAX and
61   * {@link Marshaller} for serializing object to XML.
62   * 
63   * This item writer also provides restart, statistics and transaction features
64   * by implementing corresponding interfaces.
65   * 
66   * The implementation is *not* thread-safe.
67   * 
68   * @author Peter Zozom
69   * @author Robert Kasanicky
70   * 
71   */
72  public class StaxEventItemWriter<T> extends ExecutionContextUserSupport implements
73  		ResourceAwareItemWriterItemStream<T>, InitializingBean {
74  
75  	private static final Log log = LogFactory.getLog(StaxEventItemWriter.class);
76  
77  	// default encoding
78  	private static final String DEFAULT_ENCODING = "UTF-8";
79  
80  	// default encoding
81  	private static final String DEFAULT_XML_VERSION = "1.0";
82  
83  	// default root tag name
84  	private static final String DEFAULT_ROOT_TAG_NAME = "root";
85  
86  	// restart data property name
87  	private static final String RESTART_DATA_NAME = "position";
88  
89  	// restart data property name
90  	private static final String WRITE_STATISTICS_NAME = "record.count";
91  
92  	// file system resource
93  	private Resource resource;
94  
95  	// xml marshaller
96  	private Marshaller marshaller;
97  
98  	// encoding to be used while reading from the resource
99  	private String encoding = DEFAULT_ENCODING;
100 
101 	// XML version
102 	private String version = DEFAULT_XML_VERSION;
103 
104 	// name of the root tag
105 	private String rootTagName = DEFAULT_ROOT_TAG_NAME;
106 
107 	// namespace prefix of the root tag
108 	private String rootTagNamespacePrefix = "";
109 
110 	// namespace of the root tag
111 	private String rootTagNamespace = "";
112 
113 	// root element attributes
114 	private Map<String, String> rootElementAttributes = null;
115 
116 	// TRUE means, that output file will be overwritten if exists - default is
117 	// TRUE
118 	private boolean overwriteOutput = true;
119 
120 	// file channel
121 	private FileChannel channel;
122 
123 	// wrapper for XML event writer that swallows StartDocument and EndDocument
124 	// events
125 	private XMLEventWriter eventWriter;
126 
127 	// XML event writer
128 	private XMLEventWriter delegateEventWriter;
129 
130 	// current count of processed records
131 	private long currentRecordCount = 0;
132 
133 	private boolean saveState = true;
134 
135 	private StaxWriterCallback headerCallback;
136 
137 	private StaxWriterCallback footerCallback;
138 
139 	private Writer bufferedWriter;
140 
141 	private boolean transactional = true;
142 
143 	private boolean forceSync;
144 
145 	public StaxEventItemWriter() {
146 		setName(ClassUtils.getShortName(StaxEventItemWriter.class));
147 	}
148 
149 	/**
150 	 * Set output file.
151 	 * 
152 	 * @param resource the output file
153 	 */
154 	public void setResource(Resource resource) {
155 		this.resource = resource;
156 	}
157 
158 	/**
159 	 * Set Object to XML marshaller.
160 	 * 
161 	 * @param marshaller the Object to XML marshaller
162 	 */
163 	public void setMarshaller(Marshaller marshaller) {
164 		this.marshaller = marshaller;
165 	}
166 
167 	/**
168 	 * headerCallback is called before writing any items.
169 	 */
170 	public void setHeaderCallback(StaxWriterCallback headerCallback) {
171 		this.headerCallback = headerCallback;
172 	}
173 
174 	/**
175 	 * footerCallback is called after writing all items but before closing the
176 	 * file
177 	 */
178 	public void setFooterCallback(StaxWriterCallback footerCallback) {
179 		this.footerCallback = footerCallback;
180 	}
181 
182 	/**
183 	 * Flag to indicate that writes should be deferred to the end of a
184 	 * transaction if present. Defaults to true.
185 	 * 
186 	 * @param transactional the flag to set
187 	 */
188 	public void setTransactional(boolean transactional) {
189 		this.transactional = transactional;
190 	}
191 
192 	/**
193 	 * Flag to indicate that changes should be force-synced to disk on flush.
194 	 * Defaults to false, which means that even with a local disk changes could
195 	 * be lost if the OS crashes in between a write and a cache flush. Setting
196 	 * to true may result in slower performance for usage patterns involving
197 	 * many frequent writes.
198 	 * 
199 	 * @param forceSync the flag value to set
200 	 */
201 	public void setForceSync(boolean forceSync) {
202 		this.forceSync = forceSync;
203 	}
204 
205 	/**
206 	 * Get used encoding.
207 	 * 
208 	 * @return the encoding used
209 	 */
210 	public String getEncoding() {
211 		return encoding;
212 	}
213 
214 	/**
215 	 * Set encoding to be used for output file.
216 	 * 
217 	 * @param encoding the encoding to be used
218 	 */
219 	public void setEncoding(String encoding) {
220 		this.encoding = encoding;
221 	}
222 
223 	/**
224 	 * Get XML version.
225 	 * 
226 	 * @return the XML version used
227 	 */
228 	public String getVersion() {
229 		return version;
230 	}
231 
232 	/**
233 	 * Set XML version to be used for output XML.
234 	 * 
235 	 * @param version the XML version to be used
236 	 */
237 	public void setVersion(String version) {
238 		this.version = version;
239 	}
240 
241 	/**
242 	 * Get the tag name of the root element.
243 	 * 
244 	 * @return the root element tag name
245 	 */
246 	public String getRootTagName() {
247 		return rootTagName;
248 	}
249 
250 	/**
251 	 * Set the tag name of the root element. If not set, default name is used
252 	 * ("root"). Namespace URI and prefix can also be set optionally using the
253 	 * notation:
254 	 * 
255 	 * <pre>
256 	 * {uri}prefix:root
257 	 * </pre>
258 	 * 
259 	 * The prefix is optional (defaults to empty), but if it is specified then
260 	 * the uri must be provided. In addition you might want to declare other
261 	 * namespaces using the {@link #setRootElementAttributes(Map) root
262 	 * attributes}.
263 	 * 
264 	 * @param rootTagName the tag name to be used for the root element
265 	 */
266 	public void setRootTagName(String rootTagName) {
267 		this.rootTagName = rootTagName;
268 	}
269 
270 	/**
271 	 * Get the namespace prefix of the root element. Empty by default.
272 	 * 
273 	 * @return the rootTagNamespacePrefix
274 	 */
275 	public String getRootTagNamespacePrefix() {
276 		return rootTagNamespacePrefix;
277 	}
278 
279 	/**
280 	 * Get the namespace of the root element.
281 	 * 
282 	 * @return the rootTagNamespace
283 	 */
284 	public String getRootTagNamespace() {
285 		return rootTagNamespace;
286 	}
287 
288 	/**
289 	 * Get attributes of the root element.
290 	 * 
291 	 * @return attributes of the root element
292 	 */
293 	public Map<String, String> getRootElementAttributes() {
294 		return rootElementAttributes;
295 	}
296 
297 	/**
298 	 * Set the root element attributes to be written. If any of the key names
299 	 * begin with "xmlns:" then they are treated as namespace declarations.
300 	 * 
301 	 * @param rootElementAttributes attributes of the root element
302 	 */
303 	public void setRootElementAttributes(Map<String, String> rootElementAttributes) {
304 		this.rootElementAttributes = rootElementAttributes;
305 	}
306 
307 	/**
308 	 * Set "overwrite" flag for the output file. Flag is ignored when output
309 	 * file processing is restarted.
310 	 * 
311 	 * @param overwriteOutput
312 	 */
313 	public void setOverwriteOutput(boolean overwriteOutput) {
314 		this.overwriteOutput = overwriteOutput;
315 	}
316 
317 	public void setSaveState(boolean saveState) {
318 		this.saveState = saveState;
319 	}
320 
321 	/**
322 	 * @throws Exception
323 	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
324 	 */
325 	public void afterPropertiesSet() throws Exception {
326 		Assert.notNull(marshaller);
327 		if (rootTagName.contains("{")) {
328 			rootTagNamespace = rootTagName.replaceAll("\\{(.*)\\}.*", "$1");
329 			rootTagName = rootTagName.replaceAll("\\{.*\\}(.*)", "$1");
330 			if (rootTagName.contains(":")) {
331 				rootTagNamespacePrefix = rootTagName.replaceAll("(.*):.*", "$1");
332 				rootTagName = rootTagName.replaceAll(".*:(.*)", "$1");
333 			}
334 		}
335 	}
336 
337 	/**
338 	 * Open the output source
339 	 * 
340 	 * @see org.springframework.batch.item.ItemStream#open(ExecutionContext)
341 	 */
342 	public void open(ExecutionContext executionContext) {
343 
344 		Assert.notNull(resource, "The resource must be set");
345 
346 		long startAtPosition = 0;
347 		boolean restarted = false;
348 
349 		// if restart data is provided, restart from provided offset
350 		// otherwise start from beginning
351 		if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
352 			startAtPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
353 			restarted = true;
354 		}
355 
356 		open(startAtPosition, restarted);
357 
358 		if (startAtPosition == 0) {
359 			try {
360 				if (headerCallback != null) {
361 					headerCallback.write(delegateEventWriter);
362 				}
363 			}
364 			catch (IOException e) {
365 				throw new ItemStreamException("Failed to write headerItems", e);
366 			}
367 		}
368 
369 	}
370 
371 	/**
372 	 * Helper method for opening output source at given file position
373 	 */
374 	private void open(long position, boolean restarted) {
375 
376 		File file;
377 		FileOutputStream os = null;
378 		FileChannel fileChannel = null;
379 
380 		try {
381 			file = resource.getFile();
382 			FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput);
383 			Assert.state(resource.exists(), "Output resource must exist");
384 			os = new FileOutputStream(file, true);
385 			fileChannel = os.getChannel();
386 			channel = os.getChannel();
387 			setPosition(position);
388 		}
389 		catch (IOException ioe) {
390 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe);
391 		}
392 
393 		XMLOutputFactory outputFactory = createXmlOutputFactory();
394 
395 		if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) {
396 			// If the current XMLOutputFactory implementation is supplied by
397 			// Woodstox >= 3.2.9 we want to disable its
398 			// automatic end element feature (see:
399 			// http://jira.codehaus.org/browse/WSTX-165) per
400 			// http://jira.springframework.org/browse/BATCH-761).
401 			outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE);
402 		}
403 		if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) {
404 			// On restart we don't write the root element so we have to disable
405 			// structural validation (see:
406 			// http://jira.springframework.org/browse/BATCH-1681).
407 			outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE);
408 		}
409 
410 		try {
411 			final FileChannel channel = fileChannel;
412 			Writer writer = new BufferedWriter(new OutputStreamWriter(os, encoding)) {
413 				@Override
414 				public void flush() throws IOException {
415 					super.flush();
416 					if (forceSync) {
417 						channel.force(false);
418 					}
419 				}
420 			};
421 			if (transactional) {
422 				bufferedWriter = new TransactionAwareBufferedWriter(writer, new Runnable() {
423 					public void run() {
424 						closeStream();
425 					}
426 				});
427 			}
428 			else {
429 				bufferedWriter = writer;
430 			}
431 			delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter);
432 			eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter);
433 			if (!restarted) {
434 				startDocument(delegateEventWriter);
435 			}
436 		}
437 		catch (XMLStreamException xse) {
438 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse);
439 		}
440 		catch (UnsupportedEncodingException e) {
441 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource
442 					+ "] with encoding=[" + encoding + "]", e);
443 		}
444 
445 	}
446 
447 	/**
448 	 * Subclasses can override to customize the writer.
449 	 * @param outputFactory
450 	 * @param writer
451 	 * @return an xml writer
452 	 * @throws XMLStreamException
453 	 */
454 	protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer)
455 			throws XMLStreamException {
456 		return outputFactory.createXMLEventWriter(writer);
457 	}
458 
459 	/**
460 	 * Subclasses can override to customize the factory.
461 	 * @return a factory for the xml output
462 	 * @throws FactoryConfigurationError
463 	 */
464 	protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError {
465 		return XMLOutputFactory.newInstance();
466 	}
467 
468 	/**
469 	 * Subclasses can override to customize the event factory.
470 	 * @return a factory for the xml events
471 	 * @throws FactoryConfigurationError
472 	 */
473 	protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError {
474 		XMLEventFactory factory = XMLEventFactory.newInstance();
475 		return factory;
476 	}
477 
478 	/**
479 	 * Subclasses can override to customize the stax result.
480 	 * @return a result for writing to
481 	 * @throws Exception
482 	 */
483 	protected Result createStaxResult() throws Exception {
484 		return StaxUtils.getResult(eventWriter);
485 	}
486 
487 	/**
488 	 * Writes simple XML header containing:
489 	 * <ul>
490 	 * <li>xml declaration - defines encoding and XML version</li>
491 	 * <li>opening tag of the root element and its attributes</li>
492 	 * </ul>
493 	 * If this is not sufficient for you, simply override this method. Encoding,
494 	 * version and root tag name can be retrieved with corresponding getters.
495 	 * 
496 	 * @param writer XML event writer
497 	 * @throws XMLStreamException
498 	 */
499 	protected void startDocument(XMLEventWriter writer) throws XMLStreamException {
500 
501 		XMLEventFactory factory = createXmlEventFactory();
502 
503 		// write start document
504 		writer.add(factory.createStartDocument(getEncoding(), getVersion()));
505 
506 		// write root tag
507 		writer.add(factory.createStartElement(getRootTagNamespacePrefix(), getRootTagNamespace(), getRootTagName()));
508 		if (StringUtils.hasText(getRootTagNamespace())) {
509 			if (StringUtils.hasText(getRootTagNamespacePrefix())) {
510 				writer.add(factory.createNamespace(getRootTagNamespacePrefix(), getRootTagNamespace()));
511 			}
512 			else {
513 				writer.add(factory.createNamespace(getRootTagNamespace()));
514 			}
515 		}
516 
517 		// write root tag attributes
518 		if (!CollectionUtils.isEmpty(getRootElementAttributes())) {
519 
520 			for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) {
521 				String key = entry.getKey();
522 				if (key.startsWith("xmlns")) {
523 					String prefix = "";
524 					if (key.contains(":")) {
525 						prefix = key.substring(key.indexOf(":") + 1);
526 					}
527 					writer.add(factory.createNamespace(prefix, entry.getValue()));
528 				}
529 				else {
530 					writer.add(factory.createAttribute(key, entry.getValue()));
531 				}
532 			}
533 
534 		}
535 
536 		/*
537 		 * This forces the flush to write the end of the root element and avoids
538 		 * an off-by-one error on restart.
539 		 */
540 		writer.add(factory.createIgnorableSpace(""));
541 		writer.flush();
542 
543 	}
544 
545 	/**
546 	 * Writes the EndDocument tag manually.
547 	 * 
548 	 * @param writer XML event writer
549 	 * @throws XMLStreamException
550 	 */
551 	protected void endDocument(XMLEventWriter writer) throws XMLStreamException {
552 
553 		// writer.writeEndDocument(); <- this doesn't work after restart
554 		// we need to write end tag of the root element manually
555 
556 		String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":";
557 		try {
558 			bufferedWriter.write("</" + nsPrefix + getRootTagName() + ">");
559 		}
560 		catch (IOException ioe) {
561 			throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe);
562 		}
563 	}
564 
565 	/**
566 	 * Flush and close the output source.
567 	 * 
568 	 * @see org.springframework.batch.item.ItemStream#close()
569 	 */
570 	public void close() {
571 
572 		XMLEventFactory factory = createXmlEventFactory();
573 		try {
574 			delegateEventWriter.add(factory.createCharacters(""));
575 		}
576 		catch (XMLStreamException e) {
577 			log.error(e);
578 		}
579 
580 		try {
581 			if (footerCallback != null) {
582 				footerCallback.write(delegateEventWriter);
583 			}
584 			delegateEventWriter.flush();
585 			endDocument(delegateEventWriter);
586 		}
587 		catch (IOException e) {
588 			throw new ItemStreamException("Failed to write footer items", e);
589 		}
590 		catch (XMLStreamException e) {
591 			throw new ItemStreamException("Failed to write end document tag", e);
592 		}
593 		finally {
594 
595 			try {
596 				eventWriter.close();
597 			}
598 			catch (XMLStreamException e) {
599 				log.error("Unable to close file resource: [" + resource + "] " + e);
600 			}
601 			finally {
602 				try {
603 					bufferedWriter.close();
604 				}
605 				catch (IOException e) {
606 					log.error("Unable to close file resource: [" + resource + "] " + e);
607 				}
608 				finally {
609 					if (!transactional) {
610 						closeStream();
611 					}
612 				}
613 			}
614 		}
615 	}
616 
617 	private void closeStream() {
618 		try {
619 			channel.close();
620 		}
621 		catch (IOException ioe) {
622 			log.error("Unable to close file resource: [" + resource + "] " + ioe);
623 		}
624 	}
625 
626 	/**
627 	 * Write the value objects and flush them to the file.
628 	 * 
629 	 * @param items the value object
630 	 * @throws IOException
631 	 * @throws XmlMappingException
632 	 */
633 	public void write(List<? extends T> items) throws XmlMappingException, Exception {
634 
635 		currentRecordCount += items.size();
636 
637 		for (Object object : items) {
638 			Assert.state(marshaller.supports(object.getClass()),
639 					"Marshaller must support the class of the marshalled object");
640 			Result result = createStaxResult();
641 			marshaller.marshal(object, result);
642 		}
643 		try {
644 			eventWriter.flush();
645 		}
646 		catch (XMLStreamException e) {
647 			throw new WriteFailedException("Failed to flush the events", e);
648 		}
649 
650 	}
651 
652 	/**
653 	 * Get the restart data.
654 	 * 
655 	 * @see org.springframework.batch.item.ItemStream#update(ExecutionContext)
656 	 */
657 	public void update(ExecutionContext executionContext) {
658 
659 		if (saveState) {
660 			Assert.notNull(executionContext, "ExecutionContext must not be null");
661 			executionContext.putLong(getKey(RESTART_DATA_NAME), getPosition());
662 			executionContext.putLong(getKey(WRITE_STATISTICS_NAME), currentRecordCount);
663 		}
664 	}
665 
666 	/*
667 	 * Get the actual position in file channel. This method flushes any buffered
668 	 * data before position is read.
669 	 * 
670 	 * @return byte offset in file channel
671 	 */
672 	private long getPosition() {
673 
674 		long position;
675 
676 		try {
677 			eventWriter.flush();
678 			position = channel.position();
679 			if (bufferedWriter instanceof TransactionAwareBufferedWriter) {
680 				position += ((TransactionAwareBufferedWriter) bufferedWriter).getBufferSize();
681 			}
682 		}
683 		catch (Exception e) {
684 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
685 		}
686 
687 		return position;
688 	}
689 
690 	/**
691 	 * Set the file channel position.
692 	 * 
693 	 * @param newPosition new file channel position
694 	 */
695 	private void setPosition(long newPosition) {
696 
697 		try {
698 			channel.truncate(newPosition);
699 			channel.position(newPosition);
700 		}
701 		catch (IOException e) {
702 			throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
703 		}
704 
705 	}
706 
707 }