1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collection; |
21 | import java.util.HashMap; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | |
26 | import org.springframework.batch.classify.BinaryExceptionClassifier; |
27 | import org.springframework.batch.classify.Classifier; |
28 | import org.springframework.batch.classify.SubclassClassifier; |
29 | import org.springframework.batch.core.JobInterruptedException; |
30 | import org.springframework.batch.core.Step; |
31 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
32 | import org.springframework.batch.core.step.skip.NonSkippableReadException; |
33 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
34 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; |
35 | import org.springframework.batch.core.step.skip.SkipPolicy; |
36 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
37 | import org.springframework.batch.item.ItemReader; |
38 | import org.springframework.batch.item.ItemStream; |
39 | import org.springframework.batch.repeat.RepeatOperations; |
40 | import org.springframework.batch.repeat.support.RepeatTemplate; |
41 | import org.springframework.batch.retry.ExhaustedRetryException; |
42 | import org.springframework.batch.retry.RetryException; |
43 | import org.springframework.batch.retry.RetryListener; |
44 | import org.springframework.batch.retry.RetryPolicy; |
45 | import org.springframework.batch.retry.backoff.BackOffPolicy; |
46 | import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy; |
47 | import org.springframework.batch.retry.policy.MapRetryContextCache; |
48 | import org.springframework.batch.retry.policy.NeverRetryPolicy; |
49 | import org.springframework.batch.retry.policy.RetryContextCache; |
50 | import org.springframework.batch.retry.policy.SimpleRetryPolicy; |
51 | import org.springframework.core.task.SyncTaskExecutor; |
52 | import org.springframework.core.task.TaskExecutor; |
53 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
54 | import org.springframework.transaction.interceptor.TransactionAttribute; |
55 | |
56 | /** |
57 | * Factory bean for step that provides options for configuring skip behaviour. |
58 | * User can set {@link #setSkipLimit(int)} to set how many exceptions of |
59 | * {@link #setSkippableExceptionClasses(Collection)} types are tolerated. |
60 | * {@link #setFatalExceptionClasses(Collection)} will cause immediate |
61 | * termination of job - they are treated as higher priority than |
62 | * {@link #setSkippableExceptionClasses(Collection)}, so the two lists don't |
63 | * need to be exclusive. |
64 | * |
65 | * Skippable exceptions on write will by default cause transaction rollback - to |
66 | * avoid rollback for specific exception class include it in the transaction |
67 | * attribute as "no rollback for". |
68 | * |
69 | * @see SimpleStepFactoryBean |
70 | * |
71 | * @author Dave Syer |
72 | * @author Robert Kasanicky |
73 | * |
74 | */ |
75 | public class FaultTolerantStepFactoryBean<T, S> extends SimpleStepFactoryBean<T, S> { |
76 | |
77 | private Collection<Class<? extends Throwable>> skippableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
78 | |
79 | private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
80 | |
81 | private Collection<Class<? extends Throwable>> fatalExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
82 | |
83 | private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
84 | |
85 | private Collection<Class<? extends Throwable>> retryableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
86 | |
87 | private int cacheCapacity = 0; |
88 | |
89 | private int retryLimit = 0; |
90 | |
91 | private int skipLimit = 0; |
92 | |
93 | private BackOffPolicy backOffPolicy; |
94 | |
95 | private RetryListener[] retryListeners; |
96 | |
97 | private RetryPolicy retryPolicy; |
98 | |
99 | private RetryContextCache retryContextCache; |
100 | |
101 | private KeyGenerator keyGenerator; |
102 | |
103 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); |
104 | |
105 | /** |
106 | * The {@link KeyGenerator} to use to identify failed items across rollback. |
107 | * Not used in the case of the |
108 | * {@link #setIsReaderTransactionalQueue(boolean) transactional queue flag} |
109 | * being false (the default). |
110 | * |
111 | * @param keyGenerator the {@link KeyGenerator} to set |
112 | */ |
113 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
114 | this.keyGenerator = keyGenerator; |
115 | } |
116 | |
117 | /** |
118 | * Setter for the retry policy. If this is specified the other retry |
119 | * properties are ignored (retryLimit, backOffPolicy, |
120 | * retryableExceptionClasses). |
121 | * |
122 | * @param retryPolicy a stateless {@link RetryPolicy} |
123 | */ |
124 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
125 | this.retryPolicy = retryPolicy; |
126 | } |
127 | |
128 | /** |
129 | * Public setter for the retry limit. Each item can be retried up to this |
130 | * limit. Note this limit includes the initial attempt to process the item, |
131 | * therefore <code>retryLimit == 1</code> by default. |
132 | * |
133 | * @param retryLimit the retry limit to set, must be greater or equal to 1. |
134 | */ |
135 | public void setRetryLimit(int retryLimit) { |
136 | this.retryLimit = retryLimit; |
137 | } |
138 | |
139 | /** |
140 | * Public setter for the capacity of the cache in the retry policy. If more |
141 | * items than this fail without being skipped or recovered an exception will |
142 | * be thrown. This is to guard against inadvertent infinite loops generated |
143 | * by item identity problems.<br/> |
144 | * |
145 | * The default value should be high enough and more for most purposes. To |
146 | * breach the limit in a single-threaded step typically you have to have |
147 | * this many failures in a single transaction. Defaults to the value in the |
148 | * {@link MapRetryContextCache}.<br/> |
149 | * |
150 | * This property is ignored if the |
151 | * {@link #setRetryContextCache(RetryContextCache)} is set directly. |
152 | * |
153 | * @param cacheCapacity the cache capacity to set (greater than 0 else |
154 | * ignored) |
155 | */ |
156 | public void setCacheCapacity(int cacheCapacity) { |
157 | this.cacheCapacity = cacheCapacity; |
158 | } |
159 | |
160 | /** |
161 | * Override the default retry context cache for retry of chunk processing. |
162 | * If this property is set then {@link #setCacheCapacity(int)} is ignored. |
163 | * |
164 | * @param retryContextCache the {@link RetryContextCache} to set |
165 | */ |
166 | public void setRetryContextCache(RetryContextCache retryContextCache) { |
167 | this.retryContextCache = retryContextCache; |
168 | } |
169 | |
170 | /** |
171 | * Public setter for the Class[]. |
172 | * @param retryableExceptionClasses the retryableExceptionClasses to set |
173 | */ |
174 | public void setRetryableExceptionClasses(Collection<Class<? extends Throwable>> retryableExceptionClasses) { |
175 | this.retryableExceptionClasses = retryableExceptionClasses; |
176 | } |
177 | |
178 | /** |
179 | * Public setter for the {@link BackOffPolicy}. |
180 | * @param backOffPolicy the {@link BackOffPolicy} to set |
181 | */ |
182 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
183 | this.backOffPolicy = backOffPolicy; |
184 | } |
185 | |
186 | /** |
187 | * Public setter for the {@link RetryListener}s. |
188 | * @param retryListeners the {@link RetryListener}s to set |
189 | */ |
190 | public void setRetryListeners(RetryListener... retryListeners) { |
191 | this.retryListeners = retryListeners; |
192 | } |
193 | |
194 | /** |
195 | * A limit that determines skip policy. If this value is positive then an |
196 | * exception in chunk processing will cause the item to be skipped and no |
197 | * exception propagated until the limit is reached. If it is zero then all |
198 | * exceptions will be propagated from the chunk and cause the step to abort. |
199 | * |
200 | * @param skipLimit the value to set. Default is 0 (never skip). |
201 | */ |
202 | public void setSkipLimit(int skipLimit) { |
203 | this.skipLimit = skipLimit; |
204 | } |
205 | |
206 | /** |
207 | * Exception classes that when raised won't crash the job but will result in |
208 | * the item which handling caused the exception being skipped. Any exception |
209 | * which is marked for "no rollback" is also skippable, but not vice versa. |
210 | * Remember to set the {@link #setSkipLimit(int) skip limit} as well. |
211 | * <p/> |
212 | * Defaults to all exceptions. |
213 | * |
214 | * @param exceptionClasses defaults to <code>Exception</code> |
215 | */ |
216 | public void setSkippableExceptionClasses(Collection<Class<? extends Throwable>> exceptionClasses) { |
217 | this.skippableExceptionClasses = exceptionClasses; |
218 | } |
219 | |
220 | /** |
221 | * Exception classes that are candidates for no rollback. The {@link Step} |
222 | * can not honour the no rollback hint in all circumstances, but any |
223 | * exception on this list is counted as skippable, so even if there has to |
224 | * be a rollback, then the step will not fail as long as the skip limit is |
225 | * not breached. |
226 | * <p/> |
227 | * Defaults is empty. |
228 | * |
229 | * @param noRollbackExceptionClasses the exception classes to set |
230 | */ |
231 | public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) { |
232 | this.noRollbackExceptionClasses = noRollbackExceptionClasses; |
233 | } |
234 | |
235 | /** |
236 | * Exception classes that are not skippable (but may be retryable). |
237 | * |
238 | * @param fatalExceptionClasses {@link Error} by default |
239 | */ |
240 | public void setFatalExceptionClasses(Collection<Class<? extends Throwable>> fatalExceptionClasses) { |
241 | this.fatalExceptionClasses = fatalExceptionClasses; |
242 | } |
243 | |
244 | /** |
245 | * Convenience method for subclasses to get an exception classifier based on |
246 | * the provided transaction attributes. |
247 | * |
248 | * @return an exception classifier: maps to true if an exception should |
249 | * cause rollback |
250 | */ |
251 | protected Classifier<Throwable, Boolean> getRollbackClassifier() { |
252 | |
253 | Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false); |
254 | |
255 | // Try to avoid pathological cases where we cannot force a rollback |
256 | // (should be pretty uncommon): |
257 | if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException())) |
258 | || !classifier.classify(new ExhaustedRetryException("test"))) { |
259 | |
260 | final Classifier<Throwable, Boolean> binary = classifier; |
261 | |
262 | Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>(); |
263 | types.add(ForceRollbackForWriteSkipException.class); |
264 | types.add(ExhaustedRetryException.class); |
265 | final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true); |
266 | |
267 | classifier = new Classifier<Throwable, Boolean>() { |
268 | public Boolean classify(Throwable classifiable) { |
269 | // Rollback if either the user's list or our own applies |
270 | return panic.classify(classifiable) || binary.classify(classifiable); |
271 | } |
272 | }; |
273 | |
274 | } |
275 | |
276 | return classifier; |
277 | |
278 | } |
279 | |
280 | /** |
281 | * Getter for the {@link TransactionAttribute} for subclasses only. |
282 | * @return the transactionAttribute |
283 | */ |
284 | @Override |
285 | protected TransactionAttribute getTransactionAttribute() { |
286 | |
287 | TransactionAttribute attribute = super.getTransactionAttribute(); |
288 | final Classifier<Throwable, Boolean> classifier = getRollbackClassifier(); |
289 | return new DefaultTransactionAttribute(attribute) { |
290 | @Override |
291 | public boolean rollbackOn(Throwable ex) { |
292 | return classifier.classify(ex); |
293 | } |
294 | |
295 | }; |
296 | |
297 | } |
298 | |
299 | @Override |
300 | protected void applyConfiguration(TaskletStep step) { |
301 | |
302 | addFatalExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, |
303 | SkipListenerFailedException.class, RetryException.class, JobInterruptedException.class, Error.class); |
304 | addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, |
305 | SkipListenerFailedException.class, RetryException.class, JobInterruptedException.class, Error.class); |
306 | |
307 | super.applyConfiguration(step); |
308 | |
309 | } |
310 | |
311 | /** |
312 | * {@inheritDoc} |
313 | */ |
314 | @Override |
315 | protected void registerStreams(TaskletStep step, ItemStream[] streams) { |
316 | boolean streamIsReader = false; |
317 | ItemReader<? extends T> itemReader = getItemReader(); |
318 | for (ItemStream stream : streams) { |
319 | if (stream instanceof ItemReader) { |
320 | streamIsReader = true; |
321 | chunkMonitor.registerItemStream(stream); |
322 | } |
323 | else { |
324 | step.registerStream(stream); |
325 | } |
326 | } |
327 | TaskExecutor taskExecutor = getTaskExecutor(); |
328 | // In cases where multiple nested item readers are registered, |
329 | // they all want to get the open() and close() callbacks. |
330 | if (streamIsReader) { |
331 | // double registration is fine |
332 | step.registerStream(chunkMonitor); |
333 | boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor); |
334 | if (!concurrent) { |
335 | chunkMonitor.setItemReader(itemReader); |
336 | } |
337 | else { |
338 | logger.warn("Synchronous TaskExecutor detected (" + taskExecutor.getClass() |
339 | + ") with ItemStream reader. This is probably an error, " |
340 | + "and may lead to incorrect restart data being stored."); |
341 | } |
342 | } |
343 | } |
344 | |
345 | /** |
346 | * @return {@link ChunkProvider} configured for fault-tolerance. |
347 | */ |
348 | @Override |
349 | protected SimpleChunkProvider<T> configureChunkProvider() { |
350 | |
351 | SkipPolicy readSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, getSkippableExceptionClasses(), |
352 | fatalExceptionClasses); |
353 | FaultTolerantChunkProvider<T> chunkProvider = new FaultTolerantChunkProvider<T>(getItemReader(), |
354 | getChunkOperations()); |
355 | chunkProvider.setSkipPolicy(readSkipPolicy); |
356 | chunkProvider.setRollbackClassifier(getRollbackClassifier()); |
357 | |
358 | return chunkProvider; |
359 | |
360 | } |
361 | |
362 | /** |
363 | * @return {@link ChunkProcessor} configured for fault-tolerance. |
364 | */ |
365 | @Override |
366 | protected SimpleChunkProcessor<T, S> configureChunkProcessor() { |
367 | |
368 | BatchRetryTemplate batchRetryTemplate = configureRetry(); |
369 | |
370 | FaultTolerantChunkProcessor<T, S> chunkProcessor = new FaultTolerantChunkProcessor<T, S>(getItemProcessor(), |
371 | getItemWriter(), batchRetryTemplate); |
372 | chunkProcessor.setBuffering(!isReaderTransactionalQueue()); |
373 | |
374 | SkipPolicy writeSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, getSkippableExceptionClasses(), |
375 | fatalExceptionClasses); |
376 | chunkProcessor.setWriteSkipPolicy(writeSkipPolicy); |
377 | chunkProcessor.setProcessSkipPolicy(writeSkipPolicy); |
378 | chunkProcessor.setRollbackClassifier(getRollbackClassifier()); |
379 | chunkProcessor.setKeyGenerator(keyGenerator); |
380 | chunkProcessor.setChunkMonitor(chunkMonitor); |
381 | |
382 | return chunkProcessor; |
383 | |
384 | } |
385 | |
386 | /** |
387 | * @return |
388 | */ |
389 | private Collection<Class<? extends Throwable>> getSkippableExceptionClasses() { |
390 | HashSet<Class<? extends Throwable>> set = new HashSet<Class<? extends Throwable>>(skippableExceptionClasses); |
391 | set.add(ForceRollbackForWriteSkipException.class); |
392 | return set; |
393 | } |
394 | |
395 | /** |
396 | * @return fully configured retry template for item processing phase. |
397 | */ |
398 | private BatchRetryTemplate configureRetry() { |
399 | |
400 | if (retryPolicy == null) { |
401 | SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(retryLimit); |
402 | HashSet<Class<? extends Throwable>> set = new HashSet<Class<? extends Throwable>>(retryableExceptionClasses); |
403 | set.add(ForceRollbackForWriteSkipException.class); |
404 | // set.addAll(noRollbackExceptionClasses); // should only be |
405 | // retryable on write |
406 | simpleRetryPolicy.setRetryableExceptionClasses(set); |
407 | retryPolicy = simpleRetryPolicy; |
408 | } |
409 | |
410 | RetryPolicy retryPolicyWrapper = fatalExceptionAwareProxy(retryPolicy); |
411 | |
412 | BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate(); |
413 | if (backOffPolicy != null) { |
414 | batchRetryTemplate.setBackOffPolicy(backOffPolicy); |
415 | } |
416 | batchRetryTemplate.setRetryPolicy(retryPolicyWrapper); |
417 | |
418 | // Co-ordinate the retry policy with the exception handler: |
419 | RepeatOperations stepOperations = getStepOperations(); |
420 | if (stepOperations instanceof RepeatTemplate) { |
421 | SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper, |
422 | getExceptionHandler(), nonRetryableExceptionClasses); |
423 | ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler); |
424 | } |
425 | |
426 | if (retryContextCache == null) { |
427 | if (cacheCapacity > 0) { |
428 | batchRetryTemplate.setRetryContextCache(new MapRetryContextCache(cacheCapacity)); |
429 | } |
430 | } |
431 | else { |
432 | batchRetryTemplate.setRetryContextCache(retryContextCache); |
433 | } |
434 | |
435 | if (retryListeners != null) { |
436 | batchRetryTemplate.setListeners(retryListeners); |
437 | } |
438 | return batchRetryTemplate; |
439 | } |
440 | |
441 | /** |
442 | * Wrap the provided {@link #setRetryPolicy(RetryPolicy)} so that it never |
443 | * retries explicitly non-retryable exceptions. |
444 | */ |
445 | private RetryPolicy fatalExceptionAwareProxy(final RetryPolicy retryPolicy) { |
446 | |
447 | NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy(); |
448 | Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>(); |
449 | for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) { |
450 | map.put(fatal, neverRetryPolicy); |
451 | } |
452 | |
453 | SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>( |
454 | retryPolicy); |
455 | classifier.setTypeMap(map); |
456 | |
457 | ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy(); |
458 | retryPolicyWrapper.setExceptionClassifier(classifier); |
459 | return retryPolicyWrapper; |
460 | |
461 | } |
462 | |
463 | @SuppressWarnings("unchecked") |
464 | private void addFatalExceptionIfMissing(Class... cls) { |
465 | List exceptions = new ArrayList<Class<? extends Throwable>>(); |
466 | for (Class exceptionClass : fatalExceptionClasses) { |
467 | exceptions.add(exceptionClass); |
468 | } |
469 | for (Class fatal : cls) { |
470 | if (!exceptions.contains(fatal)) { |
471 | exceptions.add(fatal); |
472 | } |
473 | } |
474 | fatalExceptionClasses = exceptions; |
475 | } |
476 | |
477 | @SuppressWarnings("unchecked") |
478 | private void addNonRetryableExceptionIfMissing(Class... cls) { |
479 | List exceptions = new ArrayList<Class<? extends Throwable>>(); |
480 | for (Class exceptionClass : nonRetryableExceptionClasses) { |
481 | exceptions.add(exceptionClass); |
482 | } |
483 | for (Class fatal : cls) { |
484 | if (!exceptions.contains(fatal)) { |
485 | exceptions.add(fatal); |
486 | } |
487 | } |
488 | nonRetryableExceptionClasses = exceptions; |
489 | } |
490 | |
491 | } |