1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.util.Arrays; |
20 | import java.util.Comparator; |
21 | |
22 | import org.apache.commons.logging.Log; |
23 | import org.apache.commons.logging.LogFactory; |
24 | import org.springframework.batch.item.ExecutionContext; |
25 | import org.springframework.batch.item.ItemStream; |
26 | import org.springframework.batch.item.ItemStreamException; |
27 | import org.springframework.batch.item.ParseException; |
28 | import org.springframework.batch.item.ResourceAware; |
29 | import org.springframework.batch.item.UnexpectedInputException; |
30 | import org.springframework.batch.item.support.AbstractItemStreamItemReader; |
31 | import org.springframework.core.io.Resource; |
32 | import org.springframework.util.Assert; |
33 | import org.springframework.util.ClassUtils; |
34 | |
35 | /** |
36 | * Reads items from multiple resources sequentially - resource list is given by {@link #setResources(Resource[])}, the |
37 | * actual reading is delegated to {@link #setDelegate(ResourceAwareItemReaderItemStream)}. |
38 | * |
39 | * Input resources are ordered using {@link #setComparator(Comparator)} to make sure resource ordering is preserved |
40 | * between job runs in restart scenario. |
41 | * |
42 | * |
43 | * @author Robert Kasanicky |
44 | * @author Lucas Ward |
45 | */ |
46 | public class MultiResourceItemReader<T> extends AbstractItemStreamItemReader<T> { |
47 | |
48 | private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class); |
49 | |
50 | private static final String RESOURCE_KEY = "resourceIndex"; |
51 | |
52 | private ResourceAwareItemReaderItemStream<? extends T> delegate; |
53 | |
54 | private Resource[] resources; |
55 | |
56 | private boolean saveState = true; |
57 | |
58 | private int currentResource = -1; |
59 | |
60 | // signals there are no resources to read -> just return null on first read |
61 | private boolean noInput; |
62 | |
63 | private boolean strict = false; |
64 | |
65 | /** |
66 | * In strict mode the reader will throw an exception on |
67 | * {@link #open(org.springframework.batch.item.ExecutionContext)} if there are no resources to read. |
68 | * @param strict false by default |
69 | */ |
70 | public void setStrict(boolean strict) { |
71 | this.strict = strict; |
72 | } |
73 | |
74 | private Comparator<Resource> comparator = new Comparator<Resource>() { |
75 | |
76 | /** |
77 | * Compares resource filenames. |
78 | */ |
79 | @Override |
80 | public int compare(Resource r1, Resource r2) { |
81 | return r1.getFilename().compareTo(r2.getFilename()); |
82 | } |
83 | |
84 | }; |
85 | |
86 | public MultiResourceItemReader() { |
87 | this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemReader.class)); |
88 | } |
89 | |
90 | /** |
91 | * Reads the next item, jumping to next resource if necessary. |
92 | */ |
93 | @Override |
94 | public T read() throws Exception, UnexpectedInputException, ParseException { |
95 | |
96 | if (noInput) { |
97 | return null; |
98 | } |
99 | |
100 | // If there is no resource, then this is the first item, set the current |
101 | // resource to 0 and open the first delegate. |
102 | if (currentResource == -1) { |
103 | currentResource = 0; |
104 | delegate.setResource(resources[currentResource]); |
105 | delegate.open(new ExecutionContext()); |
106 | } |
107 | |
108 | return readNextItem(); |
109 | } |
110 | |
111 | /** |
112 | * Use the delegate to read the next item, jump to next resource if current one is exhausted. Items are appended to |
113 | * the buffer. |
114 | * |
115 | * @return next item from input |
116 | */ |
117 | private T readNextItem() throws Exception { |
118 | |
119 | T item = readFromDelegate(); |
120 | |
121 | while (item == null) { |
122 | |
123 | currentResource++; |
124 | |
125 | if (currentResource >= resources.length) { |
126 | return null; |
127 | } |
128 | |
129 | delegate.close(); |
130 | delegate.setResource(resources[currentResource]); |
131 | delegate.open(new ExecutionContext()); |
132 | |
133 | item = readFromDelegate(); |
134 | } |
135 | |
136 | return item; |
137 | } |
138 | |
139 | private T readFromDelegate() throws Exception { |
140 | T item = delegate.read(); |
141 | if(item instanceof ResourceAware){ |
142 | ((ResourceAware) item).setResource(getCurrentResource()); |
143 | } |
144 | return item; |
145 | } |
146 | |
147 | /** |
148 | * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader and reset instance variable values. |
149 | */ |
150 | @Override |
151 | public void close() throws ItemStreamException { |
152 | super.close(); |
153 | delegate.close(); |
154 | noInput = false; |
155 | } |
156 | |
157 | /** |
158 | * Figure out which resource to start with in case of restart, open the delegate and restore delegate's position in |
159 | * the resource. |
160 | */ |
161 | @Override |
162 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
163 | super.open(executionContext); |
164 | Assert.notNull(resources, "Resources must be set"); |
165 | |
166 | noInput = false; |
167 | if (resources.length == 0) { |
168 | if (strict) { |
169 | throw new IllegalStateException( |
170 | "No resources to read. Set strict=false if this is not an error condition."); |
171 | } |
172 | else { |
173 | logger.warn("No resources to read. Set strict=true if this should be an error condition."); |
174 | noInput = true; |
175 | return; |
176 | } |
177 | } |
178 | |
179 | Arrays.sort(resources, comparator); |
180 | |
181 | if (executionContext.containsKey(getExecutionContextKey(RESOURCE_KEY))) { |
182 | currentResource = executionContext.getInt(getExecutionContextKey(RESOURCE_KEY)); |
183 | |
184 | // context could have been saved before reading anything |
185 | if (currentResource == -1) { |
186 | currentResource = 0; |
187 | } |
188 | |
189 | delegate.setResource(resources[currentResource]); |
190 | delegate.open(executionContext); |
191 | } |
192 | else { |
193 | currentResource = -1; |
194 | } |
195 | } |
196 | |
197 | /** |
198 | * Store the current resource index and position in the resource. |
199 | */ |
200 | @Override |
201 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
202 | super.update(executionContext); |
203 | if (saveState) { |
204 | executionContext.putInt(getExecutionContextKey(RESOURCE_KEY), currentResource); |
205 | delegate.update(executionContext); |
206 | } |
207 | } |
208 | |
209 | /** |
210 | * @param delegate reads items from single {@link Resource}. |
211 | */ |
212 | public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) { |
213 | this.delegate = delegate; |
214 | } |
215 | |
216 | /** |
217 | * Set the boolean indicating whether or not state should be saved in the provided {@link ExecutionContext} during |
218 | * the {@link ItemStream} call to update. |
219 | * |
220 | * @param saveState |
221 | */ |
222 | public void setSaveState(boolean saveState) { |
223 | this.saveState = saveState; |
224 | } |
225 | |
226 | /** |
227 | * @param comparator used to order the injected resources, by default compares {@link Resource#getFilename()} |
228 | * values. |
229 | */ |
230 | public void setComparator(Comparator<Resource> comparator) { |
231 | this.comparator = comparator; |
232 | } |
233 | |
234 | /** |
235 | * @param resources input resources |
236 | */ |
237 | public void setResources(Resource[] resources) { |
238 | Assert.notNull(resources, "The resources must not be null"); |
239 | this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]); |
240 | } |
241 | |
242 | public Resource getCurrentResource() { |
243 | if (currentResource >= resources.length || currentResource < 0) { |
244 | return null; |
245 | } |
246 | return resources[currentResource]; |
247 | } |
248 | |
249 | } |