EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [FaultTolerantChunkProcessor.java]

nameclass, %method, %block, %line, %
FaultTolerantChunkProcessor.java100% (7/7)100% (49/49)87%  (976/1120)86%  (199.6/231)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FaultTolerantChunkProcessor$2100% (1/1)100% (2/2)67%  (41/61)64%  (7/11)
recover (RetryContext): Object 100% (1/1)59%  (29/49)60%  (6/10)
FaultTolerantChunkProcessor$2 (FaultTolerantChunkProcessor, StepContribution,... 100% (1/1)100% (12/12)100% (1/1)
     
class FaultTolerantChunkProcessor$4100% (1/1)100% (2/2)85%  (67/79)83%  (10/12)
recover (RetryContext): Object 100% (1/1)81%  (52/64)82%  (9/11)
FaultTolerantChunkProcessor$4 (FaultTolerantChunkProcessor, Chunk, Chunk, Ste... 100% (1/1)100% (15/15)100% (1/1)
     
class FaultTolerantChunkProcessor$1100% (1/1)100% (2/2)87%  (111/128)85%  (18.7/22)
doWithRetry (RetryContext): Object 100% (1/1)83%  (84/101)84%  (17.7/21)
FaultTolerantChunkProcessor$1 (FaultTolerantChunkProcessor, AtomicInteger, It... 100% (1/1)100% (27/27)100% (1/1)
     
class FaultTolerantChunkProcessor100% (1/1)100% (31/31)88%  (597/679)88%  (140.9/161)
getInputKeys (Chunk): List 100% (1/1)20%  (6/30)33%  (2/6)
shouldSkip (SkipPolicy, Throwable, int): boolean 100% (1/1)31%  (5/16)20%  (1/5)
callProcessSkipListener (Object, Throwable): void 100% (1/1)47%  (7/15)60%  (3/5)
getInputKey (Object): Object 100% (1/1)50%  (5/10)67%  (2/3)
checkSkipPolicy (Chunk$ChunkIterator, Chunk$ChunkIterator, Throwable, StepCon... 100% (1/1)65%  (34/52)71%  (10/14)
callSkipListeners (Chunk, Chunk): void 100% (1/1)85%  (51/60)82%  (14/17)
isComplete (Chunk): boolean 100% (1/1)89%  (16/18)96%  (2.9/3)
scan (StepContribution, Chunk, Chunk, ChunkMonitor, boolean): void 100% (1/1)96%  (119/124)94%  (31/33)
FaultTolerantChunkProcessor (ItemProcessor, ItemWriter, BatchRetryTemplate): ... 100% (1/1)100% (40/40)100% (10/10)
access$1000 (FaultTolerantChunkProcessor, Chunk$ChunkIterator, Chunk$ChunkIte... 100% (1/1)100% (8/8)100% (1/1)
access$1100 (FaultTolerantChunkProcessor): SkipPolicy 100% (1/1)100% (3/3)100% (1/1)
access$200 (FaultTolerantChunkProcessor): boolean 100% (1/1)100% (3/3)100% (1/1)
access$300 (FaultTolerantChunkProcessor): Classifier 100% (1/1)100% (3/3)100% (1/1)
access$400 (FaultTolerantChunkProcessor): SkipPolicy 100% (1/1)100% (3/3)100% (1/1)
access$500 (FaultTolerantChunkProcessor, SkipPolicy, Throwable, int): boolean 100% (1/1)100% (6/6)100% (1/1)
access$600 (FaultTolerantChunkProcessor): Log 100% (1/1)100% (3/3)100% (1/1)
access$700 (FaultTolerantChunkProcessor, Object, Throwable): void 100% (1/1)100% (5/5)100% (1/1)
access$800 (FaultTolerantChunkProcessor): ChunkMonitor 100% (1/1)100% (3/3)100% (1/1)
access$900 (FaultTolerantChunkProcessor, StepContribution, Chunk, Chunk, Chun... 100% (1/1)100% (8/8)100% (1/1)
getAdjustedOutputs (Chunk, Chunk): Chunk 100% (1/1)100% (24/24)100% (6/6)
getFilterCount (Chunk, Chunk): int 100% (1/1)100% (7/7)100% (2/2)
initializeUserData (Chunk): void 100% (1/1)100% (20/20)100% (6/6)
setBuffering (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setChunkMonitor (ChunkMonitor): void 100% (1/1)100% (4/4)100% (2/2)
setKeyGenerator (KeyGenerator): void 100% (1/1)100% (4/4)100% (2/2)
setProcessSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setProcessorTransactional (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setRollbackClassifier (Classifier): void 100% (1/1)100% (4/4)100% (2/2)
setWriteSkipPolicy (SkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
transform (StepContribution, Chunk): Chunk 100% (1/1)100% (91/91)100% (17/17)
write (StepContribution, Chunk, Chunk): void 100% (1/1)100% (99/99)100% (19/19)
     
class FaultTolerantChunkProcessor$5100% (1/1)100% (2/2)88%  (51/58)86%  (6/7)
recover (RetryContext): Object 100% (1/1)82%  (33/40)83%  (5/6)
FaultTolerantChunkProcessor$5 (FaultTolerantChunkProcessor, Chunk, FaultToler... 100% (1/1)100% (18/18)100% (1/1)
     
class FaultTolerantChunkProcessor$3100% (1/1)100% (2/2)93%  (76/82)92%  (12/13)
doWithRetry (RetryContext): Object 100% (1/1)90%  (55/61)92%  (11/12)
FaultTolerantChunkProcessor$3 (FaultTolerantChunkProcessor, AtomicReference, ... 100% (1/1)100% (21/21)100% (1/1)
     
class FaultTolerantChunkProcessor$UserData100% (1/1)100% (8/8)100% (33/33)100% (10/10)
FaultTolerantChunkProcessor$UserData (): void 100% (1/1)100% (6/6)100% (2/2)
FaultTolerantChunkProcessor$UserData (FaultTolerantChunkProcessor$1): void 100% (1/1)100% (3/3)100% (1/1)
access$100 (FaultTolerantChunkProcessor$UserData): int 100% (1/1)100% (3/3)100% (1/1)
getOutputs (): Chunk 100% (1/1)100% (3/3)100% (1/1)
incrementFilterCount (): void 100% (1/1)100% (7/7)100% (2/2)
scanning (): boolean 100% (1/1)100% (3/3)100% (1/1)
scanning (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setOutputs (Chunk): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.ArrayList;
20import java.util.Collections;
21import java.util.Iterator;
22import java.util.List;
23import java.util.concurrent.atomic.AtomicInteger;
24import java.util.concurrent.atomic.AtomicReference;
25 
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.springframework.batch.classify.BinaryExceptionClassifier;
29import org.springframework.batch.classify.Classifier;
30import org.springframework.batch.core.StepContribution;
31import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
32import org.springframework.batch.core.step.skip.NonSkippableProcessException;
33import org.springframework.batch.core.step.skip.SkipLimitExceededException;
34import org.springframework.batch.core.step.skip.SkipListenerFailedException;
35import org.springframework.batch.core.step.skip.SkipPolicy;
36import org.springframework.batch.item.ItemProcessor;
37import org.springframework.batch.item.ItemWriter;
38import org.springframework.batch.retry.ExhaustedRetryException;
39import org.springframework.batch.retry.RecoveryCallback;
40import org.springframework.batch.retry.RetryCallback;
41import org.springframework.batch.retry.RetryContext;
42import org.springframework.batch.retry.RetryException;
43import org.springframework.batch.retry.support.DefaultRetryState;
44 
45/**
46 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
47 * allows for skipping or retry of items that cause exceptions during writing.
48 * 
49 */
50public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> {
51 
52        private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy();
53 
54        private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy();
55 
56        private final BatchRetryTemplate batchRetryTemplate;
57 
58        private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
59 
60        private Log logger = LogFactory.getLog(getClass());
61 
62        private boolean buffering = true;
63 
64        private KeyGenerator keyGenerator;
65 
66        private ChunkMonitor chunkMonitor = new ChunkMonitor();
67 
68        private boolean processorTransactional = true;
69 
70        /**
71         * The {@link KeyGenerator} to use to identify failed items across rollback.
72         * Not used in the case of the {@link #setBuffering(boolean) buffering flag}
73         * being true (the default).
74         * 
75         * @param keyGenerator the {@link KeyGenerator} to set
76         */
77        public void setKeyGenerator(KeyGenerator keyGenerator) {
78                this.keyGenerator = keyGenerator;
79        }
80 
81        /**
82         * @param SkipPolicy the {@link SkipPolicy} for item processing
83         */
84        public void setProcessSkipPolicy(SkipPolicy SkipPolicy) {
85                this.itemProcessSkipPolicy = SkipPolicy;
86        }
87 
88        /**
89         * @param SkipPolicy the {@link SkipPolicy} for item writing
90         */
91        public void setWriteSkipPolicy(SkipPolicy SkipPolicy) {
92                this.itemWriteSkipPolicy = SkipPolicy;
93        }
94 
95        /**
96         * A classifier that can distinguish between exceptions that cause rollback
97         * (return true) or not (return false).
98         * 
99         * @param rollbackClassifier
100         */
101        public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
102                this.rollbackClassifier = rollbackClassifier;
103        }
104 
105        /**
106         * @param chunkMonitor
107         */
108        public void setChunkMonitor(ChunkMonitor chunkMonitor) {
109                this.chunkMonitor = chunkMonitor;
110        }
111 
112        /**
113         * A flag to indicate that items have been buffered and therefore will
114         * always come back as a chunk after a rollback. Otherwise things are more
115         * complicated because after a rollback the new chunk might or might not
116         * contain items from the previous failed chunk.
117         * 
118         * @param buffering
119         */
120        public void setBuffering(boolean buffering) {
121                this.buffering = buffering;
122        }
123 
124        /**
125         * Flag to say that the {@link ItemProcessor} is transactional (defaults to
126         * true). If false then the processor is only called once per item per
127         * chunk, even if there are rollbacks with retries and skips.
128         * 
129         * @param processorTransactional the flag value to set
130         */
131        public void setProcessorTransactional(boolean processorTransactional) {
132                this.processorTransactional = processorTransactional;
133        }
134 
135        public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor,
136                        ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) {
137                super(itemProcessor, itemWriter);
138                this.batchRetryTemplate = batchRetryTemplate;
139        }
140 
141        @Override
142        protected void initializeUserData(Chunk<I> inputs) {
143                @SuppressWarnings("unchecked")
144                UserData<O> data = (UserData<O>) inputs.getUserData();
145                if (data == null) {
146                        data = new UserData<O>();
147                        inputs.setUserData(data);
148                        data.setOutputs(new Chunk<O>());
149                }
150        }
151 
152        @Override
153        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
154                @SuppressWarnings("unchecked")
155                UserData<O> data = (UserData<O>) inputs.getUserData();
156                return data.filterCount;
157        }
158 
159        @Override
160        protected boolean isComplete(Chunk<I> inputs) {
161 
162                /*
163                 * Need to remember the write skips across transactions, otherwise they
164                 * keep coming back. Since we register skips with the inputs they will
165                 * not be processed again but the output skips need to be saved for
166                 * registration later with the listeners. The inputs are going to be the
167                 * same for all transactions processing the same chunk, but the outputs
168                 * are not, so we stash them in user data on the inputs.
169                 */
170 
171                @SuppressWarnings("unchecked")
172                UserData<O> data = (UserData<O>) inputs.getUserData();
173                Chunk<O> previous = data.getOutputs();
174 
175                return inputs.isEmpty() && previous.getSkips().isEmpty();
176 
177        }
178 
179        @Override
180        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
181 
182                @SuppressWarnings("unchecked")
183                UserData<O> data = (UserData<O>) inputs.getUserData();
184                Chunk<O> previous = data.getOutputs();
185 
186                Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips());
187                next.setBusy(previous.isBusy());
188 
189                // Remember for next time if there are skips accumulating
190                data.setOutputs(next);
191 
192                return next;
193 
194        }
195 
196        @Override
197        protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
198 
199                Chunk<O> outputs = new Chunk<O>();
200                @SuppressWarnings("unchecked")
201                final UserData<O> data = (UserData<O>) inputs.getUserData();
202                final Chunk<O> cache = data.getOutputs();
203                final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
204                final AtomicInteger count = new AtomicInteger(0);
205 
206                // final int scanLimit = processorTransactional && data.scanning() ? 1 :
207                // 0;
208 
209                for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
210 
211                        final I item = iterator.next();
212 
213                        RetryCallback<O> retryCallback = new RetryCallback<O>() {
214 
215                                public O doWithRetry(RetryContext context) throws Exception {
216                                        O output = null;
217                                        try {
218                                                count.incrementAndGet();
219                                                O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
220                                                if (cached != null && !processorTransactional) {
221                                                        output = cached;
222                                                }
223                                                else {
224                                                        output = doProcess(item);
225                                                        if (!processorTransactional && !data.scanning()) {
226                                                                cache.add(output);
227                                                        }
228                                                }
229                                        }
230                                        catch (Exception e) {
231                                                if (rollbackClassifier.classify(e)) {
232                                                        // Default is to rollback unless the classifier
233                                                        // allows us to continue
234                                                        throw e;
235                                                }
236                                                else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
237                                                        // If we are not re-throwing then we should check if
238                                                        // this is skippable
239                                                        contribution.incrementProcessSkipCount();
240                                                        logger.debug("Skipping after failed process with no rollback", e);
241                                                        // If not re-throwing then the listener will not be
242                                                        // called in next chunk.
243                                                        callProcessSkipListener(item, e);
244                                                }
245                                                else {
246                                                        // If it's not skippable that's an error in
247                                                        // configuration - it doesn't make sense to not roll
248                                                        // back if we are also not allowed to skip
249                                                        throw new NonSkippableProcessException(
250                                                                        "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
251                                                                        e);
252                                                }
253                                        }
254                                        if (output == null) {
255                                                // No need to re-process filtered items
256                                                iterator.remove();
257                                                data.incrementFilterCount();
258                                        }
259                                        return output;
260                                }
261 
262                        };
263 
264                        RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
265 
266                                public O recover(RetryContext context) throws Exception {
267                                        Throwable e = context.getLastThrowable();
268                                        if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
269                                                iterator.remove(e);
270                                                contribution.incrementProcessSkipCount();
271                                                logger.debug("Skipping after failed process", e);
272                                                return null;
273                                        }
274                                        else {
275                                                if (rollbackClassifier.classify(e)) {
276                                                        // Default is to rollback unless the classifier
277                                                        // allows us to continue
278                                                        throw new RetryException("Non-skippable exception in recoverer while processing", e);
279                                                }
280                                                iterator.remove(e);
281                                                return null;
282                                        }
283                                }
284 
285                        };
286 
287                        O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
288                                        getInputKey(item), rollbackClassifier));
289                        if (output != null) {
290                                outputs.add(output);
291                        }
292 
293                        /*
294                         * We only want to process the first item if there is a scan for a
295                         * failed item.
296                         */
297                        if (data.scanning()) {
298                                while (cacheIterator != null && cacheIterator.hasNext()) {
299                                        outputs.add(cacheIterator.next());
300                                }
301                                // Only process the first item if scanning
302                                break;
303                        }
304                }
305 
306                return outputs;
307 
308        }
309 
310        @Override
311        protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
312                        throws Exception {
313 
314                @SuppressWarnings("unchecked")
315                final UserData<O> data = (UserData<O>) inputs.getUserData();
316                final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
317 
318                RetryCallback<Object> retryCallback = new RetryCallback<Object>() {
319                        public Object doWithRetry(RetryContext context) throws Exception {
320 
321                                contextHolder.set(context);
322 
323                                if (!data.scanning()) {
324                                        chunkMonitor.setChunkSize(inputs.size());
325                                        try {
326                                                doWrite(outputs.getItems());
327                                        }
328                                        catch (Exception e) {
329                                                if (rollbackClassifier.classify(e)) {
330                                                        throw e;
331                                                }
332                                                /*
333                                                 * If the exception is marked as no-rollback, we need to
334                                                 * override that, otherwise there's no way to write the
335                                                 * rest of the chunk or to honour the skip listener
336                                                 * contract.
337                                                 */
338                                                throw new ForceRollbackForWriteSkipException(
339                                                                "Force rollback on skippable exception so that skipped item can be located.", e);
340                                        }
341                                        contribution.incrementWriteCount(outputs.size());
342                                }
343                                else {
344                                        scan(contribution, inputs, outputs, chunkMonitor, false);
345                                }
346                                return null;
347 
348                        }
349                };
350 
351                if (!buffering) {
352 
353                        RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {
354 
355                                public Object recover(RetryContext context) throws Exception {
356 
357                                        Throwable e = context.getLastThrowable();
358                                        if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
359                                                throw new RetryException("Invalid retry state during write caused by "
360                                                                + "exception that does not classify for rollback: ", e);
361                                        }
362 
363                                        Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
364                                        for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {
365 
366                                                inputIterator.next();
367                                                outputIterator.next();
368 
369                                                checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
370                                                if (!rollbackClassifier.classify(e)) {
371                                                        throw new RetryException(
372                                                                        "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
373                                                                        e);
374                                                }
375 
376                                        }
377 
378                                        return null;
379 
380                                }
381 
382                        };
383 
384                        batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
385                                        BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));
386 
387                }
388                else {
389 
390                        RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
391 
392                                public Object recover(RetryContext context) throws Exception {
393 
394                                        /*
395                                         * If the last exception was not skippable we don't need to
396                                         * do any scanning. We can just bomb out with a retry
397                                         * exhausted.
398                                         */
399                                        if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
400                                                throw new ExhaustedRetryException(
401                                                                "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
402                                                                context.getLastThrowable());
403                                        }
404 
405                                        inputs.setBusy(true);
406                                        data.scanning(true);
407                                        scan(contribution, inputs, outputs, chunkMonitor, true);
408                                        return null;
409                                }
410 
411                        };
412 
413                        if (logger.isDebugEnabled()) {
414                                logger.debug("Attempting to write: " + inputs);
415                        }
416                        try {
417                                batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
418                                                rollbackClassifier));
419                        }
420                        catch (Exception e) {
421                                RetryContext context = contextHolder.get();
422                                if (!batchRetryTemplate.canRetry(context)) {
423                                        /*
424                                         * BATCH-1761: we need advance warning of the scan about to
425                                         * start in the next transaction, so we can change the
426                                         * processing behaviour.
427                                         */
428                                        data.scanning(true);
429                                }
430                                throw e;
431                        }
432 
433                }
434 
435                callSkipListeners(inputs, outputs);
436 
437        }
438 
439        private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) {
440 
441                for (SkipWrapper<I> wrapper : inputs.getSkips()) {
442                        I item = wrapper.getItem();
443                        if (item == null) {
444                                continue;
445                        }
446                        Throwable e = wrapper.getException();
447                        callProcessSkipListener(item, e);
448                }
449 
450                for (SkipWrapper<O> wrapper : outputs.getSkips()) {
451                        Throwable e = wrapper.getException();
452                        try {
453                                getListener().onSkipInWrite(wrapper.getItem(), e);
454                        }
455                        catch (RuntimeException ex) {
456                                throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
457                        }
458                }
459 
460                // Clear skips if we are possibly going to process this chunk again
461                outputs.clearSkips();
462                inputs.clearSkips();
463 
464        }
465 
466        /**
467         * Convenience method for calling process skip listener, so that it can be
468         * called from multiple places.
469         * 
470         * @param item the item that is skipped
471         * @param e the cause of the skip
472         */
473        private void callProcessSkipListener(I item, Throwable e) {
474                try {
475                        getListener().onSkipInProcess(item, e);
476                }
477                catch (RuntimeException ex) {
478                        throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
479                }
480        }
481 
482        /**
483         * Convenience method for calling process skip policy, so that it can be
484         * called from multiple places.
485         * 
486         * @param policy the skip policy
487         * @param e the cause of the skip
488         * @param skipCount the current skip count
489         */
490        private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
491                try {
492                        return policy.shouldSkip(e, skipCount);
493                }
494                catch (SkipLimitExceededException ex) {
495                        throw ex;
496                }
497                catch (RuntimeException ex) {
498                        throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e);
499                }
500        }
501 
502        private Object getInputKey(I item) {
503                if (keyGenerator == null) {
504                        return item;
505                }
506                return keyGenerator.getKey(item);
507        }
508 
509        private List<?> getInputKeys(final Chunk<I> inputs) {
510                if (keyGenerator == null) {
511                        return inputs.getItems();
512                }
513                List<Object> keys = new ArrayList<Object>();
514                for (I item : inputs.getItems()) {
515                        keys.add(keyGenerator.getKey(item));
516                }
517                return keys;
518        }
519 
520        private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator,
521                        Throwable e, StepContribution contribution, boolean recovery) throws Exception {
522                logger.debug("Checking skip policy after failed write");
523                if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) {
524                        contribution.incrementWriteSkipCount();
525                        inputIterator.remove();
526                        outputIterator.remove(e);
527                        logger.debug("Skipping after failed write", e);
528                }
529                else {
530                        if (recovery) {
531                                // Only if already recovering should we check skip policy
532                                throw new RetryException("Non-skippable exception in recoverer", e);
533                        }
534                        else {
535                                if (e instanceof Exception) {
536                                        throw (Exception) e;
537                                }
538                                else if (e instanceof Error) {
539                                        throw (Error) e;
540                                }
541                                else {
542                                        throw new RetryException("Non-skippable throwable in recoverer", e);
543                                }
544                        }
545                }
546        }
547 
548        private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
549                        ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
550 
551                @SuppressWarnings("unchecked")
552                final UserData<O> data = (UserData<O>) inputs.getUserData();
553 
554                if (logger.isDebugEnabled()) {
555                        if (recovery) {
556                                logger.debug("Scanning for failed item on recovery from write: " + inputs);
557                        }
558                        else {
559                                logger.debug("Scanning for failed item on write: " + inputs);
560                        }
561                }
562                if (outputs.isEmpty()) {
563                        data.scanning(false);
564                        inputs.setBusy(false);
565                        return;
566                }
567 
568                Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
569                Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
570 
571                List<O> items = Collections.singletonList(outputIterator.next());
572                inputIterator.next();
573                try {
574                        writeItems(items);
575                        // If successful we are going to return and allow
576                        // the driver to commit...
577                        doAfterWrite(items);
578                        contribution.incrementWriteCount(1);
579                        inputIterator.remove();
580                        outputIterator.remove();
581                }
582                catch (Exception e) {
583                        doOnWriteError(e, items);
584                        if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
585                                inputIterator.remove();
586                                outputIterator.remove();
587                        }
588                        else {
589                                checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
590                        }
591                        if (rollbackClassifier.classify(e)) {
592                                throw e;
593                        }
594                }
595                chunkMonitor.incrementOffset();
596                if (outputs.isEmpty()) {
597                        data.scanning(false);
598                        inputs.setBusy(false);
599                        chunkMonitor.resetOffset();
600                }
601        }
602 
603        private static class UserData<O> {
604 
605                private Chunk<O> outputs;
606 
607                private int filterCount = 0;
608 
609                private boolean scanning;
610 
611                public boolean scanning() {
612                        return scanning;
613                }
614 
615                public void scanning(boolean scanning) {
616                        this.scanning = scanning;
617                }
618 
619                public void incrementFilterCount() {
620                        filterCount++;
621                }
622 
623                public Chunk<O> getOutputs() {
624                        return outputs;
625                }
626 
627                public void setOutputs(Chunk<O> outputs) {
628                        this.outputs = outputs;
629                }
630 
631        }
632 
633}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov