1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.io.BufferedWriter; |
20 | import java.io.File; |
21 | import java.io.FileOutputStream; |
22 | import java.io.IOException; |
23 | import java.io.Writer; |
24 | import java.nio.channels.Channels; |
25 | import java.nio.channels.FileChannel; |
26 | import java.nio.charset.UnsupportedCharsetException; |
27 | import java.util.List; |
28 | |
29 | import org.apache.commons.logging.Log; |
30 | import org.apache.commons.logging.LogFactory; |
31 | import org.springframework.batch.item.ExecutionContext; |
32 | import org.springframework.batch.item.ItemStream; |
33 | import org.springframework.batch.item.ItemStreamException; |
34 | import org.springframework.batch.item.WriteFailedException; |
35 | import org.springframework.batch.item.WriterNotOpenException; |
36 | import org.springframework.batch.item.file.transform.LineAggregator; |
37 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
38 | import org.springframework.batch.item.util.FileUtils; |
39 | import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter; |
40 | import org.springframework.beans.factory.InitializingBean; |
41 | import org.springframework.core.io.Resource; |
42 | import org.springframework.util.Assert; |
43 | import org.springframework.util.ClassUtils; |
44 | |
45 | /** |
46 | * This class is an item writer that writes data to a file or stream. The writer |
47 | * also provides restart. The location of the output file is defined by a |
48 | * {@link Resource} and must represent a writable file.<br/> |
49 | * |
50 | * Uses buffered writer to improve performance.<br/> |
51 | * |
52 | * The implementation is *not* thread-safe. |
53 | * |
54 | * @author Waseem Malik |
55 | * @author Tomas Slanina |
56 | * @author Robert Kasanicky |
57 | * @author Dave Syer |
58 | */ |
59 | public class FlatFileItemWriter<T> extends ExecutionContextUserSupport implements ResourceAwareItemWriterItemStream<T>, |
60 | InitializingBean { |
61 | |
62 | private static final boolean DEFAULT_TRANSACTIONAL = true; |
63 | |
64 | protected static final Log logger = LogFactory.getLog(FlatFileItemWriter.class); |
65 | |
66 | private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator"); |
67 | |
68 | private static final String WRITTEN_STATISTICS_NAME = "written"; |
69 | |
70 | private static final String RESTART_DATA_NAME = "current.count"; |
71 | |
72 | private Resource resource; |
73 | |
74 | private OutputState state = null; |
75 | |
76 | private LineAggregator<T> lineAggregator; |
77 | |
78 | private boolean saveState = true; |
79 | |
80 | private boolean forceSync = false; |
81 | |
82 | private boolean shouldDeleteIfExists = true; |
83 | |
84 | private boolean shouldDeleteIfEmpty = false; |
85 | |
86 | private String encoding = OutputState.DEFAULT_CHARSET; |
87 | |
88 | private FlatFileHeaderCallback headerCallback; |
89 | |
90 | private FlatFileFooterCallback footerCallback; |
91 | |
92 | private String lineSeparator = DEFAULT_LINE_SEPARATOR; |
93 | |
94 | private boolean transactional = DEFAULT_TRANSACTIONAL; |
95 | |
96 | private boolean append = false; |
97 | |
98 | public FlatFileItemWriter() { |
99 | setName(ClassUtils.getShortName(FlatFileItemWriter.class)); |
100 | } |
101 | |
102 | /** |
103 | * Assert that mandatory properties (lineAggregator) are set. |
104 | * |
105 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
106 | */ |
107 | public void afterPropertiesSet() throws Exception { |
108 | Assert.notNull(lineAggregator, "A LineAggregator must be provided."); |
109 | if (append) { |
110 | shouldDeleteIfExists = false; |
111 | } |
112 | } |
113 | |
114 | /** |
115 | * Flag to indicate that changes should be force-synced to disk on flush. |
116 | * Defaults to false, which means that even with a local disk changes could |
117 | * be lost if the OS crashes in between a write and a cache flush. Setting |
118 | * to true may result in slower performance for usage patterns involving many |
119 | * frequent writes. |
120 | * |
121 | * @param forceSync the flag value to set |
122 | */ |
123 | public void setForceSync(boolean forceSync) { |
124 | this.forceSync = forceSync; |
125 | } |
126 | |
127 | /** |
128 | * Public setter for the line separator. Defaults to the System property |
129 | * line.separator. |
130 | * @param lineSeparator the line separator to set |
131 | */ |
132 | public void setLineSeparator(String lineSeparator) { |
133 | this.lineSeparator = lineSeparator; |
134 | } |
135 | |
136 | /** |
137 | * Public setter for the {@link LineAggregator}. This will be used to |
138 | * translate the item into a line for output. |
139 | * |
140 | * @param lineAggregator the {@link LineAggregator} to set |
141 | */ |
142 | public void setLineAggregator(LineAggregator<T> lineAggregator) { |
143 | this.lineAggregator = lineAggregator; |
144 | } |
145 | |
146 | /** |
147 | * Setter for resource. Represents a file that can be written. |
148 | * |
149 | * @param resource |
150 | */ |
151 | public void setResource(Resource resource) { |
152 | this.resource = resource; |
153 | } |
154 | |
155 | /** |
156 | * Sets encoding for output template. |
157 | */ |
158 | public void setEncoding(String newEncoding) { |
159 | this.encoding = newEncoding; |
160 | } |
161 | |
162 | /** |
163 | * Flag to indicate that the target file should be deleted if it already |
164 | * exists, otherwise it will be created. Defaults to true, so no appending |
165 | * except on restart. If set to false and {@link #setAppendAllowed(boolean) |
166 | * appendAllowed} is also false then there will be an exception when the |
167 | * stream is opened to prevent existing data being potentially corrupted. |
168 | * |
169 | * @param shouldDeleteIfExists the flag value to set |
170 | */ |
171 | public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) { |
172 | this.shouldDeleteIfExists = shouldDeleteIfExists; |
173 | } |
174 | |
175 | /** |
176 | * Flag to indicate that the target file should be appended if it already |
177 | * exists. If this flag is set then the flag |
178 | * {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is |
179 | * automatically set to false, so that flag should not be set explicitly. |
180 | * Defaults value is false. |
181 | * |
182 | * @param append the flag value to set |
183 | */ |
184 | public void setAppendAllowed(boolean append) { |
185 | this.append = append; |
186 | this.shouldDeleteIfExists = false; |
187 | } |
188 | |
189 | /** |
190 | * Flag to indicate that the target file should be deleted if no lines have |
191 | * been written (other than header and footer) on close. Defaults to false. |
192 | * |
193 | * @param shouldDeleteIfEmpty the flag value to set |
194 | */ |
195 | public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) { |
196 | this.shouldDeleteIfEmpty = shouldDeleteIfEmpty; |
197 | } |
198 | |
199 | /** |
200 | * Set the flag indicating whether or not state should be saved in the |
201 | * provided {@link ExecutionContext} during the {@link ItemStream} call to |
202 | * update. Setting this to false means that it will always start at the |
203 | * beginning on a restart. |
204 | * |
205 | * @param saveState |
206 | */ |
207 | public void setSaveState(boolean saveState) { |
208 | this.saveState = saveState; |
209 | } |
210 | |
211 | /** |
212 | * headerCallback will be called before writing the first item to file. |
213 | * Newline will be automatically appended after the header is written. |
214 | */ |
215 | public void setHeaderCallback(FlatFileHeaderCallback headerCallback) { |
216 | this.headerCallback = headerCallback; |
217 | } |
218 | |
219 | /** |
220 | * footerCallback will be called after writing the last item to file, but |
221 | * before the file is closed. |
222 | */ |
223 | public void setFooterCallback(FlatFileFooterCallback footerCallback) { |
224 | this.footerCallback = footerCallback; |
225 | } |
226 | |
227 | /** |
228 | * Flag to indicate that writing to the buffer should be delayed if a |
229 | * transaction is active. Defaults to true. |
230 | */ |
231 | public void setTransactional(boolean transactional) { |
232 | this.transactional = transactional; |
233 | } |
234 | |
235 | /** |
236 | * Writes out a string followed by a "new line", where the format of the new |
237 | * line separator is determined by the underlying operating system. If the |
238 | * input is not a String and a converter is available the converter will be |
239 | * applied and then this method recursively called with the result. If the |
240 | * input is an array or collection each value will be written to a separate |
241 | * line (recursively calling this method for each value). If no converter is |
242 | * supplied the input object's toString method will be used.<br/> |
243 | * |
244 | * @param items list of items to be written to output stream |
245 | * @throws Exception if the transformer or file output fail, |
246 | * WriterNotOpenException if the writer has not been initialized. |
247 | */ |
248 | public void write(List<? extends T> items) throws Exception { |
249 | |
250 | if (!getOutputState().isInitialized()) { |
251 | throw new WriterNotOpenException("Writer must be open before it can be written to"); |
252 | } |
253 | |
254 | if (logger.isDebugEnabled()) { |
255 | logger.debug("Writing to flat file with " + items.size() + " items."); |
256 | } |
257 | |
258 | OutputState state = getOutputState(); |
259 | |
260 | StringBuilder lines = new StringBuilder(); |
261 | int lineCount = 0; |
262 | for (T item : items) { |
263 | lines.append(lineAggregator.aggregate(item) + lineSeparator); |
264 | lineCount++; |
265 | } |
266 | try { |
267 | state.write(lines.toString()); |
268 | } |
269 | catch (IOException e) { |
270 | throw new WriteFailedException("Could not write data. The file may be corrupt.", e); |
271 | } |
272 | state.linesWritten += lineCount; |
273 | } |
274 | |
275 | /** |
276 | * @see ItemStream#close() |
277 | */ |
278 | public void close() { |
279 | if (state != null) { |
280 | try { |
281 | if (footerCallback != null && state.outputBufferedWriter != null) { |
282 | footerCallback.writeFooter(state.outputBufferedWriter); |
283 | state.outputBufferedWriter.flush(); |
284 | } |
285 | } |
286 | catch (IOException e) { |
287 | throw new ItemStreamException("Failed to write footer before closing", e); |
288 | } |
289 | finally { |
290 | state.close(); |
291 | if (state.linesWritten == 0 && shouldDeleteIfEmpty) { |
292 | try { |
293 | resource.getFile().delete(); |
294 | } |
295 | catch (IOException e) { |
296 | throw new ItemStreamException("Failed to delete empty file on close", e); |
297 | } |
298 | } |
299 | state = null; |
300 | } |
301 | } |
302 | } |
303 | |
304 | /** |
305 | * Initialize the reader. This method may be called multiple times before |
306 | * close is called. |
307 | * |
308 | * @see ItemStream#open(ExecutionContext) |
309 | */ |
310 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
311 | |
312 | Assert.notNull(resource, "The resource must be set"); |
313 | |
314 | if (!getOutputState().isInitialized()) { |
315 | doOpen(executionContext); |
316 | } |
317 | } |
318 | |
319 | private void doOpen(ExecutionContext executionContext) throws ItemStreamException { |
320 | OutputState outputState = getOutputState(); |
321 | if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) { |
322 | outputState.restoreFrom(executionContext); |
323 | } |
324 | try { |
325 | outputState.initializeBufferedWriter(); |
326 | } |
327 | catch (IOException ioe) { |
328 | throw new ItemStreamException("Failed to initialize writer", ioe); |
329 | } |
330 | if (outputState.lastMarkedByteOffsetPosition == 0 && !outputState.appending) { |
331 | if (headerCallback != null) { |
332 | try { |
333 | headerCallback.writeHeader(outputState.outputBufferedWriter); |
334 | outputState.write(lineSeparator); |
335 | } |
336 | catch (IOException e) { |
337 | throw new ItemStreamException("Could not write headers. The file may be corrupt.", e); |
338 | } |
339 | } |
340 | } |
341 | } |
342 | |
343 | /** |
344 | * @see ItemStream#update(ExecutionContext) |
345 | */ |
346 | public void update(ExecutionContext executionContext) { |
347 | if (state == null) { |
348 | throw new ItemStreamException("ItemStream not open or already closed."); |
349 | } |
350 | |
351 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
352 | |
353 | if (saveState) { |
354 | |
355 | try { |
356 | executionContext.putLong(getKey(RESTART_DATA_NAME), state.position()); |
357 | } |
358 | catch (IOException e) { |
359 | throw new ItemStreamException("ItemStream does not return current position properly", e); |
360 | } |
361 | |
362 | executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten); |
363 | } |
364 | } |
365 | |
366 | // Returns object representing state. |
367 | private OutputState getOutputState() { |
368 | if (state == null) { |
369 | File file; |
370 | try { |
371 | file = resource.getFile(); |
372 | } |
373 | catch (IOException e) { |
374 | throw new ItemStreamException("Could not convert resource to file: [" + resource + "]", e); |
375 | } |
376 | Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]"); |
377 | state = new OutputState(); |
378 | state.setDeleteIfExists(shouldDeleteIfExists); |
379 | state.setAppendAllowed(append); |
380 | state.setEncoding(encoding); |
381 | } |
382 | return (OutputState) state; |
383 | } |
384 | |
385 | /** |
386 | * Encapsulates the runtime state of the writer. All state changing |
387 | * operations on the writer go through this class. |
388 | */ |
389 | private class OutputState { |
390 | // default encoding for writing to output files - set to UTF-8. |
391 | private static final String DEFAULT_CHARSET = "UTF-8"; |
392 | |
393 | private FileOutputStream os; |
394 | |
395 | // The bufferedWriter over the file channel that is actually written |
396 | Writer outputBufferedWriter; |
397 | |
398 | FileChannel fileChannel; |
399 | |
400 | // this represents the charset encoding (if any is needed) for the |
401 | // output file |
402 | String encoding = DEFAULT_CHARSET; |
403 | |
404 | boolean restarted = false; |
405 | |
406 | long lastMarkedByteOffsetPosition = 0; |
407 | |
408 | long linesWritten = 0; |
409 | |
410 | boolean shouldDeleteIfExists = true; |
411 | |
412 | boolean initialized = false; |
413 | |
414 | private boolean append = false; |
415 | |
416 | private boolean appending = false; |
417 | |
418 | /** |
419 | * Return the byte offset position of the cursor in the output file as a |
420 | * long integer. |
421 | */ |
422 | public long position() throws IOException { |
423 | long pos = 0; |
424 | |
425 | if (fileChannel == null) { |
426 | return 0; |
427 | } |
428 | |
429 | outputBufferedWriter.flush(); |
430 | pos = fileChannel.position(); |
431 | if (transactional) { |
432 | pos += ((TransactionAwareBufferedWriter) outputBufferedWriter).getBufferSize(); |
433 | } |
434 | |
435 | return pos; |
436 | |
437 | } |
438 | |
439 | /** |
440 | * @param append |
441 | */ |
442 | public void setAppendAllowed(boolean append) { |
443 | this.append = append; |
444 | } |
445 | |
446 | /** |
447 | * @param executionContext |
448 | */ |
449 | public void restoreFrom(ExecutionContext executionContext) { |
450 | lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME)); |
451 | restarted = true; |
452 | } |
453 | |
454 | /** |
455 | * @param shouldDeleteIfExists |
456 | */ |
457 | public void setDeleteIfExists(boolean shouldDeleteIfExists) { |
458 | this.shouldDeleteIfExists = shouldDeleteIfExists; |
459 | } |
460 | |
461 | /** |
462 | * @param encoding |
463 | */ |
464 | public void setEncoding(String encoding) { |
465 | this.encoding = encoding; |
466 | } |
467 | |
468 | /** |
469 | * Close the open resource and reset counters. |
470 | */ |
471 | public void close() { |
472 | |
473 | initialized = false; |
474 | restarted = false; |
475 | try { |
476 | if (outputBufferedWriter != null) { |
477 | outputBufferedWriter.close(); |
478 | } |
479 | } |
480 | catch (IOException ioe) { |
481 | throw new ItemStreamException("Unable to close the the ItemWriter", ioe); |
482 | } |
483 | finally { |
484 | if (!transactional) { |
485 | closeStream(); |
486 | } |
487 | } |
488 | } |
489 | |
490 | private void closeStream() { |
491 | try { |
492 | if (fileChannel != null) { |
493 | fileChannel.close(); |
494 | } |
495 | } |
496 | catch (IOException ioe) { |
497 | throw new ItemStreamException("Unable to close the the ItemWriter", ioe); |
498 | } |
499 | finally { |
500 | try { |
501 | if (os != null) { |
502 | os.close(); |
503 | } |
504 | } |
505 | catch (IOException ioe) { |
506 | throw new ItemStreamException("Unable to close the the ItemWriter", ioe); |
507 | } |
508 | } |
509 | } |
510 | |
511 | /** |
512 | * @param line |
513 | * @throws IOException |
514 | */ |
515 | public void write(String line) throws IOException { |
516 | if (!initialized) { |
517 | initializeBufferedWriter(); |
518 | } |
519 | |
520 | outputBufferedWriter.write(line); |
521 | outputBufferedWriter.flush(); |
522 | } |
523 | |
524 | /** |
525 | * Truncate the output at the last known good point. |
526 | * |
527 | * @throws IOException |
528 | */ |
529 | public void truncate() throws IOException { |
530 | fileChannel.truncate(lastMarkedByteOffsetPosition); |
531 | fileChannel.position(lastMarkedByteOffsetPosition); |
532 | } |
533 | |
534 | /** |
535 | * Creates the buffered writer for the output file channel based on |
536 | * configuration information. |
537 | * @throws IOException |
538 | */ |
539 | private void initializeBufferedWriter() throws IOException { |
540 | |
541 | File file = resource.getFile(); |
542 | FileUtils.setUpOutputFile(file, restarted, append, shouldDeleteIfExists); |
543 | |
544 | os = new FileOutputStream(file.getAbsolutePath(), true); |
545 | fileChannel = os.getChannel(); |
546 | |
547 | outputBufferedWriter = getBufferedWriter(fileChannel, encoding); |
548 | outputBufferedWriter.flush(); |
549 | |
550 | if (append) { |
551 | // Bug in IO library? This doesn't work... |
552 | // lastMarkedByteOffsetPosition = fileChannel.position(); |
553 | if (file.length() > 0) { |
554 | appending = true; |
555 | // Don't write the headers again |
556 | } |
557 | } |
558 | |
559 | Assert.state(outputBufferedWriter != null); |
560 | // in case of restarting reset position to last committed point |
561 | if (restarted) { |
562 | checkFileSize(); |
563 | truncate(); |
564 | } |
565 | |
566 | initialized = true; |
567 | linesWritten = 0; |
568 | } |
569 | |
570 | public boolean isInitialized() { |
571 | return initialized; |
572 | } |
573 | |
574 | /** |
575 | * Returns the buffered writer opened to the beginning of the file |
576 | * specified by the absolute path name contained in absoluteFileName. |
577 | */ |
578 | private Writer getBufferedWriter(FileChannel fileChannel, String encoding) { |
579 | try { |
580 | final FileChannel channel = fileChannel; |
581 | Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) { |
582 | @Override |
583 | public void flush() throws IOException { |
584 | super.flush(); |
585 | if (forceSync) { |
586 | channel.force(false); |
587 | } |
588 | } |
589 | }; |
590 | if (transactional) { |
591 | return new TransactionAwareBufferedWriter(writer, new Runnable() { |
592 | public void run() { |
593 | closeStream(); |
594 | } |
595 | }); |
596 | } |
597 | else { |
598 | return new BufferedWriter(writer); |
599 | } |
600 | } |
601 | catch (UnsupportedCharsetException ucse) { |
602 | throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse); |
603 | } |
604 | } |
605 | |
606 | /** |
607 | * Checks (on setState) to make sure that the current output file's size |
608 | * is not smaller than the last saved commit point. If it is, then the |
609 | * file has been damaged in some way and whole task must be started over |
610 | * again from the beginning. |
611 | * @throws IOException if there is an IO problem |
612 | */ |
613 | private void checkFileSize() throws IOException { |
614 | long size = -1; |
615 | |
616 | outputBufferedWriter.flush(); |
617 | size = fileChannel.size(); |
618 | |
619 | if (size < lastMarkedByteOffsetPosition) { |
620 | throw new ItemStreamException("Current file size is smaller than size at last commit"); |
621 | } |
622 | } |
623 | |
624 | } |
625 | |
626 | } |