1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.io.BufferedWriter; |
20 | import java.io.File; |
21 | import java.io.FileOutputStream; |
22 | import java.io.IOException; |
23 | import java.nio.channels.Channels; |
24 | import java.nio.channels.FileChannel; |
25 | import java.nio.charset.UnsupportedCharsetException; |
26 | import java.util.ArrayList; |
27 | import java.util.Arrays; |
28 | import java.util.Iterator; |
29 | import java.util.List; |
30 | |
31 | import org.springframework.batch.item.ClearFailedException; |
32 | import org.springframework.batch.item.ExecutionContext; |
33 | import org.springframework.batch.item.FlushFailedException; |
34 | import org.springframework.batch.item.ItemStream; |
35 | import org.springframework.batch.item.ItemStreamException; |
36 | import org.springframework.batch.item.ItemWriter; |
37 | import org.springframework.batch.item.MarkFailedException; |
38 | import org.springframework.batch.item.ResetFailedException; |
39 | import org.springframework.batch.item.WriterNotOpenException; |
40 | import org.springframework.batch.item.file.mapping.FieldSet; |
41 | import org.springframework.batch.item.file.mapping.FieldSetCreator; |
42 | import org.springframework.batch.item.file.transform.DelimitedLineAggregator; |
43 | import org.springframework.batch.item.file.transform.LineAggregator; |
44 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
45 | import org.springframework.batch.item.util.FileUtils; |
46 | import org.springframework.beans.factory.InitializingBean; |
47 | import org.springframework.core.io.Resource; |
48 | import org.springframework.util.Assert; |
49 | import org.springframework.util.ClassUtils; |
50 | |
51 | /** |
52 | * This class is an item writer that writes data to a file or stream. The writer |
53 | * also provides restart. The location of the output file is defined by a |
54 | * {@link Resource} and must represent a writable file.<br/> |
55 | * |
56 | * Uses buffered writer to improve performance.<br/> |
57 | * |
58 | * <p> |
59 | * Output lines are buffered until {@link #flush()} is called and only then the |
60 | * actual writing to file occurs. |
61 | * </p> |
62 | * |
63 | * The implementation is *not* thread-safe. |
64 | * |
65 | * @author Waseem Malik |
66 | * @author Tomas Slanina |
67 | * @author Robert Kasanicky |
68 | * @author Dave Syer |
69 | */ |
70 | public class FlatFileItemWriter extends ExecutionContextUserSupport implements ItemWriter, ItemStream, InitializingBean { |
71 | |
72 | private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator"); |
73 | |
74 | private static final String WRITTEN_STATISTICS_NAME = "written"; |
75 | |
76 | private static final String RESTART_COUNT_STATISTICS_NAME = "restart.count"; |
77 | |
78 | private static final String RESTART_DATA_NAME = "current.count"; |
79 | |
80 | private Resource resource; |
81 | |
82 | private OutputState state = null; |
83 | |
84 | private LineAggregator lineAggregator = new DelimitedLineAggregator(); |
85 | |
86 | private FieldSetCreator fieldSetCreator; |
87 | |
88 | private boolean saveState = true; |
89 | |
90 | private boolean shouldDeleteIfExists = true; |
91 | |
92 | private String encoding = OutputState.DEFAULT_CHARSET; |
93 | |
94 | private int bufferSize = OutputState.DEFAULT_BUFFER_SIZE; |
95 | |
96 | private List lineBuffer = new ArrayList(); |
97 | |
98 | private List headerLines = new ArrayList(); |
99 | |
100 | private String lineSeparator = DEFAULT_LINE_SEPARATOR; |
101 | |
102 | public FlatFileItemWriter() { |
103 | setName(ClassUtils.getShortName(FlatFileItemWriter.class)); |
104 | } |
105 | |
106 | /** |
107 | * Assert that mandatory properties (resource) are set. |
108 | * |
109 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
110 | */ |
111 | public void afterPropertiesSet() throws Exception { |
112 | Assert.notNull(fieldSetCreator, "A FieldSetCreator must be provided."); |
113 | } |
114 | |
115 | /** |
116 | * Public setter for the line separator. Defaults to the System property |
117 | * line.separator. |
118 | * @param lineSeparator the line separator to set |
119 | */ |
120 | public void setLineSeparator(String lineSeparator) { |
121 | this.lineSeparator = lineSeparator; |
122 | } |
123 | |
124 | /** |
125 | * Public setter for the {@link LineAggregator}. This will be used to |
126 | * translate a {@link FieldSet} into a line for output. |
127 | * |
128 | * @param lineAggregator the {@link LineAggregator} to set |
129 | */ |
130 | public void setLineAggregator(LineAggregator lineAggregator) { |
131 | this.lineAggregator = lineAggregator; |
132 | } |
133 | |
134 | /** |
135 | * Public setter for the {@link FieldSetCreator}. This will be used to |
136 | * transform the item into a {@link FieldSet} before it is aggregated by the |
137 | * {@link LineAggregator}. |
138 | * |
139 | * @param fieldSetCreator the {@link FieldSetCreator} to set |
140 | */ |
141 | public void setFieldSetCreator(FieldSetCreator fieldSetCreator) { |
142 | this.fieldSetCreator = fieldSetCreator; |
143 | } |
144 | |
145 | /** |
146 | * Setter for resource. Represents a file that can be written. |
147 | * |
148 | * @param resource |
149 | */ |
150 | public void setResource(Resource resource) { |
151 | this.resource = resource; |
152 | } |
153 | |
154 | /** |
155 | * Sets encoding for output template. |
156 | */ |
157 | public void setEncoding(String newEncoding) { |
158 | this.encoding = newEncoding; |
159 | } |
160 | |
161 | /** |
162 | * Sets buffer size for output template |
163 | */ |
164 | public void setBufferSize(int newSize) { |
165 | this.bufferSize = newSize; |
166 | } |
167 | |
168 | /** |
169 | * @param shouldDeleteIfExists the shouldDeleteIfExists to set |
170 | */ |
171 | public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) { |
172 | this.shouldDeleteIfExists = shouldDeleteIfExists; |
173 | } |
174 | |
175 | /** |
176 | * Set the flag indicating whether or not state should be saved in the |
177 | * provided {@link ExecutionContext} during the {@link ItemStream} call to |
178 | * update. Setting this to false means that it will always start at the |
179 | * beginning on a restart. |
180 | * |
181 | * @param saveState |
182 | */ |
183 | public void setSaveState(boolean saveState) { |
184 | this.saveState = saveState; |
185 | } |
186 | |
187 | /** |
188 | * Public setter for the header lines. These will be output at the head of |
189 | * the file before any calls to {@link #write(Object)} (and not on restart |
190 | * unless the restart is after a failure before the first flush). |
191 | * |
192 | * @param headerLines the header lines to set |
193 | */ |
194 | public void setHeaderLines(String[] headerLines) { |
195 | this.headerLines = Arrays.asList(headerLines); |
196 | } |
197 | |
198 | /** |
199 | * Writes out a string followed by a "new line", where the format of the new |
200 | * line separator is determined by the underlying operating system. If the |
201 | * input is not a String and a converter is available the converter will be |
202 | * applied and then this method recursively called with the result. If the |
203 | * input is an array or collection each value will be written to a separate |
204 | * line (recursively calling this method for each value). If no converter is |
205 | * supplied the input object's toString method will be used.<br/> |
206 | * |
207 | * @param data Object (a String or Object that can be converted) to be |
208 | * written to output stream |
209 | * @throws Exception if the transformer or file output fail, WriterNotOpenException |
210 | * if the writer has not been initialized. |
211 | */ |
212 | public void write(Object data) throws Exception { |
213 | if(getOutputState().isInitialized()){ |
214 | FieldSet fieldSet = fieldSetCreator.mapItem(data); |
215 | lineBuffer.add(lineAggregator.aggregate(fieldSet) + lineSeparator); |
216 | } |
217 | else{ |
218 | throw new WriterNotOpenException("Writer must be open before it can be written to"); |
219 | } |
220 | } |
221 | |
222 | /** |
223 | * @see ItemStream#close(ExecutionContext) |
224 | */ |
225 | public void close(ExecutionContext executionContext) { |
226 | if (state != null) { |
227 | getOutputState().close(); |
228 | state = null; |
229 | } |
230 | } |
231 | |
232 | /** |
233 | * Initialize the reader. This method may be called multiple times before close is |
234 | * called. |
235 | * |
236 | * @see ItemStream#open(ExecutionContext) |
237 | */ |
238 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
239 | |
240 | Assert.notNull(resource, "The resource must be set"); |
241 | |
242 | if(!getOutputState().isInitialized()){ |
243 | doOpen(executionContext); |
244 | } |
245 | } |
246 | |
247 | private void doOpen(ExecutionContext executionContext){ |
248 | OutputState outputState = getOutputState(); |
249 | if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) { |
250 | outputState.restoreFrom(executionContext); |
251 | } |
252 | try { |
253 | outputState.initializeBufferedWriter(); |
254 | } |
255 | catch (IOException ioe) { |
256 | throw new ItemStreamException("Failed to initialize writer", ioe); |
257 | } |
258 | if (outputState.lastMarkedByteOffsetPosition == 0) { |
259 | for (Iterator iterator = headerLines.iterator(); iterator.hasNext();) { |
260 | String line = (String) iterator.next(); |
261 | lineBuffer.add(line + lineSeparator); |
262 | } |
263 | } |
264 | } |
265 | |
266 | /** |
267 | * @see ItemStream#update(ExecutionContext) |
268 | */ |
269 | public void update(ExecutionContext executionContext) { |
270 | if (state == null) { |
271 | throw new ItemStreamException("ItemStream not open or already closed."); |
272 | } |
273 | |
274 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
275 | |
276 | if (saveState) { |
277 | |
278 | try { |
279 | executionContext.putLong(getKey(RESTART_DATA_NAME), state.position()); |
280 | } |
281 | catch (IOException e) { |
282 | throw new ItemStreamException("ItemStream does not return current position properly", e); |
283 | } |
284 | |
285 | executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten); |
286 | executionContext.putLong(getKey(RESTART_COUNT_STATISTICS_NAME), state.restartCount); |
287 | } |
288 | } |
289 | |
290 | public void flush() throws FlushFailedException { |
291 | OutputState state = getOutputState(); |
292 | for (Iterator iterator = lineBuffer.listIterator(); iterator.hasNext();) { |
293 | String line = (String) iterator.next(); |
294 | try { |
295 | state.write(line); |
296 | } |
297 | catch (IOException e) { |
298 | throw new FlushFailedException("Failed to write line to output file: " + line, e); |
299 | } |
300 | } |
301 | lineBuffer.clear(); |
302 | state.mark(); |
303 | } |
304 | |
305 | // Returns object representing state. |
306 | private OutputState getOutputState() { |
307 | if (state == null) { |
308 | try { |
309 | File file = resource.getFile(); |
310 | Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]"); |
311 | } |
312 | catch (IOException e) { |
313 | throw new ItemStreamException("Could not test resource for writable status.", e); |
314 | } |
315 | state = new OutputState(); |
316 | state.setDeleteIfExists(shouldDeleteIfExists); |
317 | state.setBufferSize(bufferSize); |
318 | state.setEncoding(encoding); |
319 | } |
320 | return (OutputState) state; |
321 | } |
322 | |
323 | /** |
324 | * Encapsulates the runtime state of the writer. All state changing |
325 | * operations on the writer go through this class. |
326 | */ |
327 | private class OutputState { |
328 | // default encoding for writing to output files - set to UTF-8. |
329 | private static final String DEFAULT_CHARSET = "UTF-8"; |
330 | |
331 | private static final int DEFAULT_BUFFER_SIZE = 2048; |
332 | |
333 | // The bufferedWriter over the file channel that is actually written |
334 | BufferedWriter outputBufferedWriter; |
335 | |
336 | FileChannel fileChannel; |
337 | |
338 | // this represents the charset encoding (if any is needed) for the |
339 | // output file |
340 | String encoding = DEFAULT_CHARSET; |
341 | |
342 | // Optional write buffer size |
343 | int bufferSize = DEFAULT_BUFFER_SIZE; |
344 | |
345 | boolean restarted = false; |
346 | |
347 | long lastMarkedByteOffsetPosition = 0; |
348 | |
349 | long linesWritten = 0; |
350 | |
351 | long restartCount = 0; |
352 | |
353 | boolean shouldDeleteIfExists = true; |
354 | |
355 | boolean initialized = false; |
356 | |
357 | /** |
358 | * Return the byte offset position of the cursor in the output file as a |
359 | * long integer. |
360 | */ |
361 | public long position() throws IOException { |
362 | long pos = 0; |
363 | |
364 | if (fileChannel == null) { |
365 | return 0; |
366 | } |
367 | |
368 | outputBufferedWriter.flush(); |
369 | pos = fileChannel.position(); |
370 | |
371 | return pos; |
372 | |
373 | } |
374 | |
375 | /** |
376 | * @param executionContext |
377 | */ |
378 | public void restoreFrom(ExecutionContext executionContext) { |
379 | lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME)); |
380 | restarted = true; |
381 | } |
382 | |
383 | /** |
384 | * @param shouldDeleteIfExists |
385 | */ |
386 | public void setDeleteIfExists(boolean shouldDeleteIfExists) { |
387 | this.shouldDeleteIfExists = shouldDeleteIfExists; |
388 | } |
389 | |
390 | /** |
391 | * @param bufferSize |
392 | */ |
393 | public void setBufferSize(int bufferSize) { |
394 | this.bufferSize = bufferSize; |
395 | } |
396 | |
397 | /** |
398 | * @param encoding |
399 | */ |
400 | public void setEncoding(String encoding) { |
401 | this.encoding = encoding; |
402 | } |
403 | |
404 | /** |
405 | * Close the open resource and reset counters. |
406 | */ |
407 | public void close() { |
408 | initialized = false; |
409 | restarted = false; |
410 | try { |
411 | if (outputBufferedWriter == null) { |
412 | return; |
413 | } |
414 | outputBufferedWriter.close(); |
415 | fileChannel.close(); |
416 | } |
417 | catch (IOException ioe) { |
418 | throw new ItemStreamException("Unable to close the the ItemWriter", ioe); |
419 | } |
420 | } |
421 | |
422 | /** |
423 | * @param line |
424 | * @throws IOException |
425 | */ |
426 | public void write(String line) throws IOException { |
427 | if (!initialized) { |
428 | initializeBufferedWriter(); |
429 | } |
430 | |
431 | outputBufferedWriter.write(line); |
432 | outputBufferedWriter.flush(); |
433 | linesWritten++; |
434 | } |
435 | |
436 | /** |
437 | * Truncate the output at the last known good point. |
438 | * |
439 | * @throws IOException |
440 | */ |
441 | public void truncate() throws IOException { |
442 | fileChannel.truncate(lastMarkedByteOffsetPosition); |
443 | fileChannel.position(lastMarkedByteOffsetPosition); |
444 | } |
445 | |
446 | /** |
447 | * Mark the current position. |
448 | */ |
449 | public void mark() { |
450 | try { |
451 | lastMarkedByteOffsetPosition = this.position(); |
452 | } |
453 | catch (IOException e) { |
454 | throw new MarkFailedException("Unable to get position for mark", e); |
455 | } |
456 | } |
457 | |
458 | /** |
459 | * Creates the buffered writer for the output file channel based on |
460 | * configuration information. |
461 | * @throws IOException |
462 | */ |
463 | private void initializeBufferedWriter() throws IOException { |
464 | |
465 | |
466 | File file = resource.getFile(); |
467 | |
468 | FileUtils.setUpOutputFile(file, restarted, shouldDeleteIfExists); |
469 | |
470 | fileChannel = (new FileOutputStream(file.getAbsolutePath(), true)).getChannel(); |
471 | |
472 | outputBufferedWriter = getBufferedWriter(fileChannel, encoding, bufferSize); |
473 | |
474 | // in case of restarting reset position to last committed point |
475 | if (restarted) { |
476 | this.reset(); |
477 | } |
478 | |
479 | initialized = true; |
480 | linesWritten = 0; |
481 | } |
482 | |
483 | public boolean isInitialized() { |
484 | return initialized; |
485 | } |
486 | |
487 | /** |
488 | * Returns the buffered writer opened to the beginning of the file |
489 | * specified by the absolute path name contained in absoluteFileName. |
490 | */ |
491 | private BufferedWriter getBufferedWriter(FileChannel fileChannel, String encoding, int bufferSize) { |
492 | try { |
493 | |
494 | BufferedWriter outputBufferedWriter = null; |
495 | |
496 | // If a buffer was requested, allocate. |
497 | if (bufferSize > 0) { |
498 | outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding), bufferSize); |
499 | } |
500 | else { |
501 | outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding)); |
502 | } |
503 | |
504 | return outputBufferedWriter; |
505 | } |
506 | catch (UnsupportedCharsetException ucse) { |
507 | throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse); |
508 | } |
509 | } |
510 | |
511 | /** |
512 | * Resets the file writer's current position to the point stored in the |
513 | * last marked byte offset position variable. It first checks to make |
514 | * sure the current size of the file is not less than the byte position |
515 | * to be moved to (if it is, throws an environment exception), then it |
516 | * truncates the file to that reset position, and set the cursor to |
517 | * start writing at that point. |
518 | */ |
519 | public void reset() throws ResetFailedException { |
520 | checkFileSize(); |
521 | try { |
522 | getOutputState().truncate(); |
523 | } |
524 | catch (IOException e) { |
525 | throw new ResetFailedException("Unable to truncate file", e); |
526 | } |
527 | } |
528 | |
529 | /** |
530 | * Checks (on setState) to make sure that the current output file's size |
531 | * is not smaller than the last saved commit point. If it is, then the |
532 | * file has been damaged in some way and whole task must be started over |
533 | * again from the beginning. |
534 | */ |
535 | private void checkFileSize() { |
536 | long size = -1; |
537 | |
538 | try { |
539 | outputBufferedWriter.flush(); |
540 | size = fileChannel.size(); |
541 | } |
542 | catch (IOException e) { |
543 | throw new ResetFailedException("An Error occured while checking file size", e); |
544 | } |
545 | |
546 | if (size < lastMarkedByteOffsetPosition) { |
547 | throw new ResetFailedException("Current file size is smaller than size at last commit"); |
548 | } |
549 | } |
550 | |
551 | } |
552 | |
553 | public void clear() throws ClearFailedException { |
554 | lineBuffer.clear(); |
555 | } |
556 | |
557 | } |