EMMA Coverage Report (generated Fri Aug 21 15:59:46 BST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [FaultTolerantChunkProcessor.java]

nameclass, %method, %block, %line, %
FaultTolerantChunkProcessor.java100% (7/7)100% (39/39)92%  (826/895)90%  (170/188)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FaultTolerantChunkProcessor$4100% (1/1)100% (2/2)85%  (67/79)69%  (11/16)
recover (RetryContext): Object 100% (1/1)81%  (52/64)64%  (9/14)
FaultTolerantChunkProcessor$4 (FaultTolerantChunkProcessor, Chunk, Chunk, Ste... 100% (1/1)100% (15/15)100% (2/2)
     
class FaultTolerantChunkProcessor$2100% (1/1)100% (2/2)87%  (39/45)89%  (8/9)
recover (RetryContext): Object 100% (1/1)82%  (27/33)86%  (6/7)
FaultTolerantChunkProcessor$2 (FaultTolerantChunkProcessor, StepContribution,... 100% (1/1)100% (12/12)100% (2/2)
     
class FaultTolerantChunkProcessor100% (1/1)100% (25/25)92%  (495/540)93%  (114/123)
getInputKeys (Chunk): List 100% (1/1)19%  (6/31)33%  (2/6)
getInputKey (Object): Object 100% (1/1)50%  (5/10)67%  (2/3)
checkSkipPolicy (Chunk$ChunkIterator, Chunk$ChunkIterator, Exception, StepCon... 100% (1/1)81%  (25/31)88%  (7/8)
callSkipListeners (Chunk, Chunk): void 100% (1/1)87%  (61/70)81%  (13/16)
FaultTolerantChunkProcessor (ItemProcessor, ItemWriter, BatchRetryTemplate): ... 100% (1/1)100% (39/39)100% (9/9)
access$0 (FaultTolerantChunkProcessor): Classifier 100% (1/1)100% (3/3)100% (1/1)
access$1 (FaultTolerantChunkProcessor): SkipPolicy 100% (1/1)100% (3/3)100% (1/1)
access$2 (FaultTolerantChunkProcessor): Log 100% (1/1)100% (3/3)100% (1/1)
access$3 (FaultTolerantChunkProcessor): ChunkMonitor 100% (1/1)100% (3/3)100% (1/1)
access$4 (FaultTolerantChunkProcessor, StepContribution, Chunk, Chunk, ChunkM... 100% (1/1)100% (7/7)100% (1/1)
access$5 (FaultTolerantChunkProcessor, Chunk$ChunkIterator, Chunk$ChunkIterat... 100% (1/1)100% (7/7)100% (1/1)
access$6 (FaultTolerantChunkProcessor): SkipPolicy 100% (1/1)100% (3/3)100% (1/1)
getAdjustedOutputs (Chunk, Chunk): Chunk 100% (1/1)100% (25/25)100% (6/6)
getFilterCount (Chunk, Chunk): int 100% (1/1)100% (14/14)100% (2/2)
initializeUserData (Chunk): void 100% (1/1)100% (21/21)100% (6/6)
isComplete (Chunk): boolean 100% (1/1)100% (18/18)100% (3/3)
scan (StepContribution, Chunk, Chunk, ChunkMonitor): void 100% (1/1)100% (89/89)100% (25/25)
setBuffering (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setChunkMonitor (ChunkMonitor): void 100% (1/1)100% (4/4)100% (2/2)
setKeyGenerator (KeyGenerator): void 100% (1/1)100% (4/4)100% (2/2)
setProcessSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setRollbackClassifier (Classifier): void 100% (1/1)100% (4/4)100% (2/2)
setWriteSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
transform (StepContribution, Chunk): Chunk 100% (1/1)100% (72/72)100% (14/14)
write (StepContribution, Chunk, Chunk): void 100% (1/1)100% (67/67)100% (11/11)
     
class FaultTolerantChunkProcessor$1100% (1/1)100% (2/2)94%  (90/96)85%  (17/20)
doWithRetry (RetryContext): Object 100% (1/1)92%  (69/75)83%  (15/18)
FaultTolerantChunkProcessor$1 (FaultTolerantChunkProcessor, AtomicInteger, Ch... 100% (1/1)100% (21/21)100% (2/2)
     
class FaultTolerantChunkProcessor$3100% (1/1)100% (2/2)100% (71/71)100% (13/13)
FaultTolerantChunkProcessor$3 (FaultTolerantChunkProcessor, Chunk, Chunk, Ste... 100% (1/1)100% (15/15)100% (2/2)
doWithRetry (RetryContext): Object 100% (1/1)100% (56/56)100% (11/11)
     
class FaultTolerantChunkProcessor$5100% (1/1)100% (2/2)100% (48/48)100% (9/9)
FaultTolerantChunkProcessor$5 (FaultTolerantChunkProcessor, Chunk, StepContri... 100% (1/1)100% (15/15)100% (2/2)
recover (RetryContext): Object 100% (1/1)100% (33/33)100% (7/7)
     
class FaultTolerantChunkProcessor$UserData100% (1/1)100% (4/4)100% (16/16)100% (7/7)
FaultTolerantChunkProcessor$UserData (int): void 100% (1/1)100% (6/6)100% (3/3)
getOutputs (): Chunk 100% (1/1)100% (3/3)100% (1/1)
setOutputs (Chunk): void 100% (1/1)100% (4/4)100% (2/2)
size (): int 100% (1/1)100% (3/3)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.ArrayList;
20import java.util.Collections;
21import java.util.List;
22import java.util.concurrent.atomic.AtomicInteger;
23 
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.springframework.batch.classify.BinaryExceptionClassifier;
27import org.springframework.batch.classify.Classifier;
28import org.springframework.batch.core.StepContribution;
29import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
30import org.springframework.batch.core.step.skip.NonSkippableProcessException;
31import org.springframework.batch.core.step.skip.SkipListenerFailedException;
32import org.springframework.batch.core.step.skip.SkipPolicy;
33import org.springframework.batch.item.ItemProcessor;
34import org.springframework.batch.item.ItemWriter;
35import org.springframework.batch.retry.ExhaustedRetryException;
36import org.springframework.batch.retry.RecoveryCallback;
37import org.springframework.batch.retry.RetryCallback;
38import org.springframework.batch.retry.RetryContext;
39import org.springframework.batch.retry.RetryException;
40import org.springframework.batch.retry.support.DefaultRetryState;
41 
42/**
43 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
44 * allows for skipping or retry of items that cause exceptions during writing.
45 * 
46 */
47public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> {
48 
49        private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy(0);
50 
51        private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy(0);
52 
53        private final BatchRetryTemplate batchRetryTemplate;
54 
55        private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
56 
57        private Log logger = LogFactory.getLog(getClass());
58 
59        private boolean buffering = true;
60 
61        private KeyGenerator keyGenerator;
62 
63        private ChunkMonitor chunkMonitor = new ChunkMonitor();
64 
65        /**
66         * The {@link KeyGenerator} to use to identify failed items across rollback.
67         * Not used in the case of the {@link #setBuffering(boolean) buffering flag}
68         * being true (the default).
69         * 
70         * @param keyGenerator the {@link KeyGenerator} to set
71         */
72        public void setKeyGenerator(KeyGenerator keyGenerator) {
73                this.keyGenerator = keyGenerator;
74        }
75 
76        /**
77         * @param SkipPolicy the {@link SkipPolicy} for item processing
78         */
79        public void setProcessSkipPolicy(SkipPolicy SkipPolicy) {
80                this.itemProcessSkipPolicy = SkipPolicy;
81        }
82 
83        /**
84         * @param SkipPolicy the {@link SkipPolicy} for item writing
85         */
86        public void setWriteSkipPolicy(SkipPolicy SkipPolicy) {
87                this.itemWriteSkipPolicy = SkipPolicy;
88        }
89 
90        /**
91         * A classifier that can distinguish between exceptions that cause rollback
92         * (return true) or not (return false).
93         * 
94         * @param rollbackClassifier
95         */
96        public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
97                this.rollbackClassifier = rollbackClassifier;
98        }
99 
100        /**
101         * @param chunkMonitor
102         */
103        public void setChunkMonitor(ChunkMonitor chunkMonitor) {
104                this.chunkMonitor = chunkMonitor;
105        }
106 
107        /**
108         * A flag to indicate that items have been buffered and therefore will
109         * always come back as a chunk after a rollback. Otherwise things are more
110         * complicated because after a rollback the new chunk might or moght not
111         * contain items from the previous failed chunk.
112         * 
113         * @param buffering
114         */
115        public void setBuffering(boolean buffering) {
116                this.buffering = buffering;
117        }
118 
119        public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor,
120                        ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) {
121                super(itemProcessor, itemWriter);
122                this.batchRetryTemplate = batchRetryTemplate;
123        }
124 
125        @Override
126        protected void initializeUserData(Chunk<I> inputs) {
127                @SuppressWarnings("unchecked")
128                UserData<O> data = (UserData<O>) inputs.getUserData();
129                if (data == null) {
130                        data = new UserData<O>(inputs.size());
131                        inputs.setUserData(data);
132                        data.setOutputs(new Chunk<O>());
133                }
134        }
135 
136        @Override
137        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
138                @SuppressWarnings("unchecked")
139                UserData<O> data = (UserData<O>) inputs.getUserData();
140                return data.size() - outputs.size() - inputs.getSkips().size();
141        }
142 
143        @Override
144        protected boolean isComplete(Chunk<I> inputs) {
145 
146                /*
147                 * Need to remember the write skips across transactions, otherwise they
148                 * keep coming back. Since we register skips with the inputs they will
149                 * not be processed again but the output skips need to be saved for
150                 * registration later with the listeners. The inputs are going to be the
151                 * same for all transactions processing the same chunk, but the outputs
152                 * are not, so we stash them in user data on the inputs.
153                 */
154 
155                @SuppressWarnings("unchecked")
156                UserData<O> data = (UserData<O>) inputs.getUserData();
157                Chunk<O> previous = data.getOutputs();
158 
159                return inputs.isEmpty() && previous.getSkips().isEmpty();
160 
161        }
162 
163        @Override
164        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
165 
166                @SuppressWarnings("unchecked")
167                UserData<O> data = (UserData<O>) inputs.getUserData();
168                Chunk<O> previous = data.getOutputs();
169 
170                Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips());
171                next.setBusy(previous.isBusy());
172 
173                // Remember for next time if there are skips accumulating
174                data.setOutputs(next);
175 
176                return next;
177 
178        }
179 
180        @Override
181        protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
182 
183                Chunk<O> outputs = new Chunk<O>();
184                @SuppressWarnings("unchecked")
185                UserData<O> data = (UserData<O>) inputs.getUserData();
186                Chunk<O> cache = data.getOutputs();
187                final Chunk<O>.ChunkIterator cacheIterator = cache.isEmpty() ? null : cache.iterator();
188                final AtomicInteger count = new AtomicInteger(0);
189 
190                for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
191 
192                        final I item = iterator.next();
193 
194                        RetryCallback<O> retryCallback = new RetryCallback<O>() {
195 
196                                public O doWithRetry(RetryContext context) throws Exception {
197                                        O output = null;
198                                        try {
199                                                count.incrementAndGet();
200                                                O cached = (cacheIterator != null) ? cacheIterator.next() : null;
201                                                if (cached != null && count.get() > 1) {
202                                                        /*
203                                                         * If there is a cached chunk then we must be
204                                                         * scanning for errors in the writer, in which case
205                                                         * only the first one will be written, and for the
206                                                         * rest we need to fill in the output from the
207                                                         * cache.
208                                                         */
209                                                        output = cached;
210                                                }
211                                                else {
212                                                        output = doProcess(item);
213                                                }
214                                        }
215                                        catch (Exception e) {
216                                                if (rollbackClassifier.classify(e)) {
217                                                        // Default is to rollback unless the classifier
218                                                        // allows us to continue
219                                                        throw e;
220                                                }
221                                                else if (itemProcessSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
222                                                        // If we are not re-throwing then we should check if
223                                                        // this is skippable
224                                                        contribution.incrementProcessSkipCount();
225                                                        logger.debug("Skipping after failed process with no rollback", e);
226                                                }
227                                                else {
228                                                        // If it's not skippable that's an error in
229                                                        // configuration - it doesn't make sense to not roll
230                                                        // back if we are also not allowed to skip
231                                                        throw new NonSkippableProcessException(
232                                                                        "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
233                                                                        e);
234                                                }
235                                        }
236                                        if (output == null) {
237                                                // No need to re-process filtered items
238                                                iterator.remove();
239                                        }
240                                        return output;
241                                }
242 
243                        };
244 
245                        RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
246 
247                                public O recover(RetryContext context) throws Exception {
248                                        Exception e = (Exception) context.getLastThrowable();
249                                        if (itemProcessSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
250                                                contribution.incrementProcessSkipCount();
251                                                iterator.remove(e);
252                                                logger.debug("Skipping after failed process", e);
253                                                return null;
254                                        }
255                                        else {
256                                                throw new RetryException("Non-skippable exception in recoverer while processing", e);
257                                        }
258                                }
259 
260                        };
261 
262                        O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
263                                        getInputKey(item), rollbackClassifier));
264                        if (output != null) {
265                                outputs.add(output);
266                        }
267 
268                }
269 
270                return outputs;
271 
272        }
273 
274        @Override
275        protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
276                        throws Exception {
277 
278                RetryCallback<Object> retryCallback = new RetryCallback<Object>() {
279                        public Object doWithRetry(RetryContext context) throws Exception {
280 
281                                if (!inputs.isBusy()) {
282                                        chunkMonitor.setChunkSize(inputs.size());
283                                        try {
284                                                doWrite(outputs.getItems());
285                                        }
286                                        catch (Exception e) {
287                                                if (rollbackClassifier.classify(e)) {
288                                                        throw e;
289                                                }
290                                                /*
291                                                 * If the exception is marked as no-rollback, we need to
292                                                 * override that, otherwise there's no way to write the
293                                                 * rest of the chunk or to honour the skip listener
294                                                 * contract.
295                                                 */
296                                                throw new ForceRollbackForWriteSkipException(
297                                                                "Force rollback on skippable exception so that skipped item can be located.", e);
298                                        }
299                                        contribution.incrementWriteCount(outputs.size());
300                                }
301                                else {
302                                        scan(contribution, inputs, outputs, chunkMonitor);
303                                }
304                                return null;
305 
306                        }
307                };
308 
309                if (!buffering) {
310 
311                        RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {
312 
313                                public Object recover(RetryContext context) throws Exception {
314 
315                                        Exception e = (Exception) context.getLastThrowable();
316                                        if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
317                                                throw new RetryException("Invalid retry state during write caused by "
318                                                                + "exception that does not classify for rollback: ", e);
319                                        }
320 
321                                        Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
322                                        for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {
323 
324                                                inputIterator.next();
325                                                outputIterator.next();
326 
327                                                checkSkipPolicy(inputIterator, outputIterator, e, contribution);
328                                                if (!rollbackClassifier.classify(e)) {
329                                                        throw new RetryException(
330                                                                        "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
331                                                                        e);
332                                                }
333 
334                                        }
335 
336                                        return null;
337 
338                                }
339 
340                        };
341 
342                        batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, BatchRetryTemplate.createState(
343                                        getInputKeys(inputs), rollbackClassifier));
344 
345                }
346                else {
347 
348                        RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
349 
350                                public Object recover(RetryContext context) throws Exception {
351 
352                                        /*
353                                         * If the last exception was not skippable we don't need to
354                                         * do any scanning. We can just bomb out with a retry
355                                         * exhausted.
356                                         */
357                                        if (!itemWriteSkipPolicy.shouldSkip(context.getLastThrowable(), -1)) {
358                                                throw new ExhaustedRetryException(
359                                                                "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
360                                                                context.getLastThrowable());
361                                        }
362 
363                                        inputs.setBusy(true);
364                                        scan(contribution, inputs, outputs, chunkMonitor);
365                                        return null;
366                                }
367 
368                        };
369 
370                        logger.debug("Attempting to write: " + inputs);
371                        batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
372                                        rollbackClassifier));
373 
374                }
375 
376                callSkipListeners(inputs, outputs);
377 
378        }
379 
380        private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) {
381 
382                for (SkipWrapper<I> wrapper : inputs.getSkips()) {
383                        I item = wrapper.getItem();
384                        if (item == null) {
385                                continue;
386                        }
387                        Exception e = wrapper.getException();
388                        try {
389                                getListener().onSkipInProcess(item, e);
390                        }
391                        catch (RuntimeException ex) {
392                                throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
393                        }
394                }
395 
396                for (SkipWrapper<O> wrapper : outputs.getSkips()) {
397                        Exception e = wrapper.getException();
398                        try {
399                                getListener().onSkipInWrite(wrapper.getItem(), e);
400                        }
401                        catch (RuntimeException ex) {
402                                throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
403                        }
404                }
405 
406                // Clear skips if we are possibly going to process this chunk again
407                outputs.clearSkips();
408                inputs.clearSkips();
409 
410        }
411 
412        private Object getInputKey(I item) {
413                if (keyGenerator == null) {
414                        return item;
415                }
416                return keyGenerator.getKey(item);
417        }
418 
419        private List<?> getInputKeys(final Chunk<I> inputs) {
420                if (keyGenerator == null) {
421                        return inputs.getItems();
422                }
423                List<Object> keys = new ArrayList<Object>();
424                for (I item : inputs.getItems()) {
425                        keys.add(keyGenerator.getKey(item));
426                }
427                return keys;
428        }
429 
430        private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator,
431                        Exception e, StepContribution contribution) {
432                logger.debug("Checking skip policy after failed write");
433                if (itemWriteSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
434                        contribution.incrementWriteSkipCount();
435                        inputIterator.remove();
436                        outputIterator.remove(e);
437                        logger.debug("Skipping after failed write", e);
438                }
439                else {
440                        throw new RetryException("Non-skippable exception in recoverer", e);
441                }
442        }
443 
444        private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
445                        ChunkMonitor chunkMonitor) throws Exception {
446 
447                logger.debug("Scanning for failed item on write: " + inputs);
448                if (outputs.isEmpty()) {
449                        inputs.setBusy(false);
450                        return;
451                }
452 
453                Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
454                Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
455 
456                List<O> items = Collections.singletonList(outputIterator.next());
457                inputIterator.next();
458                try {
459                        writeItems(items);
460                        // If successful we are going to return and allow
461                        // the driver to commit...
462                        doAfterWrite(items);
463                        contribution.incrementWriteCount(1);
464                        inputIterator.remove();
465                        outputIterator.remove();
466                }
467                catch (Exception e) {
468                        if (!itemWriteSkipPolicy.shouldSkip(e, -1) && !rollbackClassifier.classify(e)) {
469                                inputIterator.remove();
470                                outputIterator.remove();
471                        }
472                        else {
473                                checkSkipPolicy(inputIterator, outputIterator, e, contribution);
474                        }
475                        if (rollbackClassifier.classify(e)) {
476                                throw e;
477                        }
478                }
479                chunkMonitor.incrementOffset();
480                if (outputs.isEmpty()) {
481                        inputs.setBusy(false);
482                        chunkMonitor.resetOffset();
483                }
484        }
485 
486        private static class UserData<O> {
487 
488                private final int size;
489 
490                private Chunk<O> outputs;
491 
492                public UserData(int size) {
493                        this.size = size;
494                }
495 
496                public int size() {
497                        return size;
498                }
499 
500                public Chunk<O> getOutputs() {
501                        return outputs;
502                }
503 
504                public void setOutputs(Chunk<O> outputs) {
505                        this.outputs = outputs;
506                }
507 
508        }
509 
510}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov