1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.Iterator; |
22 | import java.util.List; |
23 | import java.util.concurrent.atomic.AtomicInteger; |
24 | import java.util.concurrent.atomic.AtomicReference; |
25 | |
26 | import org.apache.commons.logging.Log; |
27 | import org.apache.commons.logging.LogFactory; |
28 | import org.springframework.batch.classify.BinaryExceptionClassifier; |
29 | import org.springframework.batch.classify.Classifier; |
30 | import org.springframework.batch.core.StepContribution; |
31 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
32 | import org.springframework.batch.core.step.skip.NonSkippableProcessException; |
33 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
34 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; |
35 | import org.springframework.batch.core.step.skip.SkipPolicy; |
36 | import org.springframework.batch.item.ItemProcessor; |
37 | import org.springframework.batch.item.ItemWriter; |
38 | import org.springframework.batch.retry.ExhaustedRetryException; |
39 | import org.springframework.batch.retry.RecoveryCallback; |
40 | import org.springframework.batch.retry.RetryCallback; |
41 | import org.springframework.batch.retry.RetryContext; |
42 | import org.springframework.batch.retry.RetryException; |
43 | import org.springframework.batch.retry.support.DefaultRetryState; |
44 | |
45 | /** |
46 | * FaultTolerant implementation of the {@link ChunkProcessor} interface, that |
47 | * allows for skipping or retry of items that cause exceptions during writing. |
48 | * |
49 | */ |
50 | public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> { |
51 | |
52 | private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy(); |
53 | |
54 | private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy(); |
55 | |
56 | private final BatchRetryTemplate batchRetryTemplate; |
57 | |
58 | private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); |
59 | |
60 | private Log logger = LogFactory.getLog(getClass()); |
61 | |
62 | private boolean buffering = true; |
63 | |
64 | private KeyGenerator keyGenerator; |
65 | |
66 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); |
67 | |
68 | private boolean processorTransactional = true; |
69 | |
70 | /** |
71 | * The {@link KeyGenerator} to use to identify failed items across rollback. |
72 | * Not used in the case of the {@link #setBuffering(boolean) buffering flag} |
73 | * being true (the default). |
74 | * |
75 | * @param keyGenerator the {@link KeyGenerator} to set |
76 | */ |
77 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
78 | this.keyGenerator = keyGenerator; |
79 | } |
80 | |
81 | /** |
82 | * @param SkipPolicy the {@link SkipPolicy} for item processing |
83 | */ |
84 | public void setProcessSkipPolicy(SkipPolicy SkipPolicy) { |
85 | this.itemProcessSkipPolicy = SkipPolicy; |
86 | } |
87 | |
88 | /** |
89 | * @param SkipPolicy the {@link SkipPolicy} for item writing |
90 | */ |
91 | public void setWriteSkipPolicy(SkipPolicy SkipPolicy) { |
92 | this.itemWriteSkipPolicy = SkipPolicy; |
93 | } |
94 | |
95 | /** |
96 | * A classifier that can distinguish between exceptions that cause rollback |
97 | * (return true) or not (return false). |
98 | * |
99 | * @param rollbackClassifier |
100 | */ |
101 | public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { |
102 | this.rollbackClassifier = rollbackClassifier; |
103 | } |
104 | |
105 | /** |
106 | * @param chunkMonitor |
107 | */ |
108 | public void setChunkMonitor(ChunkMonitor chunkMonitor) { |
109 | this.chunkMonitor = chunkMonitor; |
110 | } |
111 | |
112 | /** |
113 | * A flag to indicate that items have been buffered and therefore will |
114 | * always come back as a chunk after a rollback. Otherwise things are more |
115 | * complicated because after a rollback the new chunk might or might not |
116 | * contain items from the previous failed chunk. |
117 | * |
118 | * @param buffering |
119 | */ |
120 | public void setBuffering(boolean buffering) { |
121 | this.buffering = buffering; |
122 | } |
123 | |
124 | /** |
125 | * Flag to say that the {@link ItemProcessor} is transactional (defaults to |
126 | * true). If false then the processor is only called once per item per |
127 | * chunk, even if there are rollbacks with retries and skips. |
128 | * |
129 | * @param processorTransactional the flag value to set |
130 | */ |
131 | public void setProcessorTransactional(boolean processorTransactional) { |
132 | this.processorTransactional = processorTransactional; |
133 | } |
134 | |
135 | public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, |
136 | ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) { |
137 | super(itemProcessor, itemWriter); |
138 | this.batchRetryTemplate = batchRetryTemplate; |
139 | } |
140 | |
141 | @Override |
142 | protected void initializeUserData(Chunk<I> inputs) { |
143 | @SuppressWarnings("unchecked") |
144 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
145 | if (data == null) { |
146 | data = new UserData<O>(); |
147 | inputs.setUserData(data); |
148 | data.setOutputs(new Chunk<O>()); |
149 | } |
150 | } |
151 | |
152 | @Override |
153 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { |
154 | @SuppressWarnings("unchecked") |
155 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
156 | return data.filterCount; |
157 | } |
158 | |
159 | @Override |
160 | protected boolean isComplete(Chunk<I> inputs) { |
161 | |
162 | /* |
163 | * Need to remember the write skips across transactions, otherwise they |
164 | * keep coming back. Since we register skips with the inputs they will |
165 | * not be processed again but the output skips need to be saved for |
166 | * registration later with the listeners. The inputs are going to be the |
167 | * same for all transactions processing the same chunk, but the outputs |
168 | * are not, so we stash them in user data on the inputs. |
169 | */ |
170 | |
171 | @SuppressWarnings("unchecked") |
172 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
173 | Chunk<O> previous = data.getOutputs(); |
174 | |
175 | return inputs.isEmpty() && previous.getSkips().isEmpty(); |
176 | |
177 | } |
178 | |
179 | @Override |
180 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { |
181 | |
182 | @SuppressWarnings("unchecked") |
183 | UserData<O> data = (UserData<O>) inputs.getUserData(); |
184 | Chunk<O> previous = data.getOutputs(); |
185 | |
186 | Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips()); |
187 | next.setBusy(previous.isBusy()); |
188 | |
189 | // Remember for next time if there are skips accumulating |
190 | data.setOutputs(next); |
191 | |
192 | return next; |
193 | |
194 | } |
195 | |
196 | @Override |
197 | protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception { |
198 | |
199 | Chunk<O> outputs = new Chunk<O>(); |
200 | @SuppressWarnings("unchecked") |
201 | final UserData<O> data = (UserData<O>) inputs.getUserData(); |
202 | final Chunk<O> cache = data.getOutputs(); |
203 | final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator(); |
204 | final AtomicInteger count = new AtomicInteger(0); |
205 | |
206 | // final int scanLimit = processorTransactional && data.scanning() ? 1 : |
207 | // 0; |
208 | |
209 | for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { |
210 | |
211 | final I item = iterator.next(); |
212 | |
213 | RetryCallback<O> retryCallback = new RetryCallback<O>() { |
214 | |
215 | public O doWithRetry(RetryContext context) throws Exception { |
216 | O output = null; |
217 | try { |
218 | count.incrementAndGet(); |
219 | O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null; |
220 | if (cached != null && !processorTransactional) { |
221 | output = cached; |
222 | } |
223 | else { |
224 | output = doProcess(item); |
225 | if (!processorTransactional && !data.scanning()) { |
226 | cache.add(output); |
227 | } |
228 | } |
229 | } |
230 | catch (Exception e) { |
231 | if (rollbackClassifier.classify(e)) { |
232 | // Default is to rollback unless the classifier |
233 | // allows us to continue |
234 | throw e; |
235 | } |
236 | else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { |
237 | // If we are not re-throwing then we should check if |
238 | // this is skippable |
239 | contribution.incrementProcessSkipCount(); |
240 | logger.debug("Skipping after failed process with no rollback", e); |
241 | // If not re-throwing then the listener will not be |
242 | // called in next chunk. |
243 | callProcessSkipListener(item, e); |
244 | } |
245 | else { |
246 | // If it's not skippable that's an error in |
247 | // configuration - it doesn't make sense to not roll |
248 | // back if we are also not allowed to skip |
249 | throw new NonSkippableProcessException( |
250 | "Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.", |
251 | e); |
252 | } |
253 | } |
254 | if (output == null) { |
255 | // No need to re-process filtered items |
256 | iterator.remove(); |
257 | data.incrementFilterCount(); |
258 | } |
259 | return output; |
260 | } |
261 | |
262 | }; |
263 | |
264 | RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { |
265 | |
266 | public O recover(RetryContext context) throws Exception { |
267 | Throwable e = context.getLastThrowable(); |
268 | if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { |
269 | iterator.remove(e); |
270 | contribution.incrementProcessSkipCount(); |
271 | logger.debug("Skipping after failed process", e); |
272 | return null; |
273 | } |
274 | else { |
275 | if (rollbackClassifier.classify(e)) { |
276 | // Default is to rollback unless the classifier |
277 | // allows us to continue |
278 | throw new RetryException("Non-skippable exception in recoverer while processing", e); |
279 | } |
280 | iterator.remove(e); |
281 | return null; |
282 | } |
283 | } |
284 | |
285 | }; |
286 | |
287 | O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( |
288 | getInputKey(item), rollbackClassifier)); |
289 | if (output != null) { |
290 | outputs.add(output); |
291 | } |
292 | |
293 | /* |
294 | * We only want to process the first item if there is a scan for a |
295 | * failed item. |
296 | */ |
297 | if (data.scanning()) { |
298 | while (cacheIterator != null && cacheIterator.hasNext()) { |
299 | outputs.add(cacheIterator.next()); |
300 | } |
301 | // Only process the first item if scanning |
302 | break; |
303 | } |
304 | } |
305 | |
306 | return outputs; |
307 | |
308 | } |
309 | |
310 | @Override |
311 | protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs) |
312 | throws Exception { |
313 | |
314 | @SuppressWarnings("unchecked") |
315 | final UserData<O> data = (UserData<O>) inputs.getUserData(); |
316 | final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>(); |
317 | |
318 | RetryCallback<Object> retryCallback = new RetryCallback<Object>() { |
319 | public Object doWithRetry(RetryContext context) throws Exception { |
320 | |
321 | contextHolder.set(context); |
322 | |
323 | if (!data.scanning()) { |
324 | chunkMonitor.setChunkSize(inputs.size()); |
325 | try { |
326 | doWrite(outputs.getItems()); |
327 | } |
328 | catch (Exception e) { |
329 | if (rollbackClassifier.classify(e)) { |
330 | throw e; |
331 | } |
332 | /* |
333 | * If the exception is marked as no-rollback, we need to |
334 | * override that, otherwise there's no way to write the |
335 | * rest of the chunk or to honour the skip listener |
336 | * contract. |
337 | */ |
338 | throw new ForceRollbackForWriteSkipException( |
339 | "Force rollback on skippable exception so that skipped item can be located.", e); |
340 | } |
341 | contribution.incrementWriteCount(outputs.size()); |
342 | } |
343 | else { |
344 | scan(contribution, inputs, outputs, chunkMonitor, false); |
345 | } |
346 | return null; |
347 | |
348 | } |
349 | }; |
350 | |
351 | if (!buffering) { |
352 | |
353 | RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() { |
354 | |
355 | public Object recover(RetryContext context) throws Exception { |
356 | |
357 | Throwable e = context.getLastThrowable(); |
358 | if (outputs.size() > 1 && !rollbackClassifier.classify(e)) { |
359 | throw new RetryException("Invalid retry state during write caused by " |
360 | + "exception that does not classify for rollback: ", e); |
361 | } |
362 | |
363 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); |
364 | for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) { |
365 | |
366 | inputIterator.next(); |
367 | outputIterator.next(); |
368 | |
369 | checkSkipPolicy(inputIterator, outputIterator, e, contribution, true); |
370 | if (!rollbackClassifier.classify(e)) { |
371 | throw new RetryException( |
372 | "Invalid retry state during recovery caused by exception that does not classify for rollback: ", |
373 | e); |
374 | } |
375 | |
376 | } |
377 | |
378 | return null; |
379 | |
380 | } |
381 | |
382 | }; |
383 | |
384 | batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, |
385 | BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier)); |
386 | |
387 | } |
388 | else { |
389 | |
390 | RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { |
391 | |
392 | public Object recover(RetryContext context) throws Exception { |
393 | |
394 | /* |
395 | * If the last exception was not skippable we don't need to |
396 | * do any scanning. We can just bomb out with a retry |
397 | * exhausted. |
398 | */ |
399 | if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) { |
400 | throw new ExhaustedRetryException( |
401 | "Retry exhausted after last attempt in recovery path, but exception is not skippable.", |
402 | context.getLastThrowable()); |
403 | } |
404 | |
405 | inputs.setBusy(true); |
406 | data.scanning(true); |
407 | scan(contribution, inputs, outputs, chunkMonitor, true); |
408 | return null; |
409 | } |
410 | |
411 | }; |
412 | |
413 | if (logger.isDebugEnabled()) { |
414 | logger.debug("Attempting to write: " + inputs); |
415 | } |
416 | try { |
417 | batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs, |
418 | rollbackClassifier)); |
419 | } |
420 | catch (Exception e) { |
421 | RetryContext context = contextHolder.get(); |
422 | if (!batchRetryTemplate.canRetry(context)) { |
423 | /* |
424 | * BATCH-1761: we need advance warning of the scan about to |
425 | * start in the next transaction, so we can change the |
426 | * processing behaviour. |
427 | */ |
428 | data.scanning(true); |
429 | } |
430 | throw e; |
431 | } |
432 | |
433 | } |
434 | |
435 | callSkipListeners(inputs, outputs); |
436 | |
437 | } |
438 | |
439 | private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) { |
440 | |
441 | for (SkipWrapper<I> wrapper : inputs.getSkips()) { |
442 | I item = wrapper.getItem(); |
443 | if (item == null) { |
444 | continue; |
445 | } |
446 | Throwable e = wrapper.getException(); |
447 | callProcessSkipListener(item, e); |
448 | } |
449 | |
450 | for (SkipWrapper<O> wrapper : outputs.getSkips()) { |
451 | Throwable e = wrapper.getException(); |
452 | try { |
453 | getListener().onSkipInWrite(wrapper.getItem(), e); |
454 | } |
455 | catch (RuntimeException ex) { |
456 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); |
457 | } |
458 | } |
459 | |
460 | // Clear skips if we are possibly going to process this chunk again |
461 | outputs.clearSkips(); |
462 | inputs.clearSkips(); |
463 | |
464 | } |
465 | |
466 | /** |
467 | * Convenience method for calling process skip listener, so that it can be |
468 | * called from multiple places. |
469 | * |
470 | * @param item the item that is skipped |
471 | * @param e the cause of the skip |
472 | */ |
473 | private void callProcessSkipListener(I item, Throwable e) { |
474 | try { |
475 | getListener().onSkipInProcess(item, e); |
476 | } |
477 | catch (RuntimeException ex) { |
478 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); |
479 | } |
480 | } |
481 | |
482 | /** |
483 | * Convenience method for calling process skip policy, so that it can be |
484 | * called from multiple places. |
485 | * |
486 | * @param policy the skip policy |
487 | * @param e the cause of the skip |
488 | * @param skipCount the current skip count |
489 | */ |
490 | private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) { |
491 | try { |
492 | return policy.shouldSkip(e, skipCount); |
493 | } |
494 | catch (SkipLimitExceededException ex) { |
495 | throw ex; |
496 | } |
497 | catch (RuntimeException ex) { |
498 | throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e); |
499 | } |
500 | } |
501 | |
502 | private Object getInputKey(I item) { |
503 | if (keyGenerator == null) { |
504 | return item; |
505 | } |
506 | return keyGenerator.getKey(item); |
507 | } |
508 | |
509 | private List<?> getInputKeys(final Chunk<I> inputs) { |
510 | if (keyGenerator == null) { |
511 | return inputs.getItems(); |
512 | } |
513 | List<Object> keys = new ArrayList<Object>(); |
514 | for (I item : inputs.getItems()) { |
515 | keys.add(keyGenerator.getKey(item)); |
516 | } |
517 | return keys; |
518 | } |
519 | |
520 | private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator, |
521 | Throwable e, StepContribution contribution, boolean recovery) throws Exception { |
522 | logger.debug("Checking skip policy after failed write"); |
523 | if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) { |
524 | contribution.incrementWriteSkipCount(); |
525 | inputIterator.remove(); |
526 | outputIterator.remove(e); |
527 | logger.debug("Skipping after failed write", e); |
528 | } |
529 | else { |
530 | if (recovery) { |
531 | // Only if already recovering should we check skip policy |
532 | throw new RetryException("Non-skippable exception in recoverer", e); |
533 | } |
534 | else { |
535 | if (e instanceof Exception) { |
536 | throw (Exception) e; |
537 | } |
538 | else if (e instanceof Error) { |
539 | throw (Error) e; |
540 | } |
541 | else { |
542 | throw new RetryException("Non-skippable throwable in recoverer", e); |
543 | } |
544 | } |
545 | } |
546 | } |
547 | |
548 | private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs, |
549 | ChunkMonitor chunkMonitor, boolean recovery) throws Exception { |
550 | |
551 | @SuppressWarnings("unchecked") |
552 | final UserData<O> data = (UserData<O>) inputs.getUserData(); |
553 | |
554 | if (logger.isDebugEnabled()) { |
555 | if (recovery) { |
556 | logger.debug("Scanning for failed item on recovery from write: " + inputs); |
557 | } |
558 | else { |
559 | logger.debug("Scanning for failed item on write: " + inputs); |
560 | } |
561 | } |
562 | if (outputs.isEmpty()) { |
563 | data.scanning(false); |
564 | inputs.setBusy(false); |
565 | return; |
566 | } |
567 | |
568 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); |
569 | Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); |
570 | |
571 | List<O> items = Collections.singletonList(outputIterator.next()); |
572 | inputIterator.next(); |
573 | try { |
574 | writeItems(items); |
575 | // If successful we are going to return and allow |
576 | // the driver to commit... |
577 | doAfterWrite(items); |
578 | contribution.incrementWriteCount(1); |
579 | inputIterator.remove(); |
580 | outputIterator.remove(); |
581 | } |
582 | catch (Exception e) { |
583 | doOnWriteError(e, items); |
584 | if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) { |
585 | inputIterator.remove(); |
586 | outputIterator.remove(); |
587 | } |
588 | else { |
589 | checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery); |
590 | } |
591 | if (rollbackClassifier.classify(e)) { |
592 | throw e; |
593 | } |
594 | } |
595 | chunkMonitor.incrementOffset(); |
596 | if (outputs.isEmpty()) { |
597 | data.scanning(false); |
598 | inputs.setBusy(false); |
599 | chunkMonitor.resetOffset(); |
600 | } |
601 | } |
602 | |
603 | private static class UserData<O> { |
604 | |
605 | private Chunk<O> outputs; |
606 | |
607 | private int filterCount = 0; |
608 | |
609 | private boolean scanning; |
610 | |
611 | public boolean scanning() { |
612 | return scanning; |
613 | } |
614 | |
615 | public void scanning(boolean scanning) { |
616 | this.scanning = scanning; |
617 | } |
618 | |
619 | public void incrementFilterCount() { |
620 | filterCount++; |
621 | } |
622 | |
623 | public Chunk<O> getOutputs() { |
624 | return outputs; |
625 | } |
626 | |
627 | public void setOutputs(Chunk<O> outputs) { |
628 | this.outputs = outputs; |
629 | } |
630 | |
631 | } |
632 | |
633 | } |