EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.item.xml]

COVERAGE SUMMARY FOR SOURCE FILE [StaxEventItemWriter.java]

nameclass, %method, %block, %line, %
StaxEventItemWriter.java100% (1/1)100% (27/27)75%  (404/541)87%  (118.8/137)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class StaxEventItemWriter100% (1/1)100% (27/27)75%  (404/541)87%  (118.8/137)
getPosition (): long 100% (1/1)37%  (10/27)67%  (4/6)
close (ExecutionContext): void 100% (1/1)38%  (23/61)60%  (9/15)
endDocument (XMLEventWriter): void 100% (1/1)55%  (21/38)67%  (4/6)
setPosition (long): void 100% (1/1)56%  (23/41)71%  (5/7)
open (long, boolean): void 100% (1/1)61%  (54/88)79%  (15/19)
flush (): void 100% (1/1)82%  (33/40)83%  (10/12)
StaxEventItemWriter (): void 100% (1/1)90%  (46/51)99%  (13.9/14)
<static initializer> 100% (1/1)91%  (10/11)90%  (0.9/1)
afterPropertiesSet (): void 100% (1/1)100% (4/4)100% (2/2)
clear (): void 100% (1/1)100% (8/8)100% (3/3)
getEncoding (): String 100% (1/1)100% (3/3)100% (1/1)
getRootElementAttributes (): Map 100% (1/1)100% (3/3)100% (1/1)
getRootTagName (): String 100% (1/1)100% (3/3)100% (1/1)
getVersion (): String 100% (1/1)100% (3/3)100% (1/1)
open (ExecutionContext): void 100% (1/1)100% (44/44)100% (13/13)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
setHeaderItems (Object []): void 100% (1/1)100% (5/5)100% (2/2)
setOverwriteOutput (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setResource (Resource): void 100% (1/1)100% (4/4)100% (2/2)
setRootElementAttributes (Map): void 100% (1/1)100% (4/4)100% (2/2)
setRootTagName (String): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setSerializer (EventWriterSerializer): void 100% (1/1)100% (4/4)100% (2/2)
setVersion (String): void 100% (1/1)100% (4/4)100% (2/2)
startDocument (XMLEventWriter): void 100% (1/1)100% (46/46)100% (9/9)
update (ExecutionContext): void 100% (1/1)100% (21/21)100% (5/5)
write (Object): void 100% (1/1)100% (12/12)100% (3/3)

1package org.springframework.batch.item.xml;
2 
3import java.io.File;
4import java.io.FileOutputStream;
5import java.io.IOException;
6import java.nio.ByteBuffer;
7import java.nio.channels.FileChannel;
8import java.util.ArrayList;
9import java.util.Arrays;
10import java.util.Iterator;
11import java.util.List;
12import java.util.Map;
13 
14import javax.xml.stream.XMLEventFactory;
15import javax.xml.stream.XMLEventWriter;
16import javax.xml.stream.XMLOutputFactory;
17import javax.xml.stream.XMLStreamException;
18 
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.springframework.batch.item.ClearFailedException;
22import org.springframework.batch.item.ExecutionContext;
23import org.springframework.batch.item.FlushFailedException;
24import org.springframework.batch.item.ItemStream;
25import org.springframework.batch.item.ItemWriter;
26import org.springframework.batch.item.util.ExecutionContextUserSupport;
27import org.springframework.batch.item.util.FileUtils;
28import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter;
29import org.springframework.beans.factory.InitializingBean;
30import org.springframework.core.io.Resource;
31import org.springframework.dao.DataAccessResourceFailureException;
32import org.springframework.util.Assert;
33import org.springframework.util.ClassUtils;
34import org.springframework.util.CollectionUtils;
35 
36/**
37 * An implementation of {@link ItemWriter} which uses StAX and
38 * {@link EventWriterSerializer} for serializing object to XML.
39 * 
40 * This item writer also provides restart, statistics and transaction features
41 * by implementing corresponding interfaces.
42 * 
43 * Output is buffered until {@link #flush()} is called - only then the actual
44 * writing to file takes place.
45 * 
46 * The implementation is *not* thread-safe.
47 * 
48 * @author Peter Zozom
49 * @author Robert Kasanicky
50 * 
51 */
52public class StaxEventItemWriter extends ExecutionContextUserSupport implements ItemWriter, ItemStream,
53                InitializingBean {
54 
55        private static final Log log = LogFactory.getLog(StaxEventItemWriter.class);
56 
57        // default encoding
58        private static final String DEFAULT_ENCODING = "UTF-8";
59 
60        // default encoding
61        private static final String DEFAULT_XML_VERSION = "1.0";
62 
63        // default root tag name
64        private static final String DEFAULT_ROOT_TAG_NAME = "root";
65 
66        // restart data property name
67        private static final String RESTART_DATA_NAME = "position";
68 
69        // restart data property name
70        private static final String WRITE_STATISTICS_NAME = "record.count";
71 
72        // file system resource
73        private Resource resource;
74 
75        // xml serializer
76        private EventWriterSerializer serializer;
77 
78        // encoding to be used while reading from the resource
79        private String encoding = DEFAULT_ENCODING;
80 
81        // XML version
82        private String version = DEFAULT_XML_VERSION;
83 
84        // name of the root tag
85        private String rootTagName = DEFAULT_ROOT_TAG_NAME;
86 
87        // root element attributes
88        private Map rootElementAttributes = null;
89 
90        // TRUE means, that output file will be overwritten if exists - default is
91        // TRUE
92        private boolean overwriteOutput = true;
93 
94        // file channel
95        private FileChannel channel;
96 
97        // wrapper for XML event writer that swallows StartDocument and EndDocument
98        // events
99        private XMLEventWriter eventWriter;
100 
101        // XML event writer
102        private XMLEventWriter delegateEventWriter;
103 
104        // byte offset in file channel at last commit point
105        private long lastCommitPointPosition = 0;
106 
107        // processed record count at last commit point
108        private long lastCommitPointRecordCount = 0;
109 
110        // current count of processed records
111        private long currentRecordCount = 0;
112 
113        private boolean saveState = true;
114 
115        // holds the list of items for writing before they are actually written on
116        // #flush()
117        private List buffer = new ArrayList();
118 
119        private List headers = new ArrayList();
120 
121        public StaxEventItemWriter() {
122                setName(ClassUtils.getShortName(StaxEventItemWriter.class));
123        }
124 
125        /**
126         * Set output file.
127         * 
128         * @param resource the output file
129         */
130        public void setResource(Resource resource) {
131                this.resource = resource;
132        }
133 
134        /**
135         * Set Object to XML serializer.
136         * 
137         * @param serializer the Object to XML serializer
138         */
139        public void setSerializer(EventWriterSerializer serializer) {
140                this.serializer = serializer;
141        }
142 
143        /**
144         * Get used encoding.
145         * 
146         * @return the encoding used
147         */
148        public String getEncoding() {
149                return encoding;
150        }
151 
152        /**
153         * Set encoding to be used for output file.
154         * 
155         * @param encoding the encoding to be used
156         */
157        public void setEncoding(String encoding) {
158                this.encoding = encoding;
159        }
160 
161        /**
162         * Get XML version.
163         * 
164         * @return the XML version used
165         */
166        public String getVersion() {
167                return version;
168        }
169 
170        /**
171         * Set XML version to be used for output XML.
172         * 
173         * @param version the XML version to be used
174         */
175        public void setVersion(String version) {
176                this.version = version;
177        }
178 
179        /**
180         * Get the tag name of the root element.
181         * 
182         * @return the root element tag name
183         */
184        public String getRootTagName() {
185                return rootTagName;
186        }
187 
188        /**
189         * Set the tag name of the root element. If not set, default name is used
190         * ("root").
191         * 
192         * @param rootTagName the tag name to be used for the root element
193         */
194        public void setRootTagName(String rootTagName) {
195                this.rootTagName = rootTagName;
196        }
197 
198        /**
199         * Get attributes of the root element.
200         * 
201         * @return attributes of the root element
202         */
203        public Map getRootElementAttributes() {
204                return rootElementAttributes;
205        }
206 
207        /**
208         * Set the root element attributes to be written.
209         * 
210         * @param rootElementAttributes attributes of the root element
211         */
212        public void setRootElementAttributes(Map rootElementAttributes) {
213                this.rootElementAttributes = rootElementAttributes;
214        }
215 
216        /**
217         * Set "overwrite" flag for the output file. Flag is ignored when output
218         * file processing is restarted.
219         * 
220         * @param overwriteOutput
221         */
222        public void setOverwriteOutput(boolean overwriteOutput) {
223                this.overwriteOutput = overwriteOutput;
224        }
225 
226        /**
227         * Setter for the headers. This list will be marshalled and output before
228         * any calls to {@link #write(Object)}.
229         * @param headers
230         */
231        public void setHeaderItems(Object[] headers) {
232                this.headers = Arrays.asList(headers);
233        }
234 
235        public void setSaveState(boolean saveState) {
236                this.saveState = saveState;
237        }
238 
239        /**
240         * @throws Exception
241         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
242         */
243        public void afterPropertiesSet() throws Exception {
244                Assert.notNull(serializer);
245        }
246 
247        /**
248         * Open the output source
249         * 
250         * @see org.springframework.batch.item.ItemStream#open(ExecutionContext)
251         */
252        public void open(ExecutionContext executionContext) {
253                
254                Assert.notNull(resource);
255                
256                long startAtPosition = 0;
257                boolean restarted = false;
258 
259                // if restart data is provided, restart from provided offset
260                // otherwise start from beginning
261                if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
262                        startAtPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
263                        restarted = true;
264                }
265 
266                open(startAtPosition, restarted);
267 
268                if (startAtPosition == 0) {
269                        for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
270                                Object header = (Object) iterator.next();
271                                write(header);
272                        }
273                }
274        }
275 
276        /**
277         * Helper method for opening output source at given file position
278         */
279        private void open(long position, boolean restarted) {
280 
281                File file;
282                FileOutputStream os = null;
283 
284                try {
285                        file = resource.getFile();
286                        FileUtils.setUpOutputFile(file, restarted, overwriteOutput);
287                        Assert.state(resource.exists(), "Output resource must exist");
288                        os = new FileOutputStream(file, true);
289                        channel = os.getChannel();
290                        setPosition(position);
291                }
292                catch (IOException ioe) {
293                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe);
294                }
295 
296                XMLOutputFactory outputFactory = XMLOutputFactory.newInstance();
297 
298                try {
299                        delegateEventWriter = outputFactory.createXMLEventWriter(os, encoding);
300                        eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter);
301                        if (!restarted) {
302                                startDocument(delegateEventWriter);
303                        }
304                }
305                catch (XMLStreamException xse) {
306                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse);
307                }
308 
309        }
310 
311        /**
312         * Writes simple XML header containing:
313         * <ul>
314         * <li>xml declaration - defines encoding and XML version</li>
315         * <li>opening tag of the root element and its attributes</li>
316         * </ul>
317         * If this is not sufficient for you, simply override this method. Encoding,
318         * version and root tag name can be retrieved with corresponding getters.
319         * 
320         * @param writer XML event writer
321         * @throws XMLStreamException
322         */
323        private void startDocument(XMLEventWriter writer) throws XMLStreamException {
324 
325                XMLEventFactory factory = XMLEventFactory.newInstance();
326 
327                // write start document
328                writer.add(factory.createStartDocument(getEncoding(), getVersion()));
329 
330                // write root tag
331                writer.add(factory.createStartElement("", "", getRootTagName()));
332 
333                // write root tag attributes
334                if (!CollectionUtils.isEmpty(getRootElementAttributes())) {
335 
336                        for (Iterator i = getRootElementAttributes().entrySet().iterator(); i.hasNext();) {
337                                Map.Entry entry = (Map.Entry) i.next();
338                                writer.add(factory.createAttribute((String) entry.getKey(), (String) entry.getValue()));
339                        }
340 
341                }
342 
343        }
344 
345        /**
346         * Writes the EndDocument tag manually.
347         * 
348         * @param writer XML event writer
349         * @throws XMLStreamException
350         */
351        protected void endDocument(XMLEventWriter writer) throws XMLStreamException {
352 
353                // writer.writeEndDocument(); <- this doesn't work after restart
354                // we need to write end tag of the root element manually
355 
356                ByteBuffer bbuf = ByteBuffer.wrap(("</" + getRootTagName() + ">").getBytes());
357                try {
358                        channel.write(bbuf);
359                }
360                catch (IOException ioe) {
361                        throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe);
362                }
363        }
364 
365        /**
366         * Flush and close the output source.
367         * 
368         * @see org.springframework.batch.item.ItemStream#close(ExecutionContext)
369         */
370        public void close(ExecutionContext executionContext) {
371                
372                // harmless event to close the root tag if there were no items
373                XMLEventFactory factory = XMLEventFactory.newInstance();
374                try {
375                        delegateEventWriter.add(factory.createCharacters(""));
376                }
377                catch (XMLStreamException e) {
378                        log.error(e);
379                }
380 
381                flush();
382                try {
383                        endDocument(delegateEventWriter);
384                        eventWriter.close();
385                        channel.close();
386                }
387                catch (XMLStreamException xse) {
388                        throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", xse);
389                }
390                catch (IOException ioe) {
391                        throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe);
392                }
393        }
394 
395        /**
396         * Write the value object to internal buffer.
397         * 
398         * @param item the value object
399         * @see #flush()
400         */
401        public void write(Object item) {
402 
403                currentRecordCount++;
404                buffer.add(item);
405        }
406 
407        /**
408         * Get the restart data.
409         * 
410         * @see org.springframework.batch.item.ItemStream#update(ExecutionContext)
411         */
412        public void update(ExecutionContext executionContext) {
413 
414                if (saveState) {
415                        Assert.notNull(executionContext, "ExecutionContext must not be null");
416                        executionContext.putLong(getKey(RESTART_DATA_NAME), getPosition());
417                        executionContext.putLong(getKey(WRITE_STATISTICS_NAME), currentRecordCount);
418                }
419        }
420 
421        /*
422         * Get the actual position in file channel. This method flushes any buffered
423         * data before position is read.
424         * 
425         * @return byte offset in file channel
426         */
427        private long getPosition() {
428 
429                long position;
430 
431                try {
432                        eventWriter.flush();
433                        position = channel.position();
434                }
435                catch (Exception e) {
436                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
437                }
438 
439                return position;
440        }
441 
442        /**
443         * Set the file channel position.
444         * 
445         * @param newPosition new file channel position
446         */
447        private void setPosition(long newPosition) {
448 
449                try {
450                        Assert.state(channel.size() >= lastCommitPointPosition,
451                                        "Current file size is smaller than size at last commit");
452                        channel.truncate(newPosition);
453                        channel.position(newPosition);
454                }
455                catch (IOException e) {
456                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
457                }
458 
459        }
460 
461        /**
462         * Writes buffered items to XML stream and marks restore point.
463         */
464        public void flush() throws FlushFailedException {
465 
466                for (Iterator iterator = buffer.listIterator(); iterator.hasNext();) {
467                        Object item = iterator.next();
468                        serializer.serializeObject(eventWriter, item);
469                }
470                try {
471                        eventWriter.flush();
472                }
473                catch (XMLStreamException e) {
474                        throw new FlushFailedException("Failed to flush the events", e);
475                }
476                buffer.clear();
477 
478                lastCommitPointPosition = getPosition();
479                lastCommitPointRecordCount = currentRecordCount;
480        }
481 
482        /**
483         * Clear the output buffer
484         */
485        public void clear() throws ClearFailedException {
486                currentRecordCount = lastCommitPointRecordCount;
487                buffer.clear();
488        }
489 
490}

[all classes][org.springframework.batch.item.xml]
EMMA 2.0.5312 (C) Vladimir Roubtsov