EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [FaultTolerantChunkProcessor.java]

nameclass, %method, %block, %line, %
FaultTolerantChunkProcessor.java100% (7/7)100% (49/49)85%  (958/1125)85%  (200/234)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FaultTolerantChunkProcessor$2100% (1/1)100% (2/2)67%  (41/61)64%  (7/11)
recover (RetryContext): Object 100% (1/1)59%  (29/49)60%  (6/10)
FaultTolerantChunkProcessor$2 (FaultTolerantChunkProcessor, StepContribution,... 100% (1/1)100% (12/12)100% (1/1)
     
class FaultTolerantChunkProcessor100% (1/1)100% (31/31)83%  (565/681)85%  (138/163)
getInputKeys (Chunk): List 100% (1/1)20%  (6/30)29%  (2/7)
shouldSkip (SkipPolicy, Throwable, int): boolean 100% (1/1)31%  (5/16)20%  (1/5)
callProcessSkipListener (Object, Throwable): void 100% (1/1)47%  (7/15)60%  (3/5)
getInputKey (Object): Object 100% (1/1)50%  (5/10)67%  (2/3)
checkSkipPolicy (Chunk$ChunkIterator, Chunk$ChunkIterator, Throwable, StepCon... 100% (1/1)65%  (34/52)71%  (10/14)
scan (StepContribution, Chunk, Chunk, ChunkMonitor, boolean): void 100% (1/1)76%  (96/126)85%  (29/34)
callSkipListeners (Chunk, Chunk): void 100% (1/1)85%  (51/60)82%  (14/17)
write (StepContribution, Chunk, Chunk): void 100% (1/1)89%  (88/99)95%  (18/19)
FaultTolerantChunkProcessor (ItemProcessor, ItemWriter, BatchRetryTemplate): ... 100% (1/1)100% (40/40)100% (10/10)
access$1000 (FaultTolerantChunkProcessor, Chunk$ChunkIterator, Chunk$ChunkIte... 100% (1/1)100% (8/8)100% (1/1)
access$1100 (FaultTolerantChunkProcessor): SkipPolicy 100% (1/1)100% (3/3)100% (1/1)
access$200 (FaultTolerantChunkProcessor): boolean 100% (1/1)100% (3/3)100% (1/1)
access$300 (FaultTolerantChunkProcessor): Classifier 100% (1/1)100% (3/3)100% (1/1)
access$400 (FaultTolerantChunkProcessor): SkipPolicy 100% (1/1)100% (3/3)100% (1/1)
access$500 (FaultTolerantChunkProcessor, SkipPolicy, Throwable, int): boolean 100% (1/1)100% (6/6)100% (1/1)
access$600 (FaultTolerantChunkProcessor): Log 100% (1/1)100% (3/3)100% (1/1)
access$700 (FaultTolerantChunkProcessor, Object, Throwable): void 100% (1/1)100% (5/5)100% (1/1)
access$800 (FaultTolerantChunkProcessor): ChunkMonitor 100% (1/1)100% (3/3)100% (1/1)
access$900 (FaultTolerantChunkProcessor, StepContribution, Chunk, Chunk, Chun... 100% (1/1)100% (8/8)100% (1/1)
getAdjustedOutputs (Chunk, Chunk): Chunk 100% (1/1)100% (24/24)100% (6/6)
getFilterCount (Chunk, Chunk): int 100% (1/1)100% (7/7)100% (2/2)
initializeUserData (Chunk): void 100% (1/1)100% (20/20)100% (6/6)
isComplete (Chunk): boolean 100% (1/1)100% (18/18)100% (3/3)
setBuffering (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setChunkMonitor (ChunkMonitor): void 100% (1/1)100% (4/4)100% (2/2)
setKeyGenerator (KeyGenerator): void 100% (1/1)100% (4/4)100% (2/2)
setProcessSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setProcessorTransactional (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setRollbackClassifier (Classifier): void 100% (1/1)100% (4/4)100% (2/2)
setWriteSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
transform (StepContribution, Chunk): Chunk 100% (1/1)100% (91/91)100% (17/17)
     
class FaultTolerantChunkProcessor$4100% (1/1)100% (2/2)85%  (67/79)83%  (10/12)
recover (RetryContext): Object 100% (1/1)81%  (52/64)82%  (9/11)
FaultTolerantChunkProcessor$4 (FaultTolerantChunkProcessor, Chunk, Chunk, Ste... 100% (1/1)100% (15/15)100% (1/1)
     
class FaultTolerantChunkProcessor$5100% (1/1)100% (2/2)88%  (51/58)86%  (6/7)
recover (RetryContext): Object 100% (1/1)82%  (33/40)83%  (5/6)
FaultTolerantChunkProcessor$5 (FaultTolerantChunkProcessor, Chunk, FaultToler... 100% (1/1)100% (18/18)100% (1/1)
     
class FaultTolerantChunkProcessor$3100% (1/1)100% (2/2)93%  (76/82)92%  (12/13)
doWithRetry (RetryContext): Object 100% (1/1)90%  (55/61)92%  (11/12)
FaultTolerantChunkProcessor$3 (FaultTolerantChunkProcessor, AtomicReference, ... 100% (1/1)100% (21/21)100% (1/1)
     
class FaultTolerantChunkProcessor$1100% (1/1)100% (2/2)95%  (125/131)96%  (22/23)
doWithRetry (RetryContext): Object 100% (1/1)94%  (98/104)95%  (21/22)
FaultTolerantChunkProcessor$1 (FaultTolerantChunkProcessor, AtomicInteger, It... 100% (1/1)100% (27/27)100% (1/1)
     
class FaultTolerantChunkProcessor$UserData100% (1/1)100% (8/8)100% (33/33)100% (10/10)
FaultTolerantChunkProcessor$UserData (): void 100% (1/1)100% (6/6)100% (2/2)
FaultTolerantChunkProcessor$UserData (FaultTolerantChunkProcessor$1): void 100% (1/1)100% (3/3)100% (1/1)
access$100 (FaultTolerantChunkProcessor$UserData): int 100% (1/1)100% (3/3)100% (1/1)
getOutputs (): Chunk 100% (1/1)100% (3/3)100% (1/1)
incrementFilterCount (): void 100% (1/1)100% (7/7)100% (2/2)
scanning (): boolean 100% (1/1)100% (3/3)100% (1/1)
scanning (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setOutputs (Chunk): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.ArrayList;
20import java.util.Collections;
21import java.util.Iterator;
22import java.util.List;
23import java.util.concurrent.atomic.AtomicInteger;
24import java.util.concurrent.atomic.AtomicReference;
25 
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.springframework.batch.core.StepContribution;
29import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
30import org.springframework.batch.core.step.skip.NonSkippableProcessException;
31import org.springframework.batch.core.step.skip.SkipLimitExceededException;
32import org.springframework.batch.core.step.skip.SkipListenerFailedException;
33import org.springframework.batch.core.step.skip.SkipPolicy;
34import org.springframework.batch.item.ItemProcessor;
35import org.springframework.batch.item.ItemWriter;
36import org.springframework.classify.BinaryExceptionClassifier;
37import org.springframework.classify.Classifier;
38import org.springframework.retry.ExhaustedRetryException;
39import org.springframework.retry.RecoveryCallback;
40import org.springframework.retry.RetryCallback;
41import org.springframework.retry.RetryContext;
42import org.springframework.retry.RetryException;
43import org.springframework.retry.support.DefaultRetryState;
44 
45/**
46 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
47 * allows for skipping or retry of items that cause exceptions during writing.
48 *
49 */
50public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> {
51 
52        private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy();
53 
54        private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy();
55 
56        private final BatchRetryTemplate batchRetryTemplate;
57 
58        private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
59 
60        private Log logger = LogFactory.getLog(getClass());
61 
62        private boolean buffering = true;
63 
64        private KeyGenerator keyGenerator;
65 
66        private ChunkMonitor chunkMonitor = new ChunkMonitor();
67 
68        private boolean processorTransactional = true;
69 
70        /**
71         * The {@link KeyGenerator} to use to identify failed items across rollback.
72         * Not used in the case of the {@link #setBuffering(boolean) buffering flag}
73         * being true (the default).
74         *
75         * @param keyGenerator the {@link KeyGenerator} to set
76         */
77        public void setKeyGenerator(KeyGenerator keyGenerator) {
78                this.keyGenerator = keyGenerator;
79        }
80 
81        /**
82         * @param SkipPolicy the {@link SkipPolicy} for item processing
83         */
84        public void setProcessSkipPolicy(SkipPolicy SkipPolicy) {
85                this.itemProcessSkipPolicy = SkipPolicy;
86        }
87 
88        /**
89         * @param SkipPolicy the {@link SkipPolicy} for item writing
90         */
91        public void setWriteSkipPolicy(SkipPolicy SkipPolicy) {
92                this.itemWriteSkipPolicy = SkipPolicy;
93        }
94 
95        /**
96         * A classifier that can distinguish between exceptions that cause rollback
97         * (return true) or not (return false).
98         *
99         * @param rollbackClassifier
100         */
101        public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
102                this.rollbackClassifier = rollbackClassifier;
103        }
104 
105        /**
106         * @param chunkMonitor
107         */
108        public void setChunkMonitor(ChunkMonitor chunkMonitor) {
109                this.chunkMonitor = chunkMonitor;
110        }
111 
112        /**
113         * A flag to indicate that items have been buffered and therefore will
114         * always come back as a chunk after a rollback. Otherwise things are more
115         * complicated because after a rollback the new chunk might or might not
116         * contain items from the previous failed chunk.
117         *
118         * @param buffering
119         */
120        public void setBuffering(boolean buffering) {
121                this.buffering = buffering;
122        }
123 
124        /**
125         * Flag to say that the {@link ItemProcessor} is transactional (defaults to
126         * true). If false then the processor is only called once per item per
127         * chunk, even if there are rollbacks with retries and skips.
128         *
129         * @param processorTransactional the flag value to set
130         */
131        public void setProcessorTransactional(boolean processorTransactional) {
132                this.processorTransactional = processorTransactional;
133        }
134 
135        public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor,
136                        ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) {
137                super(itemProcessor, itemWriter);
138                this.batchRetryTemplate = batchRetryTemplate;
139        }
140 
141        @Override
142        protected void initializeUserData(Chunk<I> inputs) {
143                @SuppressWarnings("unchecked")
144                UserData<O> data = (UserData<O>) inputs.getUserData();
145                if (data == null) {
146                        data = new UserData<O>();
147                        inputs.setUserData(data);
148                        data.setOutputs(new Chunk<O>());
149                }
150        }
151 
152        @Override
153        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
154                @SuppressWarnings("unchecked")
155                UserData<O> data = (UserData<O>) inputs.getUserData();
156                return data.filterCount;
157        }
158 
159        @Override
160        protected boolean isComplete(Chunk<I> inputs) {
161 
162                /*
163                 * Need to remember the write skips across transactions, otherwise they
164                 * keep coming back. Since we register skips with the inputs they will
165                 * not be processed again but the output skips need to be saved for
166                 * registration later with the listeners. The inputs are going to be the
167                 * same for all transactions processing the same chunk, but the outputs
168                 * are not, so we stash them in user data on the inputs.
169                 */
170 
171                @SuppressWarnings("unchecked")
172                UserData<O> data = (UserData<O>) inputs.getUserData();
173                Chunk<O> previous = data.getOutputs();
174 
175                return inputs.isEmpty() && previous.getSkips().isEmpty();
176 
177        }
178 
179        @Override
180        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
181 
182                @SuppressWarnings("unchecked")
183                UserData<O> data = (UserData<O>) inputs.getUserData();
184                Chunk<O> previous = data.getOutputs();
185 
186                Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips());
187                next.setBusy(previous.isBusy());
188 
189                // Remember for next time if there are skips accumulating
190                data.setOutputs(next);
191 
192                return next;
193 
194        }
195 
196        @Override
197        protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
198 
199                Chunk<O> outputs = new Chunk<O>();
200                @SuppressWarnings("unchecked")
201                final UserData<O> data = (UserData<O>) inputs.getUserData();
202                final Chunk<O> cache = data.getOutputs();
203                final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
204                final AtomicInteger count = new AtomicInteger(0);
205 
206                // final int scanLimit = processorTransactional && data.scanning() ? 1 :
207                // 0;
208 
209                for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
210 
211                        final I item = iterator.next();
212 
213                        RetryCallback<O> retryCallback = new RetryCallback<O>() {
214 
215                                @Override
216                                public O doWithRetry(RetryContext context) throws Exception {
217                                        O output = null;
218                                        try {
219                                                count.incrementAndGet();
220                                                O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
221                                                if (cached != null && !processorTransactional) {
222                                                        output = cached;
223                                                }
224                                                else {
225                                                        output = doProcess(item);
226                                                        if (output == null) {
227                                                                data.incrementFilterCount();
228                                                        } else if (!processorTransactional && !data.scanning()) {
229                                                                cache.add(output);
230                                                        }
231                                                }
232                                        }
233                                        catch (Exception e) {
234                                                if (rollbackClassifier.classify(e)) {
235                                                        // Default is to rollback unless the classifier
236                                                        // allows us to continue
237                                                        throw e;
238                                                }
239                                                else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
240                                                        // If we are not re-throwing then we should check if
241                                                        // this is skippable
242                                                        contribution.incrementProcessSkipCount();
243                                                        logger.debug("Skipping after failed process with no rollback", e);
244                                                        // If not re-throwing then the listener will not be
245                                                        // called in next chunk.
246                                                        callProcessSkipListener(item, e);
247                                                }
248                                                else {
249                                                        // If it's not skippable that's an error in
250                                                        // configuration - it doesn't make sense to not roll
251                                                        // back if we are also not allowed to skip
252                                                        throw new NonSkippableProcessException(
253                                                                        "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
254                                                                        e);
255                                                }
256                                        }
257                                        if (output == null) {
258                                                // No need to re-process filtered items
259                                                iterator.remove();
260                                        }
261                                        return output;
262                                }
263 
264                        };
265 
266                        RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
267 
268                                @Override
269                                public O recover(RetryContext context) throws Exception {
270                                        Throwable e = context.getLastThrowable();
271                                        if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
272                                                iterator.remove(e);
273                                                contribution.incrementProcessSkipCount();
274                                                logger.debug("Skipping after failed process", e);
275                                                return null;
276                                        }
277                                        else {
278                                                if (rollbackClassifier.classify(e)) {
279                                                        // Default is to rollback unless the classifier
280                                                        // allows us to continue
281                                                        throw new RetryException("Non-skippable exception in recoverer while processing", e);
282                                                }
283                                                iterator.remove(e);
284                                                return null;
285                                        }
286                                }
287 
288                        };
289 
290                        O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
291                                        getInputKey(item), rollbackClassifier));
292                        if (output != null) {
293                                outputs.add(output);
294                        }
295 
296                        /*
297                         * We only want to process the first item if there is a scan for a
298                         * failed item.
299                         */
300                        if (data.scanning()) {
301                                while (cacheIterator != null && cacheIterator.hasNext()) {
302                                        outputs.add(cacheIterator.next());
303                                }
304                                // Only process the first item if scanning
305                                break;
306                        }
307                }
308 
309                return outputs;
310 
311        }
312 
313        @Override
314        protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
315                        throws Exception {
316                @SuppressWarnings("unchecked")
317                final UserData<O> data = (UserData<O>) inputs.getUserData();
318                final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
319 
320                RetryCallback<Object> retryCallback = new RetryCallback<Object>() {
321                        @Override
322                        public Object doWithRetry(RetryContext context) throws Exception {
323 
324                                contextHolder.set(context);
325 
326                                if (!data.scanning()) {
327                                        chunkMonitor.setChunkSize(inputs.size());
328                                        try {
329                                                doWrite(outputs.getItems());
330                                        }
331                                        catch (Exception e) {
332                                                if (rollbackClassifier.classify(e)) {
333                                                        throw e;
334                                                }
335                                                /*
336                                                 * If the exception is marked as no-rollback, we need to
337                                                 * override that, otherwise there's no way to write the
338                                                 * rest of the chunk or to honour the skip listener
339                                                 * contract.
340                                                 */
341                                                throw new ForceRollbackForWriteSkipException(
342                                                                "Force rollback on skippable exception so that skipped item can be located.", e);
343                                        }
344                                        contribution.incrementWriteCount(outputs.size());
345                                }
346                                else {
347                                        scan(contribution, inputs, outputs, chunkMonitor, false);
348                                }
349                                return null;
350 
351                        }
352                };
353 
354                if (!buffering) {
355 
356                        RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {
357 
358                                @Override
359                                public Object recover(RetryContext context) throws Exception {
360 
361                                        Throwable e = context.getLastThrowable();
362                                        if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
363                                                throw new RetryException("Invalid retry state during write caused by "
364                                                                + "exception that does not classify for rollback: ", e);
365                                        }
366 
367                                        Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
368                                        for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {
369 
370                                                inputIterator.next();
371                                                outputIterator.next();
372 
373                                                checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
374                                                if (!rollbackClassifier.classify(e)) {
375                                                        throw new RetryException(
376                                                                        "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
377                                                                        e);
378                                                }
379 
380                                        }
381 
382                                        return null;
383 
384                                }
385 
386                        };
387 
388                        batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
389                                        BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));
390 
391                }
392                else {
393 
394                        RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
395 
396                                @Override
397                                public Object recover(RetryContext context) throws Exception {
398 
399                                        /*
400                                         * If the last exception was not skippable we don't need to
401                                         * do any scanning. We can just bomb out with a retry
402                                         * exhausted.
403                                         */
404                                        if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
405                                                throw new ExhaustedRetryException(
406                                                                "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
407                                                                context.getLastThrowable());
408                                        }
409 
410                                        inputs.setBusy(true);
411                                        data.scanning(true);
412                                        scan(contribution, inputs, outputs, chunkMonitor, true);
413                                        return null;
414                                }
415 
416                        };
417 
418                        if (logger.isDebugEnabled()) {
419                                logger.debug("Attempting to write: " + inputs);
420                        }
421                        try {
422                                batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
423                                                rollbackClassifier));
424                        }
425                        catch (Exception e) {
426                                RetryContext context = contextHolder.get();
427                                if (!batchRetryTemplate.canRetry(context)) {
428                                        /*
429                                         * BATCH-1761: we need advance warning of the scan about to
430                                         * start in the next transaction, so we can change the
431                                         * processing behaviour.
432                                         */
433                                        data.scanning(true);
434                                }
435                                throw e;
436                        }
437 
438                }
439 
440                callSkipListeners(inputs, outputs);
441 
442        }
443 
444        private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) {
445 
446                for (SkipWrapper<I> wrapper : inputs.getSkips()) {
447                        I item = wrapper.getItem();
448                        if (item == null) {
449                                continue;
450                        }
451                        Throwable e = wrapper.getException();
452                        callProcessSkipListener(item, e);
453                }
454 
455                for (SkipWrapper<O> wrapper : outputs.getSkips()) {
456                        Throwable e = wrapper.getException();
457                        try {
458                                getListener().onSkipInWrite(wrapper.getItem(), e);
459                        }
460                        catch (RuntimeException ex) {
461                                throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
462                        }
463                }
464 
465                // Clear skips if we are possibly going to process this chunk again
466                outputs.clearSkips();
467                inputs.clearSkips();
468 
469        }
470 
471        /**
472         * Convenience method for calling process skip listener, so that it can be
473         * called from multiple places.
474         *
475         * @param item the item that is skipped
476         * @param e the cause of the skip
477         */
478        private void callProcessSkipListener(I item, Throwable e) {
479                try {
480                        getListener().onSkipInProcess(item, e);
481                }
482                catch (RuntimeException ex) {
483                        throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
484                }
485        }
486 
487        /**
488         * Convenience method for calling process skip policy, so that it can be
489         * called from multiple places.
490         *
491         * @param policy the skip policy
492         * @param e the cause of the skip
493         * @param skipCount the current skip count
494         */
495        private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
496                try {
497                        return policy.shouldSkip(e, skipCount);
498                }
499                catch (SkipLimitExceededException ex) {
500                        throw ex;
501                }
502                catch (RuntimeException ex) {
503                        throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e);
504                }
505        }
506 
507        private Object getInputKey(I item) {
508                if (keyGenerator == null) {
509                        return item;
510                }
511                return keyGenerator.getKey(item);
512        }
513 
514        private List<?> getInputKeys(final Chunk<I> inputs) {
515                if (keyGenerator == null) {
516                        return inputs.getItems();
517                }
518                List<Object> keys = new ArrayList<Object>();
519                for (I item : inputs.getItems()) {
520                        keys.add(keyGenerator.getKey(item));
521                }
522                return keys;
523        }
524 
525        private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator,
526                        Throwable e, StepContribution contribution, boolean recovery) throws Exception {
527                logger.debug("Checking skip policy after failed write");
528                if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) {
529                        contribution.incrementWriteSkipCount();
530                        inputIterator.remove();
531                        outputIterator.remove(e);
532                        logger.debug("Skipping after failed write", e);
533                }
534                else {
535                        if (recovery) {
536                                // Only if already recovering should we check skip policy
537                                throw new RetryException("Non-skippable exception in recoverer", e);
538                        }
539                        else {
540                                if (e instanceof Exception) {
541                                        throw (Exception) e;
542                                }
543                                else if (e instanceof Error) {
544                                        throw (Error) e;
545                                }
546                                else {
547                                        throw new RetryException("Non-skippable throwable in recoverer", e);
548                                }
549                        }
550                }
551        }
552 
553        private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
554                        ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
555 
556                @SuppressWarnings("unchecked")
557                final UserData<O> data = (UserData<O>) inputs.getUserData();
558 
559                if (logger.isDebugEnabled()) {
560                        if (recovery) {
561                                logger.debug("Scanning for failed item on recovery from write: " + inputs);
562                        }
563                        else {
564                                logger.debug("Scanning for failed item on write: " + inputs);
565                        }
566                }
567                if (outputs.isEmpty()) {
568                        data.scanning(false);
569                        inputs.setBusy(false);
570                        chunkMonitor.resetOffset();
571                        return;
572                }
573 
574                Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
575                Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
576 
577                List<O> items = Collections.singletonList(outputIterator.next());
578                inputIterator.next();
579                try {
580                        writeItems(items);
581                        // If successful we are going to return and allow
582                        // the driver to commit...
583                        doAfterWrite(items);
584                        contribution.incrementWriteCount(1);
585                        inputIterator.remove();
586                        outputIterator.remove();
587                }
588                catch (Exception e) {
589                        doOnWriteError(e, items);
590                        if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
591                                inputIterator.remove();
592                                outputIterator.remove();
593                        }
594                        else {
595                                checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
596                        }
597                        if (rollbackClassifier.classify(e)) {
598                                throw e;
599                        }
600                }
601                chunkMonitor.incrementOffset();
602                if (outputs.isEmpty()) {
603                        data.scanning(false);
604                        inputs.setBusy(false);
605                        chunkMonitor.resetOffset();
606                }
607        }
608 
609        private static class UserData<O> {
610 
611                private Chunk<O> outputs;
612 
613                private int filterCount = 0;
614 
615                private boolean scanning;
616 
617                public boolean scanning() {
618                        return scanning;
619                }
620 
621                public void scanning(boolean scanning) {
622                        this.scanning = scanning;
623                }
624 
625                public void incrementFilterCount() {
626                        filterCount++;
627                }
628 
629                public Chunk<O> getOutputs() {
630                        return outputs;
631                }
632 
633                public void setOutputs(Chunk<O> outputs) {
634                        this.outputs = outputs;
635                }
636 
637        }
638 
639}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov