View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.file;
18  
19  import java.util.Arrays;
20  import java.util.Comparator;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.springframework.batch.item.*;
25  import org.springframework.batch.item.util.ExecutionContextUserSupport;
26  import org.springframework.core.io.Resource;
27  import org.springframework.util.Assert;
28  import org.springframework.util.ClassUtils;
29  
30  /**
31   * Reads items from multiple resources sequentially - resource list is given by {@link #setResources(Resource[])}, the
32   * actual reading is delegated to {@link #setDelegate(ResourceAwareItemReaderItemStream)}.
33   * 
34   * Input resources are ordered using {@link #setComparator(Comparator)} to make sure resource ordering is preserved
35   * between job runs in restart scenario.
36   * 
37   * 
38   * @author Robert Kasanicky
39   * @author Lucas Ward
40   */
41  public class MultiResourceItemReader<T> implements ItemReader<T>, ItemStream {
42  
43  	private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class);
44  
45  	private static final String RESOURCE_KEY = "resourceIndex";
46  
47  	private final ExecutionContextUserSupport executionContextUserSupport = new ExecutionContextUserSupport();
48  
49  	private ResourceAwareItemReaderItemStream<? extends T> delegate;
50  
51  	private Resource[] resources;
52  
53  	private boolean saveState = true;
54  
55  	private int currentResource = -1;
56  
57  	// signals there are no resources to read -> just return null on first read
58  	private boolean noInput;
59  
60  	private boolean strict = false;
61  
62  	/**
63  	 * In strict mode the reader will throw an exception on
64  	 * {@link #open(org.springframework.batch.item.ExecutionContext)} if there are no resources to read.
65  	 * @param strict false by default
66  	 */
67  	public void setStrict(boolean strict) {
68  		this.strict = strict;
69  	}
70  
71  	private Comparator<Resource> comparator = new Comparator<Resource>() {
72  
73  		/**
74  		 * Compares resource filenames.
75  		 */
76          @Override
77  		public int compare(Resource r1, Resource r2) {
78  			return r1.getFilename().compareTo(r2.getFilename());
79  		}
80  
81  	};
82  
83  	public MultiResourceItemReader() {
84  		executionContextUserSupport.setName(ClassUtils.getShortName(MultiResourceItemReader.class));
85  	}
86  
87  	/**
88  	 * Reads the next item, jumping to next resource if necessary.
89  	 */
90      @Override
91  	public T read() throws Exception, UnexpectedInputException, ParseException {
92  
93  		if (noInput) {
94  			return null;
95  		}
96  
97  		// If there is no resource, then this is the first item, set the current
98  		// resource to 0 and open the first delegate.
99  		if (currentResource == -1) {
100 			currentResource = 0;
101 			delegate.setResource(resources[currentResource]);
102 			delegate.open(new ExecutionContext());
103 		}
104 
105 		return readNextItem();
106 	}
107 
108 	/**
109 	 * Use the delegate to read the next item, jump to next resource if current one is exhausted. Items are appended to
110 	 * the buffer.
111 	 * 
112 	 * @return next item from input
113 	 */
114 	private T readNextItem() throws Exception {
115 
116 		T item = readFromDelegate();
117 
118 		while (item == null) {
119 
120 			currentResource++;
121 
122 			if (currentResource >= resources.length) {
123 				return null;
124 			}
125 
126 			delegate.close();
127 			delegate.setResource(resources[currentResource]);
128 			delegate.open(new ExecutionContext());
129 
130             item = readFromDelegate();
131         }
132 
133 		return item;
134 	}
135 
136     private T readFromDelegate() throws Exception {
137         T item = delegate.read();
138         if(item instanceof ResourceAware){
139             ((ResourceAware) item).setResource(getCurrentResource());
140         }
141         return item;
142     }
143 
144     /**
145 	 * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader and reset instance variable values.
146 	 */
147     @Override
148 	public void close() throws ItemStreamException {
149 		delegate.close();
150 		noInput = false;
151 	}
152 
153 	/**
154 	 * Figure out which resource to start with in case of restart, open the delegate and restore delegate's position in
155 	 * the resource.
156 	 */
157     @Override
158 	public void open(ExecutionContext executionContext) throws ItemStreamException {
159 
160 		Assert.notNull(resources, "Resources must be set");
161 
162 		noInput = false;
163 		if (resources.length == 0) {
164 			if (strict) {
165 				throw new IllegalStateException(
166 						"No resources to read. Set strict=false if this is not an error condition.");
167 			}
168 			else {
169 				logger.warn("No resources to read. Set strict=true if this should be an error condition.");
170 				noInput = true;
171 				return;
172 			}
173 		}
174 
175 		Arrays.sort(resources, comparator);
176 
177 		if (executionContext.containsKey(executionContextUserSupport.getKey(RESOURCE_KEY))) {
178 			currentResource = executionContext.getInt(executionContextUserSupport.getKey(RESOURCE_KEY));
179 
180 			// context could have been saved before reading anything
181 			if (currentResource == -1) {
182 				currentResource = 0;
183 			}
184 
185 			delegate.setResource(resources[currentResource]);
186 			delegate.open(executionContext);
187 		}
188 		else {
189 			currentResource = -1;
190 		}
191 	}
192 
193 	/**
194 	 * Store the current resource index and position in the resource.
195 	 */
196     @Override
197 	public void update(ExecutionContext executionContext) throws ItemStreamException {
198 		if (saveState) {
199 			executionContext.putInt(executionContextUserSupport.getKey(RESOURCE_KEY), currentResource);
200 			delegate.update(executionContext);
201 		}
202 	}
203 
204 	/**
205 	 * @param delegate reads items from single {@link Resource}.
206 	 */
207 	public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) {
208 		this.delegate = delegate;
209 	}
210 
211 	/**
212 	 * Set the boolean indicating whether or not state should be saved in the provided {@link ExecutionContext} during
213 	 * the {@link ItemStream} call to update.
214 	 * 
215 	 * @param saveState
216 	 */
217 	public void setSaveState(boolean saveState) {
218 		this.saveState = saveState;
219 	}
220 
221 	/**
222 	 * @param comparator used to order the injected resources, by default compares {@link Resource#getFilename()}
223 	 * values.
224 	 */
225 	public void setComparator(Comparator<Resource> comparator) {
226 		this.comparator = comparator;
227 	}
228 
229 	/**
230 	 * @param resources input resources
231 	 */
232 	public void setResources(Resource[] resources) {
233 		Assert.notNull(resources, "The resources must not be null");
234 		this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]);
235 	}
236 
237 	public Resource getCurrentResource() {
238 		if (currentResource >= resources.length || currentResource < 0) {
239 			return null;
240 		}
241 		return resources[currentResource];
242 	}
243 
244 }