1 | package org.springframework.batch.core.step.item; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Arrays; |
5 | import java.util.HashMap; |
6 | import java.util.List; |
7 | |
8 | import org.springframework.batch.core.SkipListener; |
9 | import org.springframework.batch.core.StepContribution; |
10 | import org.springframework.batch.core.listener.CompositeSkipListener; |
11 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
12 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
13 | import org.springframework.batch.core.step.skip.NonSkippableException; |
14 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
15 | import org.springframework.batch.item.ItemKeyGenerator; |
16 | import org.springframework.batch.item.ItemReader; |
17 | import org.springframework.batch.item.ItemWriter; |
18 | import org.springframework.batch.retry.RecoveryCallback; |
19 | import org.springframework.batch.retry.RetryCallback; |
20 | import org.springframework.batch.retry.RetryContext; |
21 | import org.springframework.batch.retry.RetryException; |
22 | import org.springframework.batch.retry.RetryListener; |
23 | import org.springframework.batch.retry.RetryOperations; |
24 | import org.springframework.batch.retry.RetryPolicy; |
25 | import org.springframework.batch.retry.backoff.BackOffPolicy; |
26 | import org.springframework.batch.retry.callback.RecoveryRetryCallback; |
27 | import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy; |
28 | import org.springframework.batch.retry.policy.MapRetryContextCache; |
29 | import org.springframework.batch.retry.policy.NeverRetryPolicy; |
30 | import org.springframework.batch.retry.policy.RecoveryCallbackRetryPolicy; |
31 | import org.springframework.batch.retry.policy.SimpleRetryPolicy; |
32 | import org.springframework.batch.retry.support.RetryTemplate; |
33 | import org.springframework.batch.support.SubclassExceptionClassifier; |
34 | import org.springframework.util.Assert; |
35 | |
36 | /** |
37 | * Factory bean for step that provides options for configuring skip behavior. |
38 | * User can set {@link #setSkipLimit(int)} to set how many exceptions of |
39 | * {@link #setSkippableExceptionClasses(Class[])} types are tolerated. |
40 | * {@link #setFatalExceptionClasses(Class[])} will cause immediate termination |
41 | * of job - they are treated as higher priority than |
42 | * {@link #setSkippableExceptionClasses(Class[])}, so the two lists don't need |
43 | * to be exclusive. |
44 | * |
45 | * Skippable exceptions on write will by default cause transaction rollback - to |
46 | * avoid rollback for specific exception class include it in the transaction |
47 | * attribute as "no rollback for". |
48 | * |
49 | * @see SimpleStepFactoryBean |
50 | * |
51 | * @author Dave Syer |
52 | * @author Robert Kasanicky |
53 | * |
54 | */ |
55 | public class SkipLimitStepFactoryBean extends SimpleStepFactoryBean { |
56 | |
57 | private int skipLimit = 0; |
58 | |
59 | private Class[] skippableExceptionClasses = new Class[] { Exception.class }; |
60 | |
61 | private Class[] fatalExceptionClasses = new Class[] { Error.class }; |
62 | |
63 | private ItemKeyGenerator itemKeyGenerator; |
64 | |
65 | private int cacheCapacity = 0; |
66 | |
67 | private int retryLimit = 1; |
68 | |
69 | private Class[] retryableExceptionClasses = new Class[] {}; |
70 | |
71 | private BackOffPolicy backOffPolicy; |
72 | |
73 | private RetryListener[] retryListeners; |
74 | |
75 | private RetryPolicy retryPolicy; |
76 | |
77 | /** |
78 | * Setter for the retry policy. If this is specified the other retry |
79 | * properties are ignored (retryLimit, backOffPolicy, |
80 | * retryableExceptionClasses). |
81 | * |
82 | * @param retryPolicy a stateless {@link RetryPolicy} |
83 | */ |
84 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
85 | this.retryPolicy = retryPolicy; |
86 | } |
87 | |
88 | /** |
89 | * Public setter for the retry limit. Each item can be retried up to this |
90 | * limit. Note the limit includes the initial attempt, so it must be greater |
91 | * or equal to 1. |
92 | * |
93 | * @param retryLimit the retry limit to set |
94 | */ |
95 | public void setRetryLimit(int retryLimit) { |
96 | Assert.isTrue(retryLimit >= 1, "retry limit must be greater or equal to 1"); |
97 | this.retryLimit = retryLimit; |
98 | } |
99 | |
100 | /** |
101 | * Public setter for the capacity of the cache in the retry policy. If more |
102 | * items than this fail without being skipped or recovered an exception will |
103 | * be thrown. This is to guard against inadvertent infinite loops generated |
104 | * by item identity problems. If a large number of items are failing and not |
105 | * being recognized as skipped, it usually signals a problem with the key |
106 | * generation (often equals and hashCode in the item itself). So it is |
107 | * better to enforce a strict limit than have weird looking errors, where a |
108 | * skip limit is reached without anything being skipped.<br/> |
109 | * |
110 | * The default value should be high enough and more for most purposes. To |
111 | * breach the limit in a single-threaded step typically you have to have |
112 | * this many failures in a single transaction. Defaults to the value in the |
113 | * {@link MapRetryContextCache}. |
114 | * |
115 | * @param cacheCapacity the cacheCapacity to set |
116 | */ |
117 | public void setCacheCapacity(int cacheCapacity) { |
118 | this.cacheCapacity = cacheCapacity; |
119 | } |
120 | |
121 | /** |
122 | * Public setter for the Class[]. |
123 | * |
124 | * @param retryableExceptionClasses the retryableExceptionClasses to set |
125 | */ |
126 | public void setRetryableExceptionClasses(Class[] retryableExceptionClasses) { |
127 | this.retryableExceptionClasses = retryableExceptionClasses; |
128 | } |
129 | |
130 | /** |
131 | * Public setter for the {@link BackOffPolicy}. |
132 | * |
133 | * @param backOffPolicy the {@link BackOffPolicy} to set |
134 | */ |
135 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
136 | this.backOffPolicy = backOffPolicy; |
137 | } |
138 | |
139 | /** |
140 | * Public setter for the {@link RetryListener}s. |
141 | * |
142 | * @param retryListeners the {@link RetryListener}s to set |
143 | */ |
144 | public void setRetryListeners(RetryListener[] retryListeners) { |
145 | this.retryListeners = retryListeners; |
146 | } |
147 | |
148 | /** |
149 | * Public setter for a limit that determines skip policy. If this value is |
150 | * positive then an exception in chunk processing will cause the item to be |
151 | * skipped and no exception propagated until the limit is reached. If it is |
152 | * zero then all exceptions will be propagated from the chunk and cause the |
153 | * step to abort. |
154 | * |
155 | * Note that if chunks are executed concurrently the number of skips can |
156 | * potentially exceed the skip limit and step can still finish successfully. |
157 | * This is due to the fact that overall skip count is not being synchronized |
158 | * between concurrent chunks while they processing, only on chunk |
159 | * boundaries. |
160 | * |
161 | * @param skipLimit the value to set. Default is 0 (never skip). |
162 | */ |
163 | public void setSkipLimit(int skipLimit) { |
164 | this.skipLimit = skipLimit; |
165 | } |
166 | |
167 | /** |
168 | * Public setter for exception classes that when raised won't crash the job |
169 | * but will result in transaction rollback and the item which handling |
170 | * caused the exception will be skipped. |
171 | * |
172 | * @param exceptionClasses defaults to <code>Exception</code> |
173 | */ |
174 | public void setSkippableExceptionClasses(Class[] exceptionClasses) { |
175 | this.skippableExceptionClasses = exceptionClasses; |
176 | } |
177 | |
178 | /** |
179 | * Public setter for exception classes that should cause immediate failure. |
180 | * |
181 | * @param fatalExceptionClasses {@link Error} by default |
182 | */ |
183 | public void setFatalExceptionClasses(Class[] fatalExceptionClasses) { |
184 | this.fatalExceptionClasses = fatalExceptionClasses; |
185 | } |
186 | |
187 | /** |
188 | * Public setter for the {@link ItemKeyGenerator}. This is used to identify |
189 | * failed items so they can be skipped if encountered again, generally in |
190 | * another transaction. |
191 | * |
192 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. |
193 | */ |
194 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
195 | this.itemKeyGenerator = itemKeyGenerator; |
196 | } |
197 | |
198 | /** |
199 | * Uses the {@link #setSkipLimit(int)} value to configure item handler and |
200 | * and exception handler. |
201 | */ |
202 | protected void applyConfiguration(ItemOrientedStep step) { |
203 | super.applyConfiguration(step); |
204 | |
205 | if (retryLimit > 1 || skipLimit > 0 || retryPolicy != null) { |
206 | |
207 | addFatalExceptionIfMissing(SkipLimitExceededException.class); |
208 | addFatalExceptionIfMissing(NonSkippableException.class); |
209 | addFatalExceptionIfMissing(RetryException.class); |
210 | |
211 | if (retryPolicy == null) { |
212 | |
213 | SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(retryLimit); |
214 | if (retryableExceptionClasses.length > 0) { // otherwise we |
215 | // retry |
216 | // all exceptions |
217 | simpleRetryPolicy.setRetryableExceptionClasses(retryableExceptionClasses); |
218 | } |
219 | simpleRetryPolicy.setFatalExceptionClasses(fatalExceptionClasses); |
220 | |
221 | ExceptionClassifierRetryPolicy classifierRetryPolicy = new ExceptionClassifierRetryPolicy(); |
222 | SubclassExceptionClassifier exceptionClassifier = new SubclassExceptionClassifier(); |
223 | HashMap exceptionTypeMap = new HashMap(); |
224 | for (int i = 0; i < retryableExceptionClasses.length; i++) { |
225 | Class cls = retryableExceptionClasses[i]; |
226 | exceptionTypeMap.put(cls, "retry"); |
227 | } |
228 | exceptionClassifier.setTypeMap(exceptionTypeMap); |
229 | HashMap retryPolicyMap = new HashMap(); |
230 | retryPolicyMap.put("retry", simpleRetryPolicy); |
231 | retryPolicyMap.put("default", new NeverRetryPolicy()); |
232 | classifierRetryPolicy.setPolicyMap(retryPolicyMap); |
233 | classifierRetryPolicy.setExceptionClassifier(exceptionClassifier); |
234 | retryPolicy = classifierRetryPolicy; |
235 | |
236 | } |
237 | |
238 | // Co-ordinate the retry policy with the exception handler: |
239 | getStepOperations().setExceptionHandler( |
240 | new SimpleRetryExceptionHandler(retryPolicy, getExceptionHandler(), fatalExceptionClasses)); |
241 | |
242 | RecoveryCallbackRetryPolicy recoveryCallbackRetryPolicy = new RecoveryCallbackRetryPolicy(retryPolicy) { |
243 | protected boolean recoverForException(Throwable ex) { |
244 | return !getTransactionAttribute().rollbackOn(ex); |
245 | } |
246 | }; |
247 | if (cacheCapacity > 0) { |
248 | recoveryCallbackRetryPolicy.setRetryContextCache(new MapRetryContextCache(cacheCapacity)); |
249 | } |
250 | |
251 | RetryTemplate retryTemplate = new RetryTemplate(); |
252 | if (retryListeners != null) { |
253 | retryTemplate.setListeners(retryListeners); |
254 | } |
255 | retryTemplate.setRetryPolicy(recoveryCallbackRetryPolicy); |
256 | if (backOffPolicy != null) { |
257 | retryTemplate.setBackOffPolicy(backOffPolicy); |
258 | } |
259 | |
260 | List exceptions = new ArrayList(Arrays.asList(skippableExceptionClasses)); |
261 | ItemSkipPolicy readSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, exceptions, Arrays |
262 | .asList(fatalExceptionClasses)); |
263 | exceptions.addAll(Arrays.asList(retryableExceptionClasses)); |
264 | ItemSkipPolicy writeSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, exceptions, Arrays |
265 | .asList(fatalExceptionClasses)); |
266 | StatefulRetryItemHandler itemHandler = new StatefulRetryItemHandler(getItemReader(), getItemWriter(), |
267 | retryTemplate, itemKeyGenerator, readSkipPolicy, writeSkipPolicy); |
268 | itemHandler.setSkipListeners(BatchListenerFactoryHelper.getSkipListeners(getListeners())); |
269 | |
270 | step.setItemHandler(itemHandler); |
271 | |
272 | } |
273 | else { |
274 | // This is the default in ItemOrientedStep anyway... |
275 | step.setItemHandler(new SimpleItemHandler(getItemReader(), getItemWriter())); |
276 | } |
277 | |
278 | } |
279 | |
280 | public void addFatalExceptionIfMissing(Class cls) { |
281 | List fatalExceptionList = new ArrayList(Arrays.asList(fatalExceptionClasses)); |
282 | if (!fatalExceptionList.contains(cls)) { |
283 | fatalExceptionList.add(cls); |
284 | } |
285 | fatalExceptionClasses = (Class[]) fatalExceptionList.toArray(new Class[0]); |
286 | } |
287 | |
288 | /** |
289 | * If there is an exception on input it is skipped if allowed. If there is |
290 | * an exception on output, it will be re-thrown in any case, and the |
291 | * behaviour when the item is next encountered depends on the retryable and |
292 | * skippable exception configuration. If the exception is retryable the |
293 | * write will be attempted again up to the retry limit. When retry attempts |
294 | * are exhausted the skip listener is invoked and the skip count |
295 | * incremented. A retryable exception is thus also effectively also |
296 | * implicitly skippable. |
297 | * |
298 | * @author Dave Syer |
299 | * |
300 | */ |
301 | private static class StatefulRetryItemHandler extends SimpleItemHandler { |
302 | |
303 | final private RetryOperations retryOperations; |
304 | |
305 | final private ItemKeyGenerator itemKeyGenerator; |
306 | |
307 | final private CompositeSkipListener listener = new CompositeSkipListener(); |
308 | |
309 | final private ItemSkipPolicy readSkipPolicy; |
310 | |
311 | final private ItemSkipPolicy writeSkipPolicy; |
312 | |
313 | /** |
314 | * @param itemReader |
315 | * @param itemWriter |
316 | * @param retryTemplate |
317 | * @param itemKeyGenerator |
318 | */ |
319 | public StatefulRetryItemHandler(ItemReader itemReader, ItemWriter itemWriter, RetryOperations retryTemplate, |
320 | ItemKeyGenerator itemKeyGenerator, ItemSkipPolicy readSkipPolicy, ItemSkipPolicy writeSkipPolicy) { |
321 | super(itemReader, itemWriter); |
322 | this.retryOperations = retryTemplate; |
323 | this.itemKeyGenerator = itemKeyGenerator; |
324 | this.readSkipPolicy = readSkipPolicy; |
325 | this.writeSkipPolicy = writeSkipPolicy; |
326 | } |
327 | |
328 | /** |
329 | * Register some {@link SkipListener}s with the handler. Each will get |
330 | * the callbacks in the order specified at the correct stage if a skip |
331 | * occurs. |
332 | * |
333 | * @param listeners |
334 | */ |
335 | public void setSkipListeners(SkipListener[] listeners) { |
336 | for (int i = 0; i < listeners.length; i++) { |
337 | registerSkipListener(listeners[i]); |
338 | } |
339 | } |
340 | |
341 | /** |
342 | * Register a listener for callbacks at the appropriate stages in a skip |
343 | * process. |
344 | * |
345 | * @param listener a {@link SkipListener} |
346 | */ |
347 | public void registerSkipListener(SkipListener listener) { |
348 | this.listener.register(listener); |
349 | } |
350 | |
351 | /** |
352 | * Tries to read the item from the reader, in case of exception skip the |
353 | * item if the skip policy allows, otherwise re-throw. |
354 | * |
355 | * @param contribution current StepContribution holding skipped items |
356 | * count |
357 | * @return next item for processing |
358 | */ |
359 | protected Object read(StepContribution contribution) throws Exception { |
360 | |
361 | while (true) { |
362 | try { |
363 | return doRead(); |
364 | } |
365 | catch (Exception e) { |
366 | try { |
367 | if (readSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
368 | // increment skip count and try again |
369 | contribution.incrementTemporaryReadSkipCount(); |
370 | onSkipInRead(e); |
371 | logger.debug("Skipping failed input", e); |
372 | } |
373 | else { |
374 | throw new NonSkippableException("Non-skippable exception during read", e); |
375 | } |
376 | } |
377 | catch (SkipLimitExceededException ex) { |
378 | // we are headed for a abnormal ending so bake in the |
379 | // skip count |
380 | contribution.combineSkipCounts(); |
381 | throw ex; |
382 | } |
383 | } |
384 | } |
385 | |
386 | } |
387 | |
388 | /** |
389 | * Execute the business logic, delegating to the writer.<br/> |
390 | * |
391 | * Process the item with the {@link ItemWriter} in a stateful retry. Any |
392 | * {@link SkipListener} provided is called when retry attempts are |
393 | * exhausted. The listener callback (on write failure) will happen in |
394 | * the next transaction automatically.<br/> |
395 | * |
396 | * @see org.springframework.batch.core.step.item.SimpleItemHandler#write(java.lang.Object, |
397 | * org.springframework.batch.core.StepContribution) |
398 | */ |
399 | protected void write(final Object item, final StepContribution contribution) throws Exception { |
400 | RecoveryRetryCallback retryCallback = new RecoveryRetryCallback(item, new RetryCallback() { |
401 | public Object doWithRetry(RetryContext context) throws Throwable { |
402 | doWrite(item); |
403 | return null; |
404 | } |
405 | }, itemKeyGenerator != null ? itemKeyGenerator.getKey(item) : item); |
406 | retryCallback.setRecoveryCallback(new RecoveryCallback() { |
407 | public Object recover(RetryContext context) { |
408 | Throwable t = context.getLastThrowable(); |
409 | if (writeSkipPolicy.shouldSkip(t, contribution.getStepSkipCount())) { |
410 | listener.onSkipInWrite(item, t); |
411 | } |
412 | else { |
413 | throw new NonSkippableException("Non-skippable exception on write", t); |
414 | } |
415 | contribution.incrementWriteSkipCount(); |
416 | return null; |
417 | } |
418 | }); |
419 | retryOperations.execute(retryCallback); |
420 | } |
421 | |
422 | private void onSkipInRead(Exception e) { |
423 | |
424 | try { |
425 | listener.onSkipInRead(e); |
426 | } |
427 | catch (Exception ex) { |
428 | logger.debug("Error in SkipListener onSkipInReader encountered and ignored.", ex); |
429 | } |
430 | } |
431 | |
432 | } |
433 | |
434 | } |