EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.core.step.builder]

COVERAGE SUMMARY FOR SOURCE FILE [FaultTolerantStepBuilder.java]

nameclass, %method, %block, %line, %
FaultTolerantStepBuilder.java100% (4/4)96%  (45/47)85%  (1037/1221)85%  (207.8/244)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FaultTolerantStepBuilder$1100% (1/1)50%  (1/2)39%  (12/31)50%  (1/2)
classify (Throwable): Boolean 0%   (0/1)0%   (0/19)0%   (0/1)
FaultTolerantStepBuilder$1 (FaultTolerantStepBuilder, Classifier, Classifier)... 100% (1/1)100% (12/12)100% (1/1)
     
class FaultTolerantStepBuilder$TerminateOnExceptionChunkListenerDelegate100% (1/1)100% (6/6)67%  (43/64)73%  (16/22)
afterChunk (ChunkContext): void 100% (1/1)46%  (6/13)60%  (3/5)
afterChunkError (ChunkContext): void 100% (1/1)46%  (6/13)60%  (3/5)
beforeChunk (ChunkContext): void 100% (1/1)46%  (6/13)60%  (3/5)
FaultTolerantStepBuilder$TerminateOnExceptionChunkListenerDelegate (FaultTole... 100% (1/1)100% (9/9)100% (3/3)
equals (Object): boolean 100% (1/1)100% (12/12)100% (3/3)
hashCode (): int 100% (1/1)100% (4/4)100% (1/1)
     
class FaultTolerantStepBuilder100% (1/1)97%  (36/37)87%  (965/1109)87%  (190.8/220)
listener (Object): SimpleStepBuilder 0%   (0/1)0%   (0/95)0%   (0/19)
addNonRetryableExceptionIfMissing (Class []): void 100% (1/1)81%  (39/48)73%  (6.6/9)
addNonSkippableExceptionIfMissing (Class []): void 100% (1/1)81%  (39/48)73%  (6.6/9)
createRetryOperations (): BatchRetryTemplate 100% (1/1)81%  (100/123)84%  (21.9/26)
registerStepListenerAsSkipListener (): void 100% (1/1)88%  (37/42)89%  (8/9)
createTasklet (): Tasklet 100% (1/1)96%  (43/45)98%  (8.9/9)
createSkipPolicy (): SkipPolicy 100% (1/1)98%  (56/57)99%  (9.9/10)
FaultTolerantStepBuilder (SimpleStepBuilder): void 100% (1/1)100% (53/53)100% (13/13)
FaultTolerantStepBuilder (StepBuilderHelper): void 100% (1/1)100% (53/53)100% (13/13)
addSpecialExceptions (): void 100% (1/1)100% (73/73)100% (3/3)
backOffPolicy (BackOffPolicy): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
build (): TaskletStep 100% (1/1)100% (5/5)100% (2/2)
createChunkProcessor (): FaultTolerantChunkProcessor 100% (1/1)100% (67/67)100% (16/16)
createChunkProvider (): FaultTolerantChunkProvider 100% (1/1)100% (44/44)100% (10/10)
detectStreamInReader (): void 100% (1/1)100% (17/17)100% (5/5)
getFatalExceptionAwareProxy (RetryPolicy): RetryPolicy 100% (1/1)100% (42/42)100% (10/10)
getFatalExceptionAwareProxy (SkipPolicy): SkipPolicy 100% (1/1)100% (42/42)100% (10/10)
getRollbackClassifier (): Classifier 100% (1/1)100% (57/57)100% (9/9)
getTransactionAttribute (TransactionAttribute): TransactionAttribute 100% (1/1)100% (10/10)100% (2/2)
keyGenerator (KeyGenerator): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
listener (ChunkListener): FaultTolerantStepBuilder 100% (1/1)100% (10/10)100% (2/2)
listener (RetryListener): FaultTolerantStepBuilder 100% (1/1)100% (7/7)100% (2/2)
listener (SkipListener): FaultTolerantStepBuilder 100% (1/1)100% (7/7)100% (2/2)
noRetry (Class): FaultTolerantStepBuilder 100% (1/1)100% (9/9)100% (2/2)
noRollback (Class): FaultTolerantStepBuilder 100% (1/1)100% (7/7)100% (2/2)
noSkip (Class): FaultTolerantStepBuilder 100% (1/1)100% (9/9)100% (2/2)
processorNonTransactional (): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
registerSkipListeners (): void 100% (1/1)100% (50/50)100% (7/7)
retry (Class): FaultTolerantStepBuilder 100% (1/1)100% (9/9)100% (2/2)
retryContextCache (RetryContextCache): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
retryLimit (int): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
retryPolicy (RetryPolicy): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
skip (Class): FaultTolerantStepBuilder 100% (1/1)100% (9/9)100% (2/2)
skipLimit (int): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
skipPolicy (SkipPolicy): FaultTolerantStepBuilder 100% (1/1)100% (5/5)100% (2/2)
stream (ItemStream): AbstractTaskletStepBuilder 100% (1/1)100% (25/25)100% (7/7)
transactionAttribute (TransactionAttribute): AbstractTaskletStepBuilder 100% (1/1)100% (6/6)100% (1/1)
     
class FaultTolerantStepBuilder$2100% (1/1)100% (2/2)100% (17/17)100% (2/2)
FaultTolerantStepBuilder$2 (FaultTolerantStepBuilder, TransactionAttribute, C... 100% (1/1)100% (10/10)100% (1/1)
rollbackOn (Throwable): boolean 100% (1/1)100% (7/7)100% (1/1)

1/*
2 * Copyright 2006-2014 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.builder;
17 
18import org.springframework.batch.core.ChunkListener;
19import org.springframework.batch.core.JobInterruptedException;
20import org.springframework.batch.core.SkipListener;
21import org.springframework.batch.core.StepExecutionListener;
22import org.springframework.batch.core.StepListener;
23import org.springframework.batch.core.annotation.AfterChunk;
24import org.springframework.batch.core.annotation.AfterChunkError;
25import org.springframework.batch.core.annotation.BeforeChunk;
26import org.springframework.batch.core.annotation.OnSkipInProcess;
27import org.springframework.batch.core.annotation.OnSkipInRead;
28import org.springframework.batch.core.annotation.OnSkipInWrite;
29import org.springframework.batch.core.listener.StepListenerFactoryBean;
30import org.springframework.batch.core.scope.context.ChunkContext;
31import org.springframework.batch.core.step.FatalStepExecutionException;
32import org.springframework.batch.core.step.item.BatchRetryTemplate;
33import org.springframework.batch.core.step.item.ChunkMonitor;
34import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
35import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor;
36import org.springframework.batch.core.step.item.FaultTolerantChunkProvider;
37import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException;
38import org.springframework.batch.core.step.item.KeyGenerator;
39import org.springframework.batch.core.step.item.SimpleRetryExceptionHandler;
40import org.springframework.batch.core.step.skip.CompositeSkipPolicy;
41import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy;
42import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
43import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
44import org.springframework.batch.core.step.skip.NonSkippableReadException;
45import org.springframework.batch.core.step.skip.SkipLimitExceededException;
46import org.springframework.batch.core.step.skip.SkipListenerFailedException;
47import org.springframework.batch.core.step.skip.SkipPolicy;
48import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
49import org.springframework.batch.core.step.tasklet.Tasklet;
50import org.springframework.batch.core.step.tasklet.TaskletStep;
51import org.springframework.batch.item.ItemReader;
52import org.springframework.batch.item.ItemStream;
53import org.springframework.batch.repeat.RepeatOperations;
54import org.springframework.batch.repeat.support.RepeatTemplate;
55import org.springframework.batch.support.ReflectionUtils;
56import org.springframework.classify.BinaryExceptionClassifier;
57import org.springframework.classify.Classifier;
58import org.springframework.classify.SubclassClassifier;
59import org.springframework.retry.ExhaustedRetryException;
60import org.springframework.retry.RetryException;
61import org.springframework.retry.RetryListener;
62import org.springframework.retry.RetryPolicy;
63import org.springframework.retry.backoff.BackOffPolicy;
64import org.springframework.retry.policy.CompositeRetryPolicy;
65import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
66import org.springframework.retry.policy.NeverRetryPolicy;
67import org.springframework.retry.policy.RetryContextCache;
68import org.springframework.retry.policy.SimpleRetryPolicy;
69import org.springframework.transaction.TransactionException;
70import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
71import org.springframework.transaction.interceptor.TransactionAttribute;
72import org.springframework.util.Assert;
73 
74import java.lang.reflect.Method;
75import java.util.ArrayList;
76import java.util.Collection;
77import java.util.HashMap;
78import java.util.HashSet;
79import java.util.LinkedHashSet;
80import java.util.List;
81import java.util.Map;
82import java.util.Set;
83 
84/**
85 * A step builder for fully fault tolerant chunk-oriented item processing steps. Extends {@link SimpleStepBuilder} with
86 * additional properties for retry and skip of failed items.
87 *
88 * @author Dave Syer
89 * @author Michael Minella
90 *
91 * @since 2.2
92 */
93public class FaultTolerantStepBuilder<I, O> extends SimpleStepBuilder<I, O> {
94 
95        private ChunkMonitor chunkMonitor = new ChunkMonitor();
96 
97        private boolean streamIsReader;
98 
99        private int retryLimit = 0;
100 
101        private BackOffPolicy backOffPolicy;
102 
103        private Set<RetryListener> retryListeners = new LinkedHashSet<RetryListener>();
104 
105        private RetryPolicy retryPolicy;
106 
107        private RetryContextCache retryContextCache;
108 
109        private KeyGenerator keyGenerator;
110 
111        private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new LinkedHashSet<Class<? extends Throwable>>();
112 
113        private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
114 
115        private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<Class<? extends Throwable>>();
116 
117        private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
118 
119        private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>();
120 
121        private Set<SkipListener<? super I, ? super O>> skipListeners = new LinkedHashSet<SkipListener<? super I, ? super O>>();
122 
123        private int skipLimit = 0;
124 
125        private SkipPolicy skipPolicy;
126 
127        private boolean processorTransactional = true;
128 
129        /**
130         * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
131         *
132         * @param parent a parent helper containing common step properties
133         */
134        public FaultTolerantStepBuilder(StepBuilderHelper<?> parent) {
135                super(parent);
136        }
137 
138        /**
139         * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
140         *
141         * @param parent a parent helper containing common step properties
142         */
143        protected FaultTolerantStepBuilder(SimpleStepBuilder<I, O> parent) {
144                super(parent);
145        }
146 
147        @Override
148        public TaskletStep build() {
149                registerStepListenerAsSkipListener();
150                return super.build();
151        }
152 
153        @SuppressWarnings("unchecked")
154        private void registerStepListenerAsSkipListener() {
155                for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){
156                        if (stepExecutionListener instanceof SkipListener){
157                                listener((SkipListener<I,O>)stepExecutionListener);
158                        }
159                }
160                for (ChunkListener chunkListener: this.chunkListeners){
161                        if (chunkListener instanceof SkipListener){
162                                listener((SkipListener<I,O>)chunkListener);
163                        }
164                }
165        }
166 
167        /**
168         * Create a new chunk oriented tasklet with reader, writer and processor as provided.
169         *
170         * @see org.springframework.batch.core.step.builder.SimpleStepBuilder#createTasklet()
171         */
172        @Override
173        protected Tasklet createTasklet() {
174                Assert.state(getReader() != null, "ItemReader must be provided");
175                Assert.state(getProcessor() != null || getWriter() != null, "ItemWriter or ItemProcessor must be provided");
176                addSpecialExceptions();
177                registerSkipListeners();
178                FaultTolerantChunkProvider<I> chunkProvider = createChunkProvider();
179                FaultTolerantChunkProcessor<I, O> chunkProcessor = createChunkProcessor();
180                ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor);
181                tasklet.setBuffering(!isReaderTransactionalQueue());
182                return tasklet;
183        }
184 
185        /**
186         * Registers objects using the annotation based listener configuration.
187         *
188         * @param listener the object that has a method configured with listener annotation
189         * @return this for fluent chaining
190         */
191        @Override
192        @SuppressWarnings("unchecked")
193        public SimpleStepBuilder listener(Object listener) {
194                super.listener(listener);
195 
196                Set<Method> skipListenerMethods = new HashSet<Method>();
197                skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInRead.class));
198                skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class));
199                skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class));
200 
201                Set<Method> chunkListenerMethods = new HashSet<Method>();
202                chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class));
203                chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class));
204                chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class));
205 
206                if(skipListenerMethods.size() > 0) {
207                        StepListenerFactoryBean factory = new StepListenerFactoryBean();
208                        factory.setDelegate(listener);
209                        skipListeners.add((SkipListener) factory.getObject());
210                }
211 
212                if(chunkListenerMethods.size() > 0) {
213                        StepListenerFactoryBean factory = new StepListenerFactoryBean();
214                        factory.setDelegate(listener);
215                        super.listener(new TerminateOnExceptionChunkListenerDelegate((ChunkListener) factory.getObject()));
216                }
217 
218                @SuppressWarnings("unchecked")
219                SimpleStepBuilder result = this;
220                return result;
221        }
222 
223 
224        /**
225         * Register a skip listener.
226         *
227         * @param listener the listener to register
228         * @return this for fluent chaining
229         */
230        public FaultTolerantStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) {
231                skipListeners.add(listener);
232                return this;
233        }
234 
235        @Override
236        public FaultTolerantStepBuilder<I, O> listener(ChunkListener listener) {
237                super.listener(new TerminateOnExceptionChunkListenerDelegate(listener));
238                return this;
239        }
240 
241        @Override
242        public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> transactionAttribute(
243                        TransactionAttribute transactionAttribute) {
244                return super.transactionAttribute(getTransactionAttribute(transactionAttribute));
245        }
246 
247        /**
248         * Register a retry listener.
249         *
250         * @param listener the listener to register
251         * @return this for fluent chaining
252         */
253        public FaultTolerantStepBuilder<I, O> listener(RetryListener listener) {
254                retryListeners.add(listener);
255                return this;
256        }
257 
258        /**
259         * Sets the key generator for identifying retried items. Retry across transaction boundaries requires items to be
260         * identified when they are encountered again. The default strategy is to use the items themselves, relying on their
261         * own implementation to ensure that they can be identified. Often a key generator is not necessary as long as the
262         * items have reliable hash code and equals implementations, or the reader is not transactional (the default) and
263         * the item processor either is itself not transactional (not the default) or does not create new items.
264         *
265         * @param keyGenerator a key generator for the stateful retry
266         * @return this for fluent chaining
267         */
268        public FaultTolerantStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) {
269                this.keyGenerator = keyGenerator;
270                return this;
271        }
272 
273        /**
274         * The maximum number of times to try a failed item. Zero and one both translate to try only once and do not retry.
275         * Ignored if an explicit {@link #retryPolicy} is set.
276         *
277         * @param retryLimit the retry limit (default 0)
278         * @return this for fluent chaining
279         */
280        public FaultTolerantStepBuilder<I, O> retryLimit(int retryLimit) {
281                this.retryLimit = retryLimit;
282                return this;
283        }
284 
285        /**
286         * Provide an explicit retry policy instead of using the {@link #retryLimit(int)} and retryable exceptions provided
287         * elsewhere. Can be used to retry different exceptions a different number of times, for instance.
288         *
289         * @param retryPolicy a retry policy
290         * @return this for fluent chaining
291         */
292        public FaultTolerantStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) {
293                this.retryPolicy = retryPolicy;
294                return this;
295        }
296 
297        /**
298         * Provide a backoff policy to prevent items being retried immediately (e.g. in case the failure was caused by a
299         * remote resource failure that might take some time to be resolved). Ignored if an explicit {@link #retryPolicy} is
300         * set.
301         *
302         * @param backOffPolicy the back off policy to use (default no backoff)
303         * @return this for fluent chaining
304         */
305        public FaultTolerantStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) {
306                this.backOffPolicy = backOffPolicy;
307                return this;
308        }
309 
310        /**
311         * Provide an explicit retry context cache. Retry is stateful across transactions in the case of failures in item
312         * processing or writing, so some information about the context for subsequent retries has to be stored.
313         *
314         * @param retryContextCache cache for retry contexts in between transactions (default to standard in-memory
315         * implementation)
316         * @return this for fluent chaining
317         */
318        public FaultTolerantStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) {
319                this.retryContextCache = retryContextCache;
320                return this;
321        }
322 
323        /**
324         * Sets the maximium number of failed items to skip before the step fails. Ignored if an explicit
325         * {@link #skipPolicy(SkipPolicy)} is provided.
326         *
327         * @param skipLimit the skip limit to set
328         * @return this for fluent chaining
329         */
330        public FaultTolerantStepBuilder<I, O> skipLimit(int skipLimit) {
331                this.skipLimit = skipLimit;
332                return this;
333        }
334 
335        /**
336         * Explicitly prevent certain exceptions (and subclasses) from being skipped.
337         *
338         * @param type the non-skippable exception
339         * @return this for fluent chaining
340         */
341        public FaultTolerantStepBuilder<I, O> noSkip(Class<? extends Throwable> type) {
342                skippableExceptionClasses.put(type, false);
343                return this;
344        }
345 
346        /**
347         * Explicitly request certain exceptions (and subclasses) to be skipped.
348         *
349         * @param type
350         * @return this for fluent chaining
351         */
352        public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) {
353                skippableExceptionClasses.put(type, true);
354                return this;
355        }
356 
357        /**
358         * Provide an explicit policy for managing skips. A skip policy determines which exceptions are skippable and how
359         * many times.
360         *
361         * @param skipPolicy the skip policy
362         * @return this for fluent chaining
363         */
364        public FaultTolerantStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
365                this.skipPolicy = skipPolicy;
366                return this;
367        }
368 
369        /**
370         * Mark this exception as ignorable during item read or processing operations. Processing continues with no
371         * additional callbacks (use skips instead if you need to be notified). Ignored during write because there is no
372         * guarantee of skip and retry without rollback.
373         *
374         * @param type the exception to mark as no rollback
375         * @return this for fluent chaining
376         */
377        public FaultTolerantStepBuilder<I, O> noRollback(Class<? extends Throwable> type) {
378                noRollbackExceptionClasses.add(type);
379                return this;
380        }
381 
382        /**
383         * Explicitly ask for an exception (and subclasses) to be excluded from retry.
384         *
385         * @param type the exception to exclude from retry
386         * @return this for fluent chaining
387         */
388        public FaultTolerantStepBuilder<I, O> noRetry(Class<? extends Throwable> type) {
389                retryableExceptionClasses.put(type, false);
390                return this;
391        }
392 
393        /**
394         * Explicitly ask for an exception (and subclasses) to be retried.
395         *
396         * @param type the exception to retry
397         * @return this for fluent chaining
398         */
399        public FaultTolerantStepBuilder<I, O> retry(Class<? extends Throwable> type) {
400                retryableExceptionClasses.put(type, true);
401                return this;
402        }
403 
404        /**
405         * Mark the item processor as non-transactional (default is the opposite). If this flag is set the results of item
406         * processing are cached across transactions in between retries and during skip processing, otherwise the processor
407         * will be called in every transaction.
408         *
409         * @return this for fluent chaining
410         */
411        public FaultTolerantStepBuilder<I, O> processorNonTransactional() {
412                this.processorTransactional = false;
413                return this;
414        }
415 
416        @Override
417        public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> stream(ItemStream stream) {
418                if (stream instanceof ItemReader<?>) {
419                        if (!streamIsReader) {
420                                streamIsReader = true;
421                                super.stream(chunkMonitor);
422                        }
423                        // In cases where multiple nested item readers are registered,
424                        // they all want to get the open() and close() callbacks.
425                        chunkMonitor.registerItemStream(stream);
426                }
427                else {
428                        super.stream(stream);
429                }
430                return this;
431        }
432 
433        private FaultTolerantChunkProvider<I> createChunkProvider() {
434 
435                SkipPolicy readSkipPolicy = createSkipPolicy();
436                readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
437                FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<I>(getReader(),
438                                createChunkOperations());
439                chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
440                chunkProvider.setSkipPolicy(readSkipPolicy);
441                chunkProvider.setRollbackClassifier(getRollbackClassifier());
442                ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners());
443                listeners.addAll(skipListeners);
444                chunkProvider.setListeners(listeners);
445 
446                return chunkProvider;
447 
448        }
449 
450        private FaultTolerantChunkProcessor<I, O> createChunkProcessor() {
451 
452                BatchRetryTemplate batchRetryTemplate = createRetryOperations();
453 
454                FaultTolerantChunkProcessor<I, O> chunkProcessor = new FaultTolerantChunkProcessor<I, O>(getProcessor(),
455                                getWriter(), batchRetryTemplate);
456                chunkProcessor.setBuffering(!isReaderTransactionalQueue());
457                chunkProcessor.setProcessorTransactional(processorTransactional);
458 
459                SkipPolicy writeSkipPolicy = createSkipPolicy();
460                writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy);
461                chunkProcessor.setWriteSkipPolicy(writeSkipPolicy);
462                chunkProcessor.setProcessSkipPolicy(writeSkipPolicy);
463                chunkProcessor.setRollbackClassifier(getRollbackClassifier());
464                chunkProcessor.setKeyGenerator(keyGenerator);
465                detectStreamInReader();
466 
467                ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners());
468                listeners.addAll(skipListeners);
469                chunkProcessor.setListeners(listeners);
470                chunkProcessor.setChunkMonitor(chunkMonitor);
471 
472                return chunkProcessor;
473 
474        }
475 
476        @SuppressWarnings("unchecked")
477        private void addSpecialExceptions() {
478                addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
479                                SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class,
480                                JobInterruptedException.class, Error.class);
481                addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
482                                TransactionException.class, FatalStepExecutionException.class, SkipListenerFailedException.class,
483                                SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class, Error.class);
484        }
485 
486        private void detectStreamInReader() {
487                if (streamIsReader) {
488                        if (!concurrent()) {
489                                chunkMonitor.setItemReader(getReader());
490                        }
491                        else {
492                                logger.warn("Asynchronous TaskExecutor detected with ItemStream reader.  This is probably an error, "
493                                                + "and may lead to incorrect restart data being stored.");
494                        }
495                }
496        }
497 
498        /**
499         * Register explicitly set item listeners and auto-register reader, processor and writer if applicable
500         */
501        private void registerSkipListeners() {
502 
503                // auto-register reader, processor and writer
504                for (Object itemHandler : new Object[] { getReader(), getWriter(), getProcessor() }) {
505 
506                        if (StepListenerFactoryBean.isListener(itemHandler)) {
507                                StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
508                                if (listener instanceof SkipListener<?, ?>) {
509                                        @SuppressWarnings("unchecked")
510                                        SkipListener<? super I, ? super O> skipListener = (SkipListener<? super I, ? super O>) listener;
511                                        skipListeners.add(skipListener);
512                                }
513                        }
514 
515                }
516        }
517 
518        /**
519         * Convenience method to get an exception classifier based on the provided transaction attributes.
520         *
521         * @return an exception classifier: maps to true if an exception should cause rollback
522         */
523        private Classifier<Throwable, Boolean> getRollbackClassifier() {
524 
525                Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false);
526 
527                // Try to avoid pathological cases where we cannot force a rollback
528                // (should be pretty uncommon):
529                if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException()))
530                                || !classifier.classify(new ExhaustedRetryException("test"))) {
531 
532                        final Classifier<Throwable, Boolean> binary = classifier;
533 
534                        Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>();
535                        types.add(ForceRollbackForWriteSkipException.class);
536                        types.add(ExhaustedRetryException.class);
537                        final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true);
538 
539                        classifier = new Classifier<Throwable, Boolean>() {
540                                @Override
541                                public Boolean classify(Throwable classifiable) {
542                                        // Rollback if either the user's list or our own applies
543                                        return panic.classify(classifiable) || binary.classify(classifiable);
544                                }
545                        };
546 
547                }
548 
549                return classifier;
550 
551        }
552 
553        @SuppressWarnings("serial")
554        private TransactionAttribute getTransactionAttribute(TransactionAttribute attribute) {
555 
556                final Classifier<Throwable, Boolean> classifier = getRollbackClassifier();
557                return new DefaultTransactionAttribute(attribute) {
558                        @Override
559                        public boolean rollbackOn(Throwable ex) {
560                                return classifier.classify(ex);
561                        }
562 
563                };
564 
565        }
566 
567        protected SkipPolicy createSkipPolicy() {
568                SkipPolicy skipPolicy = this.skipPolicy;
569                Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(
570                                skippableExceptionClasses);
571                map.put(ForceRollbackForWriteSkipException.class, true);
572                LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map);
573                if (skipPolicy == null) {
574                        Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0),
575                                        "If a skip limit is provided then skippable exceptions must also be specified");
576                        skipPolicy = limitCheckingItemSkipPolicy;
577                }
578                else if (limitCheckingItemSkipPolicy != null) {
579                        skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy });
580                }
581                return skipPolicy;
582        }
583 
584        /**
585         * @return fully configured retry template for item processing phase.
586         */
587        private BatchRetryTemplate createRetryOperations() {
588 
589                RetryPolicy retryPolicy = this.retryPolicy;
590                SimpleRetryPolicy simpleRetryPolicy = null;
591 
592                Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(
593                                retryableExceptionClasses);
594                map.put(ForceRollbackForWriteSkipException.class, true);
595                simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map);
596 
597                if (retryPolicy == null) {
598                        Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0),
599                                        "If a retry limit is provided then retryable exceptions must also be specified");
600                        retryPolicy = simpleRetryPolicy;
601                }
602                else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) {
603                        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
604                        compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy });
605                        retryPolicy = compositeRetryPolicy;
606                }
607 
608                RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy);
609 
610                BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate();
611                if (backOffPolicy != null) {
612                        batchRetryTemplate.setBackOffPolicy(backOffPolicy);
613                }
614                batchRetryTemplate.setRetryPolicy(retryPolicyWrapper);
615 
616                // Co-ordinate the retry policy with the exception handler:
617                RepeatOperations stepOperations = getStepOperations();
618                if (stepOperations instanceof RepeatTemplate) {
619                        SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper,
620                                        getExceptionHandler(), nonRetryableExceptionClasses);
621                        ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
622                }
623 
624                if (retryContextCache != null) {
625                        batchRetryTemplate.setRetryContextCache(retryContextCache);
626                }
627 
628                if (retryListeners != null) {
629                        batchRetryTemplate.setListeners(retryListeners.toArray(new RetryListener[0]));
630                }
631                return batchRetryTemplate;
632 
633        }
634 
635        /**
636         * Wrap the provided {@link org.springframework.retry.RetryPolicy} so that it never retries explicitly non-retryable
637         * exceptions.
638         */
639        private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) {
640 
641                NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
642                Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>();
643                for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) {
644                        map.put(fatal, neverRetryPolicy);
645                }
646 
647                SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>(
648                                retryPolicy);
649                classifier.setTypeMap(map);
650 
651                ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy();
652                retryPolicyWrapper.setExceptionClassifier(classifier);
653                return retryPolicyWrapper;
654 
655        }
656 
657        /**
658         * Wrap a {@link SkipPolicy} and make it consistent with known fatal exceptions.
659         *
660         * @param skipPolicy an existing skip policy
661         * @return a skip policy that will not skip fatal exceptions
662         */
663        private SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) {
664 
665                NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy();
666                Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<Class<? extends Throwable>, SkipPolicy>();
667                for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) {
668                        map.put(fatal, neverSkipPolicy);
669                }
670 
671                SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<Throwable, SkipPolicy>(skipPolicy);
672                classifier.setTypeMap(map);
673 
674                ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy();
675                skipPolicyWrapper.setExceptionClassifier(classifier);
676                return skipPolicyWrapper;
677        }
678 
679        private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) {
680                List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
681                for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) {
682                        exceptions.add(exceptionClass);
683                }
684                for (Class<? extends Throwable> fatal : cls) {
685                        if (!exceptions.contains(fatal)) {
686                                exceptions.add(fatal);
687                        }
688                }
689                nonSkippableExceptionClasses = exceptions;
690        }
691 
692        private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) {
693                List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>();
694                for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) {
695                        exceptions.add(exceptionClass);
696                }
697                for (Class<? extends Throwable> fatal : cls) {
698                        if (!exceptions.contains(fatal)) {
699                                exceptions.add(fatal);
700                        }
701                }
702                nonRetryableExceptionClasses = exceptions;
703        }
704 
705        /**
706         * ChunkListener that wraps exceptions thrown from the ChunkListener in {@link FatalStepExecutionException} to force
707         * termination of StepExecution
708         *
709         * ChunkListeners shoulnd't throw exceptions and expect continued processing, they must be handled in the
710         * implementation or the step will terminate
711         *
712         */
713        private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener {
714 
715                private ChunkListener chunkListener;
716 
717                TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) {
718                        this.chunkListener = chunkListener;
719                }
720 
721                @Override
722                public void beforeChunk(ChunkContext context) {
723                        try {
724                                chunkListener.beforeChunk(context);
725                        }
726                        catch (Throwable t) {
727                                throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
728                        }
729                }
730 
731                @Override
732                public void afterChunk(ChunkContext context) {
733                        try {
734                                chunkListener.afterChunk(context);
735                        }
736                        catch (Throwable t) {
737                                throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
738                        }
739                }
740 
741                @Override
742                public void afterChunkError(ChunkContext context) {
743                        try {
744                                chunkListener.afterChunkError(context);
745                        }
746                        catch (Throwable t) {
747                                throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
748                        }
749                }
750 
751                @Override
752                public int hashCode() {
753                        return chunkListener.hashCode();
754                }
755 
756                @SuppressWarnings("unchecked")
757                @Override
758                public boolean equals(Object obj) {
759                        if (obj instanceof FaultTolerantStepBuilder.TerminateOnExceptionChunkListenerDelegate){
760                                // unwrap the ChunkListener
761                                obj = ((TerminateOnExceptionChunkListenerDelegate)obj).chunkListener;
762                        }
763                        return chunkListener.equals(obj);
764                }
765                
766        }
767}

[all classes][org.springframework.batch.core.step.builder]
EMMA 2.0.5312 (C) Vladimir Roubtsov