EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [FaultTolerantStepFactoryBean.java]

nameclass, %method, %block, %line, %
FaultTolerantStepFactoryBean.java100% (4/4)88%  (30/34)90%  (774/858)89%  (158/177)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FaultTolerantStepFactoryBean$TerminateOnExceptionChunkListenerDelegate100% (1/1)33%  (1/3)27%  (9/33)23%  (3/13)
afterChunk (): void 0%   (0/1)0%   (0/12)0%   (0/5)
beforeChunk (): void 0%   (0/1)0%   (0/12)0%   (0/5)
FaultTolerantStepFactoryBean$TerminateOnExceptionChunkListenerDelegate (Fault... 100% (1/1)100% (9/9)100% (3/3)
     
class FaultTolerantStepFactoryBean$1100% (1/1)50%  (1/2)39%  (12/31)50%  (1/2)
classify (Throwable): Boolean 0%   (0/1)0%   (0/19)0%   (0/1)
FaultTolerantStepFactoryBean$1 (FaultTolerantStepFactoryBean, Classifier, Cla... 100% (1/1)100% (12/12)100% (1/1)
     
class FaultTolerantStepFactoryBean100% (1/1)96%  (26/27)95%  (736/777)95%  (154/162)
setKeyGenerator (KeyGenerator): void 0%   (0/1)0%   (0/4)0%   (0/2)
addNonRetryableExceptionIfMissing (Class []): void 100% (1/1)81%  (39/48)82%  (6.6/8)
addNonSkippableExceptionIfMissing (Class []): void 100% (1/1)81%  (39/48)82%  (6.6/8)
configureRetry (): BatchRetryTemplate 100% (1/1)85%  (111/130)89%  (24.9/28)
FaultTolerantStepFactoryBean (): void 100% (1/1)100% (45/45)100% (12/12)
applyConfiguration (TaskletStep): void 100% (1/1)100% (76/76)100% (4/4)
configureChunkProcessor (): SimpleChunkProcessor 100% (1/1)100% (51/51)100% (12/12)
configureChunkProvider (): SimpleChunkProvider 100% (1/1)100% (30/30)100% (7/7)
createSkipPolicy (): SkipPolicy 100% (1/1)100% (57/57)100% (10/10)
getFatalExceptionAwareProxy (RetryPolicy): RetryPolicy 100% (1/1)100% (42/42)100% (9/9)
getFatalExceptionAwareProxy (SkipPolicy): SkipPolicy 100% (1/1)100% (42/42)100% (9/9)
getRollbackClassifier (): Classifier 100% (1/1)100% (57/57)100% (9/9)
getTransactionAttribute (): TransactionAttribute 100% (1/1)100% (13/13)100% (3/3)
registerChunkListeners (TaskletStep, StepListener): void 100% (1/1)100% (10/10)100% (2/2)
registerStreams (TaskletStep, ItemStream []): void 100% (1/1)100% (76/76)100% (15/15)
setBackOffPolicy (BackOffPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setCacheCapacity (int): void 100% (1/1)100% (4/4)100% (2/2)
setNoRollbackExceptionClasses (Collection): void 100% (1/1)100% (4/4)100% (2/2)
setProcessorTransactional (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setRetryContextCache (RetryContextCache): void 100% (1/1)100% (4/4)100% (2/2)
setRetryLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setRetryListeners (RetryListener []): void 100% (1/1)100% (4/4)100% (2/2)
setRetryPolicy (RetryPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setRetryableExceptionClasses (Map): void 100% (1/1)100% (4/4)100% (2/2)
setSkipLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setSkippableExceptionClasses (Map): void 100% (1/1)100% (4/4)100% (2/2)
     
class FaultTolerantStepFactoryBean$2100% (1/1)100% (2/2)100% (17/17)100% (2/2)
FaultTolerantStepFactoryBean$2 (FaultTolerantStepFactoryBean, TransactionAttr... 100% (1/1)100% (10/10)100% (1/1)
rollbackOn (Throwable): boolean 100% (1/1)100% (7/7)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.HashMap;
22import java.util.HashSet;
23import java.util.List;
24import java.util.Map;
25 
26import org.springframework.batch.classify.BinaryExceptionClassifier;
27import org.springframework.batch.classify.Classifier;
28import org.springframework.batch.classify.SubclassClassifier;
29import org.springframework.batch.core.ChunkListener;
30import org.springframework.batch.core.JobInterruptedException;
31import org.springframework.batch.core.Step;
32import org.springframework.batch.core.StepListener;
33import org.springframework.batch.core.step.FatalStepExecutionException;
34import org.springframework.batch.core.step.skip.CompositeSkipPolicy;
35import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy;
36import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
37import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
38import org.springframework.batch.core.step.skip.NonSkippableReadException;
39import org.springframework.batch.core.step.skip.SkipLimitExceededException;
40import org.springframework.batch.core.step.skip.SkipListenerFailedException;
41import org.springframework.batch.core.step.skip.SkipPolicy;
42import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
43import org.springframework.batch.core.step.tasklet.TaskletStep;
44import org.springframework.batch.item.ItemReader;
45import org.springframework.batch.item.ItemStream;
46import org.springframework.batch.repeat.RepeatOperations;
47import org.springframework.batch.repeat.support.RepeatTemplate;
48import org.springframework.batch.retry.ExhaustedRetryException;
49import org.springframework.batch.retry.RetryException;
50import org.springframework.batch.retry.RetryListener;
51import org.springframework.batch.retry.RetryPolicy;
52import org.springframework.batch.retry.backoff.BackOffPolicy;
53import org.springframework.batch.retry.policy.CompositeRetryPolicy;
54import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy;
55import org.springframework.batch.retry.policy.MapRetryContextCache;
56import org.springframework.batch.retry.policy.NeverRetryPolicy;
57import org.springframework.batch.retry.policy.RetryContextCache;
58import org.springframework.batch.retry.policy.SimpleRetryPolicy;
59import org.springframework.core.task.SyncTaskExecutor;
60import org.springframework.core.task.TaskExecutor;
61import org.springframework.transaction.TransactionException;
62import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
63import org.springframework.transaction.interceptor.TransactionAttribute;
64import org.springframework.util.Assert;
65 
66/**
67 * Factory bean for step that provides options for configuring skip behaviour.
68 * User can set {@link #setSkipLimit(int)} to set how many exceptions of
69 * {@link #setSkippableExceptionClasses(Collection)} types are tolerated.
70 * {@link #setFatalExceptionClasses(Collection)} will cause immediate
71 * termination of job - they are treated as higher priority than
72 * {@link #setSkippableExceptionClasses(Collection)}, so the two lists don't
73 * need to be exclusive.
74 * 
75 * Skippable exceptions on write will by default cause transaction rollback - to
76 * avoid rollback for specific exception class include it in the transaction
77 * attribute as "no rollback for".
78 * 
79 * @see SimpleStepFactoryBean
80 * 
81 * @author Dave Syer
82 * @author Robert Kasanicky
83 * @author Morten Andersen-Gott
84 * 
85 */
86public class FaultTolerantStepFactoryBean<T, S> extends SimpleStepFactoryBean<T, S> {
87 
88        private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
89 
90        private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<Class<? extends Throwable>>();
91 
92        private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new HashSet<Class<? extends Throwable>>();
93 
94        private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
95 
96        private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>();
97 
98        private int cacheCapacity = 0;
99 
100        private int retryLimit = 0;
101 
102        private int skipLimit = 0;
103 
104        private SkipPolicy skipPolicy;
105 
106        private BackOffPolicy backOffPolicy;
107 
108        private RetryListener[] retryListeners;
109 
110        private RetryPolicy retryPolicy;
111 
112        private RetryContextCache retryContextCache;
113 
114        private KeyGenerator keyGenerator;
115 
116        private ChunkMonitor chunkMonitor = new ChunkMonitor();
117 
118        private boolean processorTransactional = true;
119 
120        /**
121         * The {@link KeyGenerator} to use to identify failed items across rollback.
122         * Not used in the case of the
123         * {@link #setIsReaderTransactionalQueue(boolean) transactional queue flag}
124         * being false (the default).
125         * 
126         * @param keyGenerator
127         *            the {@link KeyGenerator} to set
128         */
129        public void setKeyGenerator(KeyGenerator keyGenerator) {
130                this.keyGenerator = keyGenerator;
131        }
132 
133        /**
134         * Setter for the retry policy. If this is specified the other retry
135         * properties are ignored (retryLimit, backOffPolicy,
136         * retryableExceptionClasses).
137         * 
138         * @param retryPolicy
139         *            a stateless {@link RetryPolicy}
140         */
141        public void setRetryPolicy(RetryPolicy retryPolicy) {
142                this.retryPolicy = retryPolicy;
143        }
144 
145        /**
146         * Public setter for the retry limit. Each item can be retried up to this
147         * limit. Note this limit includes the initial attempt to process the item,
148         * therefore <code>retryLimit == 1</code> by default.
149         * 
150         * @param retryLimit
151         *            the retry limit to set, must be greater or equal to 1.
152         */
153        public void setRetryLimit(int retryLimit) {
154                this.retryLimit = retryLimit;
155        }
156 
157        /**
158         * Public setter for the capacity of the cache in the retry policy. If more
159         * items than this fail without being skipped or recovered an exception will
160         * be thrown. This is to guard against inadvertent infinite loops generated
161         * by item identity problems.<br/>
162         * 
163         * The default value should be high enough and more for most purposes. To
164         * breach the limit in a single-threaded step typically you have to have
165         * this many failures in a single transaction. Defaults to the value in the
166         * {@link MapRetryContextCache}.<br/>
167         * 
168         * This property is ignored if the
169         * {@link #setRetryContextCache(RetryContextCache)} is set directly.
170         * 
171         * @param cacheCapacity
172         *            the cache capacity to set (greater than 0 else ignored)
173         */
174        public void setCacheCapacity(int cacheCapacity) {
175                this.cacheCapacity = cacheCapacity;
176        }
177 
178        /**
179         * Override the default retry context cache for retry of chunk processing.
180         * If this property is set then {@link #setCacheCapacity(int)} is ignored.
181         * 
182         * @param retryContextCache
183         *            the {@link RetryContextCache} to set
184         */
185        public void setRetryContextCache(RetryContextCache retryContextCache) {
186                this.retryContextCache = retryContextCache;
187        }
188 
189        /**
190         * Public setter for the retryable exceptions classifier map (from throwable
191         * class to boolean, true is retryable).
192         * 
193         * @param retryableExceptionClasses
194         *            the retryableExceptionClasses to set
195         */
196        public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) {
197                this.retryableExceptionClasses = retryableExceptionClasses;
198        }
199 
200        /**
201         * Public setter for the {@link BackOffPolicy}.
202         * 
203         * @param backOffPolicy
204         *            the {@link BackOffPolicy} to set
205         */
206        public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
207                this.backOffPolicy = backOffPolicy;
208        }
209 
210        /**
211         * Public setter for the {@link RetryListener}s.
212         * 
213         * @param retryListeners
214         *            the {@link RetryListener}s to set
215         */
216        public void setRetryListeners(RetryListener... retryListeners) {
217                this.retryListeners = retryListeners;
218        }
219 
220        /**
221         * A limit that determines skip policy. If this value is positive then an
222         * exception in chunk processing will cause the item to be skipped and no
223         * exception propagated until the limit is reached. If it is zero then all
224         * exceptions will be propagated from the chunk and cause the step to abort.
225         * 
226         * @param skipLimit
227         *            the value to set. Default is 0 (never skip).
228         */
229        public void setSkipLimit(int skipLimit) {
230                this.skipLimit = skipLimit;
231        }
232 
233        /**
234         * A {@link SkipPolicy} that determines the outcome of an exception when
235         * processing an item. Overrides the {@link #setSkipLimit(int) skipLimit}.
236         * The {@link #setSkippableExceptionClasses(Map) skippableExceptionClasses}
237         * are also ignored if this is set.
238         * 
239         * @param skipPolicy
240         *            the {@link SkipPolicy} to set
241         */
242        public void setSkipPolicy(SkipPolicy skipPolicy) {
243                this.skipPolicy = skipPolicy;
244        }
245 
246        /**
247         * Exception classes that when raised won't crash the job but will result in
248         * the item which handling caused the exception being skipped. Any exception
249         * which is marked for "no rollback" is also skippable, but not vice versa.
250         * Remember to set the {@link #setSkipLimit(int) skip limit} as well.
251         * <p/>
252         * Defaults to all no exception.
253         * 
254         * @param exceptionClasses
255         *            defaults to <code>Exception</code>
256         */
257        public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) {
258                this.skippableExceptionClasses = exceptionClasses;
259        }
260 
261        /**
262         * Exception classes that are candidates for no rollback. The {@link Step}
263         * can not honour the no rollback hint in all circumstances, but any
264         * exception on this list is counted as skippable, so even if there has to
265         * be a rollback, then the step will not fail as long as the skip limit is
266         * not breached.
267         * <p/>
268         * Defaults is empty.
269         * 
270         * @param noRollbackExceptionClasses
271         *            the exception classes to set
272         */
273        public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) {
274                this.noRollbackExceptionClasses = noRollbackExceptionClasses;
275        }
276 
277        /**
278         * @param processorTransactional
279         */
280        public void setProcessorTransactional(boolean processorTransactional) {
281                this.processorTransactional = processorTransactional;
282        }
283 
284        /**
285         * Convenience method for subclasses to get an exception classifier based on
286         * the provided transaction attributes.
287         * 
288         * @return an exception classifier: maps to true if an exception should
289         *         cause rollback
290         */
291        protected Classifier<Throwable, Boolean> getRollbackClassifier() {
292 
293                Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false);
294 
295                // Try to avoid pathological cases where we cannot force a rollback
296                // (should be pretty uncommon):
297                if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException()))
298                                || !classifier.classify(new ExhaustedRetryException("test"))) {
299 
300                        final Classifier<Throwable, Boolean> binary = classifier;
301 
302                        Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>();
303                        types.add(ForceRollbackForWriteSkipException.class);
304                        types.add(ExhaustedRetryException.class);
305                        final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true);
306 
307                        classifier = new Classifier<Throwable, Boolean>() {
308                                public Boolean classify(Throwable classifiable) {
309                                        // Rollback if either the user's list or our own applies
310                                        return panic.classify(classifiable) || binary.classify(classifiable);
311                                }
312                        };
313 
314                }
315 
316                return classifier;
317 
318        }
319 
320        /**
321         * Getter for the {@link TransactionAttribute} for subclasses only.
322         * 
323         * @return the transactionAttribute
324         */
325        @SuppressWarnings("serial")
326        @Override
327        protected TransactionAttribute getTransactionAttribute() {
328 
329                TransactionAttribute attribute = super.getTransactionAttribute();
330                final Classifier<Throwable, Boolean> classifier = getRollbackClassifier();
331                return new DefaultTransactionAttribute(attribute) {
332                        @Override
333                        public boolean rollbackOn(Throwable ex) {
334                                return classifier.classify(ex);
335                        }
336 
337                };
338 
339        }
340 
341        @Override
342        @SuppressWarnings("unchecked")
343        protected void applyConfiguration(TaskletStep step) {
344                addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
345                                SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class,
346                                Error.class);
347                addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, TransactionException.class,
348                                FatalStepExecutionException.class, SkipListenerFailedException.class, SkipPolicyFailedException.class,
349                                RetryException.class, JobInterruptedException.class, Error.class);
350                super.applyConfiguration(step);
351        }
352 
353        /**
354         * {@inheritDoc}
355         */
356        @Override
357        protected void registerStreams(TaskletStep step, ItemStream[] streams) {
358                boolean streamIsReader = false;
359                ItemReader<? extends T> itemReader = getItemReader();
360                for (ItemStream stream : streams) {
361                        if (stream instanceof ItemReader<?>) {
362                                streamIsReader = true;
363                                chunkMonitor.registerItemStream(stream);
364                        } else {
365                                step.registerStream(stream);
366                        }
367                }
368                TaskExecutor taskExecutor = getTaskExecutor();
369                // In cases where multiple nested item readers are registered,
370                // they all want to get the open() and close() callbacks.
371                if (streamIsReader) {
372                        // double registration is fine
373                        step.registerStream(chunkMonitor);
374                        boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor);
375                        if (!concurrent) {
376                                chunkMonitor.setItemReader(itemReader);
377                        } else {
378                                logger.warn("Asynchronous TaskExecutor detected (" + taskExecutor.getClass()
379                                                + ") with ItemStream reader.  This is probably an error, " + "and may lead to incorrect restart data being stored.");
380                        }
381                }
382        }
383 
384        /**
385         * @return {@link ChunkProvider} configured for fault-tolerance.
386         */
387        @Override
388        protected SimpleChunkProvider<T> configureChunkProvider() {
389 
390                SkipPolicy readSkipPolicy = createSkipPolicy();
391                readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
392                FaultTolerantChunkProvider<T> chunkProvider = new FaultTolerantChunkProvider<T>(getItemReader(), getChunkOperations());
393                chunkProvider.setMaxSkipsOnRead(Math.max(getCommitInterval(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
394                chunkProvider.setSkipPolicy(readSkipPolicy);
395                chunkProvider.setRollbackClassifier(getRollbackClassifier());
396 
397                return chunkProvider;
398 
399        }
400 
401        /**
402         * @return
403         */
404        protected SkipPolicy createSkipPolicy() {
405                SkipPolicy skipPolicy = this.skipPolicy;
406                Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(skippableExceptionClasses);
407                map.put(ForceRollbackForWriteSkipException.class, true);
408                LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map);
409                if (skipPolicy == null) {
410                        Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0),
411                                        "If a skip limit is provided then skippable exceptions must also be specified");
412                        skipPolicy = limitCheckingItemSkipPolicy;
413                } else if (limitCheckingItemSkipPolicy != null) {
414                        skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy });
415                }
416                return skipPolicy;
417        }
418 
419        /**
420         * @return {@link ChunkProcessor} configured for fault-tolerance.
421         */
422        @Override
423        protected SimpleChunkProcessor<T, S> configureChunkProcessor() {
424 
425                BatchRetryTemplate batchRetryTemplate = configureRetry();
426 
427                FaultTolerantChunkProcessor<T, S> chunkProcessor = new FaultTolerantChunkProcessor<T, S>(getItemProcessor(), getItemWriter(),
428                                batchRetryTemplate);
429                chunkProcessor.setBuffering(!isReaderTransactionalQueue());
430                chunkProcessor.setProcessorTransactional(processorTransactional);
431 
432                SkipPolicy writeSkipPolicy = createSkipPolicy();
433                writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy);
434                chunkProcessor.setWriteSkipPolicy(writeSkipPolicy);
435                chunkProcessor.setProcessSkipPolicy(writeSkipPolicy);
436                chunkProcessor.setRollbackClassifier(getRollbackClassifier());
437                chunkProcessor.setKeyGenerator(keyGenerator);
438                chunkProcessor.setChunkMonitor(chunkMonitor);
439 
440                return chunkProcessor;
441 
442        }
443 
444        /**
445         * @return fully configured retry template for item processing phase.
446         */
447        private BatchRetryTemplate configureRetry() {
448 
449                RetryPolicy retryPolicy = this.retryPolicy;
450                SimpleRetryPolicy simpleRetryPolicy = null;
451 
452                Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(retryableExceptionClasses);
453                map.put(ForceRollbackForWriteSkipException.class, true);
454                simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map);
455 
456                if (retryPolicy == null) {
457                        Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0),
458                                        "If a retry limit is provided then retryable exceptions must also be specified");
459                        retryPolicy = simpleRetryPolicy;
460                } else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) {
461                        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
462                        compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy });
463                        retryPolicy = compositeRetryPolicy;
464                }
465 
466                RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy);
467 
468                BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate();
469                if (backOffPolicy != null) {
470                        batchRetryTemplate.setBackOffPolicy(backOffPolicy);
471                }
472                batchRetryTemplate.setRetryPolicy(retryPolicyWrapper);
473 
474                // Co-ordinate the retry policy with the exception handler:
475                RepeatOperations stepOperations = getStepOperations();
476                if (stepOperations instanceof RepeatTemplate) {
477                        SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper, getExceptionHandler(),
478                                        nonRetryableExceptionClasses);
479                        ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
480                }
481 
482                if (retryContextCache == null) {
483                        if (cacheCapacity > 0) {
484                                batchRetryTemplate.setRetryContextCache(new MapRetryContextCache(cacheCapacity));
485                        }
486                } else {
487                        batchRetryTemplate.setRetryContextCache(retryContextCache);
488                }
489 
490                if (retryListeners != null) {
491                        batchRetryTemplate.setListeners(retryListeners);
492                }
493                return batchRetryTemplate;
494        }
495 
496        /**
497         * Wrap the provided {@link #setRetryPolicy(RetryPolicy)} so that it never
498         * retries explicitly non-retryable exceptions.
499         */
500        private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) {
501 
502                NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
503                Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>();
504                for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) {
505                        map.put(fatal, neverRetryPolicy);
506                }
507 
508                SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>(retryPolicy);
509                classifier.setTypeMap(map);
510 
511                ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy();
512                retryPolicyWrapper.setExceptionClassifier(classifier);
513                return retryPolicyWrapper;
514 
515        }
516 
517        /**
518         * Wrap a {@link SkipPolicy} and make it consistent with known fatal
519         * exceptions.
520         * 
521         * @param skipPolicy
522         *            an existing skip policy
523         * @return a skip policy that will not skip fatal exceptions
524         */
525        private SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) {
526 
527                NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy();
528                Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<Class<? extends Throwable>, SkipPolicy>();
529                for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) {
530                        map.put(fatal, neverSkipPolicy);
531                }
532 
533                SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<Throwable, SkipPolicy>(skipPolicy);
534                classifier.setTypeMap(map);
535 
536                ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy();
537                skipPolicyWrapper.setExceptionClassifier(classifier);
538                return skipPolicyWrapper;
539        }
540 
541        private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) {
542                List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
543                for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) {
544                        exceptions.add(exceptionClass);
545                }
546                for (Class<? extends Throwable> fatal : cls) {
547                        if (!exceptions.contains(fatal)) {
548                                exceptions.add(fatal);
549                        }
550                }
551                nonSkippableExceptionClasses = exceptions;
552        }
553 
554        private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) {
555                List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
556                for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) {
557                        exceptions.add(exceptionClass);
558                }
559                for (Class<? extends Throwable> fatal : cls) {
560                        if (!exceptions.contains(fatal)) {
561                                exceptions.add(fatal);
562                        }
563                }
564                nonRetryableExceptionClasses = (List<Class<? extends Throwable>>) exceptions;
565        }
566 
567        @Override
568        protected void registerChunkListeners(TaskletStep step, StepListener listener) {
569                super.registerChunkListeners(step, new TerminateOnExceptionChunkListenerDelegate((ChunkListener) listener));
570        }
571 
572        /**
573         * ChunkListener that wraps exceptions thrown from the ChunkListener in
574         * {@link FatalStepExecutionException} to force termination of StepExecution
575         * 
576         * ChunkListeners shoulnd't throw exceptions and expect continued
577         * processing, they must be handled in the implementation or the step will
578         * terminate
579         * 
580         */
581        private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener {
582 
583                private ChunkListener chunkListener;
584 
585                TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) {
586                        this.chunkListener = chunkListener;
587                }
588 
589                public void beforeChunk() {
590                        try {
591                                chunkListener.beforeChunk();
592                        } catch (Throwable t) {
593                                throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
594                        }
595                }
596 
597                public void afterChunk() {
598                        try {
599                                chunkListener.afterChunk();
600                        } catch (Throwable t) {
601                                throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
602                        }
603                }
604 
605        }
606 
607}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov