1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collection; |
21 | import java.util.HashMap; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | |
26 | import org.springframework.batch.classify.BinaryExceptionClassifier; |
27 | import org.springframework.batch.classify.Classifier; |
28 | import org.springframework.batch.classify.SubclassClassifier; |
29 | import org.springframework.batch.core.ChunkListener; |
30 | import org.springframework.batch.core.JobInterruptedException; |
31 | import org.springframework.batch.core.Step; |
32 | import org.springframework.batch.core.StepListener; |
33 | import org.springframework.batch.core.step.FatalStepExecutionException; |
34 | import org.springframework.batch.core.step.skip.CompositeSkipPolicy; |
35 | import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy; |
36 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
37 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
38 | import org.springframework.batch.core.step.skip.NonSkippableReadException; |
39 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
40 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; |
41 | import org.springframework.batch.core.step.skip.SkipPolicy; |
42 | import org.springframework.batch.core.step.skip.SkipPolicyFailedException; |
43 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
44 | import org.springframework.batch.item.ItemReader; |
45 | import org.springframework.batch.item.ItemStream; |
46 | import org.springframework.batch.repeat.RepeatOperations; |
47 | import org.springframework.batch.repeat.support.RepeatTemplate; |
48 | import org.springframework.batch.retry.ExhaustedRetryException; |
49 | import org.springframework.batch.retry.RetryException; |
50 | import org.springframework.batch.retry.RetryListener; |
51 | import org.springframework.batch.retry.RetryPolicy; |
52 | import org.springframework.batch.retry.backoff.BackOffPolicy; |
53 | import org.springframework.batch.retry.policy.CompositeRetryPolicy; |
54 | import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy; |
55 | import org.springframework.batch.retry.policy.MapRetryContextCache; |
56 | import org.springframework.batch.retry.policy.NeverRetryPolicy; |
57 | import org.springframework.batch.retry.policy.RetryContextCache; |
58 | import org.springframework.batch.retry.policy.SimpleRetryPolicy; |
59 | import org.springframework.core.task.SyncTaskExecutor; |
60 | import org.springframework.core.task.TaskExecutor; |
61 | import org.springframework.transaction.TransactionException; |
62 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
63 | import org.springframework.transaction.interceptor.TransactionAttribute; |
64 | import org.springframework.util.Assert; |
65 | |
66 | /** |
67 | * Factory bean for step that provides options for configuring skip behaviour. |
68 | * User can set {@link #setSkipLimit(int)} to set how many exceptions of |
69 | * {@link #setSkippableExceptionClasses(Collection)} types are tolerated. |
70 | * {@link #setFatalExceptionClasses(Collection)} will cause immediate |
71 | * termination of job - they are treated as higher priority than |
72 | * {@link #setSkippableExceptionClasses(Collection)}, so the two lists don't |
73 | * need to be exclusive. |
74 | * |
75 | * Skippable exceptions on write will by default cause transaction rollback - to |
76 | * avoid rollback for specific exception class include it in the transaction |
77 | * attribute as "no rollback for". |
78 | * |
79 | * @see SimpleStepFactoryBean |
80 | * |
81 | * @author Dave Syer |
82 | * @author Robert Kasanicky |
83 | * @author Morten Andersen-Gott |
84 | * |
85 | */ |
86 | public class FaultTolerantStepFactoryBean<T, S> extends SimpleStepFactoryBean<T, S> { |
87 | |
88 | private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); |
89 | |
90 | private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
91 | |
92 | private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
93 | |
94 | private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); |
95 | |
96 | private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
97 | |
98 | private int cacheCapacity = 0; |
99 | |
100 | private int retryLimit = 0; |
101 | |
102 | private int skipLimit = 0; |
103 | |
104 | private SkipPolicy skipPolicy; |
105 | |
106 | private BackOffPolicy backOffPolicy; |
107 | |
108 | private RetryListener[] retryListeners; |
109 | |
110 | private RetryPolicy retryPolicy; |
111 | |
112 | private RetryContextCache retryContextCache; |
113 | |
114 | private KeyGenerator keyGenerator; |
115 | |
116 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); |
117 | |
118 | private boolean processorTransactional = true; |
119 | |
120 | /** |
121 | * The {@link KeyGenerator} to use to identify failed items across rollback. |
122 | * Not used in the case of the |
123 | * {@link #setIsReaderTransactionalQueue(boolean) transactional queue flag} |
124 | * being false (the default). |
125 | * |
126 | * @param keyGenerator |
127 | * the {@link KeyGenerator} to set |
128 | */ |
129 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
130 | this.keyGenerator = keyGenerator; |
131 | } |
132 | |
133 | /** |
134 | * Setter for the retry policy. If this is specified the other retry |
135 | * properties are ignored (retryLimit, backOffPolicy, |
136 | * retryableExceptionClasses). |
137 | * |
138 | * @param retryPolicy |
139 | * a stateless {@link RetryPolicy} |
140 | */ |
141 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
142 | this.retryPolicy = retryPolicy; |
143 | } |
144 | |
145 | /** |
146 | * Public setter for the retry limit. Each item can be retried up to this |
147 | * limit. Note this limit includes the initial attempt to process the item, |
148 | * therefore <code>retryLimit == 1</code> by default. |
149 | * |
150 | * @param retryLimit |
151 | * the retry limit to set, must be greater or equal to 1. |
152 | */ |
153 | public void setRetryLimit(int retryLimit) { |
154 | this.retryLimit = retryLimit; |
155 | } |
156 | |
157 | /** |
158 | * Public setter for the capacity of the cache in the retry policy. If more |
159 | * items than this fail without being skipped or recovered an exception will |
160 | * be thrown. This is to guard against inadvertent infinite loops generated |
161 | * by item identity problems.<br/> |
162 | * |
163 | * The default value should be high enough and more for most purposes. To |
164 | * breach the limit in a single-threaded step typically you have to have |
165 | * this many failures in a single transaction. Defaults to the value in the |
166 | * {@link MapRetryContextCache}.<br/> |
167 | * |
168 | * This property is ignored if the |
169 | * {@link #setRetryContextCache(RetryContextCache)} is set directly. |
170 | * |
171 | * @param cacheCapacity |
172 | * the cache capacity to set (greater than 0 else ignored) |
173 | */ |
174 | public void setCacheCapacity(int cacheCapacity) { |
175 | this.cacheCapacity = cacheCapacity; |
176 | } |
177 | |
178 | /** |
179 | * Override the default retry context cache for retry of chunk processing. |
180 | * If this property is set then {@link #setCacheCapacity(int)} is ignored. |
181 | * |
182 | * @param retryContextCache |
183 | * the {@link RetryContextCache} to set |
184 | */ |
185 | public void setRetryContextCache(RetryContextCache retryContextCache) { |
186 | this.retryContextCache = retryContextCache; |
187 | } |
188 | |
189 | /** |
190 | * Public setter for the retryable exceptions classifier map (from throwable |
191 | * class to boolean, true is retryable). |
192 | * |
193 | * @param retryableExceptionClasses |
194 | * the retryableExceptionClasses to set |
195 | */ |
196 | public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) { |
197 | this.retryableExceptionClasses = retryableExceptionClasses; |
198 | } |
199 | |
200 | /** |
201 | * Public setter for the {@link BackOffPolicy}. |
202 | * |
203 | * @param backOffPolicy |
204 | * the {@link BackOffPolicy} to set |
205 | */ |
206 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
207 | this.backOffPolicy = backOffPolicy; |
208 | } |
209 | |
210 | /** |
211 | * Public setter for the {@link RetryListener}s. |
212 | * |
213 | * @param retryListeners |
214 | * the {@link RetryListener}s to set |
215 | */ |
216 | public void setRetryListeners(RetryListener... retryListeners) { |
217 | this.retryListeners = retryListeners; |
218 | } |
219 | |
220 | /** |
221 | * A limit that determines skip policy. If this value is positive then an |
222 | * exception in chunk processing will cause the item to be skipped and no |
223 | * exception propagated until the limit is reached. If it is zero then all |
224 | * exceptions will be propagated from the chunk and cause the step to abort. |
225 | * |
226 | * @param skipLimit |
227 | * the value to set. Default is 0 (never skip). |
228 | */ |
229 | public void setSkipLimit(int skipLimit) { |
230 | this.skipLimit = skipLimit; |
231 | } |
232 | |
233 | /** |
234 | * A {@link SkipPolicy} that determines the outcome of an exception when |
235 | * processing an item. Overrides the {@link #setSkipLimit(int) skipLimit}. |
236 | * The {@link #setSkippableExceptionClasses(Map) skippableExceptionClasses} |
237 | * are also ignored if this is set. |
238 | * |
239 | * @param skipPolicy |
240 | * the {@link SkipPolicy} to set |
241 | */ |
242 | public void setSkipPolicy(SkipPolicy skipPolicy) { |
243 | this.skipPolicy = skipPolicy; |
244 | } |
245 | |
246 | /** |
247 | * Exception classes that when raised won't crash the job but will result in |
248 | * the item which handling caused the exception being skipped. Any exception |
249 | * which is marked for "no rollback" is also skippable, but not vice versa. |
250 | * Remember to set the {@link #setSkipLimit(int) skip limit} as well. |
251 | * <p/> |
252 | * Defaults to all no exception. |
253 | * |
254 | * @param exceptionClasses |
255 | * defaults to <code>Exception</code> |
256 | */ |
257 | public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) { |
258 | this.skippableExceptionClasses = exceptionClasses; |
259 | } |
260 | |
261 | /** |
262 | * Exception classes that are candidates for no rollback. The {@link Step} |
263 | * can not honour the no rollback hint in all circumstances, but any |
264 | * exception on this list is counted as skippable, so even if there has to |
265 | * be a rollback, then the step will not fail as long as the skip limit is |
266 | * not breached. |
267 | * <p/> |
268 | * Defaults is empty. |
269 | * |
270 | * @param noRollbackExceptionClasses |
271 | * the exception classes to set |
272 | */ |
273 | public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) { |
274 | this.noRollbackExceptionClasses = noRollbackExceptionClasses; |
275 | } |
276 | |
277 | /** |
278 | * @param processorTransactional |
279 | */ |
280 | public void setProcessorTransactional(boolean processorTransactional) { |
281 | this.processorTransactional = processorTransactional; |
282 | } |
283 | |
284 | /** |
285 | * Convenience method for subclasses to get an exception classifier based on |
286 | * the provided transaction attributes. |
287 | * |
288 | * @return an exception classifier: maps to true if an exception should |
289 | * cause rollback |
290 | */ |
291 | protected Classifier<Throwable, Boolean> getRollbackClassifier() { |
292 | |
293 | Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false); |
294 | |
295 | // Try to avoid pathological cases where we cannot force a rollback |
296 | // (should be pretty uncommon): |
297 | if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException())) |
298 | || !classifier.classify(new ExhaustedRetryException("test"))) { |
299 | |
300 | final Classifier<Throwable, Boolean> binary = classifier; |
301 | |
302 | Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>(); |
303 | types.add(ForceRollbackForWriteSkipException.class); |
304 | types.add(ExhaustedRetryException.class); |
305 | final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true); |
306 | |
307 | classifier = new Classifier<Throwable, Boolean>() { |
308 | public Boolean classify(Throwable classifiable) { |
309 | // Rollback if either the user's list or our own applies |
310 | return panic.classify(classifiable) || binary.classify(classifiable); |
311 | } |
312 | }; |
313 | |
314 | } |
315 | |
316 | return classifier; |
317 | |
318 | } |
319 | |
320 | /** |
321 | * Getter for the {@link TransactionAttribute} for subclasses only. |
322 | * |
323 | * @return the transactionAttribute |
324 | */ |
325 | @SuppressWarnings("serial") |
326 | @Override |
327 | protected TransactionAttribute getTransactionAttribute() { |
328 | |
329 | TransactionAttribute attribute = super.getTransactionAttribute(); |
330 | final Classifier<Throwable, Boolean> classifier = getRollbackClassifier(); |
331 | return new DefaultTransactionAttribute(attribute) { |
332 | @Override |
333 | public boolean rollbackOn(Throwable ex) { |
334 | return classifier.classify(ex); |
335 | } |
336 | |
337 | }; |
338 | |
339 | } |
340 | |
341 | @Override |
342 | @SuppressWarnings("unchecked") |
343 | protected void applyConfiguration(TaskletStep step) { |
344 | addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, |
345 | SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class, |
346 | Error.class); |
347 | addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, TransactionException.class, |
348 | FatalStepExecutionException.class, SkipListenerFailedException.class, SkipPolicyFailedException.class, |
349 | RetryException.class, JobInterruptedException.class, Error.class); |
350 | super.applyConfiguration(step); |
351 | } |
352 | |
353 | /** |
354 | * {@inheritDoc} |
355 | */ |
356 | @Override |
357 | protected void registerStreams(TaskletStep step, ItemStream[] streams) { |
358 | boolean streamIsReader = false; |
359 | ItemReader<? extends T> itemReader = getItemReader(); |
360 | for (ItemStream stream : streams) { |
361 | if (stream instanceof ItemReader<?>) { |
362 | streamIsReader = true; |
363 | chunkMonitor.registerItemStream(stream); |
364 | } else { |
365 | step.registerStream(stream); |
366 | } |
367 | } |
368 | TaskExecutor taskExecutor = getTaskExecutor(); |
369 | // In cases where multiple nested item readers are registered, |
370 | // they all want to get the open() and close() callbacks. |
371 | if (streamIsReader) { |
372 | // double registration is fine |
373 | step.registerStream(chunkMonitor); |
374 | boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor); |
375 | if (!concurrent) { |
376 | chunkMonitor.setItemReader(itemReader); |
377 | } else { |
378 | logger.warn("Asynchronous TaskExecutor detected (" + taskExecutor.getClass() |
379 | + ") with ItemStream reader. This is probably an error, " + "and may lead to incorrect restart data being stored."); |
380 | } |
381 | } |
382 | } |
383 | |
384 | /** |
385 | * @return {@link ChunkProvider} configured for fault-tolerance. |
386 | */ |
387 | @Override |
388 | protected SimpleChunkProvider<T> configureChunkProvider() { |
389 | |
390 | SkipPolicy readSkipPolicy = createSkipPolicy(); |
391 | readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy); |
392 | FaultTolerantChunkProvider<T> chunkProvider = new FaultTolerantChunkProvider<T>(getItemReader(), getChunkOperations()); |
393 | chunkProvider.setMaxSkipsOnRead(Math.max(getCommitInterval(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ)); |
394 | chunkProvider.setSkipPolicy(readSkipPolicy); |
395 | chunkProvider.setRollbackClassifier(getRollbackClassifier()); |
396 | |
397 | return chunkProvider; |
398 | |
399 | } |
400 | |
401 | /** |
402 | * @return |
403 | */ |
404 | protected SkipPolicy createSkipPolicy() { |
405 | SkipPolicy skipPolicy = this.skipPolicy; |
406 | Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(skippableExceptionClasses); |
407 | map.put(ForceRollbackForWriteSkipException.class, true); |
408 | LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map); |
409 | if (skipPolicy == null) { |
410 | Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0), |
411 | "If a skip limit is provided then skippable exceptions must also be specified"); |
412 | skipPolicy = limitCheckingItemSkipPolicy; |
413 | } else if (limitCheckingItemSkipPolicy != null) { |
414 | skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy }); |
415 | } |
416 | return skipPolicy; |
417 | } |
418 | |
419 | /** |
420 | * @return {@link ChunkProcessor} configured for fault-tolerance. |
421 | */ |
422 | @Override |
423 | protected SimpleChunkProcessor<T, S> configureChunkProcessor() { |
424 | |
425 | BatchRetryTemplate batchRetryTemplate = configureRetry(); |
426 | |
427 | FaultTolerantChunkProcessor<T, S> chunkProcessor = new FaultTolerantChunkProcessor<T, S>(getItemProcessor(), getItemWriter(), |
428 | batchRetryTemplate); |
429 | chunkProcessor.setBuffering(!isReaderTransactionalQueue()); |
430 | chunkProcessor.setProcessorTransactional(processorTransactional); |
431 | |
432 | SkipPolicy writeSkipPolicy = createSkipPolicy(); |
433 | writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy); |
434 | chunkProcessor.setWriteSkipPolicy(writeSkipPolicy); |
435 | chunkProcessor.setProcessSkipPolicy(writeSkipPolicy); |
436 | chunkProcessor.setRollbackClassifier(getRollbackClassifier()); |
437 | chunkProcessor.setKeyGenerator(keyGenerator); |
438 | chunkProcessor.setChunkMonitor(chunkMonitor); |
439 | |
440 | return chunkProcessor; |
441 | |
442 | } |
443 | |
444 | /** |
445 | * @return fully configured retry template for item processing phase. |
446 | */ |
447 | private BatchRetryTemplate configureRetry() { |
448 | |
449 | RetryPolicy retryPolicy = this.retryPolicy; |
450 | SimpleRetryPolicy simpleRetryPolicy = null; |
451 | |
452 | Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>(retryableExceptionClasses); |
453 | map.put(ForceRollbackForWriteSkipException.class, true); |
454 | simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map); |
455 | |
456 | if (retryPolicy == null) { |
457 | Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0), |
458 | "If a retry limit is provided then retryable exceptions must also be specified"); |
459 | retryPolicy = simpleRetryPolicy; |
460 | } else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) { |
461 | CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); |
462 | compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy }); |
463 | retryPolicy = compositeRetryPolicy; |
464 | } |
465 | |
466 | RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy); |
467 | |
468 | BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate(); |
469 | if (backOffPolicy != null) { |
470 | batchRetryTemplate.setBackOffPolicy(backOffPolicy); |
471 | } |
472 | batchRetryTemplate.setRetryPolicy(retryPolicyWrapper); |
473 | |
474 | // Co-ordinate the retry policy with the exception handler: |
475 | RepeatOperations stepOperations = getStepOperations(); |
476 | if (stepOperations instanceof RepeatTemplate) { |
477 | SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper, getExceptionHandler(), |
478 | nonRetryableExceptionClasses); |
479 | ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler); |
480 | } |
481 | |
482 | if (retryContextCache == null) { |
483 | if (cacheCapacity > 0) { |
484 | batchRetryTemplate.setRetryContextCache(new MapRetryContextCache(cacheCapacity)); |
485 | } |
486 | } else { |
487 | batchRetryTemplate.setRetryContextCache(retryContextCache); |
488 | } |
489 | |
490 | if (retryListeners != null) { |
491 | batchRetryTemplate.setListeners(retryListeners); |
492 | } |
493 | return batchRetryTemplate; |
494 | } |
495 | |
496 | /** |
497 | * Wrap the provided {@link #setRetryPolicy(RetryPolicy)} so that it never |
498 | * retries explicitly non-retryable exceptions. |
499 | */ |
500 | private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) { |
501 | |
502 | NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy(); |
503 | Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>(); |
504 | for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) { |
505 | map.put(fatal, neverRetryPolicy); |
506 | } |
507 | |
508 | SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>(retryPolicy); |
509 | classifier.setTypeMap(map); |
510 | |
511 | ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy(); |
512 | retryPolicyWrapper.setExceptionClassifier(classifier); |
513 | return retryPolicyWrapper; |
514 | |
515 | } |
516 | |
517 | /** |
518 | * Wrap a {@link SkipPolicy} and make it consistent with known fatal |
519 | * exceptions. |
520 | * |
521 | * @param skipPolicy |
522 | * an existing skip policy |
523 | * @return a skip policy that will not skip fatal exceptions |
524 | */ |
525 | private SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) { |
526 | |
527 | NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy(); |
528 | Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<Class<? extends Throwable>, SkipPolicy>(); |
529 | for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) { |
530 | map.put(fatal, neverSkipPolicy); |
531 | } |
532 | |
533 | SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<Throwable, SkipPolicy>(skipPolicy); |
534 | classifier.setTypeMap(map); |
535 | |
536 | ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy(); |
537 | skipPolicyWrapper.setExceptionClassifier(classifier); |
538 | return skipPolicyWrapper; |
539 | } |
540 | |
541 | private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) { |
542 | List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>(); |
543 | for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) { |
544 | exceptions.add(exceptionClass); |
545 | } |
546 | for (Class<? extends Throwable> fatal : cls) { |
547 | if (!exceptions.contains(fatal)) { |
548 | exceptions.add(fatal); |
549 | } |
550 | } |
551 | nonSkippableExceptionClasses = exceptions; |
552 | } |
553 | |
554 | private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) { |
555 | List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>(); |
556 | for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) { |
557 | exceptions.add(exceptionClass); |
558 | } |
559 | for (Class<? extends Throwable> fatal : cls) { |
560 | if (!exceptions.contains(fatal)) { |
561 | exceptions.add(fatal); |
562 | } |
563 | } |
564 | nonRetryableExceptionClasses = (List<Class<? extends Throwable>>) exceptions; |
565 | } |
566 | |
567 | @Override |
568 | protected void registerChunkListeners(TaskletStep step, StepListener listener) { |
569 | super.registerChunkListeners(step, new TerminateOnExceptionChunkListenerDelegate((ChunkListener) listener)); |
570 | } |
571 | |
572 | /** |
573 | * ChunkListener that wraps exceptions thrown from the ChunkListener in |
574 | * {@link FatalStepExecutionException} to force termination of StepExecution |
575 | * |
576 | * ChunkListeners shoulnd't throw exceptions and expect continued |
577 | * processing, they must be handled in the implementation or the step will |
578 | * terminate |
579 | * |
580 | */ |
581 | private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener { |
582 | |
583 | private ChunkListener chunkListener; |
584 | |
585 | TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) { |
586 | this.chunkListener = chunkListener; |
587 | } |
588 | |
589 | public void beforeChunk() { |
590 | try { |
591 | chunkListener.beforeChunk(); |
592 | } catch (Throwable t) { |
593 | throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t); |
594 | } |
595 | } |
596 | |
597 | public void afterChunk() { |
598 | try { |
599 | chunkListener.afterChunk(); |
600 | } catch (Throwable t) { |
601 | throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t); |
602 | } |
603 | } |
604 | |
605 | } |
606 | |
607 | } |