EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [SkipLimitStepFactoryBean.java]

nameclass, %method, %block, %line, %
SkipLimitStepFactoryBean.java100% (5/5)88%  (22/25)95%  (527/553)93%  (113.8/122)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SkipLimitStepFactoryBean100% (1/1)77%  (10/13)94%  (335/356)89%  (68.9/77)
setBackOffPolicy (BackOffPolicy): void 0%   (0/1)0%   (0/4)0%   (0/2)
setItemKeyGenerator (ItemKeyGenerator): void 0%   (0/1)0%   (0/4)0%   (0/2)
setRetryListeners (RetryListener []): void 0%   (0/1)0%   (0/4)0%   (0/2)
setRetryLimit (int): void 100% (1/1)92%  (11/12)97%  (2.9/3)
applyConfiguration (ItemOrientedStep): void 100% (1/1)97%  (230/238)95%  (41/43)
SkipLimitStepFactoryBean (): void 100% (1/1)100% (46/46)100% (8/8)
addFatalExceptionIfMissing (Class): void 100% (1/1)100% (24/24)100% (5/5)
setCacheCapacity (int): void 100% (1/1)100% (4/4)100% (2/2)
setFatalExceptionClasses (Class []): void 100% (1/1)100% (4/4)100% (2/2)
setRetryPolicy (RetryPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setRetryableExceptionClasses (Class []): void 100% (1/1)100% (4/4)100% (2/2)
setSkipLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setSkippableExceptionClasses (Class []): void 100% (1/1)100% (4/4)100% (2/2)
     
class SkipLimitStepFactoryBean$StatefulRetryItemHandler100% (1/1)100% (6/6)96%  (116/121)100% (32.8/33)
write (Object, StepContribution): void 100% (1/1)85%  (28/33)96%  (3.8/4)
SkipLimitStepFactoryBean$StatefulRetryItemHandler (ItemReader, ItemWriter, Re... 100% (1/1)100% (22/22)100% (7/7)
onSkipInRead (Exception): void 100% (1/1)100% (12/12)100% (5/5)
read (StepContribution): Object 100% (1/1)100% (35/35)100% (12/12)
registerSkipListener (SkipListener): void 100% (1/1)100% (5/5)100% (2/2)
setSkipListeners (SkipListener []): void 100% (1/1)100% (14/14)100% (3/3)
     
class SkipLimitStepFactoryBean$1100% (1/1)100% (2/2)100% (17/17)100% (2/2)
SkipLimitStepFactoryBean$1 (SkipLimitStepFactoryBean, RetryPolicy): void 100% (1/1)100% (7/7)100% (1/1)
recoverForException (Throwable): boolean 100% (1/1)100% (10/10)100% (1/1)
     
class SkipLimitStepFactoryBean$StatefulRetryItemHandler$1100% (1/1)100% (2/2)100% (16/16)100% (3/3)
SkipLimitStepFactoryBean$StatefulRetryItemHandler$1 (SkipLimitStepFactoryBean... 100% (1/1)100% (9/9)100% (1/1)
doWithRetry (RetryContext): Object 100% (1/1)100% (7/7)100% (2/2)
     
class SkipLimitStepFactoryBean$StatefulRetryItemHandler$2100% (1/1)100% (2/2)100% (43/43)100% (7/7)
SkipLimitStepFactoryBean$StatefulRetryItemHandler$2 (SkipLimitStepFactoryBean... 100% (1/1)100% (12/12)100% (1/1)
recover (RetryContext): Object 100% (1/1)100% (31/31)100% (6/6)

1package org.springframework.batch.core.step.item;
2 
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.HashMap;
6import java.util.List;
7 
8import org.springframework.batch.core.SkipListener;
9import org.springframework.batch.core.StepContribution;
10import org.springframework.batch.core.listener.CompositeSkipListener;
11import org.springframework.batch.core.step.skip.ItemSkipPolicy;
12import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
13import org.springframework.batch.core.step.skip.NonSkippableException;
14import org.springframework.batch.core.step.skip.SkipLimitExceededException;
15import org.springframework.batch.item.ItemKeyGenerator;
16import org.springframework.batch.item.ItemReader;
17import org.springframework.batch.item.ItemWriter;
18import org.springframework.batch.retry.RecoveryCallback;
19import org.springframework.batch.retry.RetryCallback;
20import org.springframework.batch.retry.RetryContext;
21import org.springframework.batch.retry.RetryException;
22import org.springframework.batch.retry.RetryListener;
23import org.springframework.batch.retry.RetryOperations;
24import org.springframework.batch.retry.RetryPolicy;
25import org.springframework.batch.retry.backoff.BackOffPolicy;
26import org.springframework.batch.retry.callback.RecoveryRetryCallback;
27import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy;
28import org.springframework.batch.retry.policy.MapRetryContextCache;
29import org.springframework.batch.retry.policy.NeverRetryPolicy;
30import org.springframework.batch.retry.policy.RecoveryCallbackRetryPolicy;
31import org.springframework.batch.retry.policy.SimpleRetryPolicy;
32import org.springframework.batch.retry.support.RetryTemplate;
33import org.springframework.batch.support.SubclassExceptionClassifier;
34import org.springframework.util.Assert;
35 
36/**
37 * Factory bean for step that provides options for configuring skip behavior.
38 * User can set {@link #setSkipLimit(int)} to set how many exceptions of
39 * {@link #setSkippableExceptionClasses(Class[])} types are tolerated.
40 * {@link #setFatalExceptionClasses(Class[])} will cause immediate termination
41 * of job - they are treated as higher priority than
42 * {@link #setSkippableExceptionClasses(Class[])}, so the two lists don't need
43 * to be exclusive.
44 * 
45 * Skippable exceptions on write will by default cause transaction rollback - to
46 * avoid rollback for specific exception class include it in the transaction
47 * attribute as "no rollback for".
48 * 
49 * @see SimpleStepFactoryBean
50 * 
51 * @author Dave Syer
52 * @author Robert Kasanicky
53 * 
54 */
55public class SkipLimitStepFactoryBean extends SimpleStepFactoryBean {
56 
57        private int skipLimit = 0;
58 
59        private Class[] skippableExceptionClasses = new Class[] { Exception.class };
60 
61        private Class[] fatalExceptionClasses = new Class[] { Error.class };
62 
63        private ItemKeyGenerator itemKeyGenerator;
64 
65        private int cacheCapacity = 0;
66 
67        private int retryLimit = 1;
68 
69        private Class[] retryableExceptionClasses = new Class[] {};
70 
71        private BackOffPolicy backOffPolicy;
72 
73        private RetryListener[] retryListeners;
74 
75        private RetryPolicy retryPolicy;
76 
77        /**
78         * Setter for the retry policy. If this is specified the other retry
79         * properties are ignored (retryLimit, backOffPolicy,
80         * retryableExceptionClasses).
81         * 
82         * @param retryPolicy a stateless {@link RetryPolicy}
83         */
84        public void setRetryPolicy(RetryPolicy retryPolicy) {
85                this.retryPolicy = retryPolicy;
86        }
87 
88        /**
89         * Public setter for the retry limit. Each item can be retried up to this
90         * limit. Note the limit includes the initial attempt, so it must be greater
91         * or equal to 1.
92         * 
93         * @param retryLimit the retry limit to set
94         */
95        public void setRetryLimit(int retryLimit) {
96                Assert.isTrue(retryLimit >= 1, "retry limit must be greater or equal to 1");
97                this.retryLimit = retryLimit;
98        }
99 
100        /**
101         * Public setter for the capacity of the cache in the retry policy. If more
102         * items than this fail without being skipped or recovered an exception will
103         * be thrown. This is to guard against inadvertent infinite loops generated
104         * by item identity problems. If a large number of items are failing and not
105         * being recognized as skipped, it usually signals a problem with the key
106         * generation (often equals and hashCode in the item itself). So it is
107         * better to enforce a strict limit than have weird looking errors, where a
108         * skip limit is reached without anything being skipped.<br/>
109         * 
110         * The default value should be high enough and more for most purposes. To
111         * breach the limit in a single-threaded step typically you have to have
112         * this many failures in a single transaction. Defaults to the value in the
113         * {@link MapRetryContextCache}.
114         * 
115         * @param cacheCapacity the cacheCapacity to set
116         */
117        public void setCacheCapacity(int cacheCapacity) {
118                this.cacheCapacity = cacheCapacity;
119        }
120 
121        /**
122         * Public setter for the Class[].
123         * 
124         * @param retryableExceptionClasses the retryableExceptionClasses to set
125         */
126        public void setRetryableExceptionClasses(Class[] retryableExceptionClasses) {
127                this.retryableExceptionClasses = retryableExceptionClasses;
128        }
129 
130        /**
131         * Public setter for the {@link BackOffPolicy}.
132         * 
133         * @param backOffPolicy the {@link BackOffPolicy} to set
134         */
135        public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
136                this.backOffPolicy = backOffPolicy;
137        }
138 
139        /**
140         * Public setter for the {@link RetryListener}s.
141         * 
142         * @param retryListeners the {@link RetryListener}s to set
143         */
144        public void setRetryListeners(RetryListener[] retryListeners) {
145                this.retryListeners = retryListeners;
146        }
147 
148        /**
149         * Public setter for a limit that determines skip policy. If this value is
150         * positive then an exception in chunk processing will cause the item to be
151         * skipped and no exception propagated until the limit is reached. If it is
152         * zero then all exceptions will be propagated from the chunk and cause the
153         * step to abort.
154         * 
155         * Note that if chunks are executed concurrently the number of skips can
156         * potentially exceed the skip limit and step can still finish successfully.
157         * This is due to the fact that overall skip count is not being synchronized
158         * between concurrent chunks while they processing, only on chunk
159         * boundaries.
160         * 
161         * @param skipLimit the value to set. Default is 0 (never skip).
162         */
163        public void setSkipLimit(int skipLimit) {
164                this.skipLimit = skipLimit;
165        }
166 
167        /**
168         * Public setter for exception classes that when raised won't crash the job
169         * but will result in transaction rollback and the item which handling
170         * caused the exception will be skipped.
171         * 
172         * @param exceptionClasses defaults to <code>Exception</code>
173         */
174        public void setSkippableExceptionClasses(Class[] exceptionClasses) {
175                this.skippableExceptionClasses = exceptionClasses;
176        }
177 
178        /**
179         * Public setter for exception classes that should cause immediate failure.
180         * 
181         * @param fatalExceptionClasses {@link Error} by default
182         */
183        public void setFatalExceptionClasses(Class[] fatalExceptionClasses) {
184                this.fatalExceptionClasses = fatalExceptionClasses;
185        }
186 
187        /**
188         * Public setter for the {@link ItemKeyGenerator}. This is used to identify
189         * failed items so they can be skipped if encountered again, generally in
190         * another transaction.
191         * 
192         * @param itemKeyGenerator the {@link ItemKeyGenerator} to set.
193         */
194        public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) {
195                this.itemKeyGenerator = itemKeyGenerator;
196        }
197 
198        /**
199         * Uses the {@link #setSkipLimit(int)} value to configure item handler and
200         * and exception handler.
201         */
202        protected void applyConfiguration(ItemOrientedStep step) {
203                super.applyConfiguration(step);
204 
205                if (retryLimit > 1 || skipLimit > 0 || retryPolicy != null) {
206 
207                        addFatalExceptionIfMissing(SkipLimitExceededException.class);
208                        addFatalExceptionIfMissing(NonSkippableException.class);
209                        addFatalExceptionIfMissing(RetryException.class);
210 
211                        if (retryPolicy == null) {
212 
213                                SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(retryLimit);
214                                if (retryableExceptionClasses.length > 0) { // otherwise we
215                                        // retry
216                                        // all exceptions
217                                        simpleRetryPolicy.setRetryableExceptionClasses(retryableExceptionClasses);
218                                }
219                                simpleRetryPolicy.setFatalExceptionClasses(fatalExceptionClasses);
220 
221                                ExceptionClassifierRetryPolicy classifierRetryPolicy = new ExceptionClassifierRetryPolicy();
222                                SubclassExceptionClassifier exceptionClassifier = new SubclassExceptionClassifier();
223                                HashMap exceptionTypeMap = new HashMap();
224                                for (int i = 0; i < retryableExceptionClasses.length; i++) {
225                                        Class cls = retryableExceptionClasses[i];
226                                        exceptionTypeMap.put(cls, "retry");
227                                }
228                                exceptionClassifier.setTypeMap(exceptionTypeMap);
229                                HashMap retryPolicyMap = new HashMap();
230                                retryPolicyMap.put("retry", simpleRetryPolicy);
231                                retryPolicyMap.put("default", new NeverRetryPolicy());
232                                classifierRetryPolicy.setPolicyMap(retryPolicyMap);
233                                classifierRetryPolicy.setExceptionClassifier(exceptionClassifier);
234                                retryPolicy = classifierRetryPolicy;
235 
236                        }
237 
238                        // Co-ordinate the retry policy with the exception handler:
239                        getStepOperations().setExceptionHandler(
240                                        new SimpleRetryExceptionHandler(retryPolicy, getExceptionHandler(), fatalExceptionClasses));
241 
242                        RecoveryCallbackRetryPolicy recoveryCallbackRetryPolicy = new RecoveryCallbackRetryPolicy(retryPolicy) {
243                                protected boolean recoverForException(Throwable ex) {
244                                        return !getTransactionAttribute().rollbackOn(ex);
245                                }
246                        };
247                        if (cacheCapacity > 0) {
248                                recoveryCallbackRetryPolicy.setRetryContextCache(new MapRetryContextCache(cacheCapacity));
249                        }
250 
251                        RetryTemplate retryTemplate = new RetryTemplate();
252                        if (retryListeners != null) {
253                                retryTemplate.setListeners(retryListeners);
254                        }
255                        retryTemplate.setRetryPolicy(recoveryCallbackRetryPolicy);
256                        if (backOffPolicy != null) {
257                                retryTemplate.setBackOffPolicy(backOffPolicy);
258                        }
259 
260                        List exceptions = new ArrayList(Arrays.asList(skippableExceptionClasses));
261                        ItemSkipPolicy readSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, exceptions, Arrays
262                                        .asList(fatalExceptionClasses));
263                        exceptions.addAll(Arrays.asList(retryableExceptionClasses));
264                        ItemSkipPolicy writeSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, exceptions, Arrays
265                                        .asList(fatalExceptionClasses));
266                        StatefulRetryItemHandler itemHandler = new StatefulRetryItemHandler(getItemReader(), getItemWriter(),
267                                        retryTemplate, itemKeyGenerator, readSkipPolicy, writeSkipPolicy);
268                        itemHandler.setSkipListeners(BatchListenerFactoryHelper.getSkipListeners(getListeners()));
269 
270                        step.setItemHandler(itemHandler);
271 
272                }
273                else {
274                        // This is the default in ItemOrientedStep anyway...
275                        step.setItemHandler(new SimpleItemHandler(getItemReader(), getItemWriter()));
276                }
277 
278        }
279 
280        public void addFatalExceptionIfMissing(Class cls) {
281                List fatalExceptionList = new ArrayList(Arrays.asList(fatalExceptionClasses));
282                if (!fatalExceptionList.contains(cls)) {
283                        fatalExceptionList.add(cls);
284                }
285                fatalExceptionClasses = (Class[]) fatalExceptionList.toArray(new Class[0]);
286        }
287 
288        /**
289         * If there is an exception on input it is skipped if allowed. If there is
290         * an exception on output, it will be re-thrown in any case, and the
291         * behaviour when the item is next encountered depends on the retryable and
292         * skippable exception configuration. If the exception is retryable the
293         * write will be attempted again up to the retry limit. When retry attempts
294         * are exhausted the skip listener is invoked and the skip count
295         * incremented. A retryable exception is thus also effectively also
296         * implicitly skippable.
297         * 
298         * @author Dave Syer
299         * 
300         */
301        private static class StatefulRetryItemHandler extends SimpleItemHandler {
302 
303                final private RetryOperations retryOperations;
304 
305                final private ItemKeyGenerator itemKeyGenerator;
306 
307                final private CompositeSkipListener listener = new CompositeSkipListener();
308 
309                final private ItemSkipPolicy readSkipPolicy;
310 
311                final private ItemSkipPolicy writeSkipPolicy;
312 
313                /**
314                 * @param itemReader
315                 * @param itemWriter
316                 * @param retryTemplate
317                 * @param itemKeyGenerator
318                 */
319                public StatefulRetryItemHandler(ItemReader itemReader, ItemWriter itemWriter, RetryOperations retryTemplate,
320                                ItemKeyGenerator itemKeyGenerator, ItemSkipPolicy readSkipPolicy, ItemSkipPolicy writeSkipPolicy) {
321                        super(itemReader, itemWriter);
322                        this.retryOperations = retryTemplate;
323                        this.itemKeyGenerator = itemKeyGenerator;
324                        this.readSkipPolicy = readSkipPolicy;
325                        this.writeSkipPolicy = writeSkipPolicy;
326                }
327 
328                /**
329                 * Register some {@link SkipListener}s with the handler. Each will get
330                 * the callbacks in the order specified at the correct stage if a skip
331                 * occurs.
332                 * 
333                 * @param listeners
334                 */
335                public void setSkipListeners(SkipListener[] listeners) {
336                        for (int i = 0; i < listeners.length; i++) {
337                                registerSkipListener(listeners[i]);
338                        }
339                }
340 
341                /**
342                 * Register a listener for callbacks at the appropriate stages in a skip
343                 * process.
344                 * 
345                 * @param listener a {@link SkipListener}
346                 */
347                public void registerSkipListener(SkipListener listener) {
348                        this.listener.register(listener);
349                }
350 
351                /**
352                 * Tries to read the item from the reader, in case of exception skip the
353                 * item if the skip policy allows, otherwise re-throw.
354                 * 
355                 * @param contribution current StepContribution holding skipped items
356                 * count
357                 * @return next item for processing
358                 */
359                protected Object read(StepContribution contribution) throws Exception {
360 
361                        while (true) {
362                                try {
363                                        return doRead();
364                                }
365                                catch (Exception e) {
366                                        try {
367                                                if (readSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
368                                                        // increment skip count and try again
369                                                        contribution.incrementTemporaryReadSkipCount();
370                                                        onSkipInRead(e);
371                                                        logger.debug("Skipping failed input", e);
372                                                }
373                                                else {
374                                                        throw new NonSkippableException("Non-skippable exception during read", e);
375                                                }
376                                        }
377                                        catch (SkipLimitExceededException ex) {
378                                                // we are headed for a abnormal ending so bake in the
379                                                // skip count
380                                                contribution.combineSkipCounts();
381                                                throw ex;
382                                        }
383                                }
384                        }
385 
386                }
387 
388                /**
389                 * Execute the business logic, delegating to the writer.<br/>
390                 * 
391                 * Process the item with the {@link ItemWriter} in a stateful retry. Any
392                 * {@link SkipListener} provided is called when retry attempts are
393                 * exhausted. The listener callback (on write failure) will happen in
394                 * the next transaction automatically.<br/>
395                 * 
396                 * @see org.springframework.batch.core.step.item.SimpleItemHandler#write(java.lang.Object,
397                 * org.springframework.batch.core.StepContribution)
398                 */
399                protected void write(final Object item, final StepContribution contribution) throws Exception {
400                        RecoveryRetryCallback retryCallback = new RecoveryRetryCallback(item, new RetryCallback() {
401                                public Object doWithRetry(RetryContext context) throws Throwable {
402                                        doWrite(item);
403                                        return null;
404                                }
405                        }, itemKeyGenerator != null ? itemKeyGenerator.getKey(item) : item);
406                        retryCallback.setRecoveryCallback(new RecoveryCallback() {
407                                public Object recover(RetryContext context) {
408                                        Throwable t = context.getLastThrowable();
409                                        if (writeSkipPolicy.shouldSkip(t, contribution.getStepSkipCount())) {
410                                                listener.onSkipInWrite(item, t);
411                                        }
412                                        else {
413                                                throw new NonSkippableException("Non-skippable exception on write", t);
414                                        }
415                                        contribution.incrementWriteSkipCount();
416                                        return null;
417                                }
418                        });
419                        retryOperations.execute(retryCallback);
420                }
421 
422                private void onSkipInRead(Exception e) {
423 
424                        try {
425                                listener.onSkipInRead(e);
426                        }
427                        catch (Exception ex) {
428                                logger.debug("Error in SkipListener onSkipInReader encountered and ignored.", ex);
429                        }
430                }
431 
432        }
433 
434}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov