1 | /* |
2 | * Copyright 2006-2014 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.builder; |
17 | |
18 | import org.springframework.batch.core.ChunkListener; |
19 | import org.springframework.batch.core.JobInterruptedException; |
20 | import org.springframework.batch.core.SkipListener; |
21 | import org.springframework.batch.core.StepExecutionListener; |
22 | import org.springframework.batch.core.StepListener; |
23 | import org.springframework.batch.core.annotation.AfterChunk; |
24 | import org.springframework.batch.core.annotation.AfterChunkError; |
25 | import org.springframework.batch.core.annotation.BeforeChunk; |
26 | import org.springframework.batch.core.annotation.OnSkipInProcess; |
27 | import org.springframework.batch.core.annotation.OnSkipInRead; |
28 | import org.springframework.batch.core.annotation.OnSkipInWrite; |
29 | import org.springframework.batch.core.listener.StepListenerFactoryBean; |
30 | import org.springframework.batch.core.scope.context.ChunkContext; |
31 | import org.springframework.batch.core.step.FatalStepExecutionException; |
32 | import org.springframework.batch.core.step.item.BatchRetryTemplate; |
33 | import org.springframework.batch.core.step.item.ChunkMonitor; |
34 | import org.springframework.batch.core.step.item.ChunkOrientedTasklet; |
35 | import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor; |
36 | import org.springframework.batch.core.step.item.FaultTolerantChunkProvider; |
37 | import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException; |
38 | import org.springframework.batch.core.step.item.KeyGenerator; |
39 | import org.springframework.batch.core.step.item.SimpleRetryExceptionHandler; |
40 | import org.springframework.batch.core.step.skip.CompositeSkipPolicy; |
41 | import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy; |
42 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
43 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
44 | import org.springframework.batch.core.step.skip.NonSkippableReadException; |
45 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
46 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; |
47 | import org.springframework.batch.core.step.skip.SkipPolicy; |
48 | import org.springframework.batch.core.step.skip.SkipPolicyFailedException; |
49 | import org.springframework.batch.core.step.tasklet.Tasklet; |
50 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
51 | import org.springframework.batch.item.ItemReader; |
52 | import org.springframework.batch.item.ItemStream; |
53 | import org.springframework.batch.repeat.RepeatOperations; |
54 | import org.springframework.batch.repeat.support.RepeatTemplate; |
55 | import org.springframework.batch.support.ReflectionUtils; |
56 | import org.springframework.classify.BinaryExceptionClassifier; |
57 | import org.springframework.classify.Classifier; |
58 | import org.springframework.classify.SubclassClassifier; |
59 | import org.springframework.retry.ExhaustedRetryException; |
60 | import org.springframework.retry.RetryException; |
61 | import org.springframework.retry.RetryListener; |
62 | import org.springframework.retry.RetryPolicy; |
63 | import org.springframework.retry.backoff.BackOffPolicy; |
64 | import org.springframework.retry.policy.CompositeRetryPolicy; |
65 | import org.springframework.retry.policy.ExceptionClassifierRetryPolicy; |
66 | import org.springframework.retry.policy.NeverRetryPolicy; |
67 | import org.springframework.retry.policy.RetryContextCache; |
68 | import org.springframework.retry.policy.SimpleRetryPolicy; |
69 | import org.springframework.transaction.TransactionException; |
70 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
71 | import org.springframework.transaction.interceptor.TransactionAttribute; |
72 | import org.springframework.util.Assert; |
73 | |
74 | import java.lang.reflect.Method; |
75 | import java.util.ArrayList; |
76 | import java.util.Collection; |
77 | import java.util.HashMap; |
78 | import java.util.HashSet; |
79 | import java.util.LinkedHashSet; |
80 | import java.util.List; |
81 | import java.util.Map; |
82 | import java.util.Set; |
83 | |
84 | /** |
85 | * A step builder for fully fault tolerant chunk-oriented item processing steps. Extends {@link SimpleStepBuilder} with |
86 | * additional properties for retry and skip of failed items. |
87 | * |
88 | * @author Dave Syer |
89 | * @author Michael Minella |
90 | * |
91 | * @since 2.2 |
92 | */ |
93 | public class FaultTolerantStepBuilder<I, O> extends SimpleStepBuilder<I, O> { |
94 | |
95 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); |
96 | |
97 | private boolean streamIsReader; |
98 | |
99 | private int retryLimit = 0; |
100 | |
101 | private BackOffPolicy backOffPolicy; |
102 | |
103 | private Set<RetryListener> retryListeners = new LinkedHashSet<RetryListener>(); |
104 | |
105 | private RetryPolicy retryPolicy; |
106 | |
107 | private RetryContextCache retryContextCache; |
108 | |
109 | private KeyGenerator keyGenerator; |
110 | |
111 | private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new LinkedHashSet<Class<? extends Throwable>>(); |
112 | |
113 | private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); |
114 | |
115 | private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
116 | |
117 | private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); |
118 | |
119 | private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<Class<? extends Throwable>>(); |
120 | |
121 | private Set<SkipListener<? super I, ? super O>> skipListeners = new LinkedHashSet<SkipListener<? super I, ? super O>>(); |
122 | |
123 | private int skipLimit = 0; |
124 | |
125 | private SkipPolicy skipPolicy; |
126 | |
127 | private boolean processorTransactional = true; |
128 | |
129 | /** |
130 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
131 | * |
132 | * @param parent a parent helper containing common step properties |
133 | */ |
134 | public FaultTolerantStepBuilder(StepBuilderHelper<?> parent) { |
135 | super(parent); |
136 | } |
137 | |
138 | /** |
139 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
140 | * |
141 | * @param parent a parent helper containing common step properties |
142 | */ |
143 | protected FaultTolerantStepBuilder(SimpleStepBuilder<I, O> parent) { |
144 | super(parent); |
145 | } |
146 | |
147 | @Override |
148 | public TaskletStep build() { |
149 | registerStepListenerAsSkipListener(); |
150 | return super.build(); |
151 | } |
152 | |
153 | @SuppressWarnings("unchecked") |
154 | private void registerStepListenerAsSkipListener() { |
155 | for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){ |
156 | if (stepExecutionListener instanceof SkipListener){ |
157 | listener((SkipListener<I,O>)stepExecutionListener); |
158 | } |
159 | } |
160 | for (ChunkListener chunkListener: this.chunkListeners){ |
161 | if (chunkListener instanceof SkipListener){ |
162 | listener((SkipListener<I,O>)chunkListener); |
163 | } |
164 | } |
165 | } |
166 | |
167 | /** |
168 | * Create a new chunk oriented tasklet with reader, writer and processor as provided. |
169 | * |
170 | * @see org.springframework.batch.core.step.builder.SimpleStepBuilder#createTasklet() |
171 | */ |
172 | @Override |
173 | protected Tasklet createTasklet() { |
174 | Assert.state(getReader() != null, "ItemReader must be provided"); |
175 | Assert.state(getProcessor() != null || getWriter() != null, "ItemWriter or ItemProcessor must be provided"); |
176 | addSpecialExceptions(); |
177 | registerSkipListeners(); |
178 | FaultTolerantChunkProvider<I> chunkProvider = createChunkProvider(); |
179 | FaultTolerantChunkProcessor<I, O> chunkProcessor = createChunkProcessor(); |
180 | ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor); |
181 | tasklet.setBuffering(!isReaderTransactionalQueue()); |
182 | return tasklet; |
183 | } |
184 | |
185 | /** |
186 | * Registers objects using the annotation based listener configuration. |
187 | * |
188 | * @param listener the object that has a method configured with listener annotation |
189 | * @return this for fluent chaining |
190 | */ |
191 | @Override |
192 | @SuppressWarnings("unchecked") |
193 | public SimpleStepBuilder listener(Object listener) { |
194 | super.listener(listener); |
195 | |
196 | Set<Method> skipListenerMethods = new HashSet<Method>(); |
197 | skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInRead.class)); |
198 | skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class)); |
199 | skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class)); |
200 | |
201 | Set<Method> chunkListenerMethods = new HashSet<Method>(); |
202 | chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class)); |
203 | chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class)); |
204 | chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class)); |
205 | |
206 | if(skipListenerMethods.size() > 0) { |
207 | StepListenerFactoryBean factory = new StepListenerFactoryBean(); |
208 | factory.setDelegate(listener); |
209 | skipListeners.add((SkipListener) factory.getObject()); |
210 | } |
211 | |
212 | if(chunkListenerMethods.size() > 0) { |
213 | StepListenerFactoryBean factory = new StepListenerFactoryBean(); |
214 | factory.setDelegate(listener); |
215 | super.listener(new TerminateOnExceptionChunkListenerDelegate((ChunkListener) factory.getObject())); |
216 | } |
217 | |
218 | @SuppressWarnings("unchecked") |
219 | SimpleStepBuilder result = this; |
220 | return result; |
221 | } |
222 | |
223 | |
224 | /** |
225 | * Register a skip listener. |
226 | * |
227 | * @param listener the listener to register |
228 | * @return this for fluent chaining |
229 | */ |
230 | public FaultTolerantStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) { |
231 | skipListeners.add(listener); |
232 | return this; |
233 | } |
234 | |
235 | @Override |
236 | public FaultTolerantStepBuilder<I, O> listener(ChunkListener listener) { |
237 | super.listener(new TerminateOnExceptionChunkListenerDelegate(listener)); |
238 | return this; |
239 | } |
240 | |
241 | @Override |
242 | public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> transactionAttribute( |
243 | TransactionAttribute transactionAttribute) { |
244 | return super.transactionAttribute(getTransactionAttribute(transactionAttribute)); |
245 | } |
246 | |
247 | /** |
248 | * Register a retry listener. |
249 | * |
250 | * @param listener the listener to register |
251 | * @return this for fluent chaining |
252 | */ |
253 | public FaultTolerantStepBuilder<I, O> listener(RetryListener listener) { |
254 | retryListeners.add(listener); |
255 | return this; |
256 | } |
257 | |
258 | /** |
259 | * Sets the key generator for identifying retried items. Retry across transaction boundaries requires items to be |
260 | * identified when they are encountered again. The default strategy is to use the items themselves, relying on their |
261 | * own implementation to ensure that they can be identified. Often a key generator is not necessary as long as the |
262 | * items have reliable hash code and equals implementations, or the reader is not transactional (the default) and |
263 | * the item processor either is itself not transactional (not the default) or does not create new items. |
264 | * |
265 | * @param keyGenerator a key generator for the stateful retry |
266 | * @return this for fluent chaining |
267 | */ |
268 | public FaultTolerantStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) { |
269 | this.keyGenerator = keyGenerator; |
270 | return this; |
271 | } |
272 | |
273 | /** |
274 | * The maximum number of times to try a failed item. Zero and one both translate to try only once and do not retry. |
275 | * Ignored if an explicit {@link #retryPolicy} is set. |
276 | * |
277 | * @param retryLimit the retry limit (default 0) |
278 | * @return this for fluent chaining |
279 | */ |
280 | public FaultTolerantStepBuilder<I, O> retryLimit(int retryLimit) { |
281 | this.retryLimit = retryLimit; |
282 | return this; |
283 | } |
284 | |
285 | /** |
286 | * Provide an explicit retry policy instead of using the {@link #retryLimit(int)} and retryable exceptions provided |
287 | * elsewhere. Can be used to retry different exceptions a different number of times, for instance. |
288 | * |
289 | * @param retryPolicy a retry policy |
290 | * @return this for fluent chaining |
291 | */ |
292 | public FaultTolerantStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) { |
293 | this.retryPolicy = retryPolicy; |
294 | return this; |
295 | } |
296 | |
297 | /** |
298 | * Provide a backoff policy to prevent items being retried immediately (e.g. in case the failure was caused by a |
299 | * remote resource failure that might take some time to be resolved). Ignored if an explicit {@link #retryPolicy} is |
300 | * set. |
301 | * |
302 | * @param backOffPolicy the back off policy to use (default no backoff) |
303 | * @return this for fluent chaining |
304 | */ |
305 | public FaultTolerantStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) { |
306 | this.backOffPolicy = backOffPolicy; |
307 | return this; |
308 | } |
309 | |
310 | /** |
311 | * Provide an explicit retry context cache. Retry is stateful across transactions in the case of failures in item |
312 | * processing or writing, so some information about the context for subsequent retries has to be stored. |
313 | * |
314 | * @param retryContextCache cache for retry contexts in between transactions (default to standard in-memory |
315 | * implementation) |
316 | * @return this for fluent chaining |
317 | */ |
318 | public FaultTolerantStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) { |
319 | this.retryContextCache = retryContextCache; |
320 | return this; |
321 | } |
322 | |
323 | /** |
324 | * Sets the maximium number of failed items to skip before the step fails. Ignored if an explicit |
325 | * {@link #skipPolicy(SkipPolicy)} is provided. |
326 | * |
327 | * @param skipLimit the skip limit to set |
328 | * @return this for fluent chaining |
329 | */ |
330 | public FaultTolerantStepBuilder<I, O> skipLimit(int skipLimit) { |
331 | this.skipLimit = skipLimit; |
332 | return this; |
333 | } |
334 | |
335 | /** |
336 | * Explicitly prevent certain exceptions (and subclasses) from being skipped. |
337 | * |
338 | * @param type the non-skippable exception |
339 | * @return this for fluent chaining |
340 | */ |
341 | public FaultTolerantStepBuilder<I, O> noSkip(Class<? extends Throwable> type) { |
342 | skippableExceptionClasses.put(type, false); |
343 | return this; |
344 | } |
345 | |
346 | /** |
347 | * Explicitly request certain exceptions (and subclasses) to be skipped. |
348 | * |
349 | * @param type |
350 | * @return this for fluent chaining |
351 | */ |
352 | public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) { |
353 | skippableExceptionClasses.put(type, true); |
354 | return this; |
355 | } |
356 | |
357 | /** |
358 | * Provide an explicit policy for managing skips. A skip policy determines which exceptions are skippable and how |
359 | * many times. |
360 | * |
361 | * @param skipPolicy the skip policy |
362 | * @return this for fluent chaining |
363 | */ |
364 | public FaultTolerantStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) { |
365 | this.skipPolicy = skipPolicy; |
366 | return this; |
367 | } |
368 | |
369 | /** |
370 | * Mark this exception as ignorable during item read or processing operations. Processing continues with no |
371 | * additional callbacks (use skips instead if you need to be notified). Ignored during write because there is no |
372 | * guarantee of skip and retry without rollback. |
373 | * |
374 | * @param type the exception to mark as no rollback |
375 | * @return this for fluent chaining |
376 | */ |
377 | public FaultTolerantStepBuilder<I, O> noRollback(Class<? extends Throwable> type) { |
378 | noRollbackExceptionClasses.add(type); |
379 | return this; |
380 | } |
381 | |
382 | /** |
383 | * Explicitly ask for an exception (and subclasses) to be excluded from retry. |
384 | * |
385 | * @param type the exception to exclude from retry |
386 | * @return this for fluent chaining |
387 | */ |
388 | public FaultTolerantStepBuilder<I, O> noRetry(Class<? extends Throwable> type) { |
389 | retryableExceptionClasses.put(type, false); |
390 | return this; |
391 | } |
392 | |
393 | /** |
394 | * Explicitly ask for an exception (and subclasses) to be retried. |
395 | * |
396 | * @param type the exception to retry |
397 | * @return this for fluent chaining |
398 | */ |
399 | public FaultTolerantStepBuilder<I, O> retry(Class<? extends Throwable> type) { |
400 | retryableExceptionClasses.put(type, true); |
401 | return this; |
402 | } |
403 | |
404 | /** |
405 | * Mark the item processor as non-transactional (default is the opposite). If this flag is set the results of item |
406 | * processing are cached across transactions in between retries and during skip processing, otherwise the processor |
407 | * will be called in every transaction. |
408 | * |
409 | * @return this for fluent chaining |
410 | */ |
411 | public FaultTolerantStepBuilder<I, O> processorNonTransactional() { |
412 | this.processorTransactional = false; |
413 | return this; |
414 | } |
415 | |
416 | @Override |
417 | public AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> stream(ItemStream stream) { |
418 | if (stream instanceof ItemReader<?>) { |
419 | if (!streamIsReader) { |
420 | streamIsReader = true; |
421 | super.stream(chunkMonitor); |
422 | } |
423 | // In cases where multiple nested item readers are registered, |
424 | // they all want to get the open() and close() callbacks. |
425 | chunkMonitor.registerItemStream(stream); |
426 | } |
427 | else { |
428 | super.stream(stream); |
429 | } |
430 | return this; |
431 | } |
432 | |
433 | private FaultTolerantChunkProvider<I> createChunkProvider() { |
434 | |
435 | SkipPolicy readSkipPolicy = createSkipPolicy(); |
436 | readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy); |
437 | FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<I>(getReader(), |
438 | createChunkOperations()); |
439 | chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ)); |
440 | chunkProvider.setSkipPolicy(readSkipPolicy); |
441 | chunkProvider.setRollbackClassifier(getRollbackClassifier()); |
442 | ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners()); |
443 | listeners.addAll(skipListeners); |
444 | chunkProvider.setListeners(listeners); |
445 | |
446 | return chunkProvider; |
447 | |
448 | } |
449 | |
450 | private FaultTolerantChunkProcessor<I, O> createChunkProcessor() { |
451 | |
452 | BatchRetryTemplate batchRetryTemplate = createRetryOperations(); |
453 | |
454 | FaultTolerantChunkProcessor<I, O> chunkProcessor = new FaultTolerantChunkProcessor<I, O>(getProcessor(), |
455 | getWriter(), batchRetryTemplate); |
456 | chunkProcessor.setBuffering(!isReaderTransactionalQueue()); |
457 | chunkProcessor.setProcessorTransactional(processorTransactional); |
458 | |
459 | SkipPolicy writeSkipPolicy = createSkipPolicy(); |
460 | writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy); |
461 | chunkProcessor.setWriteSkipPolicy(writeSkipPolicy); |
462 | chunkProcessor.setProcessSkipPolicy(writeSkipPolicy); |
463 | chunkProcessor.setRollbackClassifier(getRollbackClassifier()); |
464 | chunkProcessor.setKeyGenerator(keyGenerator); |
465 | detectStreamInReader(); |
466 | |
467 | ArrayList<StepListener> listeners = new ArrayList<StepListener>(getItemListeners()); |
468 | listeners.addAll(skipListeners); |
469 | chunkProcessor.setListeners(listeners); |
470 | chunkProcessor.setChunkMonitor(chunkMonitor); |
471 | |
472 | return chunkProcessor; |
473 | |
474 | } |
475 | |
476 | @SuppressWarnings("unchecked") |
477 | private void addSpecialExceptions() { |
478 | addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, |
479 | SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class, |
480 | JobInterruptedException.class, Error.class); |
481 | addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class, |
482 | TransactionException.class, FatalStepExecutionException.class, SkipListenerFailedException.class, |
483 | SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class, Error.class); |
484 | } |
485 | |
486 | private void detectStreamInReader() { |
487 | if (streamIsReader) { |
488 | if (!concurrent()) { |
489 | chunkMonitor.setItemReader(getReader()); |
490 | } |
491 | else { |
492 | logger.warn("Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, " |
493 | + "and may lead to incorrect restart data being stored."); |
494 | } |
495 | } |
496 | } |
497 | |
498 | /** |
499 | * Register explicitly set item listeners and auto-register reader, processor and writer if applicable |
500 | */ |
501 | private void registerSkipListeners() { |
502 | |
503 | // auto-register reader, processor and writer |
504 | for (Object itemHandler : new Object[] { getReader(), getWriter(), getProcessor() }) { |
505 | |
506 | if (StepListenerFactoryBean.isListener(itemHandler)) { |
507 | StepListener listener = StepListenerFactoryBean.getListener(itemHandler); |
508 | if (listener instanceof SkipListener<?, ?>) { |
509 | @SuppressWarnings("unchecked") |
510 | SkipListener<? super I, ? super O> skipListener = (SkipListener<? super I, ? super O>) listener; |
511 | skipListeners.add(skipListener); |
512 | } |
513 | } |
514 | |
515 | } |
516 | } |
517 | |
518 | /** |
519 | * Convenience method to get an exception classifier based on the provided transaction attributes. |
520 | * |
521 | * @return an exception classifier: maps to true if an exception should cause rollback |
522 | */ |
523 | private Classifier<Throwable, Boolean> getRollbackClassifier() { |
524 | |
525 | Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false); |
526 | |
527 | // Try to avoid pathological cases where we cannot force a rollback |
528 | // (should be pretty uncommon): |
529 | if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException())) |
530 | || !classifier.classify(new ExhaustedRetryException("test"))) { |
531 | |
532 | final Classifier<Throwable, Boolean> binary = classifier; |
533 | |
534 | Collection<Class<? extends Throwable>> types = new HashSet<Class<? extends Throwable>>(); |
535 | types.add(ForceRollbackForWriteSkipException.class); |
536 | types.add(ExhaustedRetryException.class); |
537 | final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true); |
538 | |
539 | classifier = new Classifier<Throwable, Boolean>() { |
540 | @Override |
541 | public Boolean classify(Throwable classifiable) { |
542 | // Rollback if either the user's list or our own applies |
543 | return panic.classify(classifiable) || binary.classify(classifiable); |
544 | } |
545 | }; |
546 | |
547 | } |
548 | |
549 | return classifier; |
550 | |
551 | } |
552 | |
553 | @SuppressWarnings("serial") |
554 | private TransactionAttribute getTransactionAttribute(TransactionAttribute attribute) { |
555 | |
556 | final Classifier<Throwable, Boolean> classifier = getRollbackClassifier(); |
557 | return new DefaultTransactionAttribute(attribute) { |
558 | @Override |
559 | public boolean rollbackOn(Throwable ex) { |
560 | return classifier.classify(ex); |
561 | } |
562 | |
563 | }; |
564 | |
565 | } |
566 | |
567 | protected SkipPolicy createSkipPolicy() { |
568 | SkipPolicy skipPolicy = this.skipPolicy; |
569 | Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>( |
570 | skippableExceptionClasses); |
571 | map.put(ForceRollbackForWriteSkipException.class, true); |
572 | LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map); |
573 | if (skipPolicy == null) { |
574 | Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0), |
575 | "If a skip limit is provided then skippable exceptions must also be specified"); |
576 | skipPolicy = limitCheckingItemSkipPolicy; |
577 | } |
578 | else if (limitCheckingItemSkipPolicy != null) { |
579 | skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy }); |
580 | } |
581 | return skipPolicy; |
582 | } |
583 | |
584 | /** |
585 | * @return fully configured retry template for item processing phase. |
586 | */ |
587 | private BatchRetryTemplate createRetryOperations() { |
588 | |
589 | RetryPolicy retryPolicy = this.retryPolicy; |
590 | SimpleRetryPolicy simpleRetryPolicy = null; |
591 | |
592 | Map<Class<? extends Throwable>, Boolean> map = new HashMap<Class<? extends Throwable>, Boolean>( |
593 | retryableExceptionClasses); |
594 | map.put(ForceRollbackForWriteSkipException.class, true); |
595 | simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map); |
596 | |
597 | if (retryPolicy == null) { |
598 | Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0), |
599 | "If a retry limit is provided then retryable exceptions must also be specified"); |
600 | retryPolicy = simpleRetryPolicy; |
601 | } |
602 | else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) { |
603 | CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); |
604 | compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy }); |
605 | retryPolicy = compositeRetryPolicy; |
606 | } |
607 | |
608 | RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy); |
609 | |
610 | BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate(); |
611 | if (backOffPolicy != null) { |
612 | batchRetryTemplate.setBackOffPolicy(backOffPolicy); |
613 | } |
614 | batchRetryTemplate.setRetryPolicy(retryPolicyWrapper); |
615 | |
616 | // Co-ordinate the retry policy with the exception handler: |
617 | RepeatOperations stepOperations = getStepOperations(); |
618 | if (stepOperations instanceof RepeatTemplate) { |
619 | SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper, |
620 | getExceptionHandler(), nonRetryableExceptionClasses); |
621 | ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler); |
622 | } |
623 | |
624 | if (retryContextCache != null) { |
625 | batchRetryTemplate.setRetryContextCache(retryContextCache); |
626 | } |
627 | |
628 | if (retryListeners != null) { |
629 | batchRetryTemplate.setListeners(retryListeners.toArray(new RetryListener[0])); |
630 | } |
631 | return batchRetryTemplate; |
632 | |
633 | } |
634 | |
635 | /** |
636 | * Wrap the provided {@link org.springframework.retry.RetryPolicy} so that it never retries explicitly non-retryable |
637 | * exceptions. |
638 | */ |
639 | private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) { |
640 | |
641 | NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy(); |
642 | Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<Class<? extends Throwable>, RetryPolicy>(); |
643 | for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) { |
644 | map.put(fatal, neverRetryPolicy); |
645 | } |
646 | |
647 | SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<Throwable, RetryPolicy>( |
648 | retryPolicy); |
649 | classifier.setTypeMap(map); |
650 | |
651 | ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy(); |
652 | retryPolicyWrapper.setExceptionClassifier(classifier); |
653 | return retryPolicyWrapper; |
654 | |
655 | } |
656 | |
657 | /** |
658 | * Wrap a {@link SkipPolicy} and make it consistent with known fatal exceptions. |
659 | * |
660 | * @param skipPolicy an existing skip policy |
661 | * @return a skip policy that will not skip fatal exceptions |
662 | */ |
663 | private SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) { |
664 | |
665 | NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy(); |
666 | Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<Class<? extends Throwable>, SkipPolicy>(); |
667 | for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) { |
668 | map.put(fatal, neverSkipPolicy); |
669 | } |
670 | |
671 | SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<Throwable, SkipPolicy>(skipPolicy); |
672 | classifier.setTypeMap(map); |
673 | |
674 | ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy(); |
675 | skipPolicyWrapper.setExceptionClassifier(classifier); |
676 | return skipPolicyWrapper; |
677 | } |
678 | |
679 | private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) { |
680 | List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>(); |
681 | for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) { |
682 | exceptions.add(exceptionClass); |
683 | } |
684 | for (Class<? extends Throwable> fatal : cls) { |
685 | if (!exceptions.contains(fatal)) { |
686 | exceptions.add(fatal); |
687 | } |
688 | } |
689 | nonSkippableExceptionClasses = exceptions; |
690 | } |
691 | |
692 | private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) { |
693 | List<Class<? extends Throwable>> exceptions = new ArrayList<Class<? extends Throwable>>(); |
694 | for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) { |
695 | exceptions.add(exceptionClass); |
696 | } |
697 | for (Class<? extends Throwable> fatal : cls) { |
698 | if (!exceptions.contains(fatal)) { |
699 | exceptions.add(fatal); |
700 | } |
701 | } |
702 | nonRetryableExceptionClasses = exceptions; |
703 | } |
704 | |
705 | /** |
706 | * ChunkListener that wraps exceptions thrown from the ChunkListener in {@link FatalStepExecutionException} to force |
707 | * termination of StepExecution |
708 | * |
709 | * ChunkListeners shoulnd't throw exceptions and expect continued processing, they must be handled in the |
710 | * implementation or the step will terminate |
711 | * |
712 | */ |
713 | private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener { |
714 | |
715 | private ChunkListener chunkListener; |
716 | |
717 | TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) { |
718 | this.chunkListener = chunkListener; |
719 | } |
720 | |
721 | @Override |
722 | public void beforeChunk(ChunkContext context) { |
723 | try { |
724 | chunkListener.beforeChunk(context); |
725 | } |
726 | catch (Throwable t) { |
727 | throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t); |
728 | } |
729 | } |
730 | |
731 | @Override |
732 | public void afterChunk(ChunkContext context) { |
733 | try { |
734 | chunkListener.afterChunk(context); |
735 | } |
736 | catch (Throwable t) { |
737 | throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t); |
738 | } |
739 | } |
740 | |
741 | @Override |
742 | public void afterChunkError(ChunkContext context) { |
743 | try { |
744 | chunkListener.afterChunkError(context); |
745 | } |
746 | catch (Throwable t) { |
747 | throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t); |
748 | } |
749 | } |
750 | |
751 | @Override |
752 | public int hashCode() { |
753 | return chunkListener.hashCode(); |
754 | } |
755 | |
756 | @SuppressWarnings("unchecked") |
757 | @Override |
758 | public boolean equals(Object obj) { |
759 | if (obj instanceof FaultTolerantStepBuilder.TerminateOnExceptionChunkListenerDelegate){ |
760 | // unwrap the ChunkListener |
761 | obj = ((TerminateOnExceptionChunkListenerDelegate)obj).chunkListener; |
762 | } |
763 | return chunkListener.equals(obj); |
764 | } |
765 | |
766 | } |
767 | } |