EMMA Coverage Report (generated Fri Aug 21 15:59:46 BST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [FaultTolerantStepFactoryBean.java]

nameclass, %method, %block, %line, %
FaultTolerantStepFactoryBean.java100% (3/3)93%  (26/28)97%  (638/661)95%  (137.4/144)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FaultTolerantStepFactoryBean$1100% (1/1)100% (2/2)91%  (29/32)95%  (2.8/3)
classify (Throwable): Boolean 100% (1/1)85%  (17/20)85%  (0.8/1)
FaultTolerantStepFactoryBean$1 (FaultTolerantStepFactoryBean, Classifier, Cla... 100% (1/1)100% (12/12)100% (2/2)
     
class FaultTolerantStepFactoryBean100% (1/1)92%  (22/24)97%  (592/612)95%  (134.6/141)
setBackOffPolicy (BackOffPolicy): void 0%   (0/1)0%   (0/4)0%   (0/2)
setKeyGenerator (KeyGenerator): void 0%   (0/1)0%   (0/4)0%   (0/2)
addNonRetryableExceptionIfMissing (Class []): void 100% (1/1)83%  (40/48)82%  (6.6/8)
configureRetry (): BatchRetryTemplate 100% (1/1)96%  (86/90)96%  (22/23)
FaultTolerantStepFactoryBean (): void 100% (1/1)100% (42/42)100% (10/10)
addFatalExceptionIfMissing (Class []): void 100% (1/1)100% (48/48)100% (8/8)
applyConfiguration (TaskletStep): void 100% (1/1)100% (60/60)100% (6/6)
configureChunkProcessor (): SimpleChunkProcessor 100% (1/1)100% (50/50)100% (12/12)
configureChunkProvider (): SimpleChunkProvider 100% (1/1)100% (27/27)100% (7/7)
fatalExceptionAwareProxy (RetryPolicy): RetryPolicy 100% (1/1)100% (42/42)100% (10/10)
getRollbackClassifier (): Classifier 100% (1/1)100% (57/57)100% (10/10)
getSkippableExceptionClasses (): Collection 100% (1/1)100% (12/12)100% (3/3)
getTransactionAttribute (): TransactionAttribute 100% (1/1)100% (13/13)100% (3/3)
registerStreams (TaskletStep, ItemStream []): void 100% (1/1)100% (75/75)100% (17/17)
setCacheCapacity (int): void 100% (1/1)100% (4/4)100% (2/2)
setFatalExceptionClasses (Collection): void 100% (1/1)100% (4/4)100% (2/2)
setNoRollbackExceptionClasses (Collection): void 100% (1/1)100% (4/4)100% (2/2)
setRetryContextCache (RetryContextCache): void 100% (1/1)100% (4/4)100% (2/2)
setRetryLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setRetryListeners (RetryListener []): void 100% (1/1)100% (4/4)100% (2/2)
setRetryPolicy (RetryPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setRetryableExceptionClasses (Collection): void 100% (1/1)100% (4/4)100% (2/2)
setSkipLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setSkippableExceptionClasses (Collection): void 100% (1/1)100% (4/4)100% (2/2)
     
class FaultTolerantStepFactoryBean$2100% (1/1)100% (2/2)100% (17/17)100% (3/3)
FaultTolerantStepFactoryBean$2 (FaultTolerantStepFactoryBean, TransactionAttr... 100% (1/1)100% (10/10)100% (2/2)
rollbackOn (Throwable): boolean 100% (1/1)100% (7/7)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.HashMap;
22import java.util.HashSet;
23import java.util.List;
24import java.util.Map;
25 
26import org.springframework.batch.classify.BinaryExceptionClassifier;
27import org.springframework.batch.classify.Classifier;
28import org.springframework.batch.classify.SubclassClassifier;
29import org.springframework.batch.core.JobInterruptedException;
30import org.springframework.batch.core.Step;
31import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
32import org.springframework.batch.core.step.skip.NonSkippableReadException;
33import org.springframework.batch.core.step.skip.SkipLimitExceededException;
34import org.springframework.batch.core.step.skip.SkipListenerFailedException;
35import org.springframework.batch.core.step.skip.SkipPolicy;
36import org.springframework.batch.core.step.tasklet.TaskletStep;
37import org.springframework.batch.item.ItemReader;
38import org.springframework.batch.item.ItemStream;
39import org.springframework.batch.repeat.RepeatOperations;
40import org.springframework.batch.repeat.support.RepeatTemplate;
41import org.springframework.batch.retry.ExhaustedRetryException;
42import org.springframework.batch.retry.RetryException;
43import org.springframework.batch.retry.RetryListener;
44import org.springframework.batch.retry.RetryPolicy;
45import org.springframework.batch.retry.backoff.BackOffPolicy;
46import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy;
47import org.springframework.batch.retry.policy.MapRetryContextCache;
48import org.springframework.batch.retry.policy.NeverRetryPolicy;
49import org.springframework.batch.retry.policy.RetryContextCache;
50import org.springframework.batch.retry.policy.SimpleRetryPolicy;
51import org.springframework.core.task.SyncTaskExecutor;
52import org.springframework.core.task.TaskExecutor;
53import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
54import org.springframework.transaction.interceptor.TransactionAttribute;
55 
56/**
57 * Factory bean for step that provides options for configuring skip behaviour.
58 * User can set {@link #setSkipLimit(int)} to set how many exceptions of
59 * {@link #setSkippableExceptionClasses(Collection)} types are tolerated.
60 * {@link #setFatalExceptionClasses(Collection)} will cause immediate
61 * termination of job - they are treated as higher priority than
62 * {@link #setSkippableExceptionClasses(Collection)}, so the two lists don't
63 * need to be exclusive.
64 * 
65 * Skippable exceptions on write will by default cause transaction rollback - to
66 * avoid rollback for specific exception class include it in the transaction
67 * attribute as "no rollback for".
68 * 
69 * @see SimpleStepFactoryBean
70 * 
71 * @author Dave Syer
72 * @author Robert Kasanicky
73 * 
74 */
75public class FaultTolerantStepFactoryBean<T, S> extends SimpleStepFactoryBean<T, S> {
76 
77        private Collection<Class<? extends Throwable>> skippableExceptionClasses = new HashSet<Class<? extends Throwable>>();
78 
79        private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new HashSet<Class<? extends Throwable>>();
80 
81        private Collection<Class<? extends Throwable>> fatalExceptionClasses = new HashSet<Class<? extends Throwable>>();
82 
83        private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>();
84 
85        private Collection<Class<? extends Throwable>> retryableExceptionClasses = new HashSet<Class<? extends Throwable>>();
86 
87        private int cacheCapacity = 0;
88 
89        private int retryLimit = 0;
90 
91        private int skipLimit = 0;
92 
93        private BackOffPolicy backOffPolicy;
94 
95        private RetryListener[] retryListeners;
96 
97        private RetryPolicy retryPolicy;
98 
99        private RetryContextCache retryContextCache;
100 
101        private KeyGenerator keyGenerator;
102 
103        private ChunkMonitor chunkMonitor = new ChunkMonitor();
104 
105        /**
106         * The {@link KeyGenerator} to use to identify failed items across rollback.
107         * Not used in the case of the
108         * {@link #setIsReaderTransactionalQueue(boolean) transactional queue flag}
109         * being false (the default).
110         * 
111         * @param keyGenerator the {@link KeyGenerator} to set
112         */
113        public void setKeyGenerator(KeyGenerator keyGenerator) {
114                this.keyGenerator = keyGenerator;
115        }
116 
117        /**
118         * Setter for the retry policy. If this is specified the other retry
119         * properties are ignored (retryLimit, backOffPolicy,
120         * retryableExceptionClasses).
121         * 
122         * @param retryPolicy a stateless {@link RetryPolicy}
123         */
124        public void setRetryPolicy(RetryPolicy retryPolicy) {
125                this.retryPolicy = retryPolicy;
126        }
127 
128        /**
129         * Public setter for the retry limit. Each item can be retried up to this
130         * limit. Note this limit includes the initial attempt to process the item,
131         * therefore <code>retryLimit == 1</code> by default.
132         * 
133         * @param retryLimit the retry limit to set, must be greater or equal to 1.
134         */
135        public void setRetryLimit(int retryLimit) {
136                this.retryLimit = retryLimit;
137        }
138 
139        /**
140         * Public setter for the capacity of the cache in the retry policy. If more
141         * items than this fail without being skipped or recovered an exception will
142         * be thrown. This is to guard against inadvertent infinite loops generated
143         * by item identity problems.<br/>
144         * 
145         * The default value should be high enough and more for most purposes. To
146         * breach the limit in a single-threaded step typically you have to have
147         * this many failures in a single transaction. Defaults to the value in the
148         * {@link MapRetryContextCache}.<br/>
149         * 
150         * This property is ignored if the
151         * {@link #setRetryContextCache(RetryContextCache)} is set directly.
152         * 
153         * @param cacheCapacity the cache capacity to set (greater than 0 else
154         * ignored)
155         */
156        public void setCacheCapacity(int cacheCapacity) {
157                this.cacheCapacity = cacheCapacity;
158        }
159 
160        /**
161         * Override the default retry context cache for retry of chunk processing.
162         * If this property is set then {@link #setCacheCapacity(int)} is ignored.
163         * 
164         * @param retryContextCache the {@link RetryContextCache} to set
165         */
166        public void setRetryContextCache(RetryContextCache retryContextCache) {
167                this.retryContextCache = retryContextCache;
168        }
169 
170        /**
171         * Public setter for the Class[].
172         * @param retryableExceptionClasses the retryableExceptionClasses to set
173         */
174        public void setRetryableExceptionClasses(Collection<Class<? extends Throwable>> retryableExceptionClasses) {
175                this.retryableExceptionClasses = retryableExceptionClasses;
176        }
177 
178        /**
179         * Public setter for the {@link BackOffPolicy}.
180         * @param backOffPolicy the {@link BackOffPolicy} to set
181         */
182        public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
183                this.backOffPolicy = backOffPolicy;
184        }
185 
186        /**
187         * Public setter for the {@link RetryListener}s.
188         * @param retryListeners the {@link RetryListener}s to set
189         */
190        public void setRetryListeners(RetryListener... retryListeners) {
191                this.retryListeners = retryListeners;
192        }
193 
194        /**
195         * A limit that determines skip policy. If this value is positive then an
196         * exception in chunk processing will cause the item to be skipped and no
197         * exception propagated until the limit is reached. If it is zero then all
198         * exceptions will be propagated from the chunk and cause the step to abort.
199         * 
200         * @param skipLimit the value to set. Default is 0 (never skip).
201         */
202        public void setSkipLimit(int skipLimit) {
203                this.skipLimit = skipLimit;
204        }
205 
206        /**
207         * Exception classes that when raised won't crash the job but will result in
208         * the item which handling caused the exception being skipped. Any exception
209         * which is marked for "no rollback" is also skippable, but not vice versa.
210         * Remember to set the {@link #setSkipLimit(int) skip limit} as well.
211         * <p/>
212         * Defaults to all exceptions.
213         * 
214         * @param exceptionClasses defaults to <code>Exception</code>
215         */
216        public void setSkippableExceptionClasses(Collection<Class<? extends Throwable>> exceptionClasses) {
217                this.skippableExceptionClasses = exceptionClasses;
218        }
219 
220        /**
221         * Exception classes that are candidates for no rollback. The {@link Step}
222         * can not honour the no rollback hint in all circumstances, but any
223         * exception on this list is counted as skippable, so even if there has to
224         * be a rollback, then the step will not fail as long as the skip limit is
225         * not breached.
226         * <p/>
227         * Defaults is empty.
228         * 
229         * @param noRollbackExceptionClasses the exception classes to set
230         */
231        public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) {
232                this.noRollbackExceptionClasses = noRollbackExceptionClasses;
233        }
234 
235        /**
236         * Exception classes that are not skippable (but may be retryable).
237         * 
238         * @param fatalExceptionClasses {@link Error} by default
239         */
240        public void setFatalExceptionClasses(Collection<Class<? extends Throwable>> fatalExceptionClasses) {
241                this.fatalExceptionClasses = fatalExceptionClasses;
242        }
243 
244        /**
245         * Convenience method for subclasses to get an exception classifier based on
246         * the provided transaction attributes.
247         * 
248         * @return an exception classifier: maps to true if an exception should
249         * cause rollback
250         */
251        protected Classifier<Throwable, Boolean> getRollbackClassifier() {
252 
253                Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false);
254 
255                // Try to avoid pathological cases where we cannot force a rollback
256                // (should be pretty uncommon):
257                if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException()))
258                                || !classifier.classify(new ExhaustedRetryException("test"))) {
259 
260                        final Classifier<Throwable, Boolean> binary = classifier;
261 
262                        Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>();
263                        types.add(ForceRollbackForWriteSkipException.class);
264                        types.add(ExhaustedRetryException.class);
265                        final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true);
266 
267                        classifier = new Classifier<Throwable, Boolean>() {
268                                public Boolean classify(Throwable classifiable) {
269                                        // Rollback if either the user's list or our own applies
270                                        return panic.classify(classifiable) || binary.classify(classifiable);
271                                }
272                        };
273 
274                }
275 
276                return classifier;
277 
278        }
279 
280        /**
281         * Getter for the {@link TransactionAttribute} for subclasses only.
282         * @return the transactionAttribute
283         */
284        @Override
285        protected TransactionAttribute getTransactionAttribute() {
286 
287                TransactionAttribute attribute = super.getTransactionAttribute();
288                final Classifier<Throwable, Boolean> classifier = getRollbackClassifier();
289                return new DefaultTransactionAttribute(attribute) {
290                        @Override
291                        public boolean rollbackOn(Throwable ex) {
292                                return classifier.classify(ex);
293                        }
294 
295                };
296 
297        }
298 
299        @Override
300        protected void applyConfiguration(TaskletStep step) {
301 
302                addFatalExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
303                                SkipListenerFailedException.class, RetryException.class, JobInterruptedException.class, Error.class);
304                addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
305                                SkipListenerFailedException.class, RetryException.class, JobInterruptedException.class, Error.class);
306 
307                super.applyConfiguration(step);
308 
309        }
310 
311        /**
312         * {@inheritDoc}
313         */
314        @Override
315        protected void registerStreams(TaskletStep step, ItemStream[] streams) {
316                boolean streamIsReader = false;
317                ItemReader<? extends T> itemReader = getItemReader();
318                for (ItemStream stream : streams) {
319                        if (stream instanceof ItemReader) {
320                                streamIsReader = true;
321                                chunkMonitor.registerItemStream(stream);
322                        }
323                        else {
324                                step.registerStream(stream);
325                        }
326                }
327                TaskExecutor taskExecutor = getTaskExecutor();
328                // In cases where multiple nested item readers are registered,
329                // they all want to get the open() and close() callbacks.
330                if (streamIsReader) {
331                        // double registration is fine
332                        step.registerStream(chunkMonitor);
333                        boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor);
334                        if (!concurrent) {
335                                chunkMonitor.setItemReader(itemReader);
336                        }
337                        else {
338                                logger.warn("Synchronous TaskExecutor detected (" + taskExecutor.getClass()
339                                                + ") with ItemStream reader.  This is probably an error, "
340                                                + "and may lead to incorrect restart data being stored.");
341                        }
342                }
343        }
344 
345        /**
346         * @return {@link ChunkProvider} configured for fault-tolerance.
347         */
348        @Override
349        protected SimpleChunkProvider<T> configureChunkProvider() {
350 
351                SkipPolicy readSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, getSkippableExceptionClasses(),
352                                fatalExceptionClasses);
353                FaultTolerantChunkProvider<T> chunkProvider = new FaultTolerantChunkProvider<T>(getItemReader(),
354                                getChunkOperations());
355                chunkProvider.setSkipPolicy(readSkipPolicy);
356                chunkProvider.setRollbackClassifier(getRollbackClassifier());
357 
358                return chunkProvider;
359 
360        }
361 
362        /**
363         * @return {@link ChunkProcessor} configured for fault-tolerance.
364         */
365        @Override
366        protected SimpleChunkProcessor<T, S> configureChunkProcessor() {
367 
368                BatchRetryTemplate batchRetryTemplate = configureRetry();
369 
370                FaultTolerantChunkProcessor<T, S> chunkProcessor = new FaultTolerantChunkProcessor<T, S>(getItemProcessor(),
371                                getItemWriter(), batchRetryTemplate);
372                chunkProcessor.setBuffering(!isReaderTransactionalQueue());
373 
374                SkipPolicy writeSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, getSkippableExceptionClasses(),
375                                fatalExceptionClasses);
376                chunkProcessor.setWriteSkipPolicy(writeSkipPolicy);
377                chunkProcessor.setProcessSkipPolicy(writeSkipPolicy);
378                chunkProcessor.setRollbackClassifier(getRollbackClassifier());
379                chunkProcessor.setKeyGenerator(keyGenerator);
380                chunkProcessor.setChunkMonitor(chunkMonitor);
381 
382                return chunkProcessor;
383 
384        }
385 
386        /**
387         * @return
388         */
389        private Collection<Class<? extends Throwable>> getSkippableExceptionClasses() {
390                HashSet<Class<? extends Throwable>> set = new HashSet<Class<? extends Throwable>>(skippableExceptionClasses);
391                set.add(ForceRollbackForWriteSkipException.class);
392                return set;
393        }
394 
395        /**
396         * @return fully configured retry template for item processing phase.
397         */
398        private BatchRetryTemplate configureRetry() {
399 
400                if (retryPolicy == null) {
401                        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(retryLimit);
402                        HashSet<Class<? extends Throwable>> set = new HashSet<Class<? extends Throwable>>(retryableExceptionClasses);
403                        set.add(ForceRollbackForWriteSkipException.class);
404                        // set.addAll(noRollbackExceptionClasses); // should only be
405                        // retryable on write
406                        simpleRetryPolicy.setRetryableExceptionClasses(set);
407                        retryPolicy = simpleRetryPolicy;
408                }
409 
410                RetryPolicy retryPolicyWrapper = fatalExceptionAwareProxy(retryPolicy);
411 
412                BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate();
413                if (backOffPolicy != null) {
414                        batchRetryTemplate.setBackOffPolicy(backOffPolicy);
415                }
416                batchRetryTemplate.setRetryPolicy(retryPolicyWrapper);
417 
418                // Co-ordinate the retry policy with the exception handler:
419                RepeatOperations stepOperations = getStepOperations();
420                if (stepOperations instanceof RepeatTemplate) {
421                        SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper,
422                                        getExceptionHandler(), nonRetryableExceptionClasses);
423                        ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
424                }
425 
426                if (retryContextCache == null) {
427                        if (cacheCapacity > 0) {
428                                batchRetryTemplate.setRetryContextCache(new MapRetryContextCache(cacheCapacity));
429                        }
430                }
431                else {
432                        batchRetryTemplate.setRetryContextCache(retryContextCache);
433                }
434 
435                if (retryListeners != null) {
436                        batchRetryTemplate.setListeners(retryListeners);
437                }
438                return batchRetryTemplate;
439        }
440 
441        /**
442         * Wrap the provided {@link #setRetryPolicy(RetryPolicy)} so that it never
443         * retries explicitly non-retryable exceptions.
444         */
445        private RetryPolicy fatalExceptionAwareProxy(final RetryPolicy retryPolicy) {
446 
447                NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
448                Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>();
449                for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) {
450                        map.put(fatal, neverRetryPolicy);
451                }
452 
453                SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>(
454                                retryPolicy);
455                classifier.setTypeMap(map);
456 
457                ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy();
458                retryPolicyWrapper.setExceptionClassifier(classifier);
459                return retryPolicyWrapper;
460 
461        }
462 
463        @SuppressWarnings("unchecked")
464        private void addFatalExceptionIfMissing(Class... cls) {
465                List exceptions = new ArrayList<Class<? extends Throwable>>();
466                for (Class exceptionClass : fatalExceptionClasses) {
467                        exceptions.add(exceptionClass);
468                }
469                for (Class fatal : cls) {
470                        if (!exceptions.contains(fatal)) {
471                                exceptions.add(fatal);
472                        }
473                }
474                fatalExceptionClasses = exceptions;
475        }
476 
477        @SuppressWarnings("unchecked")
478        private void addNonRetryableExceptionIfMissing(Class... cls) {
479                List exceptions = new ArrayList<Class<? extends Throwable>>();
480                for (Class exceptionClass : nonRetryableExceptionClasses) {
481                        exceptions.add(exceptionClass);
482                }
483                for (Class fatal : cls) {
484                        if (!exceptions.contains(fatal)) {
485                                exceptions.add(fatal);
486                        }
487                }
488                nonRetryableExceptionClasses = exceptions;
489        }
490 
491}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov