EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.item.file]

COVERAGE SUMMARY FOR SOURCE FILE [MultiResourceItemReader.java]

nameclass, %method, %block, %line, %
MultiResourceItemReader.java100% (3/3)96%  (23/24)95%  (435/460)96%  (111.8/117)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class MultiResourceItemReader$1100% (1/1)50%  (1/2)33%  (6/18)25%  (1/4)
compare (Object, Object): int 0%   (0/1)0%   (0/12)0%   (0/3)
MultiResourceItemReader$1 (MultiResourceItemReader): void 100% (1/1)100% (6/6)100% (1/1)
     
class MultiResourceItemReader100% (1/1)100% (15/15)96%  (321/334)98%  (86.8/89)
MultiResourceItemReader (): void 100% (1/1)88%  (36/41)99%  (8.9/9)
open (ExecutionContext): void 100% (1/1)90%  (66/73)89%  (16/18)
<static initializer> 100% (1/1)93%  (14/15)96%  (1.9/2)
close (ExecutionContext): void 100% (1/1)100% (26/26)100% (7/7)
emptyBuffer (): void 100% (1/1)100% (27/27)100% (6/6)
mark (): void 100% (1/1)100% (9/9)100% (4/4)
read (): Object 100% (1/1)100% (20/20)100% (7/7)
readBufferedItem (): Object 100% (1/1)100% (27/27)100% (8/8)
readNextItem (): Object 100% (1/1)100% (56/56)100% (12/12)
reset (): void 100% (1/1)100% (16/16)100% (5/5)
setComparator (Comparator): void 100% (1/1)100% (4/4)100% (2/2)
setDelegate (ResourceAwareItemReaderItemStream): void 100% (1/1)100% (4/4)100% (2/2)
setResources (Resource []): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (8/8)100% (3/3)
     
class MultiResourceItemReader$MultiResourceIndex100% (1/1)100% (7/7)100% (108/108)100% (24/24)
MultiResourceItemReader$MultiResourceIndex (MultiResourceItemReader): void 100% (1/1)100% (18/18)100% (5/5)
incrementItemCount (): void 100% (1/1)100% (7/7)100% (2/2)
incrementResourceCount (): void 100% (1/1)100% (10/10)100% (3/3)
mark (): void 100% (1/1)100% (9/9)100% (3/3)
open (ExecutionContext): void 100% (1/1)100% (33/33)100% (5/5)
reset (): void 100% (1/1)100% (9/9)100% (3/3)
update (ExecutionContext): void 100% (1/1)100% (22/22)100% (3/3)

1package org.springframework.batch.item.file;
2 
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Comparator;
6import java.util.List;
7import java.util.ListIterator;
8 
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.springframework.batch.item.ExecutionContext;
12import org.springframework.batch.item.ItemReader;
13import org.springframework.batch.item.ItemStream;
14import org.springframework.batch.item.ItemStreamException;
15import org.springframework.batch.item.MarkFailedException;
16import org.springframework.batch.item.NoWorkFoundException;
17import org.springframework.batch.item.ParseException;
18import org.springframework.batch.item.ResetFailedException;
19import org.springframework.batch.item.UnexpectedInputException;
20import org.springframework.batch.item.util.ExecutionContextUserSupport;
21import org.springframework.core.io.Resource;
22import org.springframework.util.Assert;
23import org.springframework.util.ClassUtils;
24 
25/**
26 * Reads items from multiple resources sequentially - resource list is given by
27 * {@link #setResources(Resource[])}, the actual reading is delegated to
28 * {@link #setDelegate(ResourceAwareItemReaderItemStream)}.
29 * 
30 * Input resources are ordered using {@link #setComparator(Comparator)} to make
31 * sure resource ordering is preserved between job runs in restart scenario.
32 * 
33 * Reset (rollback) capability is implemented by item buffering.
34 * 
35 * 
36 * @author Robert Kasanicky
37 */
38public class MultiResourceItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream {
39        
40        private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class);
41 
42        /**
43         * Unique object instance that marks resource boundaries in the item buffer
44         */
45        private static final Object END_OF_RESOURCE_MARKER = new Object();
46 
47        private ResourceAwareItemReaderItemStream delegate;
48 
49        private Resource[] resources;
50 
51        private MultiResourceIndex index = new MultiResourceIndex();
52 
53        private List itemBuffer = new ArrayList();
54 
55        private ListIterator itemBufferIterator = null;
56 
57        private boolean shouldReadBuffer = false;
58 
59        private boolean saveState = false;
60 
61        private Comparator comparator = new Comparator() {
62 
63                /**
64                 * Compares resource filenames.
65                 */
66                public int compare(Object o1, Object o2) {
67                        Resource r1 = (Resource) o1;
68                        Resource r2 = (Resource) o2;
69                        return r1.getFilename().compareTo(r2.getFilename());
70                }
71 
72        };
73 
74        private boolean noInput;
75 
76        public MultiResourceItemReader() {
77                setName(ClassUtils.getShortName(MultiResourceItemReader.class));
78        }
79 
80        /**
81         * Reads the next item, jumping to next resource if necessary.
82         */
83        public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException {
84 
85                if (noInput) {
86                        return null;
87                }
88                
89                Object item;
90                if (shouldReadBuffer) {
91                        item = readBufferedItem();
92                }
93                else {
94                        item = readNextItem();
95                }
96 
97                index.incrementItemCount();
98 
99                return item;
100        }
101 
102        /**
103         * Use the delegate to read the next item, jump to next resource if current
104         * one is exhausted. Items are appended to the buffer.
105         * @return next item from input
106         */
107        private Object readNextItem() throws Exception {
108 
109                Object item = delegate.read();
110 
111                while (item == null) {
112 
113                        index.incrementResourceCount();
114 
115                        if (index.currentResource >= resources.length) {
116                                return null;
117                        }
118                        itemBuffer.add(END_OF_RESOURCE_MARKER);
119 
120                        delegate.close(new ExecutionContext());
121                        delegate.setResource(resources[index.currentResource]);
122                        delegate.open(new ExecutionContext());
123 
124                        item = delegate.read();
125                }
126 
127                itemBuffer.add(item);
128 
129                return item;
130        }
131 
132        /**
133         * Read next item from buffer while keeping track of the position within the
134         * input for possible restart.
135         * @return next item from buffer
136         */
137        private Object readBufferedItem() {
138 
139                Object buffered = itemBufferIterator.next();
140                while (buffered == END_OF_RESOURCE_MARKER) {
141                        index.incrementResourceCount();
142                        buffered = itemBufferIterator.next();
143                }
144 
145                if (!itemBufferIterator.hasNext()) {
146                        // buffer is exhausted, continue reading from file
147                        shouldReadBuffer = false;
148                        itemBufferIterator = null;
149                }
150                return buffered;
151        }
152 
153        /**
154         * Remove the longer needed items from buffer, mark the index position and
155         * call mark() on delegate so that it clears its buffers.
156         */
157        public void mark() throws MarkFailedException {
158                emptyBuffer();
159 
160                index.mark();
161 
162                delegate.mark();
163        }
164 
165        /**
166         * Discard the buffered items that have already been read.
167         */
168        private void emptyBuffer() {
169                if (!shouldReadBuffer) {
170                        itemBuffer.clear();
171                        itemBufferIterator = null;
172                }
173                else {
174                        itemBuffer = itemBuffer.subList(itemBufferIterator.nextIndex(), itemBuffer.size());
175                        itemBufferIterator = itemBuffer.listIterator();
176                }
177        }
178 
179        /**
180         * Switches to 'read from buffer' state.
181         * 
182         * @see ItemReader#reset()
183         */
184        public void reset() throws ResetFailedException {
185                if (!itemBuffer.isEmpty()) {
186                        shouldReadBuffer = true;
187                        itemBufferIterator = itemBuffer.listIterator();
188                }
189                index.reset();
190        }
191 
192        /**
193         * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader
194         * and reset instance variable values.
195         */
196        public void close(ExecutionContext executionContext) throws ItemStreamException {
197                noInput = false;
198                shouldReadBuffer = false;
199                itemBufferIterator = null;
200                index = new MultiResourceIndex();
201                itemBuffer.clear();
202                delegate.close(new ExecutionContext());
203        }
204 
205        /**
206         * Figure out which resource to start with in case of restart, open the
207         * delegate and restore delegate's position in the resource.
208         */
209        public void open(ExecutionContext executionContext) throws ItemStreamException {
210 
211                Assert.notNull(resources, "There must be at least one input resource");
212                Assert.notNull(delegate, "Delegate must not be null");
213                
214                noInput = false;
215                if (resources.length == 0) {
216                        logger.warn("No resources to read");
217                        noInput = true;
218                        return;
219                }
220                
221                Arrays.sort(resources, comparator);
222 
223                index.open(executionContext);
224 
225                delegate.setResource(resources[index.currentResource]);
226 
227                delegate.open(new ExecutionContext());
228 
229                try {
230                        for (int i = 0; i < index.currentItem; i++) {
231                                delegate.read();
232                                delegate.mark();
233                        }
234                }
235                catch (Exception e) {
236                        throw new ItemStreamException("Could not restore position on restart", e);
237                }
238        }
239 
240        /**
241         * Store the current resource index and position in the resource.
242         */
243        public void update(ExecutionContext executionContext) throws ItemStreamException {
244                if (saveState) {
245                        index.update(executionContext);
246                }
247        }
248 
249        /**
250         * @param delegate reads items from single {@link Resource}.
251         */
252        public void setDelegate(ResourceAwareItemReaderItemStream delegate) {
253                this.delegate = delegate;
254        }
255 
256        /**
257         * Set the boolean indicating whether or not state should be saved in the
258         * provided {@link ExecutionContext} during the {@link ItemStream} call to
259         * update.
260         * 
261         * @param saveState
262         */
263        public void setSaveState(boolean saveState) {
264                this.saveState = saveState;
265        }
266 
267        /**
268         * @param comparator used to order the injected resources, by default
269         * compares {@link Resource#getFilename()} values.
270         */
271        public void setComparator(Comparator comparator) {
272                this.comparator = comparator;
273        }
274 
275        /**
276         * @param resources input resources
277         */
278        public void setResources(Resource[] resources) {
279                this.resources = resources;
280        }
281 
282        /**
283         * Facilitates keeping track of the position within multi-resource input.
284         */
285        private class MultiResourceIndex {
286 
287                private static final String RESOURCE_KEY = "resourceIndex";
288 
289                private static final String ITEM_KEY = "itemIndex";
290 
291                private int currentResource = 0;
292 
293                private int markedResource = 0;
294 
295                private long currentItem = 0;
296 
297                private long markedItem = 0;
298 
299                public void incrementItemCount() {
300                        currentItem++;
301                }
302 
303                public void incrementResourceCount() {
304                        currentResource++;
305                        currentItem = 0;
306                }
307 
308                public void mark() {
309                        markedResource = currentResource;
310                        markedItem = currentItem;
311                }
312 
313                public void reset() {
314                        currentResource = markedResource;
315                        currentItem = markedItem;
316                }
317 
318                public void open(ExecutionContext ctx) {
319                        if (ctx.containsKey(getKey(RESOURCE_KEY))) {
320                                currentResource = Long.valueOf(ctx.getLong(getKey(RESOURCE_KEY))).intValue();
321                        }
322 
323                        if (ctx.containsKey(getKey(ITEM_KEY))) {
324                                currentItem = ctx.getLong(getKey(ITEM_KEY));
325                        }
326                }
327 
328                public void update(ExecutionContext ctx) {
329                        ctx.putLong(getKey(RESOURCE_KEY), index.currentResource);
330                        ctx.putLong(getKey(ITEM_KEY), index.currentItem);
331                }
332        }
333 
334}

[all classes][org.springframework.batch.item.file]
EMMA 2.0.5312 (C) Vladimir Roubtsov