EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.item.file]

COVERAGE SUMMARY FOR SOURCE FILE [FlatFileItemWriter.java]

nameclass, %method, %block, %line, %
FlatFileItemWriter.java67%  (2/3)91%  (32/35)84%  (514/615)84%  (147/175)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FlatFileItemWriter$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class FlatFileItemWriter$OutputState100% (1/1)100% (15/15)83%  (220/264)85%  (68/80)
mark (): void 100% (1/1)46%  (6/13)60%  (3/5)
reset (): void 100% (1/1)53%  (8/15)67%  (4/6)
checkFileSize (): void 100% (1/1)57%  (16/28)67%  (6/9)
close (): void 100% (1/1)72%  (18/25)80%  (8/10)
getBufferedWriter (FileChannel, String, int): BufferedWriter 100% (1/1)81%  (29/36)86%  (6/7)
position (): long 100% (1/1)88%  (14/16)83%  (5/6)
write (String): void 100% (1/1)89%  (17/19)83%  (5/6)
FlatFileItemWriter$OutputState (FlatFileItemWriter): void 100% (1/1)100% (30/30)100% (9/9)
initializeBufferedWriter (): void 100% (1/1)100% (42/42)100% (9/9)
isInitialized (): boolean 100% (1/1)100% (3/3)100% (1/1)
restoreFrom (ExecutionContext): void 100% (1/1)100% (12/12)100% (3/3)
setBufferSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setDeleteIfExists (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
truncate (): void 100% (1/1)100% (13/13)100% (3/3)
     
class FlatFileItemWriter100% (1/1)85%  (17/20)84%  (294/351)83%  (79/95)
setBufferSize (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
setLineAggregator (LineAggregator): void 0%   (0/1)0%   (0/4)0%   (0/2)
setShouldDeleteIfExists (boolean): void 0%   (0/1)0%   (0/4)0%   (0/2)
flush (): void 100% (1/1)64%  (25/39)82%  (9/11)
update (ExecutionContext): void 100% (1/1)74%  (35/47)73%  (8/11)
write (Object): void 100% (1/1)84%  (27/32)83%  (5/6)
doOpen (ExecutionContext): void 100% (1/1)87%  (46/53)85%  (11/13)
getOutputState (): FlatFileItemWriter$OutputState 100% (1/1)89%  (54/61)82%  (9/11)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
FlatFileItemWriter (): void 100% (1/1)100% (47/47)100% (12/12)
afterPropertiesSet (): void 100% (1/1)100% (5/5)100% (2/2)
clear (): void 100% (1/1)100% (4/4)100% (2/2)
close (ExecutionContext): void 100% (1/1)100% (10/10)100% (4/4)
open (ExecutionContext): void 100% (1/1)100% (12/12)100% (4/4)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
setFieldSetCreator (FieldSetCreator): void 100% (1/1)100% (4/4)100% (2/2)
setHeaderLines (String []): void 100% (1/1)100% (5/5)100% (2/2)
setLineSeparator (String): void 100% (1/1)100% (4/4)100% (2/2)
setResource (Resource): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.file;
18 
19import java.io.BufferedWriter;
20import java.io.File;
21import java.io.FileOutputStream;
22import java.io.IOException;
23import java.nio.channels.Channels;
24import java.nio.channels.FileChannel;
25import java.nio.charset.UnsupportedCharsetException;
26import java.util.ArrayList;
27import java.util.Arrays;
28import java.util.Iterator;
29import java.util.List;
30 
31import org.springframework.batch.item.ClearFailedException;
32import org.springframework.batch.item.ExecutionContext;
33import org.springframework.batch.item.FlushFailedException;
34import org.springframework.batch.item.ItemStream;
35import org.springframework.batch.item.ItemStreamException;
36import org.springframework.batch.item.ItemWriter;
37import org.springframework.batch.item.MarkFailedException;
38import org.springframework.batch.item.ResetFailedException;
39import org.springframework.batch.item.WriterNotOpenException;
40import org.springframework.batch.item.file.mapping.FieldSet;
41import org.springframework.batch.item.file.mapping.FieldSetCreator;
42import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
43import org.springframework.batch.item.file.transform.LineAggregator;
44import org.springframework.batch.item.util.ExecutionContextUserSupport;
45import org.springframework.batch.item.util.FileUtils;
46import org.springframework.beans.factory.InitializingBean;
47import org.springframework.core.io.Resource;
48import org.springframework.util.Assert;
49import org.springframework.util.ClassUtils;
50 
51/**
52 * This class is an item writer that writes data to a file or stream. The writer
53 * also provides restart. The location of the output file is defined by a
54 * {@link Resource} and must represent a writable file.<br/>
55 * 
56 * Uses buffered writer to improve performance.<br/>
57 * 
58 * <p>
59 * Output lines are buffered until {@link #flush()} is called and only then the
60 * actual writing to file occurs.
61 * </p>
62 * 
63 * The implementation is *not* thread-safe.
64 * 
65 * @author Waseem Malik
66 * @author Tomas Slanina
67 * @author Robert Kasanicky
68 * @author Dave Syer
69 */
70public class FlatFileItemWriter extends ExecutionContextUserSupport implements ItemWriter, ItemStream, InitializingBean {
71 
72        private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
73 
74        private static final String WRITTEN_STATISTICS_NAME = "written";
75 
76        private static final String RESTART_COUNT_STATISTICS_NAME = "restart.count";
77 
78        private static final String RESTART_DATA_NAME = "current.count";
79 
80        private Resource resource;
81 
82        private OutputState state = null;
83 
84        private LineAggregator lineAggregator = new DelimitedLineAggregator();
85 
86        private FieldSetCreator fieldSetCreator;
87 
88        private boolean saveState = true;
89 
90        private boolean shouldDeleteIfExists = true;
91 
92        private String encoding = OutputState.DEFAULT_CHARSET;
93 
94        private int bufferSize = OutputState.DEFAULT_BUFFER_SIZE;
95 
96        private List lineBuffer = new ArrayList();
97 
98        private List headerLines = new ArrayList();
99 
100        private String lineSeparator = DEFAULT_LINE_SEPARATOR;
101        
102        public FlatFileItemWriter() {
103                setName(ClassUtils.getShortName(FlatFileItemWriter.class));
104        }
105 
106        /**
107         * Assert that mandatory properties (resource) are set.
108         * 
109         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
110         */
111        public void afterPropertiesSet() throws Exception {
112                Assert.notNull(fieldSetCreator, "A FieldSetCreator must be provided.");
113        }
114 
115        /**
116         * Public setter for the line separator. Defaults to the System property
117         * line.separator.
118         * @param lineSeparator the line separator to set
119         */
120        public void setLineSeparator(String lineSeparator) {
121                this.lineSeparator = lineSeparator;
122        }
123 
124        /**
125         * Public setter for the {@link LineAggregator}. This will be used to
126         * translate a {@link FieldSet} into a line for output.
127         * 
128         * @param lineAggregator the {@link LineAggregator} to set
129         */
130        public void setLineAggregator(LineAggregator lineAggregator) {
131                this.lineAggregator = lineAggregator;
132        }
133 
134        /**
135         * Public setter for the {@link FieldSetCreator}. This will be used to
136         * transform the item into a {@link FieldSet} before it is aggregated by the
137         * {@link LineAggregator}.
138         * 
139         * @param fieldSetCreator the {@link FieldSetCreator} to set
140         */
141        public void setFieldSetCreator(FieldSetCreator fieldSetCreator) {
142                this.fieldSetCreator = fieldSetCreator;
143        }
144 
145        /**
146         * Setter for resource. Represents a file that can be written.
147         * 
148         * @param resource
149         */
150        public void setResource(Resource resource) {
151                this.resource = resource;
152        }
153 
154        /**
155         * Sets encoding for output template.
156         */
157        public void setEncoding(String newEncoding) {
158                this.encoding = newEncoding;
159        }
160 
161        /**
162         * Sets buffer size for output template
163         */
164        public void setBufferSize(int newSize) {
165                this.bufferSize = newSize;
166        }
167 
168        /**
169         * @param shouldDeleteIfExists the shouldDeleteIfExists to set
170         */
171        public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) {
172                this.shouldDeleteIfExists = shouldDeleteIfExists;
173        }
174 
175        /**
176         * Set the flag indicating whether or not state should be saved in the
177         * provided {@link ExecutionContext} during the {@link ItemStream} call to
178         * update. Setting this to false means that it will always start at the
179         * beginning on a restart.
180         * 
181         * @param saveState
182         */
183        public void setSaveState(boolean saveState) {
184                this.saveState = saveState;
185        }
186 
187        /**
188         * Public setter for the header lines. These will be output at the head of
189         * the file before any calls to {@link #write(Object)} (and not on restart
190         * unless the restart is after a failure before the first flush).
191         * 
192         * @param headerLines the header lines to set
193         */
194        public void setHeaderLines(String[] headerLines) {
195                this.headerLines = Arrays.asList(headerLines);
196        }
197 
198        /**
199         * Writes out a string followed by a "new line", where the format of the new
200         * line separator is determined by the underlying operating system. If the
201         * input is not a String and a converter is available the converter will be
202         * applied and then this method recursively called with the result. If the
203         * input is an array or collection each value will be written to a separate
204         * line (recursively calling this method for each value). If no converter is
205         * supplied the input object's toString method will be used.<br/>
206         * 
207         * @param data Object (a String or Object that can be converted) to be
208         * written to output stream
209         * @throws Exception if the transformer or file output fail, WriterNotOpenException
210         * if the writer has not been initialized.
211         */
212        public void write(Object data) throws Exception {
213                if(getOutputState().isInitialized()){
214                        FieldSet fieldSet = fieldSetCreator.mapItem(data);
215                        lineBuffer.add(lineAggregator.aggregate(fieldSet) + lineSeparator);
216                }
217                else{
218                        throw new WriterNotOpenException("Writer must be open before it can be written to");
219                }
220        }
221 
222        /**
223         * @see ItemStream#close(ExecutionContext)
224         */
225        public void close(ExecutionContext executionContext) {
226                if (state != null) {
227                        getOutputState().close();
228                        state = null;
229                }
230        }
231 
232        /**
233         * Initialize the reader.  This method may be called multiple times before close is
234         * called.
235         * 
236         * @see ItemStream#open(ExecutionContext)
237         */
238        public void open(ExecutionContext executionContext) throws ItemStreamException {
239 
240                Assert.notNull(resource, "The resource must be set");
241                
242                if(!getOutputState().isInitialized()){
243                        doOpen(executionContext);
244                }
245        }
246        
247        private void doOpen(ExecutionContext executionContext){
248                OutputState outputState = getOutputState();
249                if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
250                        outputState.restoreFrom(executionContext);
251                }
252                try {
253                        outputState.initializeBufferedWriter();
254                }
255                catch (IOException ioe) {
256                        throw new ItemStreamException("Failed to initialize writer", ioe);
257                }
258                if (outputState.lastMarkedByteOffsetPosition == 0) {
259                        for (Iterator iterator = headerLines.iterator(); iterator.hasNext();) {
260                                String line = (String) iterator.next();
261                                lineBuffer.add(line + lineSeparator);
262                        }
263                }
264        }
265 
266        /**
267         * @see ItemStream#update(ExecutionContext)
268         */
269        public void update(ExecutionContext executionContext) {
270                if (state == null) {
271                        throw new ItemStreamException("ItemStream not open or already closed.");
272                }
273 
274                Assert.notNull(executionContext, "ExecutionContext must not be null");
275 
276                if (saveState) {
277 
278                        try {
279                                executionContext.putLong(getKey(RESTART_DATA_NAME), state.position());
280                        }
281                        catch (IOException e) {
282                                throw new ItemStreamException("ItemStream does not return current position properly", e);
283                        }
284 
285                        executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten);
286                        executionContext.putLong(getKey(RESTART_COUNT_STATISTICS_NAME), state.restartCount);
287                }
288        }
289 
290        public void flush() throws FlushFailedException {
291                OutputState state = getOutputState();
292                for (Iterator iterator = lineBuffer.listIterator(); iterator.hasNext();) {
293                        String line = (String) iterator.next();
294                        try {
295                                state.write(line);
296                        }
297                        catch (IOException e) {
298                                throw new FlushFailedException("Failed to write line to output file: " + line, e);
299                        }
300                }
301                lineBuffer.clear();
302                state.mark();
303        }
304 
305        // Returns object representing state.
306        private OutputState getOutputState() {
307                if (state == null) {
308                        try {
309                                File file = resource.getFile();
310                                Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]");
311                        }
312                        catch (IOException e) {
313                                throw new ItemStreamException("Could not test resource for writable status.", e);
314                        }
315                        state = new OutputState();
316                        state.setDeleteIfExists(shouldDeleteIfExists);
317                        state.setBufferSize(bufferSize);
318                        state.setEncoding(encoding);
319                }
320                return (OutputState) state;
321        }
322 
323        /**
324         * Encapsulates the runtime state of the writer. All state changing
325         * operations on the writer go through this class.
326         */
327        private class OutputState {
328                // default encoding for writing to output files - set to UTF-8.
329                private static final String DEFAULT_CHARSET = "UTF-8";
330 
331                private static final int DEFAULT_BUFFER_SIZE = 2048;
332 
333                // The bufferedWriter over the file channel that is actually written
334                BufferedWriter outputBufferedWriter;
335 
336                FileChannel fileChannel;
337 
338                // this represents the charset encoding (if any is needed) for the
339                // output file
340                String encoding = DEFAULT_CHARSET;
341 
342                // Optional write buffer size
343                int bufferSize = DEFAULT_BUFFER_SIZE;
344 
345                boolean restarted = false;
346 
347                long lastMarkedByteOffsetPosition = 0;
348 
349                long linesWritten = 0;
350 
351                long restartCount = 0;
352 
353                boolean shouldDeleteIfExists = true;
354                
355                boolean initialized = false;
356 
357                /**
358                 * Return the byte offset position of the cursor in the output file as a
359                 * long integer.
360                 */
361                public long position() throws IOException {
362                        long pos = 0;
363 
364                        if (fileChannel == null) {
365                                return 0;
366                        }
367 
368                        outputBufferedWriter.flush();
369                        pos = fileChannel.position();
370 
371                        return pos;
372 
373                }
374 
375                /**
376                 * @param executionContext
377                 */
378                public void restoreFrom(ExecutionContext executionContext) {
379                        lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
380                        restarted = true;
381                }
382 
383                /**
384                 * @param shouldDeleteIfExists
385                 */
386                public void setDeleteIfExists(boolean shouldDeleteIfExists) {
387                        this.shouldDeleteIfExists = shouldDeleteIfExists;
388                }
389 
390                /**
391                 * @param bufferSize
392                 */
393                public void setBufferSize(int bufferSize) {
394                        this.bufferSize = bufferSize;
395                }
396 
397                /**
398                 * @param encoding
399                 */
400                public void setEncoding(String encoding) {
401                        this.encoding = encoding;
402                }
403 
404                /**
405                 * Close the open resource and reset counters.
406                 */
407                public void close() {
408                        initialized = false;
409                        restarted = false;
410                        try {
411                                if (outputBufferedWriter == null) {
412                                        return;
413                                }
414                                outputBufferedWriter.close();
415                                fileChannel.close();
416                        }
417                        catch (IOException ioe) {
418                                throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
419                        }
420                }
421 
422                /**
423                 * @param line
424                 * @throws IOException
425                 */
426                public void write(String line) throws IOException {
427                        if (!initialized) {
428                                initializeBufferedWriter();
429                        }
430 
431                        outputBufferedWriter.write(line);
432                        outputBufferedWriter.flush();
433                        linesWritten++;
434                }
435 
436                /**
437                 * Truncate the output at the last known good point.
438                 * 
439                 * @throws IOException
440                 */
441                public void truncate() throws IOException {
442                        fileChannel.truncate(lastMarkedByteOffsetPosition);
443                        fileChannel.position(lastMarkedByteOffsetPosition);
444                }
445 
446                /**
447                 * Mark the current position.
448                 */
449                public void mark() {
450                        try {
451                                lastMarkedByteOffsetPosition = this.position();
452                        }
453                        catch (IOException e) {
454                                throw new MarkFailedException("Unable to get position for mark", e);
455                        }
456                }
457 
458                /**
459                 * Creates the buffered writer for the output file channel based on
460                 * configuration information.
461                 * @throws IOException
462                 */
463                private void initializeBufferedWriter() throws IOException {
464                        
465                        
466                        File file = resource.getFile();
467 
468                        FileUtils.setUpOutputFile(file, restarted, shouldDeleteIfExists);
469 
470                        fileChannel = (new FileOutputStream(file.getAbsolutePath(), true)).getChannel();
471 
472                        outputBufferedWriter = getBufferedWriter(fileChannel, encoding, bufferSize);
473 
474                        // in case of restarting reset position to last committed point
475                        if (restarted) {
476                                this.reset();
477                        }
478 
479                        initialized = true;
480                        linesWritten = 0;
481                }
482                
483                public boolean isInitialized() {
484                        return initialized;
485                }
486 
487                /**
488                 * Returns the buffered writer opened to the beginning of the file
489                 * specified by the absolute path name contained in absoluteFileName.
490                 */
491                private BufferedWriter getBufferedWriter(FileChannel fileChannel, String encoding, int bufferSize) {
492                        try {
493 
494                                BufferedWriter outputBufferedWriter = null;
495 
496                                // If a buffer was requested, allocate.
497                                if (bufferSize > 0) {
498                                        outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding), bufferSize);
499                                }
500                                else {
501                                        outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding));
502                                }
503 
504                                return outputBufferedWriter;
505                        }
506                        catch (UnsupportedCharsetException ucse) {
507                                throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse);
508                        }
509                }
510 
511                /**
512                 * Resets the file writer's current position to the point stored in the
513                 * last marked byte offset position variable. It first checks to make
514                 * sure the current size of the file is not less than the byte position
515                 * to be moved to (if it is, throws an environment exception), then it
516                 * truncates the file to that reset position, and set the cursor to
517                 * start writing at that point.
518                 */
519                public void reset() throws ResetFailedException {
520                        checkFileSize();
521                        try {
522                                getOutputState().truncate();
523                        }
524                        catch (IOException e) {
525                                throw new ResetFailedException("Unable to truncate file", e);
526                        }
527                }
528 
529                /**
530                 * Checks (on setState) to make sure that the current output file's size
531                 * is not smaller than the last saved commit point. If it is, then the
532                 * file has been damaged in some way and whole task must be started over
533                 * again from the beginning.
534                 */
535                private void checkFileSize() {
536                        long size = -1;
537 
538                        try {
539                                outputBufferedWriter.flush();
540                                size = fileChannel.size();
541                        }
542                        catch (IOException e) {
543                                throw new ResetFailedException("An Error occured while checking file size", e);
544                        }
545 
546                        if (size < lastMarkedByteOffsetPosition) {
547                                throw new ResetFailedException("Current file size is smaller than size at last commit");
548                        }
549                }
550 
551        }
552 
553        public void clear() throws ClearFailedException {
554                lineBuffer.clear();
555        }
556 
557}

[all classes][org.springframework.batch.item.file]
EMMA 2.0.5312 (C) Vladimir Roubtsov