View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.xml;
18  
19  import java.io.InputStream;
20  
21  import javax.xml.namespace.QName;
22  import javax.xml.stream.XMLEventReader;
23  import javax.xml.stream.XMLInputFactory;
24  import javax.xml.stream.XMLStreamException;
25  import javax.xml.stream.events.EndElement;
26  import javax.xml.stream.events.StartElement;
27  import javax.xml.stream.events.XMLEvent;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.springframework.batch.item.NonTransientResourceException;
32  import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream;
33  import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
34  import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader;
35  import org.springframework.batch.item.xml.stax.FragmentEventReader;
36  import org.springframework.beans.factory.InitializingBean;
37  import org.springframework.core.io.Resource;
38  import org.springframework.oxm.Unmarshaller;
39  import org.springframework.util.Assert;
40  import org.springframework.util.ClassUtils;
41  
42  /**
43   * Item reader for reading XML input based on StAX.
44   * 
45   * It extracts fragments from the input XML document which correspond to records for processing. The fragments are
46   * wrapped with StartDocument and EndDocument events so that the fragments can be further processed like standalone XML
47   * documents.
48   * 
49   * The implementation is *not* thread-safe.
50   * 
51   * @author Robert Kasanicky
52   */
53  public class StaxEventItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
54  		ResourceAwareItemReaderItemStream<T>, InitializingBean {
55  
56  	private static final Log logger = LogFactory.getLog(StaxEventItemReader.class);
57  
58  	private FragmentEventReader fragmentReader;
59  
60  	private XMLEventReader eventReader;
61  
62  	private Unmarshaller unmarshaller;
63  
64  	private Resource resource;
65  
66  	private InputStream inputStream;
67  
68  	private String fragmentRootElementName;
69  
70  	private boolean noInput;
71  
72  	private boolean strict = true;
73  
74  	private String fragmentRootElementNameSpace;
75  
76  	public StaxEventItemReader() {
77  		setName(ClassUtils.getShortName(StaxEventItemReader.class));
78  	}
79  
80  	/**
81  	 * In strict mode the reader will throw an exception on
82  	 * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
83  	 * @param strict false by default
84  	 */
85  	public void setStrict(boolean strict) {
86  		this.strict = strict;
87  	}
88  
89  	public void setResource(Resource resource) {
90  		this.resource = resource;
91  	}
92  
93  	/**
94  	 * @param unmarshaller maps xml fragments corresponding to records to objects
95  	 */
96  	public void setUnmarshaller(Unmarshaller unmarshaller) {
97  		this.unmarshaller = unmarshaller;
98  	}
99  
100 	/**
101 	 * @param fragmentRootElementName name of the root element of the fragment
102 	 */
103 	public void setFragmentRootElementName(String fragmentRootElementName) {
104 		this.fragmentRootElementName = fragmentRootElementName;
105 	}
106 
107 	/**
108 	 * Ensure that all required dependencies for the ItemReader to run are provided after all properties have been set.
109 	 * 
110 	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
111 	 * @throws IllegalArgumentException if the Resource, FragmentDeserializer or FragmentRootElementName is null, or if
112 	 * the root element is empty.
113 	 * @throws IllegalStateException if the Resource does not exist.
114 	 */
115 	public void afterPropertiesSet() throws Exception {
116 		Assert.notNull(unmarshaller, "The Unmarshaller must not be null.");
117 		Assert.hasLength(fragmentRootElementName, "The FragmentRootElementName must not be null");
118 		if (fragmentRootElementName.contains("{")) {
119 			fragmentRootElementNameSpace = fragmentRootElementName.replaceAll("\\{(.*)\\}.*", "$1");
120 			fragmentRootElementName = fragmentRootElementName.replaceAll("\\{.*\\}(.*)", "$1");
121 		}
122 	}
123 
124 	/**
125 	 * Responsible for moving the cursor before the StartElement of the fragment root.
126 	 * 
127 	 * This implementation simply looks for the next corresponding element, it does not care about element nesting. You
128 	 * will need to override this method to correctly handle composite fragments.
129 	 * 
130 	 * @return <code>true</code> if next fragment was found, <code>false</code> otherwise.
131 	 * 
132 	 * @throws NonTransientResourceException if the cursor could not be moved. This will be treated as fatal and
133 	 * subsequent calls to read will return null.
134 	 */
135 	protected boolean moveCursorToNextFragment(XMLEventReader reader) throws NonTransientResourceException {
136 		try {
137 			while (true) {
138 				while (reader.peek() != null && !reader.peek().isStartElement()) {
139 					reader.nextEvent();
140 				}
141 				if (reader.peek() == null) {
142 					return false;
143 				}
144 				QName startElementName = ((StartElement) reader.peek()).getName();
145 				if (startElementName.getLocalPart().equals(fragmentRootElementName)) {
146 					if (fragmentRootElementNameSpace == null
147 							|| startElementName.getNamespaceURI().equals(fragmentRootElementNameSpace)) {
148 						return true;
149 					}
150 				}
151 				reader.nextEvent();
152 
153 			}
154 		}
155 		catch (XMLStreamException e) {
156 			throw new NonTransientResourceException("Error while reading from event reader", e);
157 		}
158 	}
159 
160 	protected void doClose() throws Exception {
161 		try {
162 			if (fragmentReader != null) {
163 				fragmentReader.close();
164 			}
165 			if (inputStream != null) {
166 				inputStream.close();
167 			}
168 		}
169 		finally {
170 			fragmentReader = null;
171 			inputStream = null;
172 		}
173 
174 	}
175 
176 	protected void doOpen() throws Exception {
177 		Assert.notNull(resource, "The Resource must not be null.");
178 
179 		noInput = true;
180 		if (!resource.exists()) {
181 			if (strict) {
182 				throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode)");
183 			}
184 			logger.warn("Input resource does not exist " + resource.getDescription());
185 			return;
186 		}
187 		if (!resource.isReadable()) {
188 			if (strict) {
189 				throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode)");
190 			}
191 			logger.warn("Input resource is not readable " + resource.getDescription());
192 			return;
193 		}
194 
195 		inputStream = resource.getInputStream();
196 		eventReader = XMLInputFactory.newInstance().createXMLEventReader(inputStream);
197 		fragmentReader = new DefaultFragmentEventReader(eventReader);
198 		noInput = false;
199 
200 	}
201 
202 	/**
203 	 * Move to next fragment and map it to item.
204 	 */
205 	protected T doRead() throws Exception {
206 
207 		if (noInput) {
208 			return null;
209 		}
210 
211 		T item = null;
212 
213 		boolean success = false;
214 		try {
215 			success = moveCursorToNextFragment(fragmentReader);
216 		}
217 		catch (NonTransientResourceException e) {
218 			// Prevent caller from retrying indefinitely since this is fatal
219 			noInput = true;
220 			throw e;
221 		}
222 		if (success) {
223 			fragmentReader.markStartFragment();
224 
225 			try {
226 				@SuppressWarnings("unchecked")
227 				T mappedFragment = (T) unmarshaller.unmarshal(StaxUtils.getSource(fragmentReader));
228 				item = mappedFragment;
229 			}
230 			finally {
231 				fragmentReader.markFragmentProcessed();
232 			}
233 		}
234 
235 		return item;
236 	}
237 
238 	/*
239 	 * jumpToItem is overridden because reading in and attempting to bind an entire fragment is unacceptable in a
240 	 * restart scenario, and may cause exceptions to be thrown that were already skipped in previous runs.
241 	 */
242 	@Override
243 	protected void jumpToItem(int itemIndex) throws Exception {
244 		for (int i = 0; i < itemIndex; i++) {
245 			readToStartFragment();
246 			readToEndFragment();
247 		}
248 	}
249 
250 	/*
251 	 * Read until the first StartElement tag that matches the provided fragmentRootElementName. Because there may be any
252 	 * number of tags in between where the reader is now and the fragment start, this is done in a loop until the
253 	 * element type and name match.
254 	 */
255 	private void readToStartFragment() throws XMLStreamException {
256 		while (true) {
257 			XMLEvent nextEvent = eventReader.nextEvent();
258 			if (nextEvent.isStartElement()
259 					&& ((StartElement) nextEvent).getName().getLocalPart().equals(fragmentRootElementName)) {
260 				return;
261 			}
262 		}
263 	}
264 
265 	/*
266 	 * Read until the first EndElement tag that matches the provided fragmentRootElementName. Because there may be any
267 	 * number of tags in between where the reader is now and the fragment end tag, this is done in a loop until the
268 	 * element type and name match
269 	 */
270 	private void readToEndFragment() throws XMLStreamException {
271 		while (true) {
272 			XMLEvent nextEvent = eventReader.nextEvent();
273 			if (nextEvent.isEndElement()
274 					&& ((EndElement) nextEvent).getName().getLocalPart().equals(fragmentRootElementName)) {
275 				return;
276 			}
277 		}
278 	}
279 }