View Javadoc

1   /*
2    * Copyright 2006-2012 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.file;
18  
19  import java.io.BufferedWriter;
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.Writer;
24  import java.nio.channels.Channels;
25  import java.nio.channels.FileChannel;
26  import java.nio.charset.UnsupportedCharsetException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.springframework.batch.item.ExecutionContext;
32  import org.springframework.batch.item.ItemStream;
33  import org.springframework.batch.item.ItemStreamException;
34  import org.springframework.batch.item.WriteFailedException;
35  import org.springframework.batch.item.WriterNotOpenException;
36  import org.springframework.batch.item.file.transform.LineAggregator;
37  import org.springframework.batch.item.util.ExecutionContextUserSupport;
38  import org.springframework.batch.item.util.FileUtils;
39  import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
40  import org.springframework.beans.factory.InitializingBean;
41  import org.springframework.core.io.Resource;
42  import org.springframework.util.Assert;
43  import org.springframework.util.ClassUtils;
44  
45  /**
46   * This class is an item writer that writes data to a file or stream. The writer
47   * also provides restart. The location of the output file is defined by a
48   * {@link Resource} and must represent a writable file.<br/>
49   * 
50   * Uses buffered writer to improve performance.<br/>
51   * 
52   * The implementation is *not* thread-safe.
53   * 
54   * @author Waseem Malik
55   * @author Tomas Slanina
56   * @author Robert Kasanicky
57   * @author Dave Syer
58   * @author Michael Minella
59   */
60  public class FlatFileItemWriter<T> extends ExecutionContextUserSupport implements ResourceAwareItemWriterItemStream<T>,
61  		InitializingBean {
62  
63  	private static final boolean DEFAULT_TRANSACTIONAL = true;
64  
65  	protected static final Log logger = LogFactory.getLog(FlatFileItemWriter.class);
66  
67  	private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
68  
69  	private static final String WRITTEN_STATISTICS_NAME = "written";
70  
71  	private static final String RESTART_DATA_NAME = "current.count";
72  
73  	private Resource resource;
74  
75  	private OutputState state = null;
76  
77  	private LineAggregator<T> lineAggregator;
78  
79  	private boolean saveState = true;
80  
81  	private boolean forceSync = false;
82  
83  	private boolean shouldDeleteIfExists = true;
84  
85  	private boolean shouldDeleteIfEmpty = false;
86  
87  	private String encoding = OutputState.DEFAULT_CHARSET;
88  
89  	private FlatFileHeaderCallback headerCallback;
90  
91  	private FlatFileFooterCallback footerCallback;
92  
93  	private String lineSeparator = DEFAULT_LINE_SEPARATOR;
94  
95  	private boolean transactional = DEFAULT_TRANSACTIONAL;
96  
97  	private boolean append = false;
98  
99  	public FlatFileItemWriter() {
100 		setName(ClassUtils.getShortName(FlatFileItemWriter.class));
101 	}
102 
103 	/**
104 	 * Assert that mandatory properties (lineAggregator) are set.
105 	 * 
106 	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
107 	 */
108     @Override
109 	public void afterPropertiesSet() throws Exception {
110 		Assert.notNull(lineAggregator, "A LineAggregator must be provided.");
111 		if (append) {
112 			shouldDeleteIfExists = false;
113 		}
114 	}
115 
116 	/**
117 	 * Flag to indicate that changes should be force-synced to disk on flush.
118 	 * Defaults to false, which means that even with a local disk changes could
119 	 * be lost if the OS crashes in between a write and a cache flush. Setting
120 	 * to true may result in slower performance for usage patterns involving many
121 	 * frequent writes.
122 	 * 
123 	 * @param forceSync the flag value to set
124 	 */
125 	public void setForceSync(boolean forceSync) {
126 		this.forceSync = forceSync;
127 	}
128 
129 	/**
130 	 * Public setter for the line separator. Defaults to the System property
131 	 * line.separator.
132 	 * @param lineSeparator the line separator to set
133 	 */
134 	public void setLineSeparator(String lineSeparator) {
135 		this.lineSeparator = lineSeparator;
136 	}
137 
138 	/**
139 	 * Public setter for the {@link LineAggregator}. This will be used to
140 	 * translate the item into a line for output.
141 	 * 
142 	 * @param lineAggregator the {@link LineAggregator} to set
143 	 */
144 	public void setLineAggregator(LineAggregator<T> lineAggregator) {
145 		this.lineAggregator = lineAggregator;
146 	}
147 
148 	/**
149 	 * Setter for resource. Represents a file that can be written.
150 	 * 
151 	 * @param resource
152 	 */
153     @Override
154 	public void setResource(Resource resource) {
155 		this.resource = resource;
156 	}
157 
158 	/**
159 	 * Sets encoding for output template.
160 	 */
161 	public void setEncoding(String newEncoding) {
162 		this.encoding = newEncoding;
163 	}
164 
165 	/**
166 	 * Flag to indicate that the target file should be deleted if it already
167 	 * exists, otherwise it will be created. Defaults to true, so no appending
168 	 * except on restart. If set to false and {@link #setAppendAllowed(boolean)
169 	 * appendAllowed} is also false then there will be an exception when the
170 	 * stream is opened to prevent existing data being potentially corrupted.
171 	 * 
172 	 * @param shouldDeleteIfExists the flag value to set
173 	 */
174 	public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) {
175 		this.shouldDeleteIfExists = shouldDeleteIfExists;
176 	}
177 
178 	/**
179 	 * Flag to indicate that the target file should be appended if it already
180 	 * exists. If this flag is set then the flag
181 	 * {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is
182 	 * automatically set to false, so that flag should not be set explicitly.
183 	 * Defaults value is false.
184 	 * 
185 	 * @param append the flag value to set
186 	 */
187 	public void setAppendAllowed(boolean append) {
188 		this.append = append;
189 		this.shouldDeleteIfExists = false;
190 	}
191 
192 	/**
193 	 * Flag to indicate that the target file should be deleted if no lines have
194 	 * been written (other than header and footer) on close. Defaults to false.
195 	 * 
196 	 * @param shouldDeleteIfEmpty the flag value to set
197 	 */
198 	public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) {
199 		this.shouldDeleteIfEmpty = shouldDeleteIfEmpty;
200 	}
201 
202 	/**
203 	 * Set the flag indicating whether or not state should be saved in the
204 	 * provided {@link ExecutionContext} during the {@link ItemStream} call to
205 	 * update. Setting this to false means that it will always start at the
206 	 * beginning on a restart.
207 	 * 
208 	 * @param saveState
209 	 */
210 	public void setSaveState(boolean saveState) {
211 		this.saveState = saveState;
212 	}
213 
214 	/**
215 	 * headerCallback will be called before writing the first item to file.
216 	 * Newline will be automatically appended after the header is written.
217 	 */
218 	public void setHeaderCallback(FlatFileHeaderCallback headerCallback) {
219 		this.headerCallback = headerCallback;
220 	}
221 
222 	/**
223 	 * footerCallback will be called after writing the last item to file, but
224 	 * before the file is closed.
225 	 */
226 	public void setFooterCallback(FlatFileFooterCallback footerCallback) {
227 		this.footerCallback = footerCallback;
228 	}
229 
230 	/**
231 	 * Flag to indicate that writing to the buffer should be delayed if a
232 	 * transaction is active. Defaults to true.
233 	 */
234 	public void setTransactional(boolean transactional) {
235 		this.transactional = transactional;
236 	}
237 
238 	/**
239 	 * Writes out a string followed by a "new line", where the format of the new
240 	 * line separator is determined by the underlying operating system. If the
241 	 * input is not a String and a converter is available the converter will be
242 	 * applied and then this method recursively called with the result. If the
243 	 * input is an array or collection each value will be written to a separate
244 	 * line (recursively calling this method for each value). If no converter is
245 	 * supplied the input object's toString method will be used.<br/>
246 	 * 
247 	 * @param items list of items to be written to output stream
248 	 * @throws Exception if the transformer or file output fail,
249 	 * WriterNotOpenException if the writer has not been initialized.
250 	 */
251     @Override
252 	public void write(List<? extends T> items) throws Exception {
253 
254 		if (!getOutputState().isInitialized()) {
255 			throw new WriterNotOpenException("Writer must be open before it can be written to");
256 		}
257 
258 		if (logger.isDebugEnabled()) {
259 			logger.debug("Writing to flat file with " + items.size() + " items.");
260 		}
261 
262 		OutputState state = getOutputState();
263 
264 		StringBuilder lines = new StringBuilder();
265 		int lineCount = 0;
266 		for (T item : items) {
267 			lines.append(lineAggregator.aggregate(item) + lineSeparator);
268 			lineCount++;
269 		}
270 		try {
271 			state.write(lines.toString());
272 		}
273 		catch (IOException e) {
274 			throw new WriteFailedException("Could not write data.  The file may be corrupt.", e);
275 		}
276 		state.linesWritten += lineCount;
277 	}
278 
279 	/**
280 	 * @see ItemStream#close()
281 	 */
282     @Override
283 	public void close() {
284 		if (state != null) {
285 			try {
286 				if (footerCallback != null && state.outputBufferedWriter != null) {
287 					footerCallback.writeFooter(state.outputBufferedWriter);
288 					state.outputBufferedWriter.flush();
289 				}
290 			}
291 			catch (IOException e) {
292 				throw new ItemStreamException("Failed to write footer before closing", e);
293 			}
294 			finally {
295 				state.close();
296 				if (state.linesWritten == 0 && shouldDeleteIfEmpty) {
297 					try {
298 						resource.getFile().delete();
299 					}
300 					catch (IOException e) {
301 						throw new ItemStreamException("Failed to delete empty file on close", e);
302 					}
303 				}
304 				state = null;
305 			}
306 		}
307 	}
308 
309 	/**
310 	 * Initialize the reader. This method may be called multiple times before
311 	 * close is called.
312 	 * 
313 	 * @see ItemStream#open(ExecutionContext)
314 	 */
315     @Override
316 	public void open(ExecutionContext executionContext) throws ItemStreamException {
317 
318 		Assert.notNull(resource, "The resource must be set");
319 
320 		if (!getOutputState().isInitialized()) {
321 			doOpen(executionContext);
322 		}
323 	}
324 
325 	private void doOpen(ExecutionContext executionContext) throws ItemStreamException {
326 		OutputState outputState = getOutputState();
327 		if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
328 			outputState.restoreFrom(executionContext);
329 		}
330 		try {
331 			outputState.initializeBufferedWriter();
332 		}
333 		catch (IOException ioe) {
334 			throw new ItemStreamException("Failed to initialize writer", ioe);
335 		}
336 		if (outputState.lastMarkedByteOffsetPosition == 0 && !outputState.appending) {
337 			if (headerCallback != null) {
338 				try {
339 					headerCallback.writeHeader(outputState.outputBufferedWriter);
340 					outputState.write(lineSeparator);
341 				}
342 				catch (IOException e) {
343 					throw new ItemStreamException("Could not write headers.  The file may be corrupt.", e);
344 				}
345 			}
346 		}
347 	}
348 
349 	/**
350 	 * @see ItemStream#update(ExecutionContext)
351 	 */
352     @Override
353 	public void update(ExecutionContext executionContext) {
354 		if (state == null) {
355 			throw new ItemStreamException("ItemStream not open or already closed.");
356 		}
357 
358 		Assert.notNull(executionContext, "ExecutionContext must not be null");
359 
360 		if (saveState) {
361 
362 			try {
363 				executionContext.putLong(getKey(RESTART_DATA_NAME), state.position());
364 			}
365 			catch (IOException e) {
366 				throw new ItemStreamException("ItemStream does not return current position properly", e);
367 			}
368 
369 			executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten);
370 		}
371 	}
372 
373 	// Returns object representing state.
374 	private OutputState getOutputState() {
375 		if (state == null) {
376 			File file;
377 			try {
378 				file = resource.getFile();
379 			}
380 			catch (IOException e) {
381 				throw new ItemStreamException("Could not convert resource to file: [" + resource + "]", e);
382 			}
383 			Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]");
384 			state = new OutputState();
385 			state.setDeleteIfExists(shouldDeleteIfExists);
386 			state.setAppendAllowed(append);
387 			state.setEncoding(encoding);
388 		}
389 		return (OutputState) state;
390 	}
391 
392 	/**
393 	 * Encapsulates the runtime state of the writer. All state changing
394 	 * operations on the writer go through this class.
395 	 */
396 	private class OutputState {
397 		// default encoding for writing to output files - set to UTF-8.
398 		private static final String DEFAULT_CHARSET = "UTF-8";
399 
400 		private FileOutputStream os;
401 
402 		// The bufferedWriter over the file channel that is actually written
403 		Writer outputBufferedWriter;
404 
405 		FileChannel fileChannel;
406 
407 		// this represents the charset encoding (if any is needed) for the
408 		// output file
409 		String encoding = DEFAULT_CHARSET;
410 
411 		boolean restarted = false;
412 
413 		long lastMarkedByteOffsetPosition = 0;
414 
415 		long linesWritten = 0;
416 
417 		boolean shouldDeleteIfExists = true;
418 
419 		boolean initialized = false;
420 
421 		private boolean append = false;
422 
423 		private boolean appending = false;
424 
425 		/**
426 		 * Return the byte offset position of the cursor in the output file as a
427 		 * long integer.
428 		 */
429 		public long position() throws IOException {
430 			long pos = 0;
431 
432 			if (fileChannel == null) {
433 				return 0;
434 			}
435 
436 			outputBufferedWriter.flush();
437 			pos = fileChannel.position();
438 			if (transactional) {
439 				pos += ((TransactionAwareBufferedWriter) outputBufferedWriter).getBufferSize();
440 			}
441 
442 			return pos;
443 
444 		}
445 
446 		/**
447 		 * @param append
448 		 */
449 		public void setAppendAllowed(boolean append) {
450 			this.append = append;
451 		}
452 
453 		/**
454 		 * @param executionContext
455 		 */
456 		public void restoreFrom(ExecutionContext executionContext) {
457 			lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
458 			restarted = true;
459 		}
460 
461 		/**
462 		 * @param shouldDeleteIfExists
463 		 */
464 		public void setDeleteIfExists(boolean shouldDeleteIfExists) {
465 			this.shouldDeleteIfExists = shouldDeleteIfExists;
466 		}
467 
468 		/**
469 		 * @param encoding
470 		 */
471 		public void setEncoding(String encoding) {
472 			this.encoding = encoding;
473 		}
474 
475 		/**
476 		 * Close the open resource and reset counters.
477 		 */
478 		public void close() {
479 
480 			initialized = false;
481 			restarted = false;
482 			try {
483 				if (outputBufferedWriter != null) {
484 					outputBufferedWriter.close();
485 				}
486 			}
487 			catch (IOException ioe) {
488 				throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
489 			}
490 			finally {
491 				if (!transactional) {
492 					closeStream();
493 				}
494 			}
495 		}
496 
497 		private void closeStream() {
498 			try {
499 				if (fileChannel != null) {
500 					fileChannel.close();
501 				}
502 			}
503 			catch (IOException ioe) {
504 				throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
505 			}
506 			finally {
507 				try {
508 					if (os != null) {
509 						os.close();
510 					}
511 				}
512 				catch (IOException ioe) {
513 					throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
514 				}
515 			}
516 		}
517 
518 		/**
519 		 * @param line
520 		 * @throws IOException
521 		 */
522 		public void write(String line) throws IOException {
523 			if (!initialized) {
524 				initializeBufferedWriter();
525 			}
526 
527 			outputBufferedWriter.write(line);
528 			outputBufferedWriter.flush();
529 		}
530 
531 		/**
532 		 * Truncate the output at the last known good point.
533 		 * 
534 		 * @throws IOException
535 		 */
536 		public void truncate() throws IOException {
537 			fileChannel.truncate(lastMarkedByteOffsetPosition);
538 			fileChannel.position(lastMarkedByteOffsetPosition);
539 		}
540 
541 		/**
542 		 * Creates the buffered writer for the output file channel based on
543 		 * configuration information.
544 		 * @throws IOException
545 		 */
546 		private void initializeBufferedWriter() throws IOException {
547 
548 			File file = resource.getFile();
549 			FileUtils.setUpOutputFile(file, restarted, append, shouldDeleteIfExists);
550 
551 			os = new FileOutputStream(file.getAbsolutePath(), true);
552 			fileChannel = os.getChannel();
553 
554 			outputBufferedWriter = getBufferedWriter(fileChannel, encoding);
555 			outputBufferedWriter.flush();
556 
557 			if (append) {
558 				// Bug in IO library? This doesn't work...
559 				// lastMarkedByteOffsetPosition = fileChannel.position();
560 				if (file.length() > 0) {
561 					appending = true;
562 					// Don't write the headers again
563 				}
564 			}
565 
566 			Assert.state(outputBufferedWriter != null);
567 			// in case of restarting reset position to last committed point
568 			if (restarted) {
569 				checkFileSize();
570 				truncate();
571 			}
572 
573 			initialized = true;
574 			linesWritten = 0;
575 		}
576 
577 		public boolean isInitialized() {
578 			return initialized;
579 		}
580 
581 		/**
582 		 * Returns the buffered writer opened to the beginning of the file
583 		 * specified by the absolute path name contained in absoluteFileName.
584 		 */
585 		private Writer getBufferedWriter(FileChannel fileChannel, String encoding) {
586 			try {
587 				final FileChannel channel = fileChannel;
588 				if (transactional) {
589 					TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() {
590                         @Override
591 						public void run() {
592 							closeStream();
593 						}
594 					});
595 					
596 					writer.setEncoding(encoding);
597 					return writer;
598 				}
599 				else {
600 					Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) {
601 						@Override
602 						public void flush() throws IOException {
603 							super.flush();
604 							if (forceSync) {
605 								channel.force(false);
606 							}
607 						}
608 					};
609 
610 					return new BufferedWriter(writer);
611 				}
612 			}
613 			catch (UnsupportedCharsetException ucse) {
614 				throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse);
615 			}
616 		}
617 
618 		/**
619 		 * Checks (on setState) to make sure that the current output file's size
620 		 * is not smaller than the last saved commit point. If it is, then the
621 		 * file has been damaged in some way and whole task must be started over
622 		 * again from the beginning.
623 		 * @throws IOException if there is an IO problem
624 		 */
625 		private void checkFileSize() throws IOException {
626 			long size = -1;
627 
628 			outputBufferedWriter.flush();
629 			size = fileChannel.size();
630 
631 			if (size < lastMarkedByteOffsetPosition) {
632 				throw new ItemStreamException("Current file size is smaller than size at last commit");
633 			}
634 		}
635 
636 	}
637 
638 }