1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.List; |
22 | import java.util.concurrent.atomic.AtomicInteger; |
23 | |
24 | import org.apache.commons.logging.Log; |
25 | import org.apache.commons.logging.LogFactory; |
26 | import org.springframework.batch.classify.BinaryExceptionClassifier; |
27 | import org.springframework.batch.classify.Classifier; |
28 | import org.springframework.batch.core.StepContribution; |
29 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
30 | import org.springframework.batch.core.step.skip.NonSkippableProcessException; |
31 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; |
32 | import org.springframework.batch.core.step.skip.SkipPolicy; |
33 | import org.springframework.batch.item.ItemProcessor; |
34 | import org.springframework.batch.item.ItemWriter; |
35 | import org.springframework.batch.retry.ExhaustedRetryException; |
36 | import org.springframework.batch.retry.RecoveryCallback; |
37 | import org.springframework.batch.retry.RetryCallback; |
38 | import org.springframework.batch.retry.RetryContext; |
39 | import org.springframework.batch.retry.RetryException; |
40 | import org.springframework.batch.retry.support.DefaultRetryState; |
41 | |
42 | /** |
43 | * FaultTolerant implementation of the {@link ChunkProcessor} interface, that |
44 | * allows for skipping or retry of items that cause exceptions during writing. |
45 | * |
46 | */ |
47 | public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> { |
48 | |
49 | private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy(0); |
50 | |
51 | private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy(0); |
52 | |
53 | private final BatchRetryTemplate batchRetryTemplate; |
54 | |
55 | private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); |
56 | |
57 | private Log logger = LogFactory.getLog(getClass()); |
58 | |
59 | private boolean buffering = true; |
60 | |
61 | private KeyGenerator keyGenerator; |
62 | |
63 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); |
64 | |
65 | /** |
66 | * The {@link KeyGenerator} to use to identify failed items across rollback. |
67 | * Not used in the case of the {@link #setBuffering(boolean) buffering flag} |
68 | * being true (the default). |
69 | * |
70 | * @param keyGenerator the {@link KeyGenerator} to set |
71 | */ |
72 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
73 | this.keyGenerator = keyGenerator; |
74 | } |
75 | |
76 | /** |
77 | * @param SkipPolicy the {@link SkipPolicy} for item processing |
78 | */ |
79 | public void setProcessSkipPolicy(SkipPolicy SkipPolicy) { |
80 | this.itemProcessSkipPolicy = SkipPolicy; |
81 | } |
82 | |
83 | /** |
84 | * @param SkipPolicy the {@link SkipPolicy} for item writing |
85 | */ |
86 | public void setWriteSkipPolicy(SkipPolicy SkipPolicy) { |
87 | this.itemWriteSkipPolicy = SkipPolicy; |
88 | } |
89 | |
90 | /** |
91 | * A classifier that can distinguish between exceptions that cause rollback |
92 | * (return true) or not (return false). |
93 | * |
94 | * @param rollbackClassifier |
95 | */ |
96 | public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { |
97 | this.rollbackClassifier = rollbackClassifier; |
98 | } |
99 | |
100 | /** |
101 | * @param chunkMonitor |
102 | */ |
103 | public void setChunkMonitor(ChunkMonitor chunkMonitor) { |
104 | this.chunkMonitor = chunkMonitor; |
105 | } |
106 | |
107 | /** |
108 | * A flag to indicate that items have been buffered and therefore will |
109 | * always come back as a chunk after a rollback. Otherwise things are more |
110 | * complicated because after a rollback the new chunk might or moght not |
111 | * contain items from the previous failed chunk. |
112 | * |
113 | * @param buffering |
114 | */ |
115 | public void setBuffering(boolean buffering) { |
116 | this.buffering = buffering; |
117 | } |
118 | |
119 | public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, |
120 | ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) { |
121 | super(itemProcessor, itemWriter); |
122 | this.batchRetryTemplate = batchRetryTemplate; |
123 | } |
124 | |
125 | @Override |
126 | protected void initializeUserData(Chunk<I> inputs) { |
127 | @SuppressWarnings("unchecked") |
128 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
129 | if (data == null) { |
130 | data = new UserData<O>(inputs.size()); |
131 | inputs.setUserData(data); |
132 | data.setOutputs(new Chunk<O>()); |
133 | } |
134 | } |
135 | |
136 | @Override |
137 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { |
138 | @SuppressWarnings("unchecked") |
139 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
140 | return data.size() - outputs.size() - inputs.getSkips().size(); |
141 | } |
142 | |
143 | @Override |
144 | protected boolean isComplete(Chunk<I> inputs) { |
145 | |
146 | /* |
147 | * Need to remember the write skips across transactions, otherwise they |
148 | * keep coming back. Since we register skips with the inputs they will |
149 | * not be processed again but the output skips need to be saved for |
150 | * registration later with the listeners. The inputs are going to be the |
151 | * same for all transactions processing the same chunk, but the outputs |
152 | * are not, so we stash them in user data on the inputs. |
153 | */ |
154 | |
155 | @SuppressWarnings("unchecked") |
156 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
157 | Chunk<O> previous = data.getOutputs(); |
158 | |
159 | return inputs.isEmpty() && previous.getSkips().isEmpty(); |
160 | |
161 | } |
162 | |
163 | @Override |
164 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { |
165 | |
166 | @SuppressWarnings("unchecked") |
167 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
168 | Chunk<O> previous = data.getOutputs(); |
169 | |
170 | Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips()); |
171 | next.setBusy(previous.isBusy()); |
172 | |
173 | // Remember for next time if there are skips accumulating |
174 | data.setOutputs(next); |
175 | |
176 | return next; |
177 | |
178 | } |
179 | |
180 | @Override |
181 | protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception { |
182 | |
183 | Chunk<O> outputs = new Chunk<O>(); |
184 | @SuppressWarnings("unchecked") |
185 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
186 | Chunk<O> cache = data.getOutputs(); |
187 | final Chunk<O>.ChunkIterator cacheIterator = cache.isEmpty() ? null : cache.iterator(); |
188 | final AtomicInteger count = new AtomicInteger(0); |
189 | |
190 | for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { |
191 | |
192 | final I item = iterator.next(); |
193 | |
194 | RetryCallback<O> retryCallback = new RetryCallback<O>() { |
195 | |
196 | public O doWithRetry(RetryContext context) throws Exception { |
197 | O output = null; |
198 | try { |
199 | count.incrementAndGet(); |
200 | O cached = (cacheIterator != null) ? cacheIterator.next() : null; |
201 | if (cached != null && count.get() > 1) { |
202 | /* |
203 | * If there is a cached chunk then we must be |
204 | * scanning for errors in the writer, in which case |
205 | * only the first one will be written, and for the |
206 | * rest we need to fill in the output from the |
207 | * cache. |
208 | */ |
209 | output = cached; |
210 | } |
211 | else { |
212 | output = doProcess(item); |
213 | } |
214 | } |
215 | catch (Exception e) { |
216 | if (rollbackClassifier.classify(e)) { |
217 | // Default is to rollback unless the classifier |
218 | // allows us to continue |
219 | throw e; |
220 | } |
221 | else if (itemProcessSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
222 | // If we are not re-throwing then we should check if |
223 | // this is skippable |
224 | contribution.incrementProcessSkipCount(); |
225 | logger.debug("Skipping after failed process with no rollback", e); |
226 | } |
227 | else { |
228 | // If it's not skippable that's an error in |
229 | // configuration - it doesn't make sense to not roll |
230 | // back if we are also not allowed to skip |
231 | throw new NonSkippableProcessException( |
232 | "Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.", |
233 | e); |
234 | } |
235 | } |
236 | if (output == null) { |
237 | // No need to re-process filtered items |
238 | iterator.remove(); |
239 | } |
240 | return output; |
241 | } |
242 | |
243 | }; |
244 | |
245 | RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { |
246 | |
247 | public O recover(RetryContext context) throws Exception { |
248 | Exception e = (Exception) context.getLastThrowable(); |
249 | if (itemProcessSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
250 | contribution.incrementProcessSkipCount(); |
251 | iterator.remove(e); |
252 | logger.debug("Skipping after failed process", e); |
253 | return null; |
254 | } |
255 | else { |
256 | throw new RetryException("Non-skippable exception in recoverer while processing", e); |
257 | } |
258 | } |
259 | |
260 | }; |
261 | |
262 | O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( |
263 | getInputKey(item), rollbackClassifier)); |
264 | if (output != null) { |
265 | outputs.add(output); |
266 | } |
267 | |
268 | } |
269 | |
270 | return outputs; |
271 | |
272 | } |
273 | |
274 | @Override |
275 | protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs) |
276 | throws Exception { |
277 | |
278 | RetryCallback<Object> retryCallback = new RetryCallback<Object>() { |
279 | public Object doWithRetry(RetryContext context) throws Exception { |
280 | |
281 | if (!inputs.isBusy()) { |
282 | chunkMonitor.setChunkSize(inputs.size()); |
283 | try { |
284 | doWrite(outputs.getItems()); |
285 | } |
286 | catch (Exception e) { |
287 | if (rollbackClassifier.classify(e)) { |
288 | throw e; |
289 | } |
290 | /* |
291 | * If the exception is marked as no-rollback, we need to |
292 | * override that, otherwise there's no way to write the |
293 | * rest of the chunk or to honour the skip listener |
294 | * contract. |
295 | */ |
296 | throw new ForceRollbackForWriteSkipException( |
297 | "Force rollback on skippable exception so that skipped item can be located.", e); |
298 | } |
299 | contribution.incrementWriteCount(outputs.size()); |
300 | } |
301 | else { |
302 | scan(contribution, inputs, outputs, chunkMonitor); |
303 | } |
304 | return null; |
305 | |
306 | } |
307 | }; |
308 | |
309 | if (!buffering) { |
310 | |
311 | RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() { |
312 | |
313 | public Object recover(RetryContext context) throws Exception { |
314 | |
315 | Exception e = (Exception) context.getLastThrowable(); |
316 | if (outputs.size() > 1 && !rollbackClassifier.classify(e)) { |
317 | throw new RetryException("Invalid retry state during write caused by " |
318 | + "exception that does not classify for rollback: ", e); |
319 | } |
320 | |
321 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); |
322 | for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) { |
323 | |
324 | inputIterator.next(); |
325 | outputIterator.next(); |
326 | |
327 | checkSkipPolicy(inputIterator, outputIterator, e, contribution); |
328 | if (!rollbackClassifier.classify(e)) { |
329 | throw new RetryException( |
330 | "Invalid retry state during recovery caused by exception that does not classify for rollback: ", |
331 | e); |
332 | } |
333 | |
334 | } |
335 | |
336 | return null; |
337 | |
338 | } |
339 | |
340 | }; |
341 | |
342 | batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, BatchRetryTemplate.createState( |
343 | getInputKeys(inputs), rollbackClassifier)); |
344 | |
345 | } |
346 | else { |
347 | |
348 | RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { |
349 | |
350 | public Object recover(RetryContext context) throws Exception { |
351 | |
352 | /* |
353 | * If the last exception was not skippable we don't need to |
354 | * do any scanning. We can just bomb out with a retry |
355 | * exhausted. |
356 | */ |
357 | if (!itemWriteSkipPolicy.shouldSkip(context.getLastThrowable(), -1)) { |
358 | throw new ExhaustedRetryException( |
359 | "Retry exhausted after last attempt in recovery path, but exception is not skippable.", |
360 | context.getLastThrowable()); |
361 | } |
362 | |
363 | inputs.setBusy(true); |
364 | scan(contribution, inputs, outputs, chunkMonitor); |
365 | return null; |
366 | } |
367 | |
368 | }; |
369 | |
370 | logger.debug("Attempting to write: " + inputs); |
371 | batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs, |
372 | rollbackClassifier)); |
373 | |
374 | } |
375 | |
376 | callSkipListeners(inputs, outputs); |
377 | |
378 | } |
379 | |
380 | private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) { |
381 | |
382 | for (SkipWrapper<I> wrapper : inputs.getSkips()) { |
383 | I item = wrapper.getItem(); |
384 | if (item == null) { |
385 | continue; |
386 | } |
387 | Exception e = wrapper.getException(); |
388 | try { |
389 | getListener().onSkipInProcess(item, e); |
390 | } |
391 | catch (RuntimeException ex) { |
392 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); |
393 | } |
394 | } |
395 | |
396 | for (SkipWrapper<O> wrapper : outputs.getSkips()) { |
397 | Exception e = wrapper.getException(); |
398 | try { |
399 | getListener().onSkipInWrite(wrapper.getItem(), e); |
400 | } |
401 | catch (RuntimeException ex) { |
402 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); |
403 | } |
404 | } |
405 | |
406 | // Clear skips if we are possibly going to process this chunk again |
407 | outputs.clearSkips(); |
408 | inputs.clearSkips(); |
409 | |
410 | } |
411 | |
412 | private Object getInputKey(I item) { |
413 | if (keyGenerator == null) { |
414 | return item; |
415 | } |
416 | return keyGenerator.getKey(item); |
417 | } |
418 | |
419 | private List<?> getInputKeys(final Chunk<I> inputs) { |
420 | if (keyGenerator == null) { |
421 | return inputs.getItems(); |
422 | } |
423 | List<Object> keys = new ArrayList<Object>(); |
424 | for (I item : inputs.getItems()) { |
425 | keys.add(keyGenerator.getKey(item)); |
426 | } |
427 | return keys; |
428 | } |
429 | |
430 | private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator, |
431 | Exception e, StepContribution contribution) { |
432 | logger.debug("Checking skip policy after failed write"); |
433 | if (itemWriteSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
434 | contribution.incrementWriteSkipCount(); |
435 | inputIterator.remove(); |
436 | outputIterator.remove(e); |
437 | logger.debug("Skipping after failed write", e); |
438 | } |
439 | else { |
440 | throw new RetryException("Non-skippable exception in recoverer", e); |
441 | } |
442 | } |
443 | |
444 | private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs, |
445 | ChunkMonitor chunkMonitor) throws Exception { |
446 | |
447 | logger.debug("Scanning for failed item on write: " + inputs); |
448 | if (outputs.isEmpty()) { |
449 | inputs.setBusy(false); |
450 | return; |
451 | } |
452 | |
453 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); |
454 | Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); |
455 | |
456 | List<O> items = Collections.singletonList(outputIterator.next()); |
457 | inputIterator.next(); |
458 | try { |
459 | writeItems(items); |
460 | // If successful we are going to return and allow |
461 | // the driver to commit... |
462 | doAfterWrite(items); |
463 | contribution.incrementWriteCount(1); |
464 | inputIterator.remove(); |
465 | outputIterator.remove(); |
466 | } |
467 | catch (Exception e) { |
468 | if (!itemWriteSkipPolicy.shouldSkip(e, -1) && !rollbackClassifier.classify(e)) { |
469 | inputIterator.remove(); |
470 | outputIterator.remove(); |
471 | } |
472 | else { |
473 | checkSkipPolicy(inputIterator, outputIterator, e, contribution); |
474 | } |
475 | if (rollbackClassifier.classify(e)) { |
476 | throw e; |
477 | } |
478 | } |
479 | chunkMonitor.incrementOffset(); |
480 | if (outputs.isEmpty()) { |
481 | inputs.setBusy(false); |
482 | chunkMonitor.resetOffset(); |
483 | } |
484 | } |
485 | |
486 | private static class UserData<O> { |
487 | |
488 | private final int size; |
489 | |
490 | private Chunk<O> outputs; |
491 | |
492 | public UserData(int size) { |
493 | this.size = size; |
494 | } |
495 | |
496 | public int size() { |
497 | return size; |
498 | } |
499 | |
500 | public Chunk<O> getOutputs() { |
501 | return outputs; |
502 | } |
503 | |
504 | public void setOutputs(Chunk<O> outputs) { |
505 | this.outputs = outputs; |
506 | } |
507 | |
508 | } |
509 | |
510 | } |