1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import org.apache.commons.logging.Log; |
20 | import org.apache.commons.logging.LogFactory; |
21 | import org.springframework.batch.item.ExecutionContext; |
22 | import org.springframework.batch.item.ExecutionContextUserSupport; |
23 | import org.springframework.batch.item.ItemReader; |
24 | import org.springframework.batch.item.ItemReaderException; |
25 | import org.springframework.batch.item.ItemStream; |
26 | import org.springframework.batch.item.ItemStreamException; |
27 | import org.springframework.batch.item.ReaderNotOpenException; |
28 | import org.springframework.batch.item.file.mapping.FieldSet; |
29 | import org.springframework.batch.item.file.mapping.FieldSetMapper; |
30 | import org.springframework.batch.item.file.separator.LineReader; |
31 | import org.springframework.batch.item.file.separator.RecordSeparatorPolicy; |
32 | import org.springframework.batch.item.file.separator.ResourceLineReader; |
33 | import org.springframework.batch.item.file.transform.AbstractLineTokenizer; |
34 | import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; |
35 | import org.springframework.batch.item.file.transform.LineTokenizer; |
36 | import org.springframework.beans.factory.InitializingBean; |
37 | import org.springframework.core.io.Resource; |
38 | import org.springframework.util.Assert; |
39 | import org.springframework.util.ClassUtils; |
40 | |
41 | /** |
42 | * This class represents a {@link ItemReader}, that reads lines from text file, |
43 | * tokenizes them to structured tuples ({@link FieldSet}s) instances and maps |
44 | * the {@link FieldSet}s to domain objects. The location of the file is defined |
45 | * by the resource property. To separate the structure of the file, |
46 | * {@link LineTokenizer} is used to parse data obtained from the file. <br/> |
47 | * |
48 | * A {@link FlatFileItemReader} is not thread safe because it maintains state in |
49 | * the form of a {@link ResourceLineReader}. Be careful to configure a |
50 | * {@link FlatFileItemReader} using an appropriate factory or scope so that it |
51 | * is not shared between threads.<br/> |
52 | * |
53 | * <p> |
54 | * This class supports restart, skipping invalid lines and storing statistics. |
55 | * It can be configured to setup {@link FieldSet} column names from the file |
56 | * header, skip given number of lines at the beginning of the file. |
57 | * </p> |
58 | * |
59 | * @author Waseem Malik |
60 | * @author Tomas Slanina |
61 | * @author Robert Kasanicky |
62 | * @author Dave Syer |
63 | */ |
64 | public class FlatFileItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream, InitializingBean { |
65 | |
66 | private static Log log = LogFactory.getLog(FlatFileItemReader.class); |
67 | |
68 | private static final String LINES_READ_COUNT = "lines.read.count"; |
69 | |
70 | // default encoding for input files |
71 | public static final String DEFAULT_CHARSET = "ISO-8859-1"; |
72 | |
73 | private String encoding = DEFAULT_CHARSET; |
74 | |
75 | private Resource resource; |
76 | |
77 | private RecordSeparatorPolicy recordSeparatorPolicy; |
78 | |
79 | private String[] comments; |
80 | |
81 | private int linesToSkip = 0; |
82 | |
83 | private boolean firstLineIsHeader = false; |
84 | |
85 | private LineTokenizer tokenizer = new DelimitedLineTokenizer(); |
86 | |
87 | private FieldSetMapper fieldSetMapper; |
88 | |
89 | private boolean saveState = false; |
90 | |
91 | /** |
92 | * Encapsulates the state of the input source. If it is null then we are |
93 | * uninitialized. |
94 | */ |
95 | private LineReader reader; |
96 | |
97 | public FlatFileItemReader() { |
98 | setName(ClassUtils.getShortName(FlatFileItemReader.class)); |
99 | } |
100 | |
101 | /** |
102 | * Initialize the reader if necessary. |
103 | * |
104 | * @throws IllegalStateException if the resource cannot be opened |
105 | */ |
106 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
107 | |
108 | Assert.state(resource.exists(), "Resource must exist: [" + resource + "]"); |
109 | |
110 | log.debug("Opening flat file for reading: " + resource); |
111 | |
112 | if (this.reader == null) { |
113 | ResourceLineReader reader = new ResourceLineReader(resource, encoding); |
114 | if (recordSeparatorPolicy != null) { |
115 | reader.setRecordSeparatorPolicy(recordSeparatorPolicy); |
116 | } |
117 | if (comments != null) { |
118 | reader.setComments(comments); |
119 | } |
120 | reader.open(); |
121 | this.reader = reader; |
122 | } |
123 | |
124 | for (int i = 0; i < linesToSkip; i++) { |
125 | readLine(); |
126 | } |
127 | |
128 | if (firstLineIsHeader) { |
129 | // skip the header |
130 | String firstLine = readLine(); |
131 | // set names in tokenizer if they haven't been set already |
132 | if (tokenizer instanceof AbstractLineTokenizer && !((AbstractLineTokenizer) tokenizer).hasNames()) { |
133 | String[] names = tokenizer.tokenize(firstLine).getValues(); |
134 | ((AbstractLineTokenizer) tokenizer).setNames(names); |
135 | } |
136 | } |
137 | |
138 | if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { |
139 | log.debug("Initializing for restart. Restart data is: " + executionContext); |
140 | |
141 | long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); |
142 | |
143 | LineReader reader = getReader(); |
144 | |
145 | Object record = ""; |
146 | while (reader.getPosition() < lineCount && record != null) { |
147 | record = readLine(); |
148 | } |
149 | } |
150 | |
151 | } |
152 | |
153 | /** |
154 | * Close and null out the reader. |
155 | * |
156 | * @throws ItemStreamException |
157 | */ |
158 | public void close(ExecutionContext executionContext) throws ItemStreamException { |
159 | try { |
160 | if (reader != null) { |
161 | log.debug("Closing flat file for reading: " + resource); |
162 | reader.close(null); |
163 | } |
164 | } |
165 | finally { |
166 | reader = null; |
167 | } |
168 | } |
169 | |
170 | /** |
171 | * Reads a line from input, tokenizes is it using the |
172 | * {@link #setLineTokenizer(LineTokenizer)} and maps to domain object using |
173 | * {@link #setFieldSetMapper(FieldSetMapper)}. |
174 | * |
175 | * @see org.springframework.batch.item.ItemReader#read() |
176 | */ |
177 | public Object read() throws Exception { |
178 | String line = readLine(); |
179 | |
180 | if (line != null) { |
181 | try { |
182 | FieldSet tokenizedLine = tokenizer.tokenize(line); |
183 | return fieldSetMapper.mapLine(tokenizedLine); |
184 | } |
185 | catch (RuntimeException ex) { |
186 | // add current line count to message and re-throw |
187 | int lineCount = getReader().getPosition(); |
188 | throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=" |
189 | + resource.getDescription() + ", input=[" + line + "]", ex, line, lineCount); |
190 | } |
191 | } |
192 | return null; |
193 | } |
194 | |
195 | /** |
196 | * This method returns the execution attributes for the reader. It returns |
197 | * the current Line Count which can be used to reinitialise the batch job in |
198 | * case of restart. |
199 | */ |
200 | public void update(ExecutionContext executionContext) { |
201 | if (reader == null) { |
202 | throw new ItemStreamException("ItemStream not open or already closed."); |
203 | } |
204 | |
205 | if (saveState) { |
206 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
207 | executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition()); |
208 | } |
209 | } |
210 | |
211 | /** |
212 | * Mark is supported as long as this {@link ItemStream} is used in a |
213 | * single-threaded environment. The state backing the mark is a single |
214 | * counter, keeping track of the current position, so multiple threads |
215 | * cannot be accommodated. |
216 | * |
217 | * @see org.springframework.batch.item.ItemReader#mark() |
218 | */ |
219 | public void mark() { |
220 | getReader().mark(); |
221 | } |
222 | |
223 | /* |
224 | * (non-Javadoc) |
225 | * |
226 | * @see org.springframework.batch.item.ItemStream#reset(org.springframework.batch.item.ExecutionContext) |
227 | */ |
228 | public void reset() { |
229 | getReader().reset(); |
230 | } |
231 | |
232 | /** |
233 | * @return next line to be tokenized and mapped. |
234 | */ |
235 | private String readLine() { |
236 | try { |
237 | return (String) getReader().read(); |
238 | } |
239 | catch (ItemStreamException e) { |
240 | throw e; |
241 | } |
242 | catch (ItemReaderException e) { |
243 | throw e; |
244 | } |
245 | catch (Exception e) { |
246 | throw new IllegalStateException(); |
247 | } |
248 | } |
249 | |
250 | /** |
251 | * @return line reader used to read input file |
252 | */ |
253 | protected LineReader getReader() { |
254 | if (reader == null) { |
255 | throw new ReaderNotOpenException("Reader must be open before it can be read."); |
256 | // reader is now not null, or else an exception is thrown |
257 | } |
258 | return reader; |
259 | } |
260 | |
261 | /** |
262 | * Setter for resource property. The location of an input stream that can be |
263 | * read. |
264 | * |
265 | * @param resource |
266 | */ |
267 | public void setResource(Resource resource) { |
268 | this.resource = resource; |
269 | } |
270 | |
271 | /** |
272 | * Public setter for the recordSeparatorPolicy. Used to determine where the |
273 | * line endings are and do things like continue over a line ending if inside |
274 | * a quoted string. |
275 | * |
276 | * @param recordSeparatorPolicy the recordSeparatorPolicy to set |
277 | */ |
278 | public void setRecordSeparatorPolicy(RecordSeparatorPolicy recordSeparatorPolicy) { |
279 | this.recordSeparatorPolicy = recordSeparatorPolicy; |
280 | } |
281 | |
282 | /** |
283 | * Setter for comment prefixes. Can be used to ignore header lines as well |
284 | * by using e.g. the first couple of column names as a prefix. |
285 | * |
286 | * @param comments an array of comment line prefixes. |
287 | */ |
288 | public void setComments(String[] comments) { |
289 | this.comments = new String[comments.length]; |
290 | System.arraycopy(comments, 0, this.comments, 0, comments.length); |
291 | } |
292 | |
293 | /** |
294 | * Indicates whether first line is a header. If the tokenizer is an |
295 | * {@link AbstractLineTokenizer} and the column names haven't been set |
296 | * already then the header will be used to setup column names. Default is |
297 | * <code>false</code>. |
298 | */ |
299 | public void setFirstLineIsHeader(boolean firstLineIsHeader) { |
300 | this.firstLineIsHeader = firstLineIsHeader; |
301 | } |
302 | |
303 | /** |
304 | * @param lineTokenizer tokenizes each line from file into {@link FieldSet}. |
305 | */ |
306 | public void setLineTokenizer(LineTokenizer lineTokenizer) { |
307 | this.tokenizer = lineTokenizer; |
308 | } |
309 | |
310 | /** |
311 | * Set the FieldSetMapper to be used for each line. |
312 | * |
313 | * @param fieldSetMapper |
314 | */ |
315 | public void setFieldSetMapper(FieldSetMapper fieldSetMapper) { |
316 | this.fieldSetMapper = fieldSetMapper; |
317 | } |
318 | |
319 | /** |
320 | * Public setter for the number of lines to skip at the start of a file. Can |
321 | * be used if the file contains a header without useful (column name) |
322 | * information, and without a comment delimiter at the beginning of the |
323 | * lines. |
324 | * |
325 | * @param linesToSkip the number of lines to skip |
326 | */ |
327 | public void setLinesToSkip(int linesToSkip) { |
328 | this.linesToSkip = linesToSkip; |
329 | } |
330 | |
331 | /** |
332 | * Setter for the encoding for this input source. Default value is |
333 | * {@link #DEFAULT_CHARSET}. |
334 | * |
335 | * @param encoding a properties object which possibly contains the encoding |
336 | * for this input file; |
337 | */ |
338 | public void setEncoding(String encoding) { |
339 | this.encoding = encoding; |
340 | } |
341 | |
342 | public void afterPropertiesSet() throws Exception { |
343 | Assert.notNull(resource, "Input resource must not be null"); |
344 | Assert.notNull(fieldSetMapper, "FieldSetMapper must not be null."); |
345 | } |
346 | |
347 | /** |
348 | * Set the boolean indicating whether or not state should be saved in the |
349 | * provided {@link ExecutionContext} during the {@link ItemStream} call to |
350 | * update. Setting this to false means that it will always start at the |
351 | * beginning. |
352 | * |
353 | * @param saveState |
354 | */ |
355 | public void setSaveState(boolean saveState) { |
356 | this.saveState = saveState; |
357 | } |
358 | |
359 | } |