EMMA Coverage Report (generated Tue May 06 07:28:24 PDT 2008)
[all classes][org.springframework.batch.item.xml]

COVERAGE SUMMARY FOR SOURCE FILE [StaxEventItemReader.java]

nameclass, %method, %block, %line, %
StaxEventItemReader.java100% (1/1)100% (13/13)88%  (292/332)88%  (80.3/91)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class StaxEventItemReader100% (1/1)100% (13/13)88%  (292/332)88%  (80.3/91)
close (ExecutionContext): void 100% (1/1)53%  (26/49)59%  (8.3/14)
moveCursorToNextFragment (XMLEventReader): boolean 100% (1/1)82%  (33/40)83%  (10/12)
open (ExecutionContext): void 100% (1/1)91%  (102/112)88%  (21/24)
StaxEventItemReader (): void 100% (1/1)100% (26/26)100% (7/7)
afterPropertiesSet (): void 100% (1/1)100% (13/13)100% (4/4)
mark (): void 100% (1/1)100% (8/8)100% (3/3)
read (): Object 100% (1/1)100% (43/43)100% (11/11)
reset (): void 100% (1/1)100% (11/11)100% (4/4)
setFragmentDeserializer (EventReaderDeserializer): void 100% (1/1)100% (4/4)100% (2/2)
setFragmentRootElementName (String): void 100% (1/1)100% (4/4)100% (2/2)
setResource (Resource): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (14/14)100% (4/4)

1package org.springframework.batch.item.xml;
2 
3import java.io.IOException;
4import java.io.InputStream;
5 
6import javax.xml.namespace.QName;
7import javax.xml.stream.XMLEventReader;
8import javax.xml.stream.XMLInputFactory;
9import javax.xml.stream.XMLStreamException;
10import javax.xml.stream.events.StartElement;
11 
12import org.springframework.batch.item.ExecutionContext;
13import org.springframework.batch.item.ExecutionContextUserSupport;
14import org.springframework.batch.item.ItemReader;
15import org.springframework.batch.item.ItemStream;
16import org.springframework.batch.item.ItemStreamException;
17import org.springframework.batch.item.ReaderNotOpenException;
18import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader;
19import org.springframework.batch.item.xml.stax.DefaultTransactionalEventReader;
20import org.springframework.batch.item.xml.stax.FragmentEventReader;
21import org.springframework.batch.item.xml.stax.TransactionalEventReader;
22import org.springframework.beans.factory.InitializingBean;
23import org.springframework.core.io.Resource;
24import org.springframework.dao.DataAccessResourceFailureException;
25import org.springframework.util.Assert;
26import org.springframework.util.ClassUtils;
27 
28/**
29 * Input source for reading XML input based on StAX.
30 * 
31 * It extracts fragments from the input XML document which correspond to records
32 * for processing. The fragments are wrapped with StartDocument and EndDocument
33 * events so that the fragments can be further processed like standalone XML
34 * documents.
35 * 
36 * @author Robert Kasanicky
37 */
38public class StaxEventItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream,
39                InitializingBean {
40 
41        private static final String READ_COUNT_STATISTICS_NAME = "read.count";
42 
43        private FragmentEventReader fragmentReader;
44 
45        private TransactionalEventReader txReader;
46 
47        private EventReaderDeserializer eventReaderDeserializer;
48 
49        private Resource resource;
50 
51        private InputStream inputStream;
52 
53        private String fragmentRootElementName;
54 
55        private boolean initialized = false;
56 
57        private long lastCommitPointRecordCount = 0;
58 
59        private long currentRecordCount = 0;
60 
61        private boolean saveState = false;
62 
63        public StaxEventItemReader() {
64                setName(ClassUtils.getShortName(StaxEventItemReader.class));
65        }
66 
67        /**
68         * Read in the next root element from the file, and return it.
69         * 
70         * @return the next available record, if none exist, return null
71         * @see org.springframework.batch.item.ItemReader#read()
72         */
73        public Object read() {
74                if (!initialized) {
75                        throw new ReaderNotOpenException("Reader must be open before it can be read.");
76                }
77                Object item = null;
78 
79                currentRecordCount++;
80                if (moveCursorToNextFragment(fragmentReader)) {
81                        fragmentReader.markStartFragment();
82                        item = eventReaderDeserializer.deserializeFragment(fragmentReader);
83                        fragmentReader.markFragmentProcessed();
84                }
85 
86                if (item == null) {
87                        currentRecordCount--;
88                }
89                return item;
90        }
91 
92        public void close(ExecutionContext executionContext) {
93                initialized = false;
94                currentRecordCount = 0;
95                try {
96                        if (fragmentReader != null) {
97                                fragmentReader.close();
98                        }
99                        if (inputStream != null) {
100                                inputStream.close();
101                        }
102                }
103                catch (XMLStreamException e) {
104                        throw new DataAccessResourceFailureException("Error while closing event reader", e);
105                }
106                catch (IOException e) {
107                        throw new DataAccessResourceFailureException("Error while closing input stream", e);
108                }
109                finally {
110                        fragmentReader = null;
111                        inputStream = null;
112                }
113        }
114 
115        public void open(ExecutionContext executionContext) {
116                Assert.state(resource.exists(), "Input resource does not exist: [" + resource + "]");
117 
118                try {
119                        inputStream = resource.getInputStream();
120                        txReader = new DefaultTransactionalEventReader(XMLInputFactory.newInstance().createXMLEventReader(
121                                        inputStream));
122                        fragmentReader = new DefaultFragmentEventReader(txReader);
123                }
124                catch (XMLStreamException xse) {
125                        throw new DataAccessResourceFailureException("Unable to create XML reader", xse);
126                }
127                catch (IOException ioe) {
128                        throw new DataAccessResourceFailureException("Unable to get input stream", ioe);
129                }
130                initialized = true;
131 
132                if (executionContext.containsKey(getKey(READ_COUNT_STATISTICS_NAME))) {
133                        long restoredRecordCount = executionContext.getLong(getKey(READ_COUNT_STATISTICS_NAME));
134                        int REASONABLE_ADHOC_COMMIT_FREQUENCY = 100;
135                        while (currentRecordCount <= restoredRecordCount) {
136                                currentRecordCount++;
137                                if (currentRecordCount % REASONABLE_ADHOC_COMMIT_FREQUENCY == 0) {
138                                        txReader.onCommit(); // reset the history buffer
139                                }
140                                if (!fragmentReader.hasNext()) {
141                                        throw new ItemStreamException("Restore point must be before end of input");
142                                }
143                                fragmentReader.next();
144                                moveCursorToNextFragment(fragmentReader);
145                        }
146                        mark(); // reset the history buffer
147                }
148        }
149 
150        public void setResource(Resource resource) {
151                this.resource = resource;
152        }
153 
154        /**
155         * @param eventReaderDeserializer maps xml fragments corresponding to
156         * records to objects
157         */
158        public void setFragmentDeserializer(EventReaderDeserializer eventReaderDeserializer) {
159                this.eventReaderDeserializer = eventReaderDeserializer;
160        }
161 
162        /**
163         * @param fragmentRootElementName name of the root element of the fragment
164         * TODO String can be ambiguous due to namespaces, use QName?
165         */
166        public void setFragmentRootElementName(String fragmentRootElementName) {
167                this.fragmentRootElementName = fragmentRootElementName;
168        }
169 
170        /**
171         * Ensure that all required dependencies for the ItemReader to run are
172         * provided after all properties have been set.
173         * 
174         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
175         * @throws IllegalArgumentException if the Resource, FragmentDeserializer or
176         * FragmentRootElementName is null, or if the root element is empty.
177         * @throws IllegalStateException if the Resource does not exist.
178         */
179        public void afterPropertiesSet() throws Exception {
180                Assert.notNull(resource, "The Resource must not be null.");
181                Assert.notNull(eventReaderDeserializer, "The FragmentDeserializer must not be null.");
182                Assert.hasLength(fragmentRootElementName, "The FragmentRootElementName must not be null");
183        }
184 
185        /**
186         * @see ItemStream#update(ExecutionContext)
187         */
188        public void update(ExecutionContext executionContext) {
189                if (saveState) {
190                        Assert.notNull(executionContext, "ExecutionContext must not be null");
191                        executionContext.putLong(getKey(READ_COUNT_STATISTICS_NAME), currentRecordCount);
192                }
193        }
194 
195        /**
196         * Responsible for moving the cursor before the StartElement of the fragment
197         * root.
198         * 
199         * This implementation simply looks for the next corresponding element, it
200         * does not care about element nesting. You will need to override this
201         * method to correctly handle composite fragments.
202         * 
203         * @return <code>true</code> if next fragment was found,
204         * <code>false</code> otherwise.
205         */
206        protected boolean moveCursorToNextFragment(XMLEventReader reader) {
207                try {
208                        while (true) {
209                                while (reader.peek() != null && !reader.peek().isStartElement()) {
210                                        reader.nextEvent();
211                                }
212                                if (reader.peek() == null) {
213                                        return false;
214                                }
215                                QName startElementName = ((StartElement) reader.peek()).getName();
216                                if (startElementName.getLocalPart().equals(fragmentRootElementName)) {
217                                        return true;
218                                }
219                                else {
220                                        reader.nextEvent();
221                                }
222                        }
223                }
224                catch (XMLStreamException e) {
225                        throw new DataAccessResourceFailureException("Error while reading from event reader", e);
226                }
227        }
228 
229        /**
230         * Mark is supported as long as this {@link ItemStream} is used in a
231         * single-threaded environment. The state backing the mark is a single
232         * counter, keeping track of the current position, so multiple threads
233         * cannot be accommodated.
234         * 
235         * @see org.springframework.batch.item.AbstractItemReader#mark()
236         */
237        public void mark() {
238                lastCommitPointRecordCount = currentRecordCount;
239                txReader.onCommit();
240        }
241 
242        /*
243         * (non-Javadoc)
244         * 
245         * @see org.springframework.batch.item.ItemStream#reset(org.springframework.batch.item.ExecutionContext)
246         */
247        public void reset() {
248                currentRecordCount = lastCommitPointRecordCount;
249                txReader.onRollback();
250                fragmentReader.reset();
251        }
252 
253        /**
254         * Set the flag that determines whether to save internal data for
255         * {@link ExecutionContext}. Only switch this to false if you don't want to
256         * save any state from this stream, and you don't need it to be restartable.
257         * 
258         * @param saveState flag value (default true)
259         */
260        public void setSaveState(boolean saveState) {
261                this.saveState = saveState;
262        }
263}

[all classes][org.springframework.batch.item.xml]
EMMA 2.0.5312 (C) Vladimir Roubtsov