1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.io.BufferedWriter; |
20 | import java.io.File; |
21 | import java.io.FileNotFoundException; |
22 | import java.io.FileOutputStream; |
23 | import java.io.IOException; |
24 | import java.nio.channels.Channels; |
25 | import java.nio.channels.FileChannel; |
26 | import java.nio.charset.UnsupportedCharsetException; |
27 | import java.util.ArrayList; |
28 | import java.util.Iterator; |
29 | import java.util.List; |
30 | |
31 | import org.springframework.batch.item.ClearFailedException; |
32 | import org.springframework.batch.item.ExecutionContext; |
33 | import org.springframework.batch.item.ExecutionContextUserSupport; |
34 | import org.springframework.batch.item.FlushFailedException; |
35 | import org.springframework.batch.item.ItemStream; |
36 | import org.springframework.batch.item.ItemStreamException; |
37 | import org.springframework.batch.item.ItemWriter; |
38 | import org.springframework.batch.item.MarkFailedException; |
39 | import org.springframework.batch.item.ResetFailedException; |
40 | import org.springframework.batch.item.file.mapping.FieldSet; |
41 | import org.springframework.batch.item.file.mapping.FieldSetCreator; |
42 | import org.springframework.batch.item.file.transform.DelimitedLineAggregator; |
43 | import org.springframework.batch.item.file.transform.LineAggregator; |
44 | import org.springframework.beans.factory.InitializingBean; |
45 | import org.springframework.core.io.Resource; |
46 | import org.springframework.dao.DataAccessResourceFailureException; |
47 | import org.springframework.util.Assert; |
48 | import org.springframework.util.ClassUtils; |
49 | |
50 | /** |
51 | * This class is an output target that writes data to a file or stream. The |
52 | * writer also provides restart, statistics and transaction features by |
53 | * implementing corresponding interfaces where possible (with a file). The |
54 | * location of the file is defined by a {@link Resource} and must represent a |
55 | * writable file.<br/> |
56 | * |
57 | * Uses buffered writer to improve performance.<br/> |
58 | * |
59 | * <p> |
60 | * This class will be updated in the future to use a buffering approach to |
61 | * handling transactions, rather than outputting directly to the file and |
62 | * truncating on rollback |
63 | * </p> |
64 | * |
65 | * @author Waseem Malik |
66 | * @author Tomas Slanina |
67 | * @author Robert Kasanicky |
68 | * @author Dave Syer |
69 | */ |
70 | public class FlatFileItemWriter extends ExecutionContextUserSupport implements ItemWriter, ItemStream, InitializingBean { |
71 | |
72 | private static final String LINE_SEPARATOR = System.getProperty("line.separator"); |
73 | |
74 | private static final String WRITTEN_STATISTICS_NAME = "written"; |
75 | |
76 | private static final String RESTART_COUNT_STATISTICS_NAME = "restart.count"; |
77 | |
78 | private static final String RESTART_DATA_NAME = "current.count"; |
79 | |
80 | private Resource resource; |
81 | |
82 | private OutputState state = null; |
83 | |
84 | private LineAggregator lineAggregator = new DelimitedLineAggregator(); |
85 | |
86 | private FieldSetCreator fieldSetCreator; |
87 | |
88 | private boolean saveState = false; |
89 | |
90 | private boolean shouldDeleteIfExists = true; |
91 | |
92 | private String encoding = OutputState.DEFAULT_CHARSET; |
93 | |
94 | private int bufferSize = OutputState.DEFAULT_BUFFER_SIZE; |
95 | |
96 | private List lineBuffer = new ArrayList(); |
97 | |
98 | public FlatFileItemWriter() { |
99 | setName(ClassUtils.getShortName(FlatFileItemWriter.class)); |
100 | } |
101 | |
102 | /** |
103 | * Assert that mandatory properties (resource) are set. |
104 | * |
105 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
106 | */ |
107 | public void afterPropertiesSet() throws Exception { |
108 | Assert.notNull(resource, "The resource must be set"); |
109 | Assert.notNull(fieldSetCreator, "A FieldSetCreator must be provided."); |
110 | } |
111 | |
112 | /** |
113 | * Public setter for the {@link LineAggregator}. This will be used to |
114 | * translate a {@link FieldSet} into a line for output. |
115 | * |
116 | * @param lineAggregator the {@link LineAggregator} to set |
117 | */ |
118 | public void setLineAggregator(LineAggregator lineAggregator) { |
119 | this.lineAggregator = lineAggregator; |
120 | } |
121 | |
122 | /** |
123 | * Public setter for the {@link FieldSetCreator}. This will be used to |
124 | * transform the item into a {@link FieldSet} before it is aggregated by the |
125 | * {@link LineAggregator}. |
126 | * |
127 | * @param fieldSetCreator the {@link FieldSetCreator} to set |
128 | */ |
129 | public void setFieldSetCreator(FieldSetCreator fieldSetCreator) { |
130 | this.fieldSetCreator = fieldSetCreator; |
131 | } |
132 | |
133 | /** |
134 | * Setter for resource. Represents a file that can be written. |
135 | * |
136 | * @param resource |
137 | */ |
138 | public void setResource(Resource resource) { |
139 | this.resource = resource; |
140 | } |
141 | |
142 | /** |
143 | * Writes out a string followed by a "new line", where the format of the new |
144 | * line separator is determined by the underlying operating system. If the |
145 | * input is not a String and a converter is available the converter will be |
146 | * applied and then this method recursively called with the result. If the |
147 | * input is an array or collection each value will be written to a separate |
148 | * line (recursively calling this method for each value). If no converter is |
149 | * supplied the input object's toString method will be used.<br/> |
150 | * |
151 | * @param data Object (a String or Object that can be converted) to be |
152 | * written to output stream |
153 | * @throws Exception if the transformer or file output fail |
154 | */ |
155 | public void write(Object data) throws Exception { |
156 | FieldSet fieldSet = fieldSetCreator.mapItem(data); |
157 | lineBuffer.add(lineAggregator.aggregate(fieldSet) + LINE_SEPARATOR); |
158 | } |
159 | |
160 | /** |
161 | * @see ItemStream#close(ExecutionContext) |
162 | */ |
163 | public void close(ExecutionContext executionContext) { |
164 | if (state != null) { |
165 | getOutputState().close(); |
166 | state = null; |
167 | } |
168 | } |
169 | |
170 | /** |
171 | * Sets encoding for output template. |
172 | */ |
173 | public void setEncoding(String newEncoding) { |
174 | this.encoding = newEncoding; |
175 | } |
176 | |
177 | /** |
178 | * Sets buffer size for output template |
179 | */ |
180 | public void setBufferSize(int newSize) { |
181 | this.bufferSize = newSize; |
182 | } |
183 | |
184 | /** |
185 | * @param shouldDeleteIfExists the shouldDeleteIfExists to set |
186 | */ |
187 | public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) { |
188 | this.shouldDeleteIfExists = shouldDeleteIfExists; |
189 | } |
190 | |
191 | /** |
192 | * Initialize the Output Template. |
193 | * |
194 | * @see ItemStream#open(ExecutionContext) |
195 | */ |
196 | public void open(ExecutionContext executionContext) { |
197 | OutputState outputState = getOutputState(); |
198 | if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) { |
199 | outputState.restoreFrom(executionContext); |
200 | } |
201 | outputState.initializeBufferedWriter(); |
202 | } |
203 | |
204 | /** |
205 | * @see ItemStream#update(ExecutionContext) |
206 | */ |
207 | public void update(ExecutionContext executionContext) { |
208 | if (state == null) { |
209 | throw new ItemStreamException("ItemStream not open or already closed."); |
210 | } |
211 | |
212 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
213 | |
214 | if (saveState) { |
215 | |
216 | try { |
217 | executionContext.putLong(getKey(RESTART_DATA_NAME), state.position()); |
218 | } |
219 | catch (IOException e) { |
220 | throw new ItemStreamException("ItemStream does not return current position properly", e); |
221 | } |
222 | |
223 | executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten); |
224 | executionContext.putLong(getKey(RESTART_COUNT_STATISTICS_NAME), state.restartCount); |
225 | } |
226 | } |
227 | |
228 | // Returns object representing state. |
229 | private OutputState getOutputState() { |
230 | if (state == null) { |
231 | try { |
232 | File file = resource.getFile(); |
233 | Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]"); |
234 | } |
235 | catch (IOException e) { |
236 | throw new ItemStreamException("Could not test resource for writable status.", e); |
237 | } |
238 | state = new OutputState(); |
239 | state.setDeleteIfExists(shouldDeleteIfExists); |
240 | state.setBufferSize(bufferSize); |
241 | state.setEncoding(encoding); |
242 | } |
243 | return (OutputState) state; |
244 | } |
245 | |
246 | /** |
247 | * Encapsulates the runtime state of the writer. All state changing |
248 | * operations on the writer go through this class. |
249 | */ |
250 | private class OutputState { |
251 | // default encoding for writing to output files - set to UTF-8. |
252 | private static final String DEFAULT_CHARSET = "UTF-8"; |
253 | |
254 | private static final int DEFAULT_BUFFER_SIZE = 2048; |
255 | |
256 | // The bufferedWriter over the file channel that is actually written |
257 | BufferedWriter outputBufferedWriter; |
258 | |
259 | FileChannel fileChannel; |
260 | |
261 | // this represents the charset encoding (if any is needed) for the |
262 | // output file |
263 | String encoding = DEFAULT_CHARSET; |
264 | |
265 | // Optional write buffer size |
266 | int bufferSize = DEFAULT_BUFFER_SIZE; |
267 | |
268 | boolean restarted = false; |
269 | |
270 | boolean initialized = false; |
271 | |
272 | long lastMarkedByteOffsetPosition = 0; |
273 | |
274 | long linesWritten = 0; |
275 | |
276 | long restartCount = 0; |
277 | |
278 | boolean shouldDeleteIfExists = true; |
279 | |
280 | /** |
281 | * Return the byte offset position of the cursor in the output file as a |
282 | * long integer. |
283 | */ |
284 | public long position() throws IOException { |
285 | long pos = 0; |
286 | |
287 | if (fileChannel == null) { |
288 | return 0; |
289 | } |
290 | |
291 | outputBufferedWriter.flush(); |
292 | pos = fileChannel.position(); |
293 | |
294 | return pos; |
295 | |
296 | } |
297 | |
298 | /** |
299 | * @param executionContext |
300 | */ |
301 | public void restoreFrom(ExecutionContext executionContext) { |
302 | lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME)); |
303 | restarted = true; |
304 | } |
305 | |
306 | /** |
307 | * @param shouldDeleteIfExists |
308 | */ |
309 | public void setDeleteIfExists(boolean shouldDeleteIfExists) { |
310 | this.shouldDeleteIfExists = shouldDeleteIfExists; |
311 | } |
312 | |
313 | /** |
314 | * @param bufferSize |
315 | */ |
316 | public void setBufferSize(int bufferSize) { |
317 | this.bufferSize = bufferSize; |
318 | } |
319 | |
320 | /** |
321 | * @param encoding |
322 | */ |
323 | public void setEncoding(String encoding) { |
324 | this.encoding = encoding; |
325 | } |
326 | |
327 | /** |
328 | * Close the open resource and reset counters. |
329 | */ |
330 | public void close() { |
331 | initialized = false; |
332 | restarted = false; |
333 | try { |
334 | if (outputBufferedWriter == null) { |
335 | return; |
336 | } |
337 | outputBufferedWriter.close(); |
338 | fileChannel.close(); |
339 | } |
340 | catch (IOException ioe) { |
341 | throw new ItemStreamException("Unable to close the the ItemWriter", ioe); |
342 | } |
343 | } |
344 | |
345 | /** |
346 | * @param line |
347 | * @throws IOException |
348 | */ |
349 | public void write(String line) throws IOException { |
350 | if (!initialized) { |
351 | initializeBufferedWriter(); |
352 | } |
353 | |
354 | outputBufferedWriter.write(line); |
355 | outputBufferedWriter.flush(); |
356 | linesWritten++; |
357 | } |
358 | |
359 | /** |
360 | * Truncate the output at the last known good point. |
361 | * |
362 | * @throws IOException |
363 | */ |
364 | public void truncate() throws IOException { |
365 | fileChannel.truncate(lastMarkedByteOffsetPosition); |
366 | fileChannel.position(lastMarkedByteOffsetPosition); |
367 | } |
368 | |
369 | /** |
370 | * Mark the current position. |
371 | */ |
372 | public void mark() { |
373 | try { |
374 | lastMarkedByteOffsetPosition = this.position(); |
375 | } |
376 | catch (IOException e) { |
377 | throw new MarkFailedException("Unable to get position for mark", e); |
378 | } |
379 | } |
380 | |
381 | /** |
382 | * Creates the buffered writer for the output file channel based on |
383 | * configuration information. |
384 | */ |
385 | private void initializeBufferedWriter() { |
386 | File file; |
387 | |
388 | try { |
389 | file = resource.getFile(); |
390 | |
391 | // If the output source was restarted, keep existing file. |
392 | // If the output source was not restarted, check following: |
393 | // - if the file should be deleted, delete it if it was exiting |
394 | // and create blank file, |
395 | // - if the file should not be deleted, if it already exists, |
396 | // throw an exception, |
397 | // - if the file was not existing, create new. |
398 | if (!restarted) { |
399 | if (file.exists()) { |
400 | if (shouldDeleteIfExists) { |
401 | file.delete(); |
402 | } |
403 | else { |
404 | throw new ItemStreamException("Resource already exists: " + resource); |
405 | } |
406 | } |
407 | String parent = file.getParent(); |
408 | if (parent != null) { |
409 | new File(parent).mkdirs(); |
410 | } |
411 | file.createNewFile(); |
412 | } |
413 | |
414 | } |
415 | catch (IOException ioe) { |
416 | throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", |
417 | ioe); |
418 | } |
419 | |
420 | try { |
421 | fileChannel = (new FileOutputStream(file.getAbsolutePath(), true)).getChannel(); |
422 | } |
423 | catch (FileNotFoundException fnfe) { |
424 | throw new ItemStreamException("Bad filename property parameter " + file, fnfe); |
425 | } |
426 | |
427 | outputBufferedWriter = getBufferedWriter(fileChannel, encoding, bufferSize); |
428 | |
429 | // in case of restarting reset position to last committed point |
430 | if (restarted) { |
431 | this.reset(); |
432 | } |
433 | |
434 | initialized = true; |
435 | linesWritten = 0; |
436 | } |
437 | |
438 | /** |
439 | * Returns the buffered writer opened to the beginning of the file |
440 | * specified by the absolute path name contained in absoluteFileName. |
441 | */ |
442 | private BufferedWriter getBufferedWriter(FileChannel fileChannel, String encoding, int bufferSize) { |
443 | try { |
444 | |
445 | BufferedWriter outputBufferedWriter = null; |
446 | |
447 | // If a buffer was requested, allocate. |
448 | if (bufferSize > 0) { |
449 | outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding), bufferSize); |
450 | } |
451 | else { |
452 | outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding)); |
453 | } |
454 | |
455 | return outputBufferedWriter; |
456 | } |
457 | catch (UnsupportedCharsetException ucse) { |
458 | throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse); |
459 | } |
460 | } |
461 | |
462 | /** |
463 | * Resets the file writer's current position to the point stored in the |
464 | * last marked byte offset position variable. It first checks to make |
465 | * sure the current size of the file is not less than the byte position |
466 | * to be moved to (if it is, throws an environment exception), then it |
467 | * truncates the file to that reset position, and set the cursor to |
468 | * start writing at that point. |
469 | */ |
470 | public void reset() throws ResetFailedException { |
471 | checkFileSize(); |
472 | try { |
473 | getOutputState().truncate(); |
474 | } |
475 | catch (IOException e) { |
476 | throw new ResetFailedException("Unable to truncate file", e); |
477 | } |
478 | } |
479 | |
480 | /** |
481 | * Checks (on setState) to make sure that the current output file's size |
482 | * is not smaller than the last saved commit point. If it is, then the |
483 | * file has been damaged in some way and whole task must be started over |
484 | * again from the beginning. |
485 | */ |
486 | private void checkFileSize() { |
487 | long size = -1; |
488 | |
489 | try { |
490 | outputBufferedWriter.flush(); |
491 | size = fileChannel.size(); |
492 | } |
493 | catch (IOException e) { |
494 | throw new ResetFailedException("An Error occured while checking file size", e); |
495 | } |
496 | |
497 | if (size < lastMarkedByteOffsetPosition) { |
498 | throw new ResetFailedException("Current file size is smaller than size at last commit"); |
499 | } |
500 | } |
501 | |
502 | } |
503 | |
504 | public void clear() throws ClearFailedException { |
505 | lineBuffer.clear(); |
506 | } |
507 | |
508 | public void flush() throws FlushFailedException { |
509 | OutputState state = getOutputState(); |
510 | for (Iterator iterator = lineBuffer.listIterator(); iterator.hasNext();) { |
511 | String line = (String) iterator.next(); |
512 | try { |
513 | state.write(line); |
514 | } |
515 | catch (IOException e) { |
516 | throw new FlushFailedException("Failed to write line to output file: " + line, e); |
517 | } |
518 | } |
519 | lineBuffer.clear(); |
520 | state.mark(); |
521 | } |
522 | |
523 | /** |
524 | * Set the boolean indicating whether or not state should be saved in the |
525 | * provided {@link ExecutionContext} during the {@link ItemStream} call to |
526 | * update. Setting this to false means that it will always start at the |
527 | * beginning. |
528 | * |
529 | * @param saveState |
530 | */ |
531 | public void setSaveState(boolean saveState) { |
532 | this.saveState = saveState; |
533 | } |
534 | |
535 | } |