View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.item;
18  
19  import java.util.ArrayList;
20  import java.util.Collections;
21  import java.util.Iterator;
22  import java.util.List;
23  import java.util.concurrent.atomic.AtomicInteger;
24  import java.util.concurrent.atomic.AtomicReference;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.springframework.batch.core.StepContribution;
29  import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
30  import org.springframework.batch.core.step.skip.NonSkippableProcessException;
31  import org.springframework.batch.core.step.skip.SkipLimitExceededException;
32  import org.springframework.batch.core.step.skip.SkipListenerFailedException;
33  import org.springframework.batch.core.step.skip.SkipPolicy;
34  import org.springframework.batch.item.ItemProcessor;
35  import org.springframework.batch.item.ItemWriter;
36  import org.springframework.classify.BinaryExceptionClassifier;
37  import org.springframework.classify.Classifier;
38  import org.springframework.retry.ExhaustedRetryException;
39  import org.springframework.retry.RecoveryCallback;
40  import org.springframework.retry.RetryCallback;
41  import org.springframework.retry.RetryContext;
42  import org.springframework.retry.RetryException;
43  import org.springframework.retry.support.DefaultRetryState;
44  
45  /**
46   * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
47   * allows for skipping or retry of items that cause exceptions during writing.
48   *
49   */
50  public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> {
51  
52  	private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy();
53  
54  	private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy();
55  
56  	private final BatchRetryTemplate batchRetryTemplate;
57  
58  	private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
59  
60  	private Log logger = LogFactory.getLog(getClass());
61  
62  	private boolean buffering = true;
63  
64  	private KeyGenerator keyGenerator;
65  
66  	private ChunkMonitor chunkMonitor = new ChunkMonitor();
67  
68  	private boolean processorTransactional = true;
69  
70  	/**
71  	 * The {@link KeyGenerator} to use to identify failed items across rollback.
72  	 * Not used in the case of the {@link #setBuffering(boolean) buffering flag}
73  	 * being true (the default).
74  	 *
75  	 * @param keyGenerator the {@link KeyGenerator} to set
76  	 */
77  	public void setKeyGenerator(KeyGenerator keyGenerator) {
78  		this.keyGenerator = keyGenerator;
79  	}
80  
81  	/**
82  	 * @param SkipPolicy the {@link SkipPolicy} for item processing
83  	 */
84  	public void setProcessSkipPolicy(SkipPolicy SkipPolicy) {
85  		this.itemProcessSkipPolicy = SkipPolicy;
86  	}
87  
88  	/**
89  	 * @param SkipPolicy the {@link SkipPolicy} for item writing
90  	 */
91  	public void setWriteSkipPolicy(SkipPolicy SkipPolicy) {
92  		this.itemWriteSkipPolicy = SkipPolicy;
93  	}
94  
95  	/**
96  	 * A classifier that can distinguish between exceptions that cause rollback
97  	 * (return true) or not (return false).
98  	 *
99  	 * @param rollbackClassifier
100 	 */
101 	public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
102 		this.rollbackClassifier = rollbackClassifier;
103 	}
104 
105 	/**
106 	 * @param chunkMonitor
107 	 */
108 	public void setChunkMonitor(ChunkMonitor chunkMonitor) {
109 		this.chunkMonitor = chunkMonitor;
110 	}
111 
112 	/**
113 	 * A flag to indicate that items have been buffered and therefore will
114 	 * always come back as a chunk after a rollback. Otherwise things are more
115 	 * complicated because after a rollback the new chunk might or might not
116 	 * contain items from the previous failed chunk.
117 	 *
118 	 * @param buffering
119 	 */
120 	public void setBuffering(boolean buffering) {
121 		this.buffering = buffering;
122 	}
123 
124 	/**
125 	 * Flag to say that the {@link ItemProcessor} is transactional (defaults to
126 	 * true). If false then the processor is only called once per item per
127 	 * chunk, even if there are rollbacks with retries and skips.
128 	 *
129 	 * @param processorTransactional the flag value to set
130 	 */
131 	public void setProcessorTransactional(boolean processorTransactional) {
132 		this.processorTransactional = processorTransactional;
133 	}
134 
135 	public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor,
136 			ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) {
137 		super(itemProcessor, itemWriter);
138 		this.batchRetryTemplate = batchRetryTemplate;
139 	}
140 
141 	@Override
142 	protected void initializeUserData(Chunk<I> inputs) {
143 		@SuppressWarnings("unchecked")
144 		UserData<O> data = (UserData<O>) inputs.getUserData();
145 		if (data == null) {
146 			data = new UserData<O>();
147 			inputs.setUserData(data);
148 			data.setOutputs(new Chunk<O>());
149 		}
150 	}
151 
152 	@Override
153 	protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
154 		@SuppressWarnings("unchecked")
155 		UserData<O> data = (UserData<O>) inputs.getUserData();
156 		return data.filterCount;
157 	}
158 
159 	@Override
160 	protected boolean isComplete(Chunk<I> inputs) {
161 
162 		/*
163 		 * Need to remember the write skips across transactions, otherwise they
164 		 * keep coming back. Since we register skips with the inputs they will
165 		 * not be processed again but the output skips need to be saved for
166 		 * registration later with the listeners. The inputs are going to be the
167 		 * same for all transactions processing the same chunk, but the outputs
168 		 * are not, so we stash them in user data on the inputs.
169 		 */
170 
171 		@SuppressWarnings("unchecked")
172 		UserData<O> data = (UserData<O>) inputs.getUserData();
173 		Chunk<O> previous = data.getOutputs();
174 
175 		return inputs.isEmpty() && previous.getSkips().isEmpty();
176 
177 	}
178 
179 	@Override
180 	protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
181 
182 		@SuppressWarnings("unchecked")
183 		UserData<O> data = (UserData<O>) inputs.getUserData();
184 		Chunk<O> previous = data.getOutputs();
185 
186 		Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips());
187 		next.setBusy(previous.isBusy());
188 
189 		// Remember for next time if there are skips accumulating
190 		data.setOutputs(next);
191 
192 		return next;
193 
194 	}
195 
196 	@Override
197 	protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
198 
199 		Chunk<O> outputs = new Chunk<O>();
200 		@SuppressWarnings("unchecked")
201 		final UserData<O> data = (UserData<O>) inputs.getUserData();
202 		final Chunk<O> cache = data.getOutputs();
203 		final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
204 		final AtomicInteger count = new AtomicInteger(0);
205 
206 		// final int scanLimit = processorTransactional && data.scanning() ? 1 :
207 		// 0;
208 
209 		for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
210 
211 			final I item = iterator.next();
212 
213 			RetryCallback<O> retryCallback = new RetryCallback<O>() {
214 
215 				@Override
216 				public O doWithRetry(RetryContext context) throws Exception {
217 					O output = null;
218 					try {
219 						count.incrementAndGet();
220 						O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
221 						if (cached != null && !processorTransactional) {
222 							output = cached;
223 						}
224 						else {
225 							output = doProcess(item);
226 							if (!processorTransactional && !data.scanning()) {
227 								cache.add(output);
228 							}
229 						}
230 					}
231 					catch (Exception e) {
232 						if (rollbackClassifier.classify(e)) {
233 							// Default is to rollback unless the classifier
234 							// allows us to continue
235 							throw e;
236 						}
237 						else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
238 							// If we are not re-throwing then we should check if
239 							// this is skippable
240 							contribution.incrementProcessSkipCount();
241 							logger.debug("Skipping after failed process with no rollback", e);
242 							// If not re-throwing then the listener will not be
243 							// called in next chunk.
244 							callProcessSkipListener(item, e);
245 						}
246 						else {
247 							// If it's not skippable that's an error in
248 							// configuration - it doesn't make sense to not roll
249 							// back if we are also not allowed to skip
250 							throw new NonSkippableProcessException(
251 									"Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
252 									e);
253 						}
254 					}
255 					if (output == null) {
256 						// No need to re-process filtered items
257 						iterator.remove();
258 						data.incrementFilterCount();
259 					}
260 					return output;
261 				}
262 
263 			};
264 
265 			RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
266 
267 				@Override
268 				public O recover(RetryContext context) throws Exception {
269 					Throwable e = context.getLastThrowable();
270 					if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
271 						iterator.remove(e);
272 						contribution.incrementProcessSkipCount();
273 						logger.debug("Skipping after failed process", e);
274 						return null;
275 					}
276 					else {
277 						if (rollbackClassifier.classify(e)) {
278 							// Default is to rollback unless the classifier
279 							// allows us to continue
280 							throw new RetryException("Non-skippable exception in recoverer while processing", e);
281 						}
282 						iterator.remove(e);
283 						return null;
284 					}
285 				}
286 
287 			};
288 
289 			O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
290 					getInputKey(item), rollbackClassifier));
291 			if (output != null) {
292 				outputs.add(output);
293 			}
294 
295 			/*
296 			 * We only want to process the first item if there is a scan for a
297 			 * failed item.
298 			 */
299 			if (data.scanning()) {
300 				while (cacheIterator != null && cacheIterator.hasNext()) {
301 					outputs.add(cacheIterator.next());
302 				}
303 				// Only process the first item if scanning
304 				break;
305 			}
306 		}
307 
308 		return outputs;
309 
310 	}
311 
312 	@Override
313 	protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
314 			throws Exception {
315 
316 		@SuppressWarnings("unchecked")
317 		final UserData<O> data = (UserData<O>) inputs.getUserData();
318 		final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
319 
320 		RetryCallback<Object> retryCallback = new RetryCallback<Object>() {
321 			@Override
322 			public Object doWithRetry(RetryContext context) throws Exception {
323 
324 				contextHolder.set(context);
325 
326 				if (!data.scanning()) {
327 					chunkMonitor.setChunkSize(inputs.size());
328 					try {
329 						doWrite(outputs.getItems());
330 					}
331 					catch (Exception e) {
332 						if (rollbackClassifier.classify(e)) {
333 							throw e;
334 						}
335 						/*
336 						 * If the exception is marked as no-rollback, we need to
337 						 * override that, otherwise there's no way to write the
338 						 * rest of the chunk or to honour the skip listener
339 						 * contract.
340 						 */
341 						throw new ForceRollbackForWriteSkipException(
342 								"Force rollback on skippable exception so that skipped item can be located.", e);
343 					}
344 					contribution.incrementWriteCount(outputs.size());
345 				}
346 				else {
347 					scan(contribution, inputs, outputs, chunkMonitor, false);
348 				}
349 				return null;
350 
351 			}
352 		};
353 
354 		if (!buffering) {
355 
356 			RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {
357 
358 				@Override
359 				public Object recover(RetryContext context) throws Exception {
360 
361 					Throwable e = context.getLastThrowable();
362 					if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
363 						throw new RetryException("Invalid retry state during write caused by "
364 								+ "exception that does not classify for rollback: ", e);
365 					}
366 
367 					Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
368 					for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {
369 
370 						inputIterator.next();
371 						outputIterator.next();
372 
373 						checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
374 						if (!rollbackClassifier.classify(e)) {
375 							throw new RetryException(
376 									"Invalid retry state during recovery caused by exception that does not classify for rollback: ",
377 									e);
378 						}
379 
380 					}
381 
382 					return null;
383 
384 				}
385 
386 			};
387 
388 			batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
389 					BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));
390 
391 		}
392 		else {
393 
394 			RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
395 
396 				@Override
397 				public Object recover(RetryContext context) throws Exception {
398 
399 					/*
400 					 * If the last exception was not skippable we don't need to
401 					 * do any scanning. We can just bomb out with a retry
402 					 * exhausted.
403 					 */
404 					if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
405 						throw new ExhaustedRetryException(
406 								"Retry exhausted after last attempt in recovery path, but exception is not skippable.",
407 								context.getLastThrowable());
408 					}
409 
410 					inputs.setBusy(true);
411 					data.scanning(true);
412 					scan(contribution, inputs, outputs, chunkMonitor, true);
413 					return null;
414 				}
415 
416 			};
417 
418 			if (logger.isDebugEnabled()) {
419 				logger.debug("Attempting to write: " + inputs);
420 			}
421 			try {
422 				batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
423 						rollbackClassifier));
424 			}
425 			catch (Exception e) {
426 				RetryContext context = contextHolder.get();
427 				if (!batchRetryTemplate.canRetry(context)) {
428 					/*
429 					 * BATCH-1761: we need advance warning of the scan about to
430 					 * start in the next transaction, so we can change the
431 					 * processing behaviour.
432 					 */
433 					data.scanning(true);
434 				}
435 				throw e;
436 			}
437 
438 		}
439 
440 		callSkipListeners(inputs, outputs);
441 
442 	}
443 
444 	private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) {
445 
446 		for (SkipWrapper<I> wrapper : inputs.getSkips()) {
447 			I item = wrapper.getItem();
448 			if (item == null) {
449 				continue;
450 			}
451 			Throwable e = wrapper.getException();
452 			callProcessSkipListener(item, e);
453 		}
454 
455 		for (SkipWrapper<O> wrapper : outputs.getSkips()) {
456 			Throwable e = wrapper.getException();
457 			try {
458 				getListener().onSkipInWrite(wrapper.getItem(), e);
459 			}
460 			catch (RuntimeException ex) {
461 				throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
462 			}
463 		}
464 
465 		// Clear skips if we are possibly going to process this chunk again
466 		outputs.clearSkips();
467 		inputs.clearSkips();
468 
469 	}
470 
471 	/**
472 	 * Convenience method for calling process skip listener, so that it can be
473 	 * called from multiple places.
474 	 *
475 	 * @param item the item that is skipped
476 	 * @param e the cause of the skip
477 	 */
478 	private void callProcessSkipListener(I item, Throwable e) {
479 		try {
480 			getListener().onSkipInProcess(item, e);
481 		}
482 		catch (RuntimeException ex) {
483 			throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
484 		}
485 	}
486 
487 	/**
488 	 * Convenience method for calling process skip policy, so that it can be
489 	 * called from multiple places.
490 	 *
491 	 * @param policy the skip policy
492 	 * @param e the cause of the skip
493 	 * @param skipCount the current skip count
494 	 */
495 	private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
496 		try {
497 			return policy.shouldSkip(e, skipCount);
498 		}
499 		catch (SkipLimitExceededException ex) {
500 			throw ex;
501 		}
502 		catch (RuntimeException ex) {
503 			throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e);
504 		}
505 	}
506 
507 	private Object getInputKey(I item) {
508 		if (keyGenerator == null) {
509 			return item;
510 		}
511 		return keyGenerator.getKey(item);
512 	}
513 
514 	private List<?> getInputKeys(final Chunk<I> inputs) {
515 		if (keyGenerator == null) {
516 			return inputs.getItems();
517 		}
518 		List<Object> keys = new ArrayList<Object>();
519 		for (I item : inputs.getItems()) {
520 			keys.add(keyGenerator.getKey(item));
521 		}
522 		return keys;
523 	}
524 
525 	private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator,
526 			Throwable e, StepContribution contribution, boolean recovery) throws Exception {
527 		logger.debug("Checking skip policy after failed write");
528 		if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) {
529 			contribution.incrementWriteSkipCount();
530 			inputIterator.remove();
531 			outputIterator.remove(e);
532 			logger.debug("Skipping after failed write", e);
533 		}
534 		else {
535 			if (recovery) {
536 				// Only if already recovering should we check skip policy
537 				throw new RetryException("Non-skippable exception in recoverer", e);
538 			}
539 			else {
540 				if (e instanceof Exception) {
541 					throw (Exception) e;
542 				}
543 				else if (e instanceof Error) {
544 					throw (Error) e;
545 				}
546 				else {
547 					throw new RetryException("Non-skippable throwable in recoverer", e);
548 				}
549 			}
550 		}
551 	}
552 
553 	private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
554 			ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
555 
556 		@SuppressWarnings("unchecked")
557 		final UserData<O> data = (UserData<O>) inputs.getUserData();
558 
559 		if (logger.isDebugEnabled()) {
560 			if (recovery) {
561 				logger.debug("Scanning for failed item on recovery from write: " + inputs);
562 			}
563 			else {
564 				logger.debug("Scanning for failed item on write: " + inputs);
565 			}
566 		}
567 		if (outputs.isEmpty()) {
568 			data.scanning(false);
569 			inputs.setBusy(false);
570 			return;
571 		}
572 
573 		Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
574 		Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
575 
576 		List<O> items = Collections.singletonList(outputIterator.next());
577 		inputIterator.next();
578 		try {
579 			writeItems(items);
580 			// If successful we are going to return and allow
581 			// the driver to commit...
582 			doAfterWrite(items);
583 			contribution.incrementWriteCount(1);
584 			inputIterator.remove();
585 			outputIterator.remove();
586 		}
587 		catch (Exception e) {
588 			doOnWriteError(e, items);
589 			if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
590 				inputIterator.remove();
591 				outputIterator.remove();
592 			}
593 			else {
594 				checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
595 			}
596 			if (rollbackClassifier.classify(e)) {
597 				throw e;
598 			}
599 		}
600 		chunkMonitor.incrementOffset();
601 		if (outputs.isEmpty()) {
602 			data.scanning(false);
603 			inputs.setBusy(false);
604 			chunkMonitor.resetOffset();
605 		}
606 	}
607 
608 	private static class UserData<O> {
609 
610 		private Chunk<O> outputs;
611 
612 		private int filterCount = 0;
613 
614 		private boolean scanning;
615 
616 		public boolean scanning() {
617 			return scanning;
618 		}
619 
620 		public void scanning(boolean scanning) {
621 			this.scanning = scanning;
622 		}
623 
624 		public void incrementFilterCount() {
625 			filterCount++;
626 		}
627 
628 		public Chunk<O> getOutputs() {
629 			return outputs;
630 		}
631 
632 		public void setOutputs(Chunk<O> outputs) {
633 			this.outputs = outputs;
634 		}
635 
636 	}
637 
638 }