EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.item.file]

COVERAGE SUMMARY FOR SOURCE FILE [MultiResourceItemReader.java]

nameclass, %method, %block, %line, %
MultiResourceItemReader.java100% (2/2)94%  (15/16)98%  (261/267)99%  (70/71)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class MultiResourceItemReader$1100% (1/1)50%  (1/2)50%  (6/12)50%  (1/2)
compare (Resource, Resource): int 0%   (0/1)0%   (0/6)0%   (0/1)
MultiResourceItemReader$1 (MultiResourceItemReader): void 100% (1/1)100% (6/6)100% (1/1)
     
class MultiResourceItemReader100% (1/1)100% (14/14)100% (255/255)100% (70/70)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
MultiResourceItemReader (): void 100% (1/1)100% (22/22)100% (7/7)
close (): void 100% (1/1)100% (9/9)100% (4/4)
getCurrentResource (): Resource 100% (1/1)100% (17/17)100% (3/3)
open (ExecutionContext): void 100% (1/1)100% (71/71)100% (18/18)
read (): Object 100% (1/1)100% (29/29)100% (7/7)
readFromDelegate (): Object 100% (1/1)100% (14/14)100% (4/4)
readNextItem (): Object 100% (1/1)100% (42/42)100% (10/10)
setComparator (Comparator): void 100% (1/1)100% (4/4)100% (2/2)
setDelegate (ResourceAwareItemReaderItemStream): void 100% (1/1)100% (4/4)100% (2/2)
setResources (Resource []): void 100% (1/1)100% (13/13)100% (3/3)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setStrict (boolean): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (18/18)100% (5/5)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.file;
18 
19import java.util.Arrays;
20import java.util.Comparator;
21 
22import org.apache.commons.logging.Log;
23import org.apache.commons.logging.LogFactory;
24import org.springframework.batch.item.ExecutionContext;
25import org.springframework.batch.item.ItemStream;
26import org.springframework.batch.item.ItemStreamException;
27import org.springframework.batch.item.ParseException;
28import org.springframework.batch.item.ResourceAware;
29import org.springframework.batch.item.UnexpectedInputException;
30import org.springframework.batch.item.support.AbstractItemStreamItemReader;
31import org.springframework.core.io.Resource;
32import org.springframework.util.Assert;
33import org.springframework.util.ClassUtils;
34 
35/**
36 * Reads items from multiple resources sequentially - resource list is given by {@link #setResources(Resource[])}, the
37 * actual reading is delegated to {@link #setDelegate(ResourceAwareItemReaderItemStream)}.
38 * 
39 * Input resources are ordered using {@link #setComparator(Comparator)} to make sure resource ordering is preserved
40 * between job runs in restart scenario.
41 * 
42 * 
43 * @author Robert Kasanicky
44 * @author Lucas Ward
45 */
46public class MultiResourceItemReader<T> extends AbstractItemStreamItemReader<T> {
47 
48        private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class);
49 
50        private static final String RESOURCE_KEY = "resourceIndex";
51 
52        private ResourceAwareItemReaderItemStream<? extends T> delegate;
53 
54        private Resource[] resources;
55 
56        private boolean saveState = true;
57 
58        private int currentResource = -1;
59 
60        // signals there are no resources to read -> just return null on first read
61        private boolean noInput;
62 
63        private boolean strict = false;
64 
65        /**
66         * In strict mode the reader will throw an exception on
67         * {@link #open(org.springframework.batch.item.ExecutionContext)} if there are no resources to read.
68         * @param strict false by default
69         */
70        public void setStrict(boolean strict) {
71                this.strict = strict;
72        }
73 
74        private Comparator<Resource> comparator = new Comparator<Resource>() {
75 
76                /**
77                 * Compares resource filenames.
78                 */
79                @Override
80                public int compare(Resource r1, Resource r2) {
81                        return r1.getFilename().compareTo(r2.getFilename());
82                }
83 
84        };
85 
86        public MultiResourceItemReader() {
87                this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemReader.class));
88        }
89 
90        /**
91         * Reads the next item, jumping to next resource if necessary.
92         */
93        @Override
94        public T read() throws Exception, UnexpectedInputException, ParseException {
95 
96                if (noInput) {
97                        return null;
98                }
99 
100                // If there is no resource, then this is the first item, set the current
101                // resource to 0 and open the first delegate.
102                if (currentResource == -1) {
103                        currentResource = 0;
104                        delegate.setResource(resources[currentResource]);
105                        delegate.open(new ExecutionContext());
106                }
107 
108                return readNextItem();
109        }
110 
111        /**
112         * Use the delegate to read the next item, jump to next resource if current one is exhausted. Items are appended to
113         * the buffer.
114         * 
115         * @return next item from input
116         */
117        private T readNextItem() throws Exception {
118 
119                T item = readFromDelegate();
120 
121                while (item == null) {
122 
123                        currentResource++;
124 
125                        if (currentResource >= resources.length) {
126                                return null;
127                        }
128 
129                        delegate.close();
130                        delegate.setResource(resources[currentResource]);
131                        delegate.open(new ExecutionContext());
132 
133                        item = readFromDelegate();
134                }
135 
136                return item;
137        }
138 
139        private T readFromDelegate() throws Exception {
140                T item = delegate.read();
141                if(item instanceof ResourceAware){
142                        ((ResourceAware) item).setResource(getCurrentResource());
143                }
144                return item;
145        }
146 
147        /**
148         * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader and reset instance variable values.
149         */
150        @Override
151        public void close() throws ItemStreamException {
152                super.close();
153                delegate.close();
154                noInput = false;
155        }
156 
157        /**
158         * Figure out which resource to start with in case of restart, open the delegate and restore delegate's position in
159         * the resource.
160         */
161        @Override
162        public void open(ExecutionContext executionContext) throws ItemStreamException {
163                super.open(executionContext);
164                Assert.notNull(resources, "Resources must be set");
165 
166                noInput = false;
167                if (resources.length == 0) {
168                        if (strict) {
169                                throw new IllegalStateException(
170                                                "No resources to read. Set strict=false if this is not an error condition.");
171                        }
172                        else {
173                                logger.warn("No resources to read. Set strict=true if this should be an error condition.");
174                                noInput = true;
175                                return;
176                        }
177                }
178 
179                Arrays.sort(resources, comparator);
180 
181                if (executionContext.containsKey(getExecutionContextKey(RESOURCE_KEY))) {
182                        currentResource = executionContext.getInt(getExecutionContextKey(RESOURCE_KEY));
183 
184                        // context could have been saved before reading anything
185                        if (currentResource == -1) {
186                                currentResource = 0;
187                        }
188 
189                        delegate.setResource(resources[currentResource]);
190                        delegate.open(executionContext);
191                }
192                else {
193                        currentResource = -1;
194                }
195        }
196 
197        /**
198         * Store the current resource index and position in the resource.
199         */
200        @Override
201        public void update(ExecutionContext executionContext) throws ItemStreamException {
202                super.update(executionContext);
203                if (saveState) {
204                        executionContext.putInt(getExecutionContextKey(RESOURCE_KEY), currentResource);
205                        delegate.update(executionContext);
206                }
207        }
208 
209        /**
210         * @param delegate reads items from single {@link Resource}.
211         */
212        public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) {
213                this.delegate = delegate;
214        }
215 
216        /**
217         * Set the boolean indicating whether or not state should be saved in the provided {@link ExecutionContext} during
218         * the {@link ItemStream} call to update.
219         * 
220         * @param saveState
221         */
222        public void setSaveState(boolean saveState) {
223                this.saveState = saveState;
224        }
225 
226        /**
227         * @param comparator used to order the injected resources, by default compares {@link Resource#getFilename()}
228         * values.
229         */
230        public void setComparator(Comparator<Resource> comparator) {
231                this.comparator = comparator;
232        }
233 
234        /**
235         * @param resources input resources
236         */
237        public void setResources(Resource[] resources) {
238                Assert.notNull(resources, "The resources must not be null");
239                this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]);
240        }
241 
242        public Resource getCurrentResource() {
243                if (currentResource >= resources.length || currentResource < 0) {
244                        return null;
245                }
246                return resources[currentResource];
247        }
248 
249}

[all classes][org.springframework.batch.item.file]
EMMA 2.0.5312 (C) Vladimir Roubtsov