1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.Iterator; |
22 | import java.util.List; |
23 | import java.util.concurrent.atomic.AtomicInteger; |
24 | import java.util.concurrent.atomic.AtomicReference; |
25 | |
26 | import org.apache.commons.logging.Log; |
27 | import org.apache.commons.logging.LogFactory; |
28 | import org.springframework.batch.core.StepContribution; |
29 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
30 | import org.springframework.batch.core.step.skip.NonSkippableProcessException; |
31 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
32 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; |
33 | import org.springframework.batch.core.step.skip.SkipPolicy; |
34 | import org.springframework.batch.item.ItemProcessor; |
35 | import org.springframework.batch.item.ItemWriter; |
36 | import org.springframework.classify.BinaryExceptionClassifier; |
37 | import org.springframework.classify.Classifier; |
38 | import org.springframework.retry.ExhaustedRetryException; |
39 | import org.springframework.retry.RecoveryCallback; |
40 | import org.springframework.retry.RetryCallback; |
41 | import org.springframework.retry.RetryContext; |
42 | import org.springframework.retry.RetryException; |
43 | import org.springframework.retry.support.DefaultRetryState; |
44 | |
45 | /** |
46 | * FaultTolerant implementation of the {@link ChunkProcessor} interface, that |
47 | * allows for skipping or retry of items that cause exceptions during writing. |
48 | * |
49 | */ |
50 | public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> { |
51 | |
52 | private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy(); |
53 | |
54 | private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy(); |
55 | |
56 | private final BatchRetryTemplate batchRetryTemplate; |
57 | |
58 | private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); |
59 | |
60 | private Log logger = LogFactory.getLog(getClass()); |
61 | |
62 | private boolean buffering = true; |
63 | |
64 | private KeyGenerator keyGenerator; |
65 | |
66 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); |
67 | |
68 | private boolean processorTransactional = true; |
69 | |
70 | /** |
71 | * The {@link KeyGenerator} to use to identify failed items across rollback. |
72 | * Not used in the case of the {@link #setBuffering(boolean) buffering flag} |
73 | * being true (the default). |
74 | * |
75 | * @param keyGenerator the {@link KeyGenerator} to set |
76 | */ |
77 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
78 | this.keyGenerator = keyGenerator; |
79 | } |
80 | |
81 | /** |
82 | * @param SkipPolicy the {@link SkipPolicy} for item processing |
83 | */ |
84 | public void setProcessSkipPolicy(SkipPolicy SkipPolicy) { |
85 | this.itemProcessSkipPolicy = SkipPolicy; |
86 | } |
87 | |
88 | /** |
89 | * @param SkipPolicy the {@link SkipPolicy} for item writing |
90 | */ |
91 | public void setWriteSkipPolicy(SkipPolicy SkipPolicy) { |
92 | this.itemWriteSkipPolicy = SkipPolicy; |
93 | } |
94 | |
95 | /** |
96 | * A classifier that can distinguish between exceptions that cause rollback |
97 | * (return true) or not (return false). |
98 | * |
99 | * @param rollbackClassifier |
100 | */ |
101 | public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { |
102 | this.rollbackClassifier = rollbackClassifier; |
103 | } |
104 | |
105 | /** |
106 | * @param chunkMonitor |
107 | */ |
108 | public void setChunkMonitor(ChunkMonitor chunkMonitor) { |
109 | this.chunkMonitor = chunkMonitor; |
110 | } |
111 | |
112 | /** |
113 | * A flag to indicate that items have been buffered and therefore will |
114 | * always come back as a chunk after a rollback. Otherwise things are more |
115 | * complicated because after a rollback the new chunk might or might not |
116 | * contain items from the previous failed chunk. |
117 | * |
118 | * @param buffering |
119 | */ |
120 | public void setBuffering(boolean buffering) { |
121 | this.buffering = buffering; |
122 | } |
123 | |
124 | /** |
125 | * Flag to say that the {@link ItemProcessor} is transactional (defaults to |
126 | * true). If false then the processor is only called once per item per |
127 | * chunk, even if there are rollbacks with retries and skips. |
128 | * |
129 | * @param processorTransactional the flag value to set |
130 | */ |
131 | public void setProcessorTransactional(boolean processorTransactional) { |
132 | this.processorTransactional = processorTransactional; |
133 | } |
134 | |
135 | public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, |
136 | ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) { |
137 | super(itemProcessor, itemWriter); |
138 | this.batchRetryTemplate = batchRetryTemplate; |
139 | } |
140 | |
141 | @Override |
142 | protected void initializeUserData(Chunk<I> inputs) { |
143 | @SuppressWarnings("unchecked") |
144 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
145 | if (data == null) { |
146 | data = new UserData<O>(); |
147 | inputs.setUserData(data); |
148 | data.setOutputs(new Chunk<O>()); |
149 | } |
150 | } |
151 | |
152 | @Override |
153 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { |
154 | @SuppressWarnings("unchecked") |
155 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
156 | return data.filterCount; |
157 | } |
158 | |
159 | @Override |
160 | protected boolean isComplete(Chunk<I> inputs) { |
161 | |
162 | /* |
163 | * Need to remember the write skips across transactions, otherwise they |
164 | * keep coming back. Since we register skips with the inputs they will |
165 | * not be processed again but the output skips need to be saved for |
166 | * registration later with the listeners. The inputs are going to be the |
167 | * same for all transactions processing the same chunk, but the outputs |
168 | * are not, so we stash them in user data on the inputs. |
169 | */ |
170 | |
171 | @SuppressWarnings("unchecked") |
172 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
173 | Chunk<O> previous = data.getOutputs(); |
174 | |
175 | return inputs.isEmpty() && previous.getSkips().isEmpty(); |
176 | |
177 | } |
178 | |
179 | @Override |
180 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { |
181 | |
182 | @SuppressWarnings("unchecked") |
183 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
184 | Chunk<O> previous = data.getOutputs(); |
185 | |
186 | Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips()); |
187 | next.setBusy(previous.isBusy()); |
188 | |
189 | // Remember for next time if there are skips accumulating |
190 | data.setOutputs(next); |
191 | |
192 | return next; |
193 | |
194 | } |
195 | |
196 | @Override |
197 | protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception { |
198 | |
199 | Chunk<O> outputs = new Chunk<O>(); |
200 | @SuppressWarnings("unchecked") |
201 | final UserData<O> data = (UserData<O>) inputs.getUserData(); |
202 | final Chunk<O> cache = data.getOutputs(); |
203 | final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator(); |
204 | final AtomicInteger count = new AtomicInteger(0); |
205 | |
206 | // final int scanLimit = processorTransactional && data.scanning() ? 1 : |
207 | // 0; |
208 | |
209 | for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { |
210 | |
211 | final I item = iterator.next(); |
212 | |
213 | RetryCallback<O> retryCallback = new RetryCallback<O>() { |
214 | |
215 | @Override |
216 | public O doWithRetry(RetryContext context) throws Exception { |
217 | O output = null; |
218 | try { |
219 | count.incrementAndGet(); |
220 | O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null; |
221 | if (cached != null && !processorTransactional) { |
222 | output = cached; |
223 | } |
224 | else { |
225 | output = doProcess(item); |
226 | if (output == null) { |
227 | data.incrementFilterCount(); |
228 | } else if (!processorTransactional && !data.scanning()) { |
229 | cache.add(output); |
230 | } |
231 | } |
232 | } |
233 | catch (Exception e) { |
234 | if (rollbackClassifier.classify(e)) { |
235 | // Default is to rollback unless the classifier |
236 | // allows us to continue |
237 | throw e; |
238 | } |
239 | else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { |
240 | // If we are not re-throwing then we should check if |
241 | // this is skippable |
242 | contribution.incrementProcessSkipCount(); |
243 | logger.debug("Skipping after failed process with no rollback", e); |
244 | // If not re-throwing then the listener will not be |
245 | // called in next chunk. |
246 | callProcessSkipListener(item, e); |
247 | } |
248 | else { |
249 | // If it's not skippable that's an error in |
250 | // configuration - it doesn't make sense to not roll |
251 | // back if we are also not allowed to skip |
252 | throw new NonSkippableProcessException( |
253 | "Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.", |
254 | e); |
255 | } |
256 | } |
257 | if (output == null) { |
258 | // No need to re-process filtered items |
259 | iterator.remove(); |
260 | } |
261 | return output; |
262 | } |
263 | |
264 | }; |
265 | |
266 | RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { |
267 | |
268 | @Override |
269 | public O recover(RetryContext context) throws Exception { |
270 | Throwable e = context.getLastThrowable(); |
271 | if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { |
272 | iterator.remove(e); |
273 | contribution.incrementProcessSkipCount(); |
274 | logger.debug("Skipping after failed process", e); |
275 | return null; |
276 | } |
277 | else { |
278 | if (rollbackClassifier.classify(e)) { |
279 | // Default is to rollback unless the classifier |
280 | // allows us to continue |
281 | throw new RetryException("Non-skippable exception in recoverer while processing", e); |
282 | } |
283 | iterator.remove(e); |
284 | return null; |
285 | } |
286 | } |
287 | |
288 | }; |
289 | |
290 | O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( |
291 | getInputKey(item), rollbackClassifier)); |
292 | if (output != null) { |
293 | outputs.add(output); |
294 | } |
295 | |
296 | /* |
297 | * We only want to process the first item if there is a scan for a |
298 | * failed item. |
299 | */ |
300 | if (data.scanning()) { |
301 | while (cacheIterator != null && cacheIterator.hasNext()) { |
302 | outputs.add(cacheIterator.next()); |
303 | } |
304 | // Only process the first item if scanning |
305 | break; |
306 | } |
307 | } |
308 | |
309 | return outputs; |
310 | |
311 | } |
312 | |
313 | @Override |
314 | protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs) |
315 | throws Exception { |
316 | @SuppressWarnings("unchecked") |
317 | final UserData<O> data = (UserData<O>) inputs.getUserData(); |
318 | final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>(); |
319 | |
320 | RetryCallback<Object> retryCallback = new RetryCallback<Object>() { |
321 | @Override |
322 | public Object doWithRetry(RetryContext context) throws Exception { |
323 | |
324 | contextHolder.set(context); |
325 | |
326 | if (!data.scanning()) { |
327 | chunkMonitor.setChunkSize(inputs.size()); |
328 | try { |
329 | doWrite(outputs.getItems()); |
330 | } |
331 | catch (Exception e) { |
332 | if (rollbackClassifier.classify(e)) { |
333 | throw e; |
334 | } |
335 | /* |
336 | * If the exception is marked as no-rollback, we need to |
337 | * override that, otherwise there's no way to write the |
338 | * rest of the chunk or to honour the skip listener |
339 | * contract. |
340 | */ |
341 | throw new ForceRollbackForWriteSkipException( |
342 | "Force rollback on skippable exception so that skipped item can be located.", e); |
343 | } |
344 | contribution.incrementWriteCount(outputs.size()); |
345 | } |
346 | else { |
347 | scan(contribution, inputs, outputs, chunkMonitor, false); |
348 | } |
349 | return null; |
350 | |
351 | } |
352 | }; |
353 | |
354 | if (!buffering) { |
355 | |
356 | RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() { |
357 | |
358 | @Override |
359 | public Object recover(RetryContext context) throws Exception { |
360 | |
361 | Throwable e = context.getLastThrowable(); |
362 | if (outputs.size() > 1 && !rollbackClassifier.classify(e)) { |
363 | throw new RetryException("Invalid retry state during write caused by " |
364 | + "exception that does not classify for rollback: ", e); |
365 | } |
366 | |
367 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); |
368 | for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) { |
369 | |
370 | inputIterator.next(); |
371 | outputIterator.next(); |
372 | |
373 | checkSkipPolicy(inputIterator, outputIterator, e, contribution, true); |
374 | if (!rollbackClassifier.classify(e)) { |
375 | throw new RetryException( |
376 | "Invalid retry state during recovery caused by exception that does not classify for rollback: ", |
377 | e); |
378 | } |
379 | |
380 | } |
381 | |
382 | return null; |
383 | |
384 | } |
385 | |
386 | }; |
387 | |
388 | batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, |
389 | BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier)); |
390 | |
391 | } |
392 | else { |
393 | |
394 | RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { |
395 | |
396 | @Override |
397 | public Object recover(RetryContext context) throws Exception { |
398 | |
399 | /* |
400 | * If the last exception was not skippable we don't need to |
401 | * do any scanning. We can just bomb out with a retry |
402 | * exhausted. |
403 | */ |
404 | if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) { |
405 | throw new ExhaustedRetryException( |
406 | "Retry exhausted after last attempt in recovery path, but exception is not skippable.", |
407 | context.getLastThrowable()); |
408 | } |
409 | |
410 | inputs.setBusy(true); |
411 | data.scanning(true); |
412 | scan(contribution, inputs, outputs, chunkMonitor, true); |
413 | return null; |
414 | } |
415 | |
416 | }; |
417 | |
418 | if (logger.isDebugEnabled()) { |
419 | logger.debug("Attempting to write: " + inputs); |
420 | } |
421 | try { |
422 | batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs, |
423 | rollbackClassifier)); |
424 | } |
425 | catch (Exception e) { |
426 | RetryContext context = contextHolder.get(); |
427 | if (!batchRetryTemplate.canRetry(context)) { |
428 | /* |
429 | * BATCH-1761: we need advance warning of the scan about to |
430 | * start in the next transaction, so we can change the |
431 | * processing behaviour. |
432 | */ |
433 | data.scanning(true); |
434 | } |
435 | throw e; |
436 | } |
437 | |
438 | } |
439 | |
440 | callSkipListeners(inputs, outputs); |
441 | |
442 | } |
443 | |
444 | private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) { |
445 | |
446 | for (SkipWrapper<I> wrapper : inputs.getSkips()) { |
447 | I item = wrapper.getItem(); |
448 | if (item == null) { |
449 | continue; |
450 | } |
451 | Throwable e = wrapper.getException(); |
452 | callProcessSkipListener(item, e); |
453 | } |
454 | |
455 | for (SkipWrapper<O> wrapper : outputs.getSkips()) { |
456 | Throwable e = wrapper.getException(); |
457 | try { |
458 | getListener().onSkipInWrite(wrapper.getItem(), e); |
459 | } |
460 | catch (RuntimeException ex) { |
461 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); |
462 | } |
463 | } |
464 | |
465 | // Clear skips if we are possibly going to process this chunk again |
466 | outputs.clearSkips(); |
467 | inputs.clearSkips(); |
468 | |
469 | } |
470 | |
471 | /** |
472 | * Convenience method for calling process skip listener, so that it can be |
473 | * called from multiple places. |
474 | * |
475 | * @param item the item that is skipped |
476 | * @param e the cause of the skip |
477 | */ |
478 | private void callProcessSkipListener(I item, Throwable e) { |
479 | try { |
480 | getListener().onSkipInProcess(item, e); |
481 | } |
482 | catch (RuntimeException ex) { |
483 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); |
484 | } |
485 | } |
486 | |
487 | /** |
488 | * Convenience method for calling process skip policy, so that it can be |
489 | * called from multiple places. |
490 | * |
491 | * @param policy the skip policy |
492 | * @param e the cause of the skip |
493 | * @param skipCount the current skip count |
494 | */ |
495 | private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) { |
496 | try { |
497 | return policy.shouldSkip(e, skipCount); |
498 | } |
499 | catch (SkipLimitExceededException ex) { |
500 | throw ex; |
501 | } |
502 | catch (RuntimeException ex) { |
503 | throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e); |
504 | } |
505 | } |
506 | |
507 | private Object getInputKey(I item) { |
508 | if (keyGenerator == null) { |
509 | return item; |
510 | } |
511 | return keyGenerator.getKey(item); |
512 | } |
513 | |
514 | private List<?> getInputKeys(final Chunk<I> inputs) { |
515 | if (keyGenerator == null) { |
516 | return inputs.getItems(); |
517 | } |
518 | List<Object> keys = new ArrayList<Object>(); |
519 | for (I item : inputs.getItems()) { |
520 | keys.add(keyGenerator.getKey(item)); |
521 | } |
522 | return keys; |
523 | } |
524 | |
525 | private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator, |
526 | Throwable e, StepContribution contribution, boolean recovery) throws Exception { |
527 | logger.debug("Checking skip policy after failed write"); |
528 | if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) { |
529 | contribution.incrementWriteSkipCount(); |
530 | inputIterator.remove(); |
531 | outputIterator.remove(e); |
532 | logger.debug("Skipping after failed write", e); |
533 | } |
534 | else { |
535 | if (recovery) { |
536 | // Only if already recovering should we check skip policy |
537 | throw new RetryException("Non-skippable exception in recoverer", e); |
538 | } |
539 | else { |
540 | if (e instanceof Exception) { |
541 | throw (Exception) e; |
542 | } |
543 | else if (e instanceof Error) { |
544 | throw (Error) e; |
545 | } |
546 | else { |
547 | throw new RetryException("Non-skippable throwable in recoverer", e); |
548 | } |
549 | } |
550 | } |
551 | } |
552 | |
553 | private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs, |
554 | ChunkMonitor chunkMonitor, boolean recovery) throws Exception { |
555 | |
556 | @SuppressWarnings("unchecked") |
557 | final UserData<O> data = (UserData<O>) inputs.getUserData(); |
558 | |
559 | if (logger.isDebugEnabled()) { |
560 | if (recovery) { |
561 | logger.debug("Scanning for failed item on recovery from write: " + inputs); |
562 | } |
563 | else { |
564 | logger.debug("Scanning for failed item on write: " + inputs); |
565 | } |
566 | } |
567 | if (outputs.isEmpty()) { |
568 | data.scanning(false); |
569 | inputs.setBusy(false); |
570 | chunkMonitor.resetOffset(); |
571 | return; |
572 | } |
573 | |
574 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); |
575 | Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); |
576 | |
577 | List<O> items = Collections.singletonList(outputIterator.next()); |
578 | inputIterator.next(); |
579 | try { |
580 | writeItems(items); |
581 | // If successful we are going to return and allow |
582 | // the driver to commit... |
583 | doAfterWrite(items); |
584 | contribution.incrementWriteCount(1); |
585 | inputIterator.remove(); |
586 | outputIterator.remove(); |
587 | } |
588 | catch (Exception e) { |
589 | doOnWriteError(e, items); |
590 | if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) { |
591 | inputIterator.remove(); |
592 | outputIterator.remove(); |
593 | } |
594 | else { |
595 | checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery); |
596 | } |
597 | if (rollbackClassifier.classify(e)) { |
598 | throw e; |
599 | } |
600 | } |
601 | chunkMonitor.incrementOffset(); |
602 | if (outputs.isEmpty()) { |
603 | data.scanning(false); |
604 | inputs.setBusy(false); |
605 | chunkMonitor.resetOffset(); |
606 | } |
607 | } |
608 | |
609 | private static class UserData<O> { |
610 | |
611 | private Chunk<O> outputs; |
612 | |
613 | private int filterCount = 0; |
614 | |
615 | private boolean scanning; |
616 | |
617 | public boolean scanning() { |
618 | return scanning; |
619 | } |
620 | |
621 | public void scanning(boolean scanning) { |
622 | this.scanning = scanning; |
623 | } |
624 | |
625 | public void incrementFilterCount() { |
626 | filterCount++; |
627 | } |
628 | |
629 | public Chunk<O> getOutputs() { |
630 | return outputs; |
631 | } |
632 | |
633 | public void setOutputs(Chunk<O> outputs) { |
634 | this.outputs = outputs; |
635 | } |
636 | |
637 | } |
638 | |
639 | } |