View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.item;
18  
19  import org.springframework.classify.BinaryExceptionClassifier;
20  import org.springframework.classify.Classifier;
21  import org.springframework.batch.core.StepContribution;
22  import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
23  import org.springframework.batch.core.step.skip.NonSkippableReadException;
24  import org.springframework.batch.core.step.skip.SkipException;
25  import org.springframework.batch.core.step.skip.SkipListenerFailedException;
26  import org.springframework.batch.core.step.skip.SkipPolicy;
27  import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
28  import org.springframework.batch.item.ItemReader;
29  import org.springframework.batch.repeat.RepeatOperations;
30  
31  /**
32   * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
33   * allows for skipping or retry of items that cause exceptions during reading or
34   * processing.
35   * 
36   */
37  public class FaultTolerantChunkProvider<I> extends SimpleChunkProvider<I> {
38  
39  	/**
40  	 * Hard limit for number of read skips in the same chunk. Should be
41  	 * sufficiently high that it is only encountered in a runaway step where all
42  	 * items are skipped before the chunk can complete (leading to a potential
43  	 * heap memory problem).
44  	 */
45  	public static final int DEFAULT_MAX_SKIPS_ON_READ = 100;
46  
47  	private SkipPolicy skipPolicy = new LimitCheckingItemSkipPolicy();
48  
49  	private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
50  
51  	private int maxSkipsOnRead = DEFAULT_MAX_SKIPS_ON_READ;
52  
53  	public FaultTolerantChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
54  		super(itemReader, repeatOperations);
55  	}
56  	
57  	/**
58  	 * @param maxSkipsOnRead the maximum number of skips on read
59  	 */
60  	public void setMaxSkipsOnRead(int maxSkipsOnRead) {
61  		this.maxSkipsOnRead = maxSkipsOnRead;
62  	}
63  
64  	/**
65  	 * The policy that determines whether exceptions can be skipped on read.
66  	 * @param SkipPolicy
67  	 */
68  	public void setSkipPolicy(SkipPolicy SkipPolicy) {
69  		this.skipPolicy = SkipPolicy;
70  	}
71  
72  	/**
73  	 * Classifier to determine whether exceptions have been marked as
74  	 * no-rollback (as opposed to skippable). If ecnounterd they are simply
75  	 * ignored, unless also skippable.
76  	 * 
77  	 * @param rollbackClassifier the rollback classifier to set
78  	 */
79  	public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
80  		this.rollbackClassifier = rollbackClassifier;
81  	}
82  
83  	@Override
84  	protected I read(StepContribution contribution, Chunk<I> chunk) throws Exception {
85  		while (true) {
86  			try {
87  				return doRead();
88  			}
89  			catch (Exception e) {
90  
91  				if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
92  					// increment skip count and try again
93  					contribution.incrementReadSkipCount();
94  					chunk.skip(e);
95  
96  					if (chunk.getErrors().size() >= maxSkipsOnRead) {
97  						throw new SkipOverflowException("Too many skips on read");
98  					}
99  
100 					logger.debug("Skipping failed input", e);
101 				}
102 				else {
103 					if (rollbackClassifier.classify(e)) {
104 						throw new NonSkippableReadException("Non-skippable exception during read", e);
105 					}
106 					logger.debug("No-rollback for non-skippable exception (ignored)", e);
107 				}
108 
109 			}
110 		}
111 	}
112 
113 	@Override
114 	public void postProcess(StepContribution contribution, Chunk<I> chunk) {
115 		for (Exception e : chunk.getErrors()) {
116 			try {
117 				getListener().onSkipInRead(e);
118 			}
119 			catch (RuntimeException ex) {
120 				throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
121 			}
122 		}
123 	}
124 
125 	/**
126 	 * Convenience method for calling process skip policy.
127 	 * 
128 	 * @param policy the skip policy
129 	 * @param e the cause of the skip
130 	 * @param skipCount the current skip count
131 	 */
132 	private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
133 		try {
134 			return policy.shouldSkip(e, skipCount);
135 		}
136 		catch (SkipException ex) {
137 			throw ex;
138 		}
139 		catch (RuntimeException ex) {
140 			throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", ex, e);
141 		}
142 	}
143 
144 }