View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.factory;
18  
19  import java.util.Collection;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.Map;
23  
24  import org.springframework.batch.core.SkipListener;
25  import org.springframework.batch.core.Step;
26  import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
27  import org.springframework.batch.core.step.builder.SimpleStepBuilder;
28  import org.springframework.batch.core.step.builder.StepBuilder;
29  import org.springframework.batch.core.step.item.KeyGenerator;
30  import org.springframework.batch.core.step.skip.SkipPolicy;
31  import org.springframework.retry.RetryListener;
32  import org.springframework.retry.RetryPolicy;
33  import org.springframework.retry.backoff.BackOffPolicy;
34  import org.springframework.retry.policy.MapRetryContextCache;
35  import org.springframework.retry.policy.RetryContextCache;
36  
37  /**
38   * Factory bean for step that provides options for configuring skip behavior. User can set {@link #setSkipLimit(int)}
39   * to set how many exceptions of {@link #setSkippableExceptionClasses(Map)} types are tolerated.
40   *
41   * Skippable exceptions on write will by default cause transaction rollback - to avoid rollback for specific exception
42   * class include it in the transaction attribute as "no rollback for".
43   *
44   * @see SimpleStepFactoryBean
45   *
46   * @author Dave Syer
47   * @author Robert Kasanicky
48   * @author Morten Andersen-Gott
49   *
50   */
51  public class FaultTolerantStepFactoryBean<T, S> extends SimpleStepFactoryBean<T, S> {
52  
53  	private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
54  
55  	private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new HashSet<Class<? extends Throwable>>();
56  
57  	private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
58  
59  	private int cacheCapacity = 0;
60  
61  	private int retryLimit = 0;
62  
63  	private int skipLimit = 0;
64  
65  	private SkipPolicy skipPolicy;
66  
67  	private BackOffPolicy backOffPolicy;
68  
69  	private RetryListener[] retryListeners;
70  
71  	private RetryPolicy retryPolicy;
72  
73  	private RetryContextCache retryContextCache;
74  
75  	private KeyGenerator keyGenerator;
76  
77  	private boolean processorTransactional = true;
78  
79  	/**
80  	 * The {@link KeyGenerator} to use to identify failed items across rollback. Not used in the case of the
81  	 * {@link #setIsReaderTransactionalQueue(boolean) transactional queue flag} being false (the default).
82  	 *
83  	 * @param keyGenerator the {@link KeyGenerator} to set
84  	 */
85  	public void setKeyGenerator(KeyGenerator keyGenerator) {
86  		this.keyGenerator = keyGenerator;
87  	}
88  
89  	/**
90  	 * Setter for the retry policy. If this is specified the other retry properties are ignored (retryLimit,
91  	 * backOffPolicy, retryableExceptionClasses).
92  	 *
93  	 * @param retryPolicy a stateless {@link RetryPolicy}
94  	 */
95  	public void setRetryPolicy(RetryPolicy retryPolicy) {
96  		this.retryPolicy = retryPolicy;
97  	}
98  
99  	/**
100 	 * Public setter for the retry limit. Each item can be retried up to this limit. Note this limit includes the
101 	 * initial attempt to process the item, therefore <code>retryLimit == 1</code> by default.
102 	 *
103 	 * @param retryLimit the retry limit to set, must be greater or equal to 1.
104 	 */
105 	public void setRetryLimit(int retryLimit) {
106 		this.retryLimit = retryLimit;
107 	}
108 
109 	/**
110 	 * Public setter for the capacity of the cache in the retry policy. If more items than this fail without being
111 	 * skipped or recovered an exception will be thrown. This is to guard against inadvertent infinite loops generated
112 	 * by item identity problems.<br/>
113 	 *
114 	 * The default value should be high enough and more for most purposes. To breach the limit in a single-threaded step
115 	 * typically you have to have this many failures in a single transaction. Defaults to the value in the
116 	 * {@link MapRetryContextCache}.<br/>
117 	 *
118 	 * This property is ignored if the {@link #setRetryContextCache(RetryContextCache)} is set directly.
119 	 *
120 	 * @param cacheCapacity the cache capacity to set (greater than 0 else ignored)
121 	 */
122 	public void setCacheCapacity(int cacheCapacity) {
123 		this.cacheCapacity = cacheCapacity;
124 	}
125 
126 	/**
127 	 * Override the default retry context cache for retry of chunk processing. If this property is set then
128 	 * {@link #setCacheCapacity(int)} is ignored.
129 	 *
130 	 * @param retryContextCache the {@link RetryContextCache} to set
131 	 */
132 	public void setRetryContextCache(RetryContextCache retryContextCache) {
133 		this.retryContextCache = retryContextCache;
134 	}
135 
136 	/**
137 	 * Public setter for the retryable exceptions classifier map (from throwable class to boolean, true is retryable).
138 	 *
139 	 * @param retryableExceptionClasses the retryableExceptionClasses to set
140 	 */
141 	public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) {
142 		this.retryableExceptionClasses = retryableExceptionClasses;
143 	}
144 
145 	/**
146 	 * Public setter for the {@link BackOffPolicy}.
147 	 *
148 	 * @param backOffPolicy the {@link BackOffPolicy} to set
149 	 */
150 	public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
151 		this.backOffPolicy = backOffPolicy;
152 	}
153 
154 	/**
155 	 * Public setter for the {@link RetryListener}s.
156 	 *
157 	 * @param retryListeners the {@link RetryListener}s to set
158 	 */
159 	public void setRetryListeners(RetryListener... retryListeners) {
160 		this.retryListeners = retryListeners;
161 	}
162 
163 	/**
164 	 * A limit that determines skip policy. If this value is positive then an exception in chunk processing will cause
165 	 * the item to be skipped and no exception propagated until the limit is reached. If it is zero then all exceptions
166 	 * will be propagated from the chunk and cause the step to abort.
167 	 *
168 	 * @param skipLimit the value to set. Default is 0 (never skip).
169 	 */
170 	public void setSkipLimit(int skipLimit) {
171 		this.skipLimit = skipLimit;
172 	}
173 
174 	/**
175 	 * A {@link SkipPolicy} that determines the outcome of an exception when processing an item. Overrides the
176 	 * {@link #setSkipLimit(int) skipLimit}. The {@link #setSkippableExceptionClasses(Map) skippableExceptionClasses}
177 	 * are also ignored if this is set.
178 	 *
179 	 * @param skipPolicy the {@link SkipPolicy} to set
180 	 */
181 	public void setSkipPolicy(SkipPolicy skipPolicy) {
182 		this.skipPolicy = skipPolicy;
183 	}
184 
185 	/**
186 	 * Exception classes that when raised won't crash the job but will result in the item which handling caused the
187 	 * exception being skipped. Any exception which is marked for "no rollback" is also skippable, but not vice versa.
188 	 * Remember to set the {@link #setSkipLimit(int) skip limit} as well.
189 	 * <p/>
190 	 * Defaults to all no exception.
191 	 *
192 	 * @param exceptionClasses defaults to <code>Exception</code>
193 	 */
194 	public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) {
195 		this.skippableExceptionClasses = exceptionClasses;
196 	}
197 
198 	/**
199 	 * Exception classes that are candidates for no rollback. The {@link Step} can not honour the no rollback hint in
200 	 * all circumstances, but any exception on this list is counted as skippable, so even if there has to be a rollback,
201 	 * then the step will not fail as long as the skip limit is not breached.
202 	 * <p/>
203 	 * Defaults is empty.
204 	 *
205 	 * @param noRollbackExceptionClasses the exception classes to set
206 	 */
207 	public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) {
208 		this.noRollbackExceptionClasses = noRollbackExceptionClasses;
209 	}
210 
211 	/**
212 	 * @param processorTransactional
213 	 */
214 	public void setProcessorTransactional(boolean processorTransactional) {
215 		this.processorTransactional = processorTransactional;
216 	}
217 
218 	@Override
219 	protected SimpleStepBuilder<T, S> createBuilder(String name) {
220 		return new FaultTolerantStepBuilder<T, S>(new StepBuilder(name));
221 	}
222 
223 	@Override
224 	protected void applyConfiguration(SimpleStepBuilder<T, S> builder) {
225 
226 		FaultTolerantStepBuilder<T, S> faultTolerantBuilder = (FaultTolerantStepBuilder<T, S>) builder;
227 
228 		if (retryContextCache == null && cacheCapacity > 0) {
229 			retryContextCache = new MapRetryContextCache(cacheCapacity);
230 		}
231 		faultTolerantBuilder.retryContextCache(retryContextCache);
232 		for (SkipListener<T, S> listener : BatchListenerFactoryHelper.<SkipListener<T, S>> getListeners(getListeners(),
233 				SkipListener.class)) {
234 			faultTolerantBuilder.listener(listener);
235 		}
236 
237 		if (retryListeners != null) {
238 			for (RetryListener listener : retryListeners) {
239 				faultTolerantBuilder.listener(listener);
240 			}
241 		}
242 
243 		faultTolerantBuilder.skipPolicy(skipPolicy);
244 		faultTolerantBuilder.skipLimit(skipLimit);
245 		for (Class<? extends Throwable> type : skippableExceptionClasses.keySet()) {
246 			if (skippableExceptionClasses.get(type)) {
247 				faultTolerantBuilder.skip(type);
248 			}
249 			else {
250 				faultTolerantBuilder.noSkip(type);
251 			}
252 		}
253 
254 		if (!processorTransactional) {
255 			faultTolerantBuilder.processorNonTransactional();
256 		}
257 
258 		faultTolerantBuilder.retryContextCache(retryContextCache);
259 		faultTolerantBuilder.keyGenerator(keyGenerator);
260 		faultTolerantBuilder.retryPolicy(retryPolicy);
261 		faultTolerantBuilder.retryLimit(retryLimit);
262 		faultTolerantBuilder.backOffPolicy(backOffPolicy);
263 		for (Class<? extends Throwable> type : retryableExceptionClasses.keySet()) {
264 			if (retryableExceptionClasses.get(type)) {
265 				faultTolerantBuilder.retry(type);
266 			}
267 			else {
268 				faultTolerantBuilder.noRetry(type);
269 			}
270 		}
271 
272 		for (Class<? extends Throwable> type : noRollbackExceptionClasses) {
273 			faultTolerantBuilder.noRollback(type);
274 		}
275 		super.applyConfiguration(builder);
276 
277 	}
278 
279 }