| 1 | package org.springframework.batch.item.file; | 
| 2 |  | 
| 3 | import java.util.ArrayList; | 
| 4 | import java.util.Arrays; | 
| 5 | import java.util.Comparator; | 
| 6 | import java.util.List; | 
| 7 | import java.util.ListIterator; | 
| 8 |  | 
| 9 | import org.apache.commons.logging.Log; | 
| 10 | import org.apache.commons.logging.LogFactory; | 
| 11 | import org.springframework.batch.item.ExecutionContext; | 
| 12 | import org.springframework.batch.item.ItemReader; | 
| 13 | import org.springframework.batch.item.ItemStream; | 
| 14 | import org.springframework.batch.item.ItemStreamException; | 
| 15 | import org.springframework.batch.item.MarkFailedException; | 
| 16 | import org.springframework.batch.item.NoWorkFoundException; | 
| 17 | import org.springframework.batch.item.ParseException; | 
| 18 | import org.springframework.batch.item.ResetFailedException; | 
| 19 | import org.springframework.batch.item.UnexpectedInputException; | 
| 20 | import org.springframework.batch.item.util.ExecutionContextUserSupport; | 
| 21 | import org.springframework.core.io.Resource; | 
| 22 | import org.springframework.util.Assert; | 
| 23 | import org.springframework.util.ClassUtils; | 
| 24 |  | 
| 25 | /** | 
| 26 | * Reads items from multiple resources sequentially - resource list is given by | 
| 27 | * {@link #setResources(Resource[])}, the actual reading is delegated to | 
| 28 | * {@link #setDelegate(ResourceAwareItemReaderItemStream)}. | 
| 29 | * | 
| 30 | * Input resources are ordered using {@link #setComparator(Comparator)} to make | 
| 31 | * sure resource ordering is preserved between job runs in restart scenario. | 
| 32 | * | 
| 33 | * Reset (rollback) capability is implemented by item buffering. | 
| 34 | * | 
| 35 | * | 
| 36 | * @author Robert Kasanicky | 
| 37 | */ | 
| 38 | public class MultiResourceItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream { | 
| 39 |  | 
| 40 | private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class); | 
| 41 |  | 
| 42 | /** | 
| 43 | * Unique object instance that marks resource boundaries in the item buffer | 
| 44 | */ | 
| 45 | private static final Object END_OF_RESOURCE_MARKER = new Object(); | 
| 46 |  | 
| 47 | private ResourceAwareItemReaderItemStream delegate; | 
| 48 |  | 
| 49 | private Resource[] resources; | 
| 50 |  | 
| 51 | private MultiResourceIndex index = new MultiResourceIndex(); | 
| 52 |  | 
| 53 | private List itemBuffer = new ArrayList(); | 
| 54 |  | 
| 55 | private ListIterator itemBufferIterator = null; | 
| 56 |  | 
| 57 | private boolean shouldReadBuffer = false; | 
| 58 |  | 
| 59 | private boolean saveState = false; | 
| 60 |  | 
| 61 | private Comparator comparator = new Comparator() { | 
| 62 |  | 
| 63 | /** | 
| 64 | * Compares resource filenames. | 
| 65 | */ | 
| 66 | public int compare(Object o1, Object o2) { | 
| 67 | Resource r1 = (Resource) o1; | 
| 68 | Resource r2 = (Resource) o2; | 
| 69 | return r1.getFilename().compareTo(r2.getFilename()); | 
| 70 | } | 
| 71 |  | 
| 72 | }; | 
| 73 |  | 
| 74 | private boolean noInput; | 
| 75 |  | 
| 76 | public MultiResourceItemReader() { | 
| 77 | setName(ClassUtils.getShortName(MultiResourceItemReader.class)); | 
| 78 | } | 
| 79 |  | 
| 80 | /** | 
| 81 | * Reads the next item, jumping to next resource if necessary. | 
| 82 | */ | 
| 83 | public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException { | 
| 84 |  | 
| 85 | if (noInput) { | 
| 86 | return null; | 
| 87 | } | 
| 88 |  | 
| 89 | Object item; | 
| 90 | if (shouldReadBuffer) { | 
| 91 | item = readBufferedItem(); | 
| 92 | } | 
| 93 | else { | 
| 94 | item = readNextItem(); | 
| 95 | } | 
| 96 |  | 
| 97 | index.incrementItemCount(); | 
| 98 |  | 
| 99 | return item; | 
| 100 | } | 
| 101 |  | 
| 102 | /** | 
| 103 | * Use the delegate to read the next item, jump to next resource if current | 
| 104 | * one is exhausted. Items are appended to the buffer. | 
| 105 | * @return next item from input | 
| 106 | */ | 
| 107 | private Object readNextItem() throws Exception { | 
| 108 |  | 
| 109 | Object item = delegate.read(); | 
| 110 |  | 
| 111 | while (item == null) { | 
| 112 |  | 
| 113 | index.incrementResourceCount(); | 
| 114 |  | 
| 115 | if (index.currentResource >= resources.length) { | 
| 116 | return null; | 
| 117 | } | 
| 118 | itemBuffer.add(END_OF_RESOURCE_MARKER); | 
| 119 |  | 
| 120 | delegate.close(new ExecutionContext()); | 
| 121 | delegate.setResource(resources[index.currentResource]); | 
| 122 | delegate.open(new ExecutionContext()); | 
| 123 |  | 
| 124 | item = delegate.read(); | 
| 125 | } | 
| 126 |  | 
| 127 | itemBuffer.add(item); | 
| 128 |  | 
| 129 | return item; | 
| 130 | } | 
| 131 |  | 
| 132 | /** | 
| 133 | * Read next item from buffer while keeping track of the position within the | 
| 134 | * input for possible restart. | 
| 135 | * @return next item from buffer | 
| 136 | */ | 
| 137 | private Object readBufferedItem() { | 
| 138 |  | 
| 139 | Object buffered = itemBufferIterator.next(); | 
| 140 | while (buffered == END_OF_RESOURCE_MARKER) { | 
| 141 | index.incrementResourceCount(); | 
| 142 | buffered = itemBufferIterator.next(); | 
| 143 | } | 
| 144 |  | 
| 145 | if (!itemBufferIterator.hasNext()) { | 
| 146 | // buffer is exhausted, continue reading from file | 
| 147 | shouldReadBuffer = false; | 
| 148 | itemBufferIterator = null; | 
| 149 | } | 
| 150 | return buffered; | 
| 151 | } | 
| 152 |  | 
| 153 | /** | 
| 154 | * Remove the longer needed items from buffer, mark the index position and | 
| 155 | * call mark() on delegate so that it clears its buffers. | 
| 156 | */ | 
| 157 | public void mark() throws MarkFailedException { | 
| 158 | emptyBuffer(); | 
| 159 |  | 
| 160 | index.mark(); | 
| 161 |  | 
| 162 | delegate.mark(); | 
| 163 | } | 
| 164 |  | 
| 165 | /** | 
| 166 | * Discard the buffered items that have already been read. | 
| 167 | */ | 
| 168 | private void emptyBuffer() { | 
| 169 | if (!shouldReadBuffer) { | 
| 170 | itemBuffer.clear(); | 
| 171 | itemBufferIterator = null; | 
| 172 | } | 
| 173 | else { | 
| 174 | itemBuffer = itemBuffer.subList(itemBufferIterator.nextIndex(), itemBuffer.size()); | 
| 175 | itemBufferIterator = itemBuffer.listIterator(); | 
| 176 | } | 
| 177 | } | 
| 178 |  | 
| 179 | /** | 
| 180 | * Switches to 'read from buffer' state. | 
| 181 | * | 
| 182 | * @see ItemReader#reset() | 
| 183 | */ | 
| 184 | public void reset() throws ResetFailedException { | 
| 185 | if (!itemBuffer.isEmpty()) { | 
| 186 | shouldReadBuffer = true; | 
| 187 | itemBufferIterator = itemBuffer.listIterator(); | 
| 188 | } | 
| 189 | index.reset(); | 
| 190 | } | 
| 191 |  | 
| 192 | /** | 
| 193 | * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader | 
| 194 | * and reset instance variable values. | 
| 195 | */ | 
| 196 | public void close(ExecutionContext executionContext) throws ItemStreamException { | 
| 197 | noInput = false; | 
| 198 | shouldReadBuffer = false; | 
| 199 | itemBufferIterator = null; | 
| 200 | index = new MultiResourceIndex(); | 
| 201 | itemBuffer.clear(); | 
| 202 | delegate.close(new ExecutionContext()); | 
| 203 | } | 
| 204 |  | 
| 205 | /** | 
| 206 | * Figure out which resource to start with in case of restart, open the | 
| 207 | * delegate and restore delegate's position in the resource. | 
| 208 | */ | 
| 209 | public void open(ExecutionContext executionContext) throws ItemStreamException { | 
| 210 |  | 
| 211 | Assert.notNull(resources, "There must be at least one input resource"); | 
| 212 | Assert.notNull(delegate, "Delegate must not be null"); | 
| 213 |  | 
| 214 | noInput = false; | 
| 215 | if (resources.length == 0) { | 
| 216 | logger.warn("No resources to read"); | 
| 217 | noInput = true; | 
| 218 | return; | 
| 219 | } | 
| 220 |  | 
| 221 | Arrays.sort(resources, comparator); | 
| 222 |  | 
| 223 | index.open(executionContext); | 
| 224 |  | 
| 225 | delegate.setResource(resources[index.currentResource]); | 
| 226 |  | 
| 227 | delegate.open(new ExecutionContext()); | 
| 228 |  | 
| 229 | try { | 
| 230 | for (int i = 0; i < index.currentItem; i++) { | 
| 231 | delegate.read(); | 
| 232 | delegate.mark(); | 
| 233 | } | 
| 234 | } | 
| 235 | catch (Exception e) { | 
| 236 | throw new ItemStreamException("Could not restore position on restart", e); | 
| 237 | } | 
| 238 | } | 
| 239 |  | 
| 240 | /** | 
| 241 | * Store the current resource index and position in the resource. | 
| 242 | */ | 
| 243 | public void update(ExecutionContext executionContext) throws ItemStreamException { | 
| 244 | if (saveState) { | 
| 245 | index.update(executionContext); | 
| 246 | } | 
| 247 | } | 
| 248 |  | 
| 249 | /** | 
| 250 | * @param delegate reads items from single {@link Resource}. | 
| 251 | */ | 
| 252 | public void setDelegate(ResourceAwareItemReaderItemStream delegate) { | 
| 253 | this.delegate = delegate; | 
| 254 | } | 
| 255 |  | 
| 256 | /** | 
| 257 | * Set the boolean indicating whether or not state should be saved in the | 
| 258 | * provided {@link ExecutionContext} during the {@link ItemStream} call to | 
| 259 | * update. | 
| 260 | * | 
| 261 | * @param saveState | 
| 262 | */ | 
| 263 | public void setSaveState(boolean saveState) { | 
| 264 | this.saveState = saveState; | 
| 265 | } | 
| 266 |  | 
| 267 | /** | 
| 268 | * @param comparator used to order the injected resources, by default | 
| 269 | * compares {@link Resource#getFilename()} values. | 
| 270 | */ | 
| 271 | public void setComparator(Comparator comparator) { | 
| 272 | this.comparator = comparator; | 
| 273 | } | 
| 274 |  | 
| 275 | /** | 
| 276 | * @param resources input resources | 
| 277 | */ | 
| 278 | public void setResources(Resource[] resources) { | 
| 279 | this.resources = resources; | 
| 280 | } | 
| 281 |  | 
| 282 | /** | 
| 283 | * Facilitates keeping track of the position within multi-resource input. | 
| 284 | */ | 
| 285 | private class MultiResourceIndex { | 
| 286 |  | 
| 287 | private static final String RESOURCE_KEY = "resourceIndex"; | 
| 288 |  | 
| 289 | private static final String ITEM_KEY = "itemIndex"; | 
| 290 |  | 
| 291 | private int currentResource = 0; | 
| 292 |  | 
| 293 | private int markedResource = 0; | 
| 294 |  | 
| 295 | private long currentItem = 0; | 
| 296 |  | 
| 297 | private long markedItem = 0; | 
| 298 |  | 
| 299 | public void incrementItemCount() { | 
| 300 | currentItem++; | 
| 301 | } | 
| 302 |  | 
| 303 | public void incrementResourceCount() { | 
| 304 | currentResource++; | 
| 305 | currentItem = 0; | 
| 306 | } | 
| 307 |  | 
| 308 | public void mark() { | 
| 309 | markedResource = currentResource; | 
| 310 | markedItem = currentItem; | 
| 311 | } | 
| 312 |  | 
| 313 | public void reset() { | 
| 314 | currentResource = markedResource; | 
| 315 | currentItem = markedItem; | 
| 316 | } | 
| 317 |  | 
| 318 | public void open(ExecutionContext ctx) { | 
| 319 | if (ctx.containsKey(getKey(RESOURCE_KEY))) { | 
| 320 | currentResource = Long.valueOf(ctx.getLong(getKey(RESOURCE_KEY))).intValue(); | 
| 321 | } | 
| 322 |  | 
| 323 | if (ctx.containsKey(getKey(ITEM_KEY))) { | 
| 324 | currentItem = ctx.getLong(getKey(ITEM_KEY)); | 
| 325 | } | 
| 326 | } | 
| 327 |  | 
| 328 | public void update(ExecutionContext ctx) { | 
| 329 | ctx.putLong(getKey(RESOURCE_KEY), index.currentResource); | 
| 330 | ctx.putLong(getKey(ITEM_KEY), index.currentItem); | 
| 331 | } | 
| 332 | } | 
| 333 |  | 
| 334 | } |