EMMA Coverage Report (generated Tue May 06 07:28:24 PDT 2008)
[all classes][org.springframework.batch.item.file]

COVERAGE SUMMARY FOR SOURCE FILE [FlatFileItemWriter.java]

nameclass, %method, %block, %line, %
FlatFileItemWriter.java67%  (2/3)90%  (28/31)78%  (476/607)84%  (142/170)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FlatFileItemWriter$OutputState100% (1/1)100% (14/14)74%  (244/330)84%  (80/95)
mark (): void 100% (1/1)46%  (6/13)60%  (3/5)
reset (): void 100% (1/1)53%  (8/15)67%  (4/6)
checkFileSize (): void 100% (1/1)57%  (16/28)67%  (6/9)
initializeBufferedWriter (): void 100% (1/1)59%  (65/111)79%  (19/24)
close (): void 100% (1/1)72%  (18/25)80%  (8/10)
getBufferedWriter (FileChannel, String, int): BufferedWriter 100% (1/1)81%  (29/36)88%  (7/8)
FlatFileItemWriter$OutputState (FlatFileItemWriter): void 100% (1/1)100% (30/30)100% (9/9)
position (): long 100% (1/1)100% (16/16)100% (6/6)
restoreFrom (ExecutionContext): void 100% (1/1)100% (12/12)100% (3/3)
setBufferSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setDeleteIfExists (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
truncate (): void 100% (1/1)100% (13/13)100% (3/3)
write (String): void 100% (1/1)100% (19/19)100% (6/6)
     
class FlatFileItemWriter100% (1/1)82%  (14/17)84%  (232/277)83%  (62/75)
setBufferSize (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
setLineAggregator (LineAggregator): void 0%   (0/1)0%   (0/4)0%   (0/2)
setShouldDeleteIfExists (boolean): void 0%   (0/1)0%   (0/4)0%   (0/2)
flush (): void 100% (1/1)64%  (25/39)82%  (9/11)
update (ExecutionContext): void 100% (1/1)74%  (35/47)73%  (8/11)
getOutputState (): FlatFileItemWriter$OutputState 100% (1/1)89%  (54/61)82%  (9/11)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
FlatFileItemWriter (): void 100% (1/1)100% (39/39)100% (10/10)
afterPropertiesSet (): void 100% (1/1)100% (9/9)100% (3/3)
clear (): void 100% (1/1)100% (4/4)100% (2/2)
close (ExecutionContext): void 100% (1/1)100% (10/10)100% (4/4)
open (ExecutionContext): void 100% (1/1)100% (15/15)100% (5/5)
setEncoding (String): void 100% (1/1)100% (4/4)100% (2/2)
setFieldSetCreator (FieldSetCreator): void 100% (1/1)100% (4/4)100% (2/2)
setResource (Resource): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
write (Object): void 100% (1/1)100% (21/21)100% (3/3)
     
class FlatFileItemWriter$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.file;
18 
19import java.io.BufferedWriter;
20import java.io.File;
21import java.io.FileNotFoundException;
22import java.io.FileOutputStream;
23import java.io.IOException;
24import java.nio.channels.Channels;
25import java.nio.channels.FileChannel;
26import java.nio.charset.UnsupportedCharsetException;
27import java.util.ArrayList;
28import java.util.Iterator;
29import java.util.List;
30 
31import org.springframework.batch.item.ClearFailedException;
32import org.springframework.batch.item.ExecutionContext;
33import org.springframework.batch.item.ExecutionContextUserSupport;
34import org.springframework.batch.item.FlushFailedException;
35import org.springframework.batch.item.ItemStream;
36import org.springframework.batch.item.ItemStreamException;
37import org.springframework.batch.item.ItemWriter;
38import org.springframework.batch.item.MarkFailedException;
39import org.springframework.batch.item.ResetFailedException;
40import org.springframework.batch.item.file.mapping.FieldSet;
41import org.springframework.batch.item.file.mapping.FieldSetCreator;
42import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
43import org.springframework.batch.item.file.transform.LineAggregator;
44import org.springframework.beans.factory.InitializingBean;
45import org.springframework.core.io.Resource;
46import org.springframework.dao.DataAccessResourceFailureException;
47import org.springframework.util.Assert;
48import org.springframework.util.ClassUtils;
49 
50/**
51 * This class is an output target that writes data to a file or stream. The
52 * writer also provides restart, statistics and transaction features by
53 * implementing corresponding interfaces where possible (with a file). The
54 * location of the file is defined by a {@link Resource} and must represent a
55 * writable file.<br/>
56 * 
57 * Uses buffered writer to improve performance.<br/>
58 * 
59 * <p>
60 * This class will be updated in the future to use a buffering approach to
61 * handling transactions, rather than outputting directly to the file and
62 * truncating on rollback
63 * </p>
64 * 
65 * @author Waseem Malik
66 * @author Tomas Slanina
67 * @author Robert Kasanicky
68 * @author Dave Syer
69 */
70public class FlatFileItemWriter extends ExecutionContextUserSupport implements ItemWriter, ItemStream, InitializingBean {
71 
72        private static final String LINE_SEPARATOR = System.getProperty("line.separator");
73 
74        private static final String WRITTEN_STATISTICS_NAME = "written";
75 
76        private static final String RESTART_COUNT_STATISTICS_NAME = "restart.count";
77 
78        private static final String RESTART_DATA_NAME = "current.count";
79 
80        private Resource resource;
81 
82        private OutputState state = null;
83 
84        private LineAggregator lineAggregator = new DelimitedLineAggregator();
85 
86        private FieldSetCreator fieldSetCreator;
87 
88        private boolean saveState = false;
89 
90        private boolean shouldDeleteIfExists = true;
91 
92        private String encoding = OutputState.DEFAULT_CHARSET;
93 
94        private int bufferSize = OutputState.DEFAULT_BUFFER_SIZE;
95 
96        private List lineBuffer = new ArrayList();
97 
98        public FlatFileItemWriter() {
99                setName(ClassUtils.getShortName(FlatFileItemWriter.class));
100        }
101 
102        /**
103         * Assert that mandatory properties (resource) are set.
104         * 
105         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
106         */
107        public void afterPropertiesSet() throws Exception {
108                Assert.notNull(resource, "The resource must be set");
109                Assert.notNull(fieldSetCreator, "A FieldSetCreator must be provided.");
110        }
111 
112        /**
113         * Public setter for the {@link LineAggregator}. This will be used to
114         * translate a {@link FieldSet} into a line for output.
115         * 
116         * @param lineAggregator the {@link LineAggregator} to set
117         */
118        public void setLineAggregator(LineAggregator lineAggregator) {
119                this.lineAggregator = lineAggregator;
120        }
121 
122        /**
123         * Public setter for the {@link FieldSetCreator}. This will be used to
124         * transform the item into a {@link FieldSet} before it is aggregated by the
125         * {@link LineAggregator}.
126         * 
127         * @param fieldSetCreator the {@link FieldSetCreator} to set
128         */
129        public void setFieldSetCreator(FieldSetCreator fieldSetCreator) {
130                this.fieldSetCreator = fieldSetCreator;
131        }
132 
133        /**
134         * Setter for resource. Represents a file that can be written.
135         * 
136         * @param resource
137         */
138        public void setResource(Resource resource) {
139                this.resource = resource;
140        }
141 
142        /**
143         * Writes out a string followed by a "new line", where the format of the new
144         * line separator is determined by the underlying operating system. If the
145         * input is not a String and a converter is available the converter will be
146         * applied and then this method recursively called with the result. If the
147         * input is an array or collection each value will be written to a separate
148         * line (recursively calling this method for each value). If no converter is
149         * supplied the input object's toString method will be used.<br/>
150         * 
151         * @param data Object (a String or Object that can be converted) to be
152         * written to output stream
153         * @throws Exception if the transformer or file output fail
154         */
155        public void write(Object data) throws Exception {
156                FieldSet fieldSet = fieldSetCreator.mapItem(data);
157                lineBuffer.add(lineAggregator.aggregate(fieldSet) + LINE_SEPARATOR);
158        }
159 
160        /**
161         * @see ItemStream#close(ExecutionContext)
162         */
163        public void close(ExecutionContext executionContext) {
164                if (state != null) {
165                        getOutputState().close();
166                        state = null;
167                }
168        }
169 
170        /**
171         * Sets encoding for output template.
172         */
173        public void setEncoding(String newEncoding) {
174                this.encoding = newEncoding;
175        }
176 
177        /**
178         * Sets buffer size for output template
179         */
180        public void setBufferSize(int newSize) {
181                this.bufferSize = newSize;
182        }
183 
184        /**
185         * @param shouldDeleteIfExists the shouldDeleteIfExists to set
186         */
187        public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) {
188                this.shouldDeleteIfExists = shouldDeleteIfExists;
189        }
190 
191        /**
192         * Initialize the Output Template.
193         * 
194         * @see ItemStream#open(ExecutionContext)
195         */
196        public void open(ExecutionContext executionContext) {
197                OutputState outputState = getOutputState();
198                if (executionContext.containsKey(getKey(RESTART_DATA_NAME))) {
199                        outputState.restoreFrom(executionContext);
200                }
201                outputState.initializeBufferedWriter();
202        }
203 
204        /**
205         * @see ItemStream#update(ExecutionContext)
206         */
207        public void update(ExecutionContext executionContext) {
208                if (state == null) {
209                        throw new ItemStreamException("ItemStream not open or already closed.");
210                }
211 
212                Assert.notNull(executionContext, "ExecutionContext must not be null");
213 
214                if (saveState) {
215 
216                        try {
217                                executionContext.putLong(getKey(RESTART_DATA_NAME), state.position());
218                        }
219                        catch (IOException e) {
220                                throw new ItemStreamException("ItemStream does not return current position properly", e);
221                        }
222 
223                        executionContext.putLong(getKey(WRITTEN_STATISTICS_NAME), state.linesWritten);
224                        executionContext.putLong(getKey(RESTART_COUNT_STATISTICS_NAME), state.restartCount);
225                }
226        }
227 
228        // Returns object representing state.
229        private OutputState getOutputState() {
230                if (state == null) {
231                        try {
232                                File file = resource.getFile();
233                                Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]");
234                        }
235                        catch (IOException e) {
236                                throw new ItemStreamException("Could not test resource for writable status.", e);
237                        }
238                        state = new OutputState();
239                        state.setDeleteIfExists(shouldDeleteIfExists);
240                        state.setBufferSize(bufferSize);
241                        state.setEncoding(encoding);
242                }
243                return (OutputState) state;
244        }
245 
246        /**
247         * Encapsulates the runtime state of the writer. All state changing
248         * operations on the writer go through this class.
249         */
250        private class OutputState {
251                // default encoding for writing to output files - set to UTF-8.
252                private static final String DEFAULT_CHARSET = "UTF-8";
253 
254                private static final int DEFAULT_BUFFER_SIZE = 2048;
255 
256                // The bufferedWriter over the file channel that is actually written
257                BufferedWriter outputBufferedWriter;
258 
259                FileChannel fileChannel;
260 
261                // this represents the charset encoding (if any is needed) for the
262                // output file
263                String encoding = DEFAULT_CHARSET;
264 
265                // Optional write buffer size
266                int bufferSize = DEFAULT_BUFFER_SIZE;
267 
268                boolean restarted = false;
269 
270                boolean initialized = false;
271 
272                long lastMarkedByteOffsetPosition = 0;
273 
274                long linesWritten = 0;
275 
276                long restartCount = 0;
277 
278                boolean shouldDeleteIfExists = true;
279 
280                /**
281                 * Return the byte offset position of the cursor in the output file as a
282                 * long integer.
283                 */
284                public long position() throws IOException {
285                        long pos = 0;
286 
287                        if (fileChannel == null) {
288                                return 0;
289                        }
290 
291                        outputBufferedWriter.flush();
292                        pos = fileChannel.position();
293 
294                        return pos;
295 
296                }
297 
298                /**
299                 * @param executionContext
300                 */
301                public void restoreFrom(ExecutionContext executionContext) {
302                        lastMarkedByteOffsetPosition = executionContext.getLong(getKey(RESTART_DATA_NAME));
303                        restarted = true;
304                }
305 
306                /**
307                 * @param shouldDeleteIfExists
308                 */
309                public void setDeleteIfExists(boolean shouldDeleteIfExists) {
310                        this.shouldDeleteIfExists = shouldDeleteIfExists;
311                }
312 
313                /**
314                 * @param bufferSize
315                 */
316                public void setBufferSize(int bufferSize) {
317                        this.bufferSize = bufferSize;
318                }
319 
320                /**
321                 * @param encoding
322                 */
323                public void setEncoding(String encoding) {
324                        this.encoding = encoding;
325                }
326 
327                /**
328                 * Close the open resource and reset counters.
329                 */
330                public void close() {
331                        initialized = false;
332                        restarted = false;
333                        try {
334                                if (outputBufferedWriter == null) {
335                                        return;
336                                }
337                                outputBufferedWriter.close();
338                                fileChannel.close();
339                        }
340                        catch (IOException ioe) {
341                                throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
342                        }
343                }
344 
345                /**
346                 * @param line
347                 * @throws IOException
348                 */
349                public void write(String line) throws IOException {
350                        if (!initialized) {
351                                initializeBufferedWriter();
352                        }
353 
354                        outputBufferedWriter.write(line);
355                        outputBufferedWriter.flush();
356                        linesWritten++;
357                }
358 
359                /**
360                 * Truncate the output at the last known good point.
361                 * 
362                 * @throws IOException
363                 */
364                public void truncate() throws IOException {
365                        fileChannel.truncate(lastMarkedByteOffsetPosition);
366                        fileChannel.position(lastMarkedByteOffsetPosition);
367                }
368 
369                /**
370                 * Mark the current position.
371                 */
372                public void mark() {
373                        try {
374                                lastMarkedByteOffsetPosition = this.position();
375                        }
376                        catch (IOException e) {
377                                throw new MarkFailedException("Unable to get position for mark", e);
378                        }
379                }
380 
381                /**
382                 * Creates the buffered writer for the output file channel based on
383                 * configuration information.
384                 */
385                private void initializeBufferedWriter() {
386                        File file;
387 
388                        try {
389                                file = resource.getFile();
390 
391                                // If the output source was restarted, keep existing file.
392                                // If the output source was not restarted, check following:
393                                // - if the file should be deleted, delete it if it was exiting
394                                // and create blank file,
395                                // - if the file should not be deleted, if it already exists,
396                                // throw an exception,
397                                // - if the file was not existing, create new.
398                                if (!restarted) {
399                                        if (file.exists()) {
400                                                if (shouldDeleteIfExists) {
401                                                        file.delete();
402                                                }
403                                                else {
404                                                        throw new ItemStreamException("Resource already exists: " + resource);
405                                                }
406                                        }
407                                        String parent = file.getParent();
408                                        if (parent != null) {
409                                                new File(parent).mkdirs();
410                                        }
411                                        file.createNewFile();
412                                }
413 
414                        }
415                        catch (IOException ioe) {
416                                throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]",
417                                                ioe);
418                        }
419 
420                        try {
421                                fileChannel = (new FileOutputStream(file.getAbsolutePath(), true)).getChannel();
422                        }
423                        catch (FileNotFoundException fnfe) {
424                                throw new ItemStreamException("Bad filename property parameter " + file, fnfe);
425                        }
426 
427                        outputBufferedWriter = getBufferedWriter(fileChannel, encoding, bufferSize);
428 
429                        // in case of restarting reset position to last committed point
430                        if (restarted) {
431                                this.reset();
432                        }
433 
434                        initialized = true;
435                        linesWritten = 0;
436                }
437 
438                /**
439                 * Returns the buffered writer opened to the beginning of the file
440                 * specified by the absolute path name contained in absoluteFileName.
441                 */
442                private BufferedWriter getBufferedWriter(FileChannel fileChannel, String encoding, int bufferSize) {
443                        try {
444 
445                                BufferedWriter outputBufferedWriter = null;
446 
447                                // If a buffer was requested, allocate.
448                                if (bufferSize > 0) {
449                                        outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding), bufferSize);
450                                }
451                                else {
452                                        outputBufferedWriter = new BufferedWriter(Channels.newWriter(fileChannel, encoding));
453                                }
454 
455                                return outputBufferedWriter;
456                        }
457                        catch (UnsupportedCharsetException ucse) {
458                                throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse);
459                        }
460                }
461 
462                /**
463                 * Resets the file writer's current position to the point stored in the
464                 * last marked byte offset position variable. It first checks to make
465                 * sure the current size of the file is not less than the byte position
466                 * to be moved to (if it is, throws an environment exception), then it
467                 * truncates the file to that reset position, and set the cursor to
468                 * start writing at that point.
469                 */
470                public void reset() throws ResetFailedException {
471                        checkFileSize();
472                        try {
473                                getOutputState().truncate();
474                        }
475                        catch (IOException e) {
476                                throw new ResetFailedException("Unable to truncate file", e);
477                        }
478                }
479 
480                /**
481                 * Checks (on setState) to make sure that the current output file's size
482                 * is not smaller than the last saved commit point. If it is, then the
483                 * file has been damaged in some way and whole task must be started over
484                 * again from the beginning.
485                 */
486                private void checkFileSize() {
487                        long size = -1;
488 
489                        try {
490                                outputBufferedWriter.flush();
491                                size = fileChannel.size();
492                        }
493                        catch (IOException e) {
494                                throw new ResetFailedException("An Error occured while checking file size", e);
495                        }
496 
497                        if (size < lastMarkedByteOffsetPosition) {
498                                throw new ResetFailedException("Current file size is smaller than size at last commit");
499                        }
500                }
501 
502        }
503 
504        public void clear() throws ClearFailedException {
505                lineBuffer.clear();
506        }
507 
508        public void flush() throws FlushFailedException {
509                OutputState state = getOutputState();
510                for (Iterator iterator = lineBuffer.listIterator(); iterator.hasNext();) {
511                        String line = (String) iterator.next();
512                        try {
513                                state.write(line);
514                        }
515                        catch (IOException e) {
516                                throw new FlushFailedException("Failed to write line to output file: " + line, e);
517                        }
518                }
519                lineBuffer.clear();
520                state.mark();
521        }
522 
523        /**
524         * Set the boolean indicating whether or not state should be saved in the
525         * provided {@link ExecutionContext} during the {@link ItemStream} call to
526         * update. Setting this to false means that it will always start at the
527         * beginning.
528         * 
529         * @param saveState
530         */
531        public void setSaveState(boolean saveState) {
532                this.saveState = saveState;
533        }
534 
535}

[all classes][org.springframework.batch.item.file]
EMMA 2.0.5312 (C) Vladimir Roubtsov