1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.file; |
18 | |
19 | import java.util.Arrays; |
20 | import java.util.Comparator; |
21 | |
22 | import org.apache.commons.logging.Log; |
23 | import org.apache.commons.logging.LogFactory; |
24 | import org.springframework.batch.item.ExecutionContext; |
25 | import org.springframework.batch.item.ItemReader; |
26 | import org.springframework.batch.item.ItemStream; |
27 | import org.springframework.batch.item.ItemStreamException; |
28 | import org.springframework.batch.item.ParseException; |
29 | import org.springframework.batch.item.UnexpectedInputException; |
30 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
31 | import org.springframework.core.io.Resource; |
32 | import org.springframework.util.Assert; |
33 | import org.springframework.util.ClassUtils; |
34 | |
35 | /** |
36 | * Reads items from multiple resources sequentially - resource list is given by {@link #setResources(Resource[])}, the |
37 | * actual reading is delegated to {@link #setDelegate(ResourceAwareItemReaderItemStream)}. |
38 | * |
39 | * Input resources are ordered using {@link #setComparator(Comparator)} to make sure resource ordering is preserved |
40 | * between job runs in restart scenario. |
41 | * |
42 | * |
43 | * @author Robert Kasanicky |
44 | * @author Lucas Ward |
45 | */ |
46 | public class MultiResourceItemReader<T> implements ItemReader<T>, ItemStream { |
47 | |
48 | private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class); |
49 | |
50 | private static final String RESOURCE_KEY = "resourceIndex"; |
51 | |
52 | private final ExecutionContextUserSupport executionContextUserSupport = new ExecutionContextUserSupport(); |
53 | |
54 | private ResourceAwareItemReaderItemStream<? extends T> delegate; |
55 | |
56 | private Resource[] resources; |
57 | |
58 | private boolean saveState = true; |
59 | |
60 | private int currentResource = -1; |
61 | |
62 | // signals there are no resources to read -> just return null on first read |
63 | private boolean noInput; |
64 | |
65 | private boolean strict = false; |
66 | |
67 | /** |
68 | * In strict mode the reader will throw an exception on |
69 | * {@link #open(org.springframework.batch.item.ExecutionContext)} if there are no resources to read. |
70 | * @param strict false by default |
71 | */ |
72 | public void setStrict(boolean strict) { |
73 | this.strict = strict; |
74 | } |
75 | |
76 | private Comparator<Resource> comparator = new Comparator<Resource>() { |
77 | |
78 | /** |
79 | * Compares resource filenames. |
80 | */ |
81 | public int compare(Resource r1, Resource r2) { |
82 | return r1.getFilename().compareTo(r2.getFilename()); |
83 | } |
84 | |
85 | }; |
86 | |
87 | public MultiResourceItemReader() { |
88 | executionContextUserSupport.setName(ClassUtils.getShortName(MultiResourceItemReader.class)); |
89 | } |
90 | |
91 | /** |
92 | * Reads the next item, jumping to next resource if necessary. |
93 | */ |
94 | public T read() throws Exception, UnexpectedInputException, ParseException { |
95 | |
96 | if (noInput) { |
97 | return null; |
98 | } |
99 | |
100 | // If there is no resource, then this is the first item, set the current |
101 | // resource to 0 and open the first delegate. |
102 | if (currentResource == -1) { |
103 | currentResource = 0; |
104 | delegate.setResource(resources[currentResource]); |
105 | delegate.open(new ExecutionContext()); |
106 | } |
107 | |
108 | return readNextItem(); |
109 | } |
110 | |
111 | /** |
112 | * Use the delegate to read the next item, jump to next resource if current one is exhausted. Items are appended to |
113 | * the buffer. |
114 | * |
115 | * @return next item from input |
116 | */ |
117 | private T readNextItem() throws Exception { |
118 | |
119 | T item = delegate.read(); |
120 | |
121 | while (item == null) { |
122 | |
123 | currentResource++; |
124 | |
125 | if (currentResource >= resources.length) { |
126 | return null; |
127 | } |
128 | |
129 | delegate.close(); |
130 | delegate.setResource(resources[currentResource]); |
131 | delegate.open(new ExecutionContext()); |
132 | |
133 | item = delegate.read(); |
134 | } |
135 | |
136 | return item; |
137 | } |
138 | |
139 | /** |
140 | * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader and reset instance variable values. |
141 | */ |
142 | public void close() throws ItemStreamException { |
143 | delegate.close(); |
144 | noInput = false; |
145 | } |
146 | |
147 | /** |
148 | * Figure out which resource to start with in case of restart, open the delegate and restore delegate's position in |
149 | * the resource. |
150 | */ |
151 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
152 | |
153 | Assert.notNull(resources, "Resources must be set"); |
154 | |
155 | noInput = false; |
156 | if (resources.length == 0) { |
157 | if (strict) { |
158 | throw new IllegalStateException( |
159 | "No resources to read. Set strict=false if this is not an error condition."); |
160 | } |
161 | else { |
162 | logger.warn("No resources to read. Set strict=true if this should be an error condition."); |
163 | noInput = true; |
164 | return; |
165 | } |
166 | } |
167 | |
168 | Arrays.sort(resources, comparator); |
169 | |
170 | if (executionContext.containsKey(executionContextUserSupport.getKey(RESOURCE_KEY))) { |
171 | currentResource = executionContext.getInt(executionContextUserSupport.getKey(RESOURCE_KEY)); |
172 | |
173 | // context could have been saved before reading anything |
174 | if (currentResource == -1) { |
175 | currentResource = 0; |
176 | } |
177 | |
178 | delegate.setResource(resources[currentResource]); |
179 | delegate.open(executionContext); |
180 | } |
181 | else { |
182 | currentResource = -1; |
183 | } |
184 | } |
185 | |
186 | /** |
187 | * Store the current resource index and position in the resource. |
188 | */ |
189 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
190 | if (saveState) { |
191 | executionContext.putInt(executionContextUserSupport.getKey(RESOURCE_KEY), currentResource); |
192 | delegate.update(executionContext); |
193 | } |
194 | } |
195 | |
196 | /** |
197 | * @param delegate reads items from single {@link Resource}. |
198 | */ |
199 | public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) { |
200 | this.delegate = delegate; |
201 | } |
202 | |
203 | /** |
204 | * Set the boolean indicating whether or not state should be saved in the provided {@link ExecutionContext} during |
205 | * the {@link ItemStream} call to update. |
206 | * |
207 | * @param saveState |
208 | */ |
209 | public void setSaveState(boolean saveState) { |
210 | this.saveState = saveState; |
211 | } |
212 | |
213 | /** |
214 | * @param comparator used to order the injected resources, by default compares {@link Resource#getFilename()} |
215 | * values. |
216 | */ |
217 | public void setComparator(Comparator<Resource> comparator) { |
218 | this.comparator = comparator; |
219 | } |
220 | |
221 | /** |
222 | * @param resources input resources |
223 | */ |
224 | public void setResources(Resource[] resources) { |
225 | Assert.notNull(resources, "The resources must not be null"); |
226 | this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]); |
227 | } |
228 | |
229 | public Resource getCurrentResource() { |
230 | if (currentResource >= resources.length || currentResource < 0) { |
231 | return null; |
232 | } |
233 | return resources[currentResource]; |
234 | } |
235 | |
236 | } |