View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.xml;
18  
19  import java.io.InputStream;
20  
21  import javax.xml.namespace.QName;
22  import javax.xml.stream.XMLEventReader;
23  import javax.xml.stream.XMLInputFactory;
24  import javax.xml.stream.XMLStreamException;
25  import javax.xml.stream.events.EndElement;
26  import javax.xml.stream.events.StartElement;
27  import javax.xml.stream.events.XMLEvent;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.springframework.batch.item.NonTransientResourceException;
32  import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream;
33  import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
34  import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader;
35  import org.springframework.batch.item.xml.stax.FragmentEventReader;
36  import org.springframework.beans.factory.InitializingBean;
37  import org.springframework.core.io.Resource;
38  import org.springframework.oxm.Unmarshaller;
39  import org.springframework.util.Assert;
40  import org.springframework.util.ClassUtils;
41  
42  /**
43   * Item reader for reading XML input based on StAX.
44   * 
45   * It extracts fragments from the input XML document which correspond to records for processing. The fragments are
46   * wrapped with StartDocument and EndDocument events so that the fragments can be further processed like standalone XML
47   * documents.
48   * 
49   * The implementation is *not* thread-safe.
50   * 
51   * @author Robert Kasanicky
52   */
53  public class StaxEventItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
54  		ResourceAwareItemReaderItemStream<T>, InitializingBean {
55  
56  	private static final Log logger = LogFactory.getLog(StaxEventItemReader.class);
57  
58  	private FragmentEventReader fragmentReader;
59  
60  	private XMLEventReader eventReader;
61  
62  	private Unmarshaller unmarshaller;
63  
64  	private Resource resource;
65  
66  	private InputStream inputStream;
67  
68  	private String fragmentRootElementName;
69  
70  	private boolean noInput;
71  
72  	private boolean strict = true;
73  
74  	private String fragmentRootElementNameSpace;
75  
76  	public StaxEventItemReader() {
77  		setName(ClassUtils.getShortName(StaxEventItemReader.class));
78  	}
79  
80  	/**
81  	 * In strict mode the reader will throw an exception on
82  	 * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
83  	 * @param strict false by default
84  	 */
85  	public void setStrict(boolean strict) {
86  		this.strict = strict;
87  	}
88  
89      @Override
90  	public void setResource(Resource resource) {
91  		this.resource = resource;
92  	}
93  
94  	/**
95  	 * @param unmarshaller maps xml fragments corresponding to records to objects
96  	 */
97  	public void setUnmarshaller(Unmarshaller unmarshaller) {
98  		this.unmarshaller = unmarshaller;
99  	}
100 
101 	/**
102 	 * @param fragmentRootElementName name of the root element of the fragment
103 	 */
104 	public void setFragmentRootElementName(String fragmentRootElementName) {
105 		this.fragmentRootElementName = fragmentRootElementName;
106 	}
107 
108 	/**
109 	 * Ensure that all required dependencies for the ItemReader to run are provided after all properties have been set.
110 	 * 
111 	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
112 	 * @throws IllegalArgumentException if the Resource, FragmentDeserializer or FragmentRootElementName is null, or if
113 	 * the root element is empty.
114 	 * @throws IllegalStateException if the Resource does not exist.
115 	 */
116     @Override
117 	public void afterPropertiesSet() throws Exception {
118 		Assert.notNull(unmarshaller, "The Unmarshaller must not be null.");
119 		Assert.hasLength(fragmentRootElementName, "The FragmentRootElementName must not be null");
120 		if (fragmentRootElementName.contains("{")) {
121 			fragmentRootElementNameSpace = fragmentRootElementName.replaceAll("\\{(.*)\\}.*", "$1");
122 			fragmentRootElementName = fragmentRootElementName.replaceAll("\\{.*\\}(.*)", "$1");
123 		}
124 	}
125 
126 	/**
127 	 * Responsible for moving the cursor before the StartElement of the fragment root.
128 	 * 
129 	 * This implementation simply looks for the next corresponding element, it does not care about element nesting. You
130 	 * will need to override this method to correctly handle composite fragments.
131 	 * 
132 	 * @return <code>true</code> if next fragment was found, <code>false</code> otherwise.
133 	 * 
134 	 * @throws NonTransientResourceException if the cursor could not be moved. This will be treated as fatal and
135 	 * subsequent calls to read will return null.
136 	 */
137 	protected boolean moveCursorToNextFragment(XMLEventReader reader) throws NonTransientResourceException {
138 		try {
139 			while (true) {
140 				while (reader.peek() != null && !reader.peek().isStartElement()) {
141 					reader.nextEvent();
142 				}
143 				if (reader.peek() == null) {
144 					return false;
145 				}
146 				QName startElementName = ((StartElement) reader.peek()).getName();
147 				if (startElementName.getLocalPart().equals(fragmentRootElementName)) {
148 					if (fragmentRootElementNameSpace == null
149 							|| startElementName.getNamespaceURI().equals(fragmentRootElementNameSpace)) {
150 						return true;
151 					}
152 				}
153 				reader.nextEvent();
154 
155 			}
156 		}
157 		catch (XMLStreamException e) {
158 			throw new NonTransientResourceException("Error while reading from event reader", e);
159 		}
160 	}
161 
162     @Override
163 	protected void doClose() throws Exception {
164 		try {
165 			if (fragmentReader != null) {
166 				fragmentReader.close();
167 			}
168 			if (inputStream != null) {
169 				inputStream.close();
170 			}
171 		}
172 		finally {
173 			fragmentReader = null;
174 			inputStream = null;
175 		}
176 
177 	}
178 
179     @Override
180 	protected void doOpen() throws Exception {
181 		Assert.notNull(resource, "The Resource must not be null.");
182 
183 		noInput = true;
184 		if (!resource.exists()) {
185 			if (strict) {
186 				throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode)");
187 			}
188 			logger.warn("Input resource does not exist " + resource.getDescription());
189 			return;
190 		}
191 		if (!resource.isReadable()) {
192 			if (strict) {
193 				throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode)");
194 			}
195 			logger.warn("Input resource is not readable " + resource.getDescription());
196 			return;
197 		}
198 
199 		inputStream = resource.getInputStream();
200 		eventReader = XMLInputFactory.newInstance().createXMLEventReader(inputStream);
201 		fragmentReader = new DefaultFragmentEventReader(eventReader);
202 		noInput = false;
203 
204 	}
205 
206 	/**
207 	 * Move to next fragment and map it to item.
208 	 */
209     @Override
210 	protected T doRead() throws Exception {
211 
212 		if (noInput) {
213 			return null;
214 		}
215 
216 		T item = null;
217 
218 		boolean success = false;
219 		try {
220 			success = moveCursorToNextFragment(fragmentReader);
221 		}
222 		catch (NonTransientResourceException e) {
223 			// Prevent caller from retrying indefinitely since this is fatal
224 			noInput = true;
225 			throw e;
226 		}
227 		if (success) {
228 			fragmentReader.markStartFragment();
229 
230 			try {
231 				@SuppressWarnings("unchecked")
232 				T mappedFragment = (T) unmarshaller.unmarshal(StaxUtils.getSource(fragmentReader));
233 				item = mappedFragment;
234 			}
235 			finally {
236 				fragmentReader.markFragmentProcessed();
237 			}
238 		}
239 
240 		return item;
241 	}
242 
243 	/*
244 	 * jumpToItem is overridden because reading in and attempting to bind an entire fragment is unacceptable in a
245 	 * restart scenario, and may cause exceptions to be thrown that were already skipped in previous runs.
246 	 */
247 	@Override
248 	protected void jumpToItem(int itemIndex) throws Exception {
249 		for (int i = 0; i < itemIndex; i++) {
250 			readToStartFragment();
251 			readToEndFragment();
252 		}
253 	}
254 
255 	/*
256 	 * Read until the first StartElement tag that matches the provided fragmentRootElementName. Because there may be any
257 	 * number of tags in between where the reader is now and the fragment start, this is done in a loop until the
258 	 * element type and name match.
259 	 */
260 	private void readToStartFragment() throws XMLStreamException {
261 		while (true) {
262 			XMLEvent nextEvent = eventReader.nextEvent();
263 			if (nextEvent.isStartElement()
264 					&& ((StartElement) nextEvent).getName().getLocalPart().equals(fragmentRootElementName)) {
265 				return;
266 			}
267 		}
268 	}
269 
270 	/*
271 	 * Read until the first EndElement tag that matches the provided fragmentRootElementName. Because there may be any
272 	 * number of tags in between where the reader is now and the fragment end tag, this is done in a loop until the
273 	 * element type and name match
274 	 */
275 	private void readToEndFragment() throws XMLStreamException {
276 		while (true) {
277 			XMLEvent nextEvent = eventReader.nextEvent();
278 			if (nextEvent.isEndElement()
279 					&& ((EndElement) nextEvent).getName().getLocalPart().equals(fragmentRootElementName)) {
280 				return;
281 			}
282 		}
283 	}
284 }