View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.builder;
17  
18  import java.util.ArrayList;
19  import java.util.Collection;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.LinkedHashSet;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  
27  import org.springframework.batch.core.ChunkListener;
28  import org.springframework.batch.core.JobInterruptedException;
29  import org.springframework.batch.core.SkipListener;
30  import org.springframework.batch.core.StepListener;
31  import org.springframework.batch.core.listener.StepListenerFactoryBean;
32  import org.springframework.batch.core.scope.context.ChunkContext;
33  import org.springframework.batch.core.step.FatalStepExecutionException;
34  import org.springframework.batch.core.step.item.BatchRetryTemplate;
35  import org.springframework.batch.core.step.item.ChunkMonitor;
36  import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
37  import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor;
38  import org.springframework.batch.core.step.item.FaultTolerantChunkProvider;
39  import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException;
40  import org.springframework.batch.core.step.item.KeyGenerator;
41  import org.springframework.batch.core.step.item.SimpleRetryExceptionHandler;
42  import org.springframework.batch.core.step.skip.CompositeSkipPolicy;
43  import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy;
44  import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
45  import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
46  import org.springframework.batch.core.step.skip.NonSkippableReadException;
47  import org.springframework.batch.core.step.skip.SkipLimitExceededException;
48  import org.springframework.batch.core.step.skip.SkipListenerFailedException;
49  import org.springframework.batch.core.step.skip.SkipPolicy;
50  import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
51  import org.springframework.batch.core.step.tasklet.Tasklet;
52  import org.springframework.batch.item.ItemReader;
53  import org.springframework.batch.item.ItemStream;
54  import org.springframework.batch.repeat.RepeatOperations;
55  import org.springframework.batch.repeat.support.RepeatTemplate;
56  import org.springframework.classify.BinaryExceptionClassifier;
57  import org.springframework.classify.Classifier;
58  import org.springframework.classify.SubclassClassifier;
59  import org.springframework.retry.ExhaustedRetryException;
60  import org.springframework.retry.RetryException;
61  import org.springframework.retry.RetryListener;
62  import org.springframework.retry.RetryPolicy;
63  import org.springframework.retry.backoff.BackOffPolicy;
64  import org.springframework.retry.policy.CompositeRetryPolicy;
65  import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
66  import org.springframework.retry.policy.NeverRetryPolicy;
67  import org.springframework.retry.policy.RetryContextCache;
68  import org.springframework.retry.policy.SimpleRetryPolicy;
69  import org.springframework.transaction.TransactionException;
70  import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
71  import org.springframework.transaction.interceptor.TransactionAttribute;
72  import org.springframework.util.Assert;
73  
74  /**
75   * A step builder for fully fault tolerant chunk-oriented item processing steps. Extends {@link SimpleStepBuilder} with
76   * additional properties for retry and skip of failed items.
77   *
78   * @author Dave Syer
79   *
80   * @since 2.2
81   */
82  public class FaultTolerantStepBuilder<I, O> extends SimpleStepBuilder<I, O> {
83  
84  	private ChunkMonitor chunkMonitor = new ChunkMonitor();
85  
86  	private boolean streamIsReader;
87  
88  	private int retryLimit = 0;
89  
90  	private BackOffPolicy backOffPolicy;
91  
92  	private Set<RetryListener> retryListeners = new LinkedHashSet<RetryListener>();
93  
94  	private RetryPolicy retryPolicy;
95  
96  	private RetryContextCache retryContextCache;
97  
98  	private KeyGenerator keyGenerator;
99  
100 	private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new LinkedHashSet<Class<? extends Throwable>>();
101 
102 	private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
103 
104 	private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<Class<? extends Throwable>>();
105 
106 	private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
107 
108 	private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>();
109 
110 	private Set<SkipListener<? super I, ? super O>> skipListeners = new LinkedHashSet<SkipListener<? super I, ? super O>>();
111 
112 	private int skipLimit = 0;
113 
114 	private SkipPolicy skipPolicy;
115 
116 	private boolean processorTransactional = true;
117 
118 	/**
119 	 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
120 	 *
121 	 * @param parent a parent helper containing common step properties
122 	 */
123 	public FaultTolerantStepBuilder(StepBuilderHelper<?> parent) {
124 		super(parent);
125 	}
126 
127 	/**
128 	 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
129 	 *
130 	 * @param parent a parent helper containing common step properties
131 	 */
132 	protected FaultTolerantStepBuilder(SimpleStepBuilder<I, O> parent) {
133 		super(parent);
134 	}
135 
136 	/**
137 	 * Create a new chunk oriented tasklet with reader, writer and processor as provided.
138 	 *
139 	 * @see org.springframework.batch.core.step.builder.SimpleStepBuilder#createTasklet()
140 	 */
141 	@Override
142 	protected Tasklet createTasklet() {
143 		Assert.state(getReader() != null, "ItemReader must be provided");
144 		Assert.state(getProcessor() != null || getWriter() != null, "ItemWriter or ItemProcessor must be provided");
145 		addSpecialExceptions();
146 		registerSkipListeners();
147 		FaultTolerantChunkProvider<I> chunkProvider = createChunkProvider();
148 		FaultTolerantChunkProcessor<I, O> chunkProcessor = createChunkProcessor();
149 		ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor);
150 		tasklet.setBuffering(!isReaderTransactionalQueue());
151 		return tasklet;
152 	}
153 
154 	/**
155 	 * Register a skip listener.
156 	 *
157 	 * @param listener the listener to register
158 	 * @return this for fluent chaining
159 	 */
160 	public FaultTolerantStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) {
161 		skipListeners.add(listener);
162 		return this;
163 	}
164 
165 	@Override
166 	public FaultTolerantStepBuilder<I, O> listener(ChunkListener listener) {
167 		super.listener(new TerminateOnExceptionChunkListenerDelegate(listener));
168 		return this;
169 	}
170 
171 	@Override
172 	public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> transactionAttribute(
173 			TransactionAttribute transactionAttribute) {
174 		return super.transactionAttribute(getTransactionAttribute(transactionAttribute));
175 	}
176 
177 	/**
178 	 * Register a retry listener.
179 	 *
180 	 * @param listener the listener to register
181 	 * @return this for fluent chaining
182 	 */
183 	public FaultTolerantStepBuilder<I, O> listener(RetryListener listener) {
184 		retryListeners.add(listener);
185 		return this;
186 	}
187 
188 	/**
189 	 * Sets the key generator for identifying retried items. Retry across transaction boundaries requires items to be
190 	 * identified when they are encountered again. The default strategy is to use the items themselves, relying on their
191 	 * own implementation to ensure that they can be identified. Often a key generator is not necessary as long as the
192 	 * items have reliable hash code and equals implementations, or the reader is not transactional (the default) and
193 	 * the item processor either is itself not transactional (not the default) or does not create new items.
194 	 *
195 	 * @param keyGenerator a key generator for the stateful retry
196 	 * @return this for fluent chaining
197 	 */
198 	public FaultTolerantStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) {
199 		this.keyGenerator = keyGenerator;
200 		return this;
201 	}
202 
203 	/**
204 	 * The maximum number of times to try a failed item. Zero and one both translate to try only once and do not retry.
205 	 * Ignored if an explicit {@link #retryPolicy} is set.
206 	 *
207 	 * @param retryLimit the retry limit (default 0)
208 	 * @return this for fluent chaining
209 	 */
210 	public FaultTolerantStepBuilder<I, O> retryLimit(int retryLimit) {
211 		this.retryLimit = retryLimit;
212 		return this;
213 	}
214 
215 	/**
216 	 * Provide an explicit retry policy instead of using the {@link #retryLimit(int)} and retryable exceptions provided
217 	 * elsewhere. Can be used to retry different exceptions a different number of times, for instance.
218 	 *
219 	 * @param retryPolicy a retry policy
220 	 * @return this for fluent chaining
221 	 */
222 	public FaultTolerantStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) {
223 		this.retryPolicy = retryPolicy;
224 		return this;
225 	}
226 
227 	/**
228 	 * Provide a backoff policy to prevent items being retried immediately (e.g. in case the failure was caused by a
229 	 * remote resource failure that might take some time to be resolved). Ignored if an explicit {@link #retryPolicy} is
230 	 * set.
231 	 *
232 	 * @param backOffPolicy the back off policy to use (default no backoff)
233 	 * @return this for fluent chaining
234 	 */
235 	public FaultTolerantStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) {
236 		this.backOffPolicy = backOffPolicy;
237 		return this;
238 	}
239 
240 	/**
241 	 * Provide an explicit retry context cache. Retry is stateful across transactions in the case of failures in item
242 	 * processing or writing, so some information about the context for subsequent retries has to be stored.
243 	 *
244 	 * @param retryContextCache cache for retry contexts in between transactions (default to standard in-memory
245 	 * implementation)
246 	 * @return this for fluent chaining
247 	 */
248 	public FaultTolerantStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) {
249 		this.retryContextCache = retryContextCache;
250 		return this;
251 	}
252 
253 	/**
254 	 * Sets the maximium number of failed items to skip before the step fails. Ignored if an explicit
255 	 * {@link #skipPolicy(SkipPolicy)} is provided.
256 	 *
257 	 * @param skipLimit the skip limit to set
258 	 * @return this for fluent chaining
259 	 */
260 	public FaultTolerantStepBuilder<I, O> skipLimit(int skipLimit) {
261 		this.skipLimit = skipLimit;
262 		return this;
263 	}
264 
265 	/**
266 	 * Explicitly prevent certain exceptions (and subclasses) from being skipped.
267 	 *
268 	 * @param type the non-skippable exception
269 	 * @return this for fluent chaining
270 	 */
271 	public FaultTolerantStepBuilder<I, O> noSkip(Class<? extends Throwable> type) {
272 		skippableExceptionClasses.put(type, false);
273 		return this;
274 	}
275 
276 	/**
277 	 * Explicitly request certain exceptions (and subclasses) to be skipped.
278 	 *
279 	 * @param type
280 	 * @return this for fluent chaining
281 	 */
282 	public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) {
283 		skippableExceptionClasses.put(type, true);
284 		return this;
285 	}
286 
287 	/**
288 	 * Provide an explicit policy for managing skips. A skip policy determines which exceptions are skippable and how
289 	 * many times.
290 	 *
291 	 * @param skipPolicy the skip policy
292 	 * @return this for fluent chaining
293 	 */
294 	public FaultTolerantStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
295 		this.skipPolicy = skipPolicy;
296 		return this;
297 	}
298 
299 	/**
300 	 * Mark this exception as ignorable during item read or processing operations. Processing continues with no
301 	 * additional callbacks (use skips instead if you need to be notified). Ignored during write because there is no
302 	 * guarantee of skip and retry without rollback.
303 	 *
304 	 * @param type the exception to mark as no rollback
305 	 * @return this for fluent chaining
306 	 */
307 	public FaultTolerantStepBuilder<I, O> noRollback(Class<? extends Throwable> type) {
308 		noRollbackExceptionClasses.add(type);
309 		return this;
310 	}
311 
312 	/**
313 	 * Explicitly ask for an exception (and subclasses) to be excluded from retry.
314 	 *
315 	 * @param type the exception to exclude from retry
316 	 * @return this for fluent chaining
317 	 */
318 	public FaultTolerantStepBuilder<I, O> noRetry(Class<? extends Throwable> type) {
319 		retryableExceptionClasses.put(type, false);
320 		return this;
321 	}
322 
323 	/**
324 	 * Explicitly ask for an exception (and subclasses) to be retried.
325 	 *
326 	 * @param type the exception to retry
327 	 * @return this for fluent chaining
328 	 */
329 	public FaultTolerantStepBuilder<I, O> retry(Class<? extends Throwable> type) {
330 		retryableExceptionClasses.put(type, true);
331 		return this;
332 	}
333 
334 	/**
335 	 * Mark the item processor as non-transactional (default is the opposite). If this flag is set the results of item
336 	 * processing are cached across transactions in between retries and during skip processing, otherwise the processor
337 	 * will be called in every transaction.
338 	 *
339 	 * @return this for fluent chaining
340 	 */
341 	public FaultTolerantStepBuilder<I, O> processorNonTransactional() {
342 		this.processorTransactional = false;
343 		return this;
344 	}
345 
346 	@Override
347 	public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> stream(ItemStream stream) {
348 		if (stream instanceof ItemReader<?>) {
349 			if (!streamIsReader) {
350 				streamIsReader = true;
351 				super.stream(chunkMonitor);
352 			}
353 			// In cases where multiple nested item readers are registered,
354 			// they all want to get the open() and close() callbacks.
355 			chunkMonitor.registerItemStream(stream);
356 		}
357 		else {
358 			super.stream(stream);
359 		}
360 		return this;
361 	}
362 
363 	private FaultTolerantChunkProvider<I> createChunkProvider() {
364 
365 		SkipPolicy readSkipPolicy = createSkipPolicy();
366 		readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
367 		FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<I>(getReader(),
368 				createChunkOperations());
369 		chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
370 		chunkProvider.setSkipPolicy(readSkipPolicy);
371 		chunkProvider.setRollbackClassifier(getRollbackClassifier());
372 		ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners());
373 		listeners.addAll(skipListeners);
374 		chunkProvider.setListeners(listeners);
375 
376 		return chunkProvider;
377 
378 	}
379 
380 	private FaultTolerantChunkProcessor<I, O> createChunkProcessor() {
381 
382 		BatchRetryTemplate batchRetryTemplate = createRetryOperations();
383 
384 		FaultTolerantChunkProcessor<I, O> chunkProcessor = new FaultTolerantChunkProcessor<I, O>(getProcessor(),
385 				getWriter(), batchRetryTemplate);
386 		chunkProcessor.setBuffering(!isReaderTransactionalQueue());
387 		chunkProcessor.setProcessorTransactional(processorTransactional);
388 
389 		SkipPolicy writeSkipPolicy = createSkipPolicy();
390 		writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy);
391 		chunkProcessor.setWriteSkipPolicy(writeSkipPolicy);
392 		chunkProcessor.setProcessSkipPolicy(writeSkipPolicy);
393 		chunkProcessor.setRollbackClassifier(getRollbackClassifier());
394 		chunkProcessor.setKeyGenerator(keyGenerator);
395 		detectStreamInReader();
396 
397 		ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners());
398 		listeners.addAll(skipListeners);
399 		chunkProcessor.setListeners(listeners);
400 		chunkProcessor.setChunkMonitor(chunkMonitor);
401 
402 		return chunkProcessor;
403 
404 	}
405 
406 	@SuppressWarnings("unchecked")
407 	private void addSpecialExceptions() {
408 		addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
409 				SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class,
410 				JobInterruptedException.class, Error.class);
411 		addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
412 				TransactionException.class, FatalStepExecutionException.class, SkipListenerFailedException.class,
413 				SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class, Error.class);
414 	}
415 
416 	private void detectStreamInReader() {
417 		if (streamIsReader) {
418 			if (!concurrent()) {
419 				chunkMonitor.setItemReader(getReader());
420 			}
421 			else {
422 				logger.warn("Asynchronous TaskExecutor detected with ItemStream reader.  This is probably an error, "
423 						+ "and may lead to incorrect restart data being stored.");
424 			}
425 		}
426 	}
427 
428 	/**
429 	 * Register explicitly set item listeners and auto-register reader, processor and writer if applicable
430 	 */
431 	private void registerSkipListeners() {
432 
433 		// auto-register reader, processor and writer
434 		for (Object itemHandler : new Object[] { getReader(), getWriter(), getProcessor() }) {
435 
436 			if (StepListenerFactoryBean.isListener(itemHandler)) {
437 				StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
438 				if (listener instanceof SkipListener<?, ?>) {
439 					@SuppressWarnings("unchecked")
440 					SkipListener<? super I, ? super O> skipListener = (SkipListener<? super I, ? super O>) listener;
441 					skipListeners.add(skipListener);
442 				}
443 			}
444 
445 		}
446 	}
447 
448 	/**
449 	 * Convenience method to get an exception classifier based on the provided transaction attributes.
450 	 *
451 	 * @return an exception classifier: maps to true if an exception should cause rollback
452 	 */
453 	private Classifier<Throwable, Boolean> getRollbackClassifier() {
454 
455 		Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false);
456 
457 		// Try to avoid pathological cases where we cannot force a rollback
458 		// (should be pretty uncommon):
459 		if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException()))
460 				|| !classifier.classify(new ExhaustedRetryException("test"))) {
461 
462 			final Classifier<Throwable, Boolean> binary = classifier;
463 
464 			Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>();
465 			types.add(ForceRollbackForWriteSkipException.class);
466 			types.add(ExhaustedRetryException.class);
467 			final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true);
468 
469 			classifier = new Classifier<Throwable, Boolean>() {
470 				@Override
471 				public Boolean classify(Throwable classifiable) {
472 					// Rollback if either the user's list or our own applies
473 					return panic.classify(classifiable) || binary.classify(classifiable);
474 				}
475 			};
476 
477 		}
478 
479 		return classifier;
480 
481 	}
482 
483 	@SuppressWarnings("serial")
484 	private TransactionAttribute getTransactionAttribute(TransactionAttribute attribute) {
485 
486 		final Classifier<Throwable, Boolean> classifier = getRollbackClassifier();
487 		return new DefaultTransactionAttribute(attribute) {
488 			@Override
489 			public boolean rollbackOn(Throwable ex) {
490 				return classifier.classify(ex);
491 			}
492 
493 		};
494 
495 	}
496 
497 	protected SkipPolicy createSkipPolicy() {
498 		SkipPolicy skipPolicy = this.skipPolicy;
499 		Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(
500 				skippableExceptionClasses);
501 		map.put(ForceRollbackForWriteSkipException.class, true);
502 		LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map);
503 		if (skipPolicy == null) {
504 			Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0),
505 					"If a skip limit is provided then skippable exceptions must also be specified");
506 			skipPolicy = limitCheckingItemSkipPolicy;
507 		}
508 		else if (limitCheckingItemSkipPolicy != null) {
509 			skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy });
510 		}
511 		return skipPolicy;
512 	}
513 
514 	/**
515 	 * @return fully configured retry template for item processing phase.
516 	 */
517 	private BatchRetryTemplate createRetryOperations() {
518 
519 		RetryPolicy retryPolicy = this.retryPolicy;
520 		SimpleRetryPolicy simpleRetryPolicy = null;
521 
522 		Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(
523 				retryableExceptionClasses);
524 		map.put(ForceRollbackForWriteSkipException.class, true);
525 		simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map);
526 
527 		if (retryPolicy == null) {
528 			Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0),
529 					"If a retry limit is provided then retryable exceptions must also be specified");
530 			retryPolicy = simpleRetryPolicy;
531 		}
532 		else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) {
533 			CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
534 			compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy });
535 			retryPolicy = compositeRetryPolicy;
536 		}
537 
538 		RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy);
539 
540 		BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate();
541 		if (backOffPolicy != null) {
542 			batchRetryTemplate.setBackOffPolicy(backOffPolicy);
543 		}
544 		batchRetryTemplate.setRetryPolicy(retryPolicyWrapper);
545 
546 		// Co-ordinate the retry policy with the exception handler:
547 		RepeatOperations stepOperations = getStepOperations();
548 		if (stepOperations instanceof RepeatTemplate) {
549 			SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper,
550 					getExceptionHandler(), nonRetryableExceptionClasses);
551 			((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
552 		}
553 
554 		if (retryContextCache != null) {
555 			batchRetryTemplate.setRetryContextCache(retryContextCache);
556 		}
557 
558 		if (retryListeners != null) {
559 			batchRetryTemplate.setListeners(retryListeners.toArray(new RetryListener[0]));
560 		}
561 		return batchRetryTemplate;
562 
563 	}
564 
565 	/**
566 	 * Wrap the provided {@link #setRetryPolicy(RetryPolicy)} so that it never retries explicitly non-retryable
567 	 * exceptions.
568 	 */
569 	private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) {
570 
571 		NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
572 		Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>();
573 		for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) {
574 			map.put(fatal, neverRetryPolicy);
575 		}
576 
577 		SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>(
578 				retryPolicy);
579 		classifier.setTypeMap(map);
580 
581 		ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy();
582 		retryPolicyWrapper.setExceptionClassifier(classifier);
583 		return retryPolicyWrapper;
584 
585 	}
586 
587 	/**
588 	 * Wrap a {@link SkipPolicy} and make it consistent with known fatal exceptions.
589 	 *
590 	 * @param skipPolicy an existing skip policy
591 	 * @return a skip policy that will not skip fatal exceptions
592 	 */
593 	private SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) {
594 
595 		NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy();
596 		Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<Class<? extends Throwable>, SkipPolicy>();
597 		for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) {
598 			map.put(fatal, neverSkipPolicy);
599 		}
600 
601 		SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<Throwable, SkipPolicy>(skipPolicy);
602 		classifier.setTypeMap(map);
603 
604 		ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy();
605 		skipPolicyWrapper.setExceptionClassifier(classifier);
606 		return skipPolicyWrapper;
607 	}
608 
609 	private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) {
610 		List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
611 		for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) {
612 			exceptions.add(exceptionClass);
613 		}
614 		for (Class<? extends Throwable> fatal : cls) {
615 			if (!exceptions.contains(fatal)) {
616 				exceptions.add(fatal);
617 			}
618 		}
619 		nonSkippableExceptionClasses = exceptions;
620 	}
621 
622 	private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) {
623 		List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
624 		for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) {
625 			exceptions.add(exceptionClass);
626 		}
627 		for (Class<? extends Throwable> fatal : cls) {
628 			if (!exceptions.contains(fatal)) {
629 				exceptions.add(fatal);
630 			}
631 		}
632 		nonRetryableExceptionClasses = exceptions;
633 	}
634 
635 	/**
636 	 * ChunkListener that wraps exceptions thrown from the ChunkListener in {@link FatalStepExecutionException} to force
637 	 * termination of StepExecution
638 	 *
639 	 * ChunkListeners shoulnd't throw exceptions and expect continued processing, they must be handled in the
640 	 * implementation or the step will terminate
641 	 *
642 	 */
643 	private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener {
644 
645 		private ChunkListener chunkListener;
646 
647 		TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) {
648 			this.chunkListener = chunkListener;
649 		}
650 
651 		@Override
652 		public void beforeChunk(ChunkContext context) {
653 			try {
654 				chunkListener.beforeChunk(context);
655 			}
656 			catch (Throwable t) {
657 				throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
658 			}
659 		}
660 
661 		@Override
662 		public void afterChunk(ChunkContext context) {
663 			try {
664 				chunkListener.afterChunk(context);
665 			}
666 			catch (Throwable t) {
667 				throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
668 			}
669 		}
670 
671 		@Override
672 		public void afterChunkError(ChunkContext context) {
673 			try {
674 				chunkListener.afterChunkError(context);
675 			}
676 			catch (Throwable t) {
677 				throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
678 			}
679 		}
680 	}
681 }