EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.item.file]

COVERAGE SUMMARY FOR SOURCE FILE [FlatFileItemWriter.java]

nameclass, %method, %block, %line, %
FlatFileItemWriter.java80%  (4/5)100% (46/46)86%  (674/786)88%  (183.8/210)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FlatFileItemWriter100% (1/1)100% (24/24)83%  (356/429)86%  (99.7/116)
update (ExecutionContext): void 100% (1/1)69%  (27/39)70%  (7/10)
close (): void 100% (1/1)72%  (46/64)73%  (11.7/16)
doOpen (ExecutionContext): void 100% (1/1)73%  (37/51)73%  (11/15)
getOutputState (): FlatFileItemWriter$OutputState 100% (1/1)76%  (54/71)82%  (9/11)
write (List): void 100% (1/1)85%  (68/80)81%  (13/16)
<static initializer> 100% (1/1)100% (7/7)100% (2/2)
FlatFileItemWriter (): void 100% (1/1)100% (34/34)100% (12/12)
access$300 (FlatFileItemWriter): boolean 100% (1/1)100% (3/3)100% (1/1)
access$400 (FlatFileItemWriter): Resource 100% (1/1)100% (3/3)100% (1/1)
access$500 (FlatFileItemWriter): boolean 100% (1/1)100% (3/3)100% (1/1)
afterPropertiesSet (): void 100% (1/1)100% (11/11)100% (4/4)
open (ExecutionContext): void 100% (1/1)100% (12/12)100% (4/4)
setAppendAllowed (boolean): void 100% (1/1)100% (7/7)100% (3/3)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
setFooterCallback (FlatFileFooterCallback): void 100% (1/1)100% (4/4)100% (2/2)
setForceSync (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setHeaderCallback (FlatFileHeaderCallback): void 100% (1/1)100% (4/4)100% (2/2)
setLineAggregator (LineAggregator): void 100% (1/1)100% (4/4)100% (2/2)
setLineSeparator (String): void 100% (1/1)100% (4/4)100% (2/2)
setResource (Resource): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setShouldDeleteIfEmpty (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setShouldDeleteIfExists (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setTransactional (boolean): void 100% (1/1)100% (4/4)100% (2/2)
     
class FlatFileItemWriter$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class FlatFileItemWriter$OutputState100% (1/1)100% (18/18)88%  (286/325)89%  (78.1/88)
closeStream (): void 100% (1/1)50%  (18/36)66%  (8.6/13)
close (): void 100% (1/1)68%  (23/34)78%  (8.6/11)
checkFileSize (): void 100% (1/1)75%  (15/20)83%  (5/6)
write (String): void 100% (1/1)85%  (11/13)80%  (4/5)
position (): long 100% (1/1)93%  (25/27)88%  (7/8)
initializeBufferedWriter (): void 100% (1/1)99%  (68/69)99%  (15.9/16)
FlatFileItemWriter$OutputState (FlatFileItemWriter): void 100% (1/1)100% (30/30)100% (9/9)
FlatFileItemWriter$OutputState (FlatFileItemWriter, FlatFileItemWriter$1): void 100% (1/1)100% (4/4)100% (1/1)
access$000 (FlatFileItemWriter$OutputState): void 100% (1/1)100% (3/3)100% (1/1)
access$100 (FlatFileItemWriter$OutputState): boolean 100% (1/1)100% (3/3)100% (1/1)
access$600 (FlatFileItemWriter$OutputState): void 100% (1/1)100% (3/3)100% (1/1)
getBufferedWriter (FileChannel, String): Writer 100% (1/1)100% (43/43)100% (7/7)
isInitialized (): boolean 100% (1/1)100% (3/3)100% (1/1)
restoreFrom (ExecutionContext): void 100% (1/1)100% (12/12)100% (3/3)
setAppendAllowed (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setDeleteIfExists (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
truncate (): void 100% (1/1)100% (13/13)100% (3/3)
     
class FlatFileItemWriter$OutputState$1100% (1/1)100% (2/2)100% (22/22)100% (5/5)
FlatFileItemWriter$OutputState$1 (FlatFileItemWriter$OutputState, Writer, Fil... 100% (1/1)100% (10/10)100% (1/1)
flush (): void 100% (1/1)100% (12/12)100% (4/4)
     
class FlatFileItemWriter$OutputState$2100% (1/1)100% (2/2)100% (10/10)100% (3/3)
FlatFileItemWriter$OutputState$2 (FlatFileItemWriter$OutputState): void 100% (1/1)100% (6/6)100% (1/1)
run (): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.file;
18 
19import java.io.BufferedWriter;
20import java.io.File;
21import java.io.FileOutputStream;
22import java.io.IOException;
23import java.io.Writer;
24import java.nio.channels.Channels;
25import java.nio.channels.FileChannel;
26import java.nio.charset.UnsupportedCharsetException;
27import java.util.List;
28 
29import org.apache.commons.logging.Log;
30import org.apache.commons.logging.LogFactory;
31import org.springframework.batch.item.ExecutionContext;
32import org.springframework.batch.item.ItemStream;
33import org.springframework.batch.item.ItemStreamException;
34import org.springframework.batch.item.WriteFailedException;
35import org.springframework.batch.item.WriterNotOpenException;
36import org.springframework.batch.item.file.transform.LineAggregator;
37import org.springframework.batch.item.util.ExecutionContextUserSupport;
38import org.springframework.batch.item.util.FileUtils;
39import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
40import org.springframework.beans.factory.InitializingBean;
41import org.springframework.core.io.Resource;
42import org.springframework.util.Assert;
43import org.springframework.util.ClassUtils;
44 
45/**
46 * This class is an item writer that writes data to a file or stream. The writer
47 * also provides restart. The location of the output file is defined by a
48 * {@link Resource} and must represent a writable file.<br/>
49 * 
50 * Uses buffered writer to improve performance.<br/>
51 * 
52 * The implementation is *not* thread-safe.
53 * 
54 * @author Waseem Malik
55 * @author Tomas Slanina
56 * @author Robert Kasanicky
57 * @author Dave Syer
58 */
59public class FlatFileItemWriter<T> extends ExecutionContextUserSupport implements ResourceAwareItemWriterItemStream<T>,
60                InitializingBean {
61 
62        private static final boolean DEFAULT_TRANSACTIONAL = true;
63 
64        protected static final Log logger = LogFactory.getLog(FlatFileItemWriter.class);
65 
66        private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
67 
68        private static final String WRITTEN_STATISTICS_NAME = "written";
69 
70        private static final String RESTART_DATA_NAME = "current.count";
71 
72        private Resource resource;
73 
74        private OutputState state = null;
75 
76        private LineAggregator<T> lineAggregator;
77 
78        private boolean saveState = true;
79 
80        private boolean forceSync = false;
81 
82        private boolean shouldDeleteIfExists = true;
83 
84        private boolean shouldDeleteIfEmpty = false;
85 
86        private String encoding = OutputState.DEFAULT_CHARSET;
87 
88        private FlatFileHeaderCallback headerCallback;
89 
90        private FlatFileFooterCallback footerCallback;
91 
92        private String lineSeparator = DEFAULT_LINE_SEPARATOR;
93 
94        private boolean transactional = DEFAULT_TRANSACTIONAL;
95 
96        private boolean append = false;
97 
98        public FlatFileItemWriter() {
99                setName(ClassUtils.getShortName(FlatFileItemWriter.class));
100        }
101 
102        /**
103         * Assert that mandatory properties (lineAggregator) are set.
104         * 
105         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
106         */
107        public void afterPropertiesSet() throws Exception {
108                Assert.notNull(lineAggregator, "A LineAggregator must be provided.");
109                if (append) {
110                        shouldDeleteIfExists = false;
111                }
112        }
113 
114        /**
115         * Flag to indicate that changes should be force-synced to disk on flush.
116         * Defaults to false, which means that even with a local disk changes could
117         * be lost if the OS crashes in between a write and a cache flush. Setting
118         * to true may result in slower performance for usage patterns involving many
119         * frequent writes.
120         * 
121         * @param forceSync the flag value to set
122         */
123        public void setForceSync(boolean forceSync) {
124                this.forceSync = forceSync;
125        }
126 
127        /**
128         * Public setter for the line separator. Defaults to the System property
129         * line.separator.
130         * @param lineSeparator the line separator to set
131         */
132        public void setLineSeparator(String lineSeparator) {
133                this.lineSeparator = lineSeparator;
134        }
135 
136        /**
137         * Public setter for the {@link LineAggregator}. This will be used to
138         * translate the item into a line for output.
139         * 
140         * @param lineAggregator the {@link LineAggregator} to set
141         */
142        public void setLineAggregator(LineAggregator<T> lineAggregator) {
143                this.lineAggregator = lineAggregator;
144        }
145 
146        /**
147         * Setter for resource. Represents a file that can be written.
148         * 
149         * @param resource
150         */
151        public void setResource(Resource resource) {
152                this.resource = resource;
153        }
154 
155        /**
156         * Sets encoding for output template.
157         */
158        public void setEncoding(String newEncoding) {
159                this.encoding = newEncoding;
160        }
161 
162        /**
163         * Flag to indicate that the target file should be deleted if it already
164         * exists, otherwise it will be created. Defaults to true, so no appending
165         * except on restart. If set to false and {@link #setAppendAllowed(boolean)
166         * appendAllowed} is also false then there will be an exception when the
167         * stream is opened to prevent existing data being potentially corrupted.
168         * 
169         * @param shouldDeleteIfExists the flag value to set
170         */
171        public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) {
172                this.shouldDeleteIfExists = shouldDeleteIfExists;
173        }
174 
175        /**
176         * Flag to indicate that the target file should be appended if it already
177         * exists. If this flag is set then the flag
178         * {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is
179         * automatically set to false, so that flag should not be set explicitly.
180         * Defaults value is false.
181         * 
182         * @param append the flag value to set
183         */
184        public void setAppendAllowed(boolean append) {
185                this.append = append;
186                this.shouldDeleteIfExists = false;
187        }
188 
189        /**
190         * Flag to indicate that the target file should be deleted if no lines have
191         * been written (other than header and footer) on close. Defaults to false.
192         * 
193         * @param shouldDeleteIfEmpty the flag value to set
194         */
195        public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) {
196                this.shouldDeleteIfEmpty = shouldDeleteIfEmpty;
197        }
198 
199        /**
200         * Set the flag indicating whether or not state should be saved in the
201         * provided {@link ExecutionContext} during the {@link ItemStream} call to
202         * update. Setting this to false means that it will always start at the
203         * beginning on a restart.
204         * 
205         * @param saveState
206         */
207        public void setSaveState(boolean saveState) {
208                this.saveState = saveState;
209        }
210 
211        /**
212         * headerCallback will be called before writing the first item to file.
213         * Newline will be automatically appended after the header is written.
214         */
215        public void setHeaderCallback(FlatFileHeaderCallback headerCallback) {
216                this.headerCallback = headerCallback;
217        }
218 
219        /**
220         * footerCallback will be called after writing the last item to file, but
221         * before the file is closed.
222         */
223        public void setFooterCallback(FlatFileFooterCallback footerCallback) {
224                this.footerCallback = footerCallback;
225        }
226 
227        /**
228         * Flag to indicate that writing to the buffer should be delayed if a
229         * transaction is active. Defaults to true.
230         */
231        public void setTransactional(boolean transactional) {
232                this.transactional = transactional;
233        }
234 
235        /**
236         * Writes out a string followed by a "new line", where the format of the new
237         * line separator is determined by the underlying operating system. If the
238         * input is not a String and a converter is available the converter will be
239         * applied and then this method recursively called with the result. If the
240         * input is an array or collection each value will be written to a separate
241         * line (recursively calling this method for each value). If no converter is
242         * supplied the input object's toString method will be used.<br/>
243         * 
244         * @param items list of items to be written to output stream
245         * @throws Exception if the transformer or file output fail,
246         * WriterNotOpenException if the writer has not been initialized.
247         */
248        public void write(List<? extends T> items) throws Exception {
249 
250                if (!getOutputState().isInitialized()) {
251                        throw new WriterNotOpenException("Writer must be open before it can be written to");
252                }
253 
254                if (logger.isDebugEnabled()) {
255                        logger.debug("Writing to flat file with " + items.size() + " items.");
256                }
257 
258                OutputState state = getOutputState();
259 
260                StringBuilder lines = new StringBuilder();
261                int lineCount = 0;
262                for (T item : items) {
263                        lines.append(lineAggregator.aggregate(item) + lineSeparator);
264                        lineCount++;
265                }
266                try {
267                        state.write(lines.toString());
268                }
269                catch (IOException e) {
270                        throw new WriteFailedException("Could not write data.  The file may be corrupt.", e);
271                }
272                state.linesWritten += lineCount;
273        }
274 
275        /**
276         * @see ItemStream#close()
277         */
278        public void close() {
279                if (state != null) {
280                        try {
281                                if (footerCallback != null && state.outputBufferedWriter != null) {
282                                        footerCallback.writeFooter(state.outputBufferedWriter);
283                                        state.outputBufferedWriter.flush();
284                                }
285                        }
286                        catch (IOException e) {
287                                throw new ItemStreamException("Failed to write footer before closing", e);
288                        }
289                        finally {
290                                state.close();
291                                if (state.linesWritten == 0 && shouldDeleteIfEmpty) {
292                                        try {
293                                                resource.getFile().delete();
294                                        }
295                                        catch (IOException e) {
296                                                throw new ItemStreamException("Failed to delete empty file on close", e);
297                                        }
298                                }
299                                state = null;
300                        }
301                }
302        }
303 
304        /**
305         * Initialize the reader. This method may be called multiple times before
306         * close is called.
307         * 
308         * @see ItemStream#open(ExecutionContext)
309         */
310        public void open(ExecutionContext executionContext) throws ItemStreamException {
311 
312                Assert.notNull(resource, "The resource must be set");
313 
314                if (!getOutputState().isInitialized()) {
315                        doOpen(executionContext);
316                }
317        }
318 
319        private void doOpen(ExecutionContext executionContext) throws ItemStreamException {
320                OutputState outputState = getOutputState();
321                if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
322                        outputState.restoreFrom(executionContext);
323                }
324                try {
325                        outputState.initializeBufferedWriter();
326                }
327                catch (IOException ioe) {
328                        throw new ItemStreamException("Failed to initialize writer", ioe);
329                }
330                if (outputState.lastMarkedByteOffsetPosition == 0 && !outputState.appending) {
331                        if (headerCallback != null) {
332                                try {
333                                        headerCallback.writeHeader(outputState.outputBufferedWriter);
334                                        outputState.write(lineSeparator);
335                                }
336                                catch (IOException e) {
337                                        throw new ItemStreamException("Could not write headers.  The file may be corrupt.", e);
338                                }
339                        }
340                }
341        }
342 
343        /**
344         * @see ItemStream#update(ExecutionContext)
345         */
346        public void update(ExecutionContext executionContext) {
347                if (state == null) {
348                        throw new ItemStreamException("ItemStream not open or already closed.");
349                }
350 
351                Assert.notNull(executionContext, "ExecutionContext must not be null");
352 
353                if (saveState) {
354 
355                        try {
356                                executionContext.putLong(getKey(RESTART_DATA_NAME), state.position());
357                        }
358                        catch (IOException e) {
359                                throw new ItemStreamException("ItemStream does not return current position properly", e);
360                        }
361 
362                        executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten);
363                }
364        }
365 
366        // Returns object representing state.
367        private OutputState getOutputState() {
368                if (state == null) {
369                        File file;
370                        try {
371                                file = resource.getFile();
372                        }
373                        catch (IOException e) {
374                                throw new ItemStreamException("Could not convert resource to file: [" + resource + "]", e);
375                        }
376                        Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]");
377                        state = new OutputState();
378                        state.setDeleteIfExists(shouldDeleteIfExists);
379                        state.setAppendAllowed(append);
380                        state.setEncoding(encoding);
381                }
382                return (OutputState) state;
383        }
384 
385        /**
386         * Encapsulates the runtime state of the writer. All state changing
387         * operations on the writer go through this class.
388         */
389        private class OutputState {
390                // default encoding for writing to output files - set to UTF-8.
391                private static final String DEFAULT_CHARSET = "UTF-8";
392 
393                private FileOutputStream os;
394 
395                // The bufferedWriter over the file channel that is actually written
396                Writer outputBufferedWriter;
397 
398                FileChannel fileChannel;
399 
400                // this represents the charset encoding (if any is needed) for the
401                // output file
402                String encoding = DEFAULT_CHARSET;
403 
404                boolean restarted = false;
405 
406                long lastMarkedByteOffsetPosition = 0;
407 
408                long linesWritten = 0;
409 
410                boolean shouldDeleteIfExists = true;
411 
412                boolean initialized = false;
413 
414                private boolean append = false;
415 
416                private boolean appending = false;
417 
418                /**
419                 * Return the byte offset position of the cursor in the output file as a
420                 * long integer.
421                 */
422                public long position() throws IOException {
423                        long pos = 0;
424 
425                        if (fileChannel == null) {
426                                return 0;
427                        }
428 
429                        outputBufferedWriter.flush();
430                        pos = fileChannel.position();
431                        if (transactional) {
432                                pos += ((TransactionAwareBufferedWriter) outputBufferedWriter).getBufferSize();
433                        }
434 
435                        return pos;
436 
437                }
438 
439                /**
440                 * @param append
441                 */
442                public void setAppendAllowed(boolean append) {
443                        this.append = append;
444                }
445 
446                /**
447                 * @param executionContext
448                 */
449                public void restoreFrom(ExecutionContext executionContext) {
450                        lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
451                        restarted = true;
452                }
453 
454                /**
455                 * @param shouldDeleteIfExists
456                 */
457                public void setDeleteIfExists(boolean shouldDeleteIfExists) {
458                        this.shouldDeleteIfExists = shouldDeleteIfExists;
459                }
460 
461                /**
462                 * @param encoding
463                 */
464                public void setEncoding(String encoding) {
465                        this.encoding = encoding;
466                }
467 
468                /**
469                 * Close the open resource and reset counters.
470                 */
471                public void close() {
472 
473                        initialized = false;
474                        restarted = false;
475                        try {
476                                if (outputBufferedWriter != null) {
477                                        outputBufferedWriter.close();
478                                }
479                        }
480                        catch (IOException ioe) {
481                                throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
482                        }
483                        finally {
484                                if (!transactional) {
485                                        closeStream();
486                                }
487                        }
488                }
489 
490                private void closeStream() {
491                        try {
492                                if (fileChannel != null) {
493                                        fileChannel.close();
494                                }
495                        }
496                        catch (IOException ioe) {
497                                throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
498                        }
499                        finally {
500                                try {
501                                        if (os != null) {
502                                                os.close();
503                                        }
504                                }
505                                catch (IOException ioe) {
506                                        throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
507                                }
508                        }
509                }
510 
511                /**
512                 * @param line
513                 * @throws IOException
514                 */
515                public void write(String line) throws IOException {
516                        if (!initialized) {
517                                initializeBufferedWriter();
518                        }
519 
520                        outputBufferedWriter.write(line);
521                        outputBufferedWriter.flush();
522                }
523 
524                /**
525                 * Truncate the output at the last known good point.
526                 * 
527                 * @throws IOException
528                 */
529                public void truncate() throws IOException {
530                        fileChannel.truncate(lastMarkedByteOffsetPosition);
531                        fileChannel.position(lastMarkedByteOffsetPosition);
532                }
533 
534                /**
535                 * Creates the buffered writer for the output file channel based on
536                 * configuration information.
537                 * @throws IOException
538                 */
539                private void initializeBufferedWriter() throws IOException {
540 
541                        File file = resource.getFile();
542                        FileUtils.setUpOutputFile(file, restarted, append, shouldDeleteIfExists);
543 
544                        os = new FileOutputStream(file.getAbsolutePath(), true);
545                        fileChannel = os.getChannel();
546 
547                        outputBufferedWriter = getBufferedWriter(fileChannel, encoding);
548                        outputBufferedWriter.flush();
549 
550                        if (append) {
551                                // Bug in IO library? This doesn't work...
552                                // lastMarkedByteOffsetPosition = fileChannel.position();
553                                if (file.length() > 0) {
554                                        appending = true;
555                                        // Don't write the headers again
556                                }
557                        }
558 
559                        Assert.state(outputBufferedWriter != null);
560                        // in case of restarting reset position to last committed point
561                        if (restarted) {
562                                checkFileSize();
563                                truncate();
564                        }
565 
566                        initialized = true;
567                        linesWritten = 0;
568                }
569 
570                public boolean isInitialized() {
571                        return initialized;
572                }
573 
574                /**
575                 * Returns the buffered writer opened to the beginning of the file
576                 * specified by the absolute path name contained in absoluteFileName.
577                 */
578                private Writer getBufferedWriter(FileChannel fileChannel, String encoding) {
579                        try {
580                                final FileChannel channel = fileChannel;
581                                Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) {
582                                        @Override
583                                        public void flush() throws IOException {
584                                                super.flush();
585                                                if (forceSync) {
586                                                        channel.force(false);
587                                                }
588                                        }
589                                };
590                                if (transactional) {
591                                        return new TransactionAwareBufferedWriter(writer, new Runnable() {
592                                                public void run() {
593                                                        closeStream();
594                                                }
595                                        });
596                                }
597                                else {
598                                        return new BufferedWriter(writer);
599                                }
600                        }
601                        catch (UnsupportedCharsetException ucse) {
602                                throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse);
603                        }
604                }
605 
606                /**
607                 * Checks (on setState) to make sure that the current output file's size
608                 * is not smaller than the last saved commit point. If it is, then the
609                 * file has been damaged in some way and whole task must be started over
610                 * again from the beginning.
611                 * @throws IOException if there is an IO problem
612                 */
613                private void checkFileSize() throws IOException {
614                        long size = -1;
615 
616                        outputBufferedWriter.flush();
617                        size = fileChannel.size();
618 
619                        if (size < lastMarkedByteOffsetPosition) {
620                                throw new ItemStreamException("Current file size is smaller than size at last commit");
621                        }
622                }
623 
624        }
625 
626}

[all classes][org.springframework.batch.item.file]
EMMA 2.0.5312 (C) Vladimir Roubtsov