1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.retry.support; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Arrays; |
21 | import java.util.Collections; |
22 | import java.util.List; |
23 | |
24 | import org.apache.commons.logging.Log; |
25 | import org.apache.commons.logging.LogFactory; |
26 | import org.springframework.batch.repeat.RepeatException; |
27 | import org.springframework.batch.retry.ExhaustedRetryException; |
28 | import org.springframework.batch.retry.RecoveryCallback; |
29 | import org.springframework.batch.retry.RetryCallback; |
30 | import org.springframework.batch.retry.RetryContext; |
31 | import org.springframework.batch.retry.RetryException; |
32 | import org.springframework.batch.retry.RetryListener; |
33 | import org.springframework.batch.retry.RetryOperations; |
34 | import org.springframework.batch.retry.RetryPolicy; |
35 | import org.springframework.batch.retry.RetryState; |
36 | import org.springframework.batch.retry.TerminatedRetryException; |
37 | import org.springframework.batch.retry.backoff.BackOffContext; |
38 | import org.springframework.batch.retry.backoff.BackOffInterruptedException; |
39 | import org.springframework.batch.retry.backoff.BackOffPolicy; |
40 | import org.springframework.batch.retry.backoff.NoBackOffPolicy; |
41 | import org.springframework.batch.retry.policy.MapRetryContextCache; |
42 | import org.springframework.batch.retry.policy.RetryContextCache; |
43 | import org.springframework.batch.retry.policy.SimpleRetryPolicy; |
44 | |
45 | /** |
46 | * Template class that simplifies the execution of operations with retry |
47 | * semantics. <br/> |
48 | * Retryable operations are encapsulated in implementations of the |
49 | * {@link RetryCallback} interface and are executed using one of the supplied |
50 | * execute methods. <br/> |
51 | * |
52 | * By default, an operation is retried if is throws any {@link Exception} or |
53 | * subclass of {@link Exception}. This behaviour can be changed by using the |
54 | * {@link #setRetryPolicy(RetryPolicy)} method. <br/> |
55 | * |
56 | * Also by default, each operation is retried for a maximum of three attempts |
57 | * with no back off in between. This behaviour can be configured using the |
58 | * {@link #setRetryPolicy(RetryPolicy)} and |
59 | * {@link #setBackOffPolicy(BackOffPolicy)} properties. The |
60 | * {@link org.springframework.batch.retry.backoff.BackOffPolicy} controls how |
61 | * long the pause is between each individual retry attempt. <br/> |
62 | * |
63 | * This class is thread-safe and suitable for concurrent access when executing |
64 | * operations and when performing configuration changes. As such, it is possible |
65 | * to change the number of retries on the fly, as well as the |
66 | * {@link BackOffPolicy} used and no in progress retryable operations will be |
67 | * affected. |
68 | * |
69 | * @author Rob Harrop |
70 | * @author Dave Syer |
71 | */ |
72 | public class RetryTemplate implements RetryOperations { |
73 | |
74 | protected final Log logger = LogFactory.getLog(getClass()); |
75 | |
76 | private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy(); |
77 | |
78 | private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3, Collections |
79 | .<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true)); |
80 | |
81 | private volatile RetryListener[] listeners = new RetryListener[0]; |
82 | |
83 | private RetryContextCache retryContextCache = new MapRetryContextCache(); |
84 | |
85 | /** |
86 | * Public setter for the {@link RetryContextCache}. |
87 | * |
88 | * @param retryContextCache the {@link RetryContextCache} to set. |
89 | */ |
90 | public void setRetryContextCache(RetryContextCache retryContextCache) { |
91 | this.retryContextCache = retryContextCache; |
92 | } |
93 | |
94 | /** |
95 | * Setter for listeners. The listeners are executed before and after a retry |
96 | * block (i.e. before and after all the attempts), and on an error (every |
97 | * attempt). |
98 | * |
99 | * @param listeners |
100 | * @see RetryListener |
101 | */ |
102 | public void setListeners(RetryListener[] listeners) { |
103 | this.listeners = Arrays.asList(listeners).toArray(new RetryListener[listeners.length]); |
104 | } |
105 | |
106 | /** |
107 | * Register an additional listener. |
108 | * |
109 | * @param listener |
110 | * @see #setListeners(RetryListener[]) |
111 | */ |
112 | public void registerListener(RetryListener listener) { |
113 | List<RetryListener> list = new ArrayList<RetryListener>(Arrays.asList(listeners)); |
114 | list.add(listener); |
115 | listeners = list.toArray(new RetryListener[list.size()]); |
116 | } |
117 | |
118 | /** |
119 | * Setter for {@link BackOffPolicy}. |
120 | * |
121 | * @param backOffPolicy |
122 | */ |
123 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
124 | this.backOffPolicy = backOffPolicy; |
125 | } |
126 | |
127 | /** |
128 | * Setter for {@link RetryPolicy}. |
129 | * |
130 | * @param retryPolicy |
131 | */ |
132 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
133 | this.retryPolicy = retryPolicy; |
134 | } |
135 | |
136 | /** |
137 | * Keep executing the callback until it either succeeds or the policy |
138 | * dictates that we stop, in which case the most recent exception thrown by |
139 | * the callback will be rethrown. |
140 | * |
141 | * @see RetryOperations#execute(RetryCallback) |
142 | * |
143 | * @throws TerminatedRetryException if the retry has been manually |
144 | * terminated by a listener. |
145 | */ |
146 | public final <T> T execute(RetryCallback<T> retryCallback) throws Exception { |
147 | return doExecute(retryCallback, null, null); |
148 | } |
149 | |
150 | /** |
151 | * Keep executing the callback until it either succeeds or the policy |
152 | * dictates that we stop, in which case the recovery callback will be |
153 | * executed. |
154 | * |
155 | * @see RetryOperations#execute(RetryCallback, RecoveryCallback) |
156 | * |
157 | * @throws TerminatedRetryException if the retry has been manually |
158 | * terminated by a listener. |
159 | */ |
160 | public final <T> T execute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback) throws Exception { |
161 | return doExecute(retryCallback, recoveryCallback, null); |
162 | } |
163 | |
164 | /** |
165 | * Execute the callback once if the policy dictates that we can, re-throwing |
166 | * any exception encountered so that clients can re-present the same task |
167 | * later. |
168 | * |
169 | * @see RetryOperations#execute(RetryCallback, RetryState) |
170 | * |
171 | * @throws ExhaustedRetryException if the retry has been exhausted. |
172 | */ |
173 | public final <T> T execute(RetryCallback<T> retryCallback, RetryState retryState) throws Exception, |
174 | ExhaustedRetryException { |
175 | return doExecute(retryCallback, null, retryState); |
176 | } |
177 | |
178 | /** |
179 | * Execute the callback once if the policy dictates that we can, re-throwing |
180 | * any exception encountered so that clients can re-present the same task |
181 | * later. |
182 | * |
183 | * @see RetryOperations#execute(RetryCallback, RetryState) |
184 | */ |
185 | public final <T> T execute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback, |
186 | RetryState retryState) throws Exception, ExhaustedRetryException { |
187 | return doExecute(retryCallback, recoveryCallback, retryState); |
188 | } |
189 | |
190 | /** |
191 | * Execute the callback once if the policy dictates that we can, otherwise |
192 | * execute the recovery callback. |
193 | * |
194 | * @see RetryOperations#execute(RetryCallback, RecoveryCallback, RetryState) |
195 | * @throws ExhaustedRetryException if the retry has been exhausted. |
196 | */ |
197 | protected <T> T doExecute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) |
198 | throws Exception, ExhaustedRetryException { |
199 | |
200 | RetryPolicy retryPolicy = this.retryPolicy; |
201 | BackOffPolicy backOffPolicy = this.backOffPolicy; |
202 | |
203 | // Allow the retry policy to initialise itself... |
204 | RetryContext context = open(retryPolicy, state); |
205 | if (logger.isTraceEnabled()) { |
206 | logger.trace("RetryContext retrieved: " + context); |
207 | } |
208 | |
209 | // Make sure the context is available globally for clients who need |
210 | // it... |
211 | RetrySynchronizationManager.register(context); |
212 | |
213 | Throwable lastException = null; |
214 | |
215 | try { |
216 | |
217 | // Give clients a chance to enhance the context... |
218 | boolean running = doOpenInterceptors(retryCallback, context); |
219 | |
220 | if (!running) { |
221 | throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt"); |
222 | } |
223 | |
224 | // Start the backoff context... |
225 | BackOffContext backOffContext = backOffPolicy.start(context); |
226 | |
227 | /* |
228 | * We allow the whole loop to be skipped if the policy or context |
229 | * already forbid the first try. This is used in the case of |
230 | * stateful retry to allow a recovery in handleRetryExhausted |
231 | * without the callback processing (which would throw an exception). |
232 | */ |
233 | while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { |
234 | |
235 | try { |
236 | logger.debug("Retry: count=" + context.getRetryCount()); |
237 | // Reset the last exception, so if we are successful |
238 | // the close interceptors will not think we failed... |
239 | lastException = null; |
240 | return retryCallback.doWithRetry(context); |
241 | } |
242 | catch (Throwable e) { |
243 | |
244 | lastException = e; |
245 | |
246 | doOnErrorInterceptors(retryCallback, context, e); |
247 | |
248 | try { |
249 | registerThrowable(retryPolicy, state, context, e); |
250 | } catch (Exception ex) { |
251 | throw new TerminatedRetryException("Terminated retry after error in policy", ex); |
252 | } |
253 | |
254 | if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { |
255 | try { |
256 | backOffPolicy.backOff(backOffContext); |
257 | } |
258 | catch (BackOffInterruptedException ex) { |
259 | lastException = e; |
260 | // back off was prevented by another thread - fail |
261 | // the retry |
262 | logger.debug("Abort retry because interrupted: count=" + context.getRetryCount()); |
263 | throw ex; |
264 | } |
265 | } |
266 | |
267 | logger.debug("Checking for rethrow: count=" + context.getRetryCount()); |
268 | if (shouldRethrow(retryPolicy, context, state)) { |
269 | logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); |
270 | throw wrapIfNecessary(e); |
271 | } |
272 | |
273 | } |
274 | |
275 | /* |
276 | * A stateful attempt that can retry should have rethrown the |
277 | * exception by now - i.e. we shouldn't get this far for a |
278 | * stateful attempt if it can retry. |
279 | */ |
280 | } |
281 | |
282 | logger.debug("Retry failed last attempt: count=" + context.getRetryCount()); |
283 | |
284 | if (context.isExhaustedOnly()) { |
285 | throw new ExhaustedRetryException("Retry exhausted after last attempt with no recovery path.", context |
286 | .getLastThrowable()); |
287 | } |
288 | |
289 | return handleRetryExhausted(recoveryCallback, context, state); |
290 | |
291 | } |
292 | finally { |
293 | close(retryPolicy, context, state, lastException == null); |
294 | doCloseInterceptors(retryCallback, context, lastException); |
295 | RetrySynchronizationManager.clear(); |
296 | } |
297 | |
298 | } |
299 | |
300 | /** |
301 | * Decide whether to proceed with the ongoing retry attempt. This method is |
302 | * called before the {@link RetryCallback} is executed, but after the |
303 | * backoff and open interceptors. |
304 | * |
305 | * @param retryPolicy the policy to apply |
306 | * @param context the current retry context |
307 | * @return true if we can continue with the attempt |
308 | */ |
309 | protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) { |
310 | return retryPolicy.canRetry(context); |
311 | } |
312 | |
313 | /** |
314 | * Clean up the cache if necessary and close the context provided (if the |
315 | * flag indicates that processing was successful). |
316 | * |
317 | * @param context |
318 | * @param state |
319 | * @param succeeded |
320 | */ |
321 | protected void close(RetryPolicy retryPolicy, RetryContext context, RetryState state, boolean succeeded) { |
322 | if (state != null) { |
323 | if (succeeded) { |
324 | retryContextCache.remove(state.getKey()); |
325 | retryPolicy.close(context); |
326 | } |
327 | } |
328 | else { |
329 | retryPolicy.close(context); |
330 | } |
331 | } |
332 | |
333 | /** |
334 | * @param retryPolicy |
335 | * @param state |
336 | * @param context |
337 | * @param e |
338 | */ |
339 | protected void registerThrowable(RetryPolicy retryPolicy, RetryState state, RetryContext context, Throwable e) { |
340 | if (state != null) { |
341 | Object key = state.getKey(); |
342 | if (context.getRetryCount() > 0 && !retryContextCache.containsKey(key)) { |
343 | throw new RetryException("Inconsistent state for failed item key: cache key has changed. " |
344 | + "Consider whether equals() or hashCode() for the key might be inconsistent, " |
345 | + "or if you need to supply a better key"); |
346 | } |
347 | retryContextCache.put(key, context); |
348 | } |
349 | retryPolicy.registerThrowable(context, e); |
350 | } |
351 | |
352 | /** |
353 | * Delegate to the {@link RetryPolicy} having checked in the cache for an |
354 | * existing value if the state is not null. |
355 | * |
356 | * @param retryPolicy a {@link RetryPolicy} to delegate the context creation |
357 | * @return a retry context, either a new one or the one used last time the |
358 | * same state was encountered |
359 | */ |
360 | protected RetryContext open(RetryPolicy retryPolicy, RetryState state) { |
361 | |
362 | if (state == null) { |
363 | return doOpenInternal(retryPolicy); |
364 | } |
365 | |
366 | Object key = state.getKey(); |
367 | if (state.isForceRefresh()) { |
368 | return doOpenInternal(retryPolicy); |
369 | } |
370 | |
371 | // If there is no cache hit we can avoid the possible expense of the |
372 | // cache re-hydration. |
373 | if (!retryContextCache.containsKey(key)) { |
374 | // The cache is only used if there is a failure. |
375 | return doOpenInternal(retryPolicy); |
376 | } |
377 | |
378 | RetryContext context = retryContextCache.get(key); |
379 | if (context == null) { |
380 | if (retryContextCache.containsKey(key)) { |
381 | throw new RetryException("Inconsistent state for failed item: no history found. " |
382 | + "Consider whether equals() or hashCode() for the item might be inconsistent, " |
383 | + "or if you need to supply a better ItemKeyGenerator"); |
384 | } |
385 | // The cache could have been expired in between calls to |
386 | // containsKey(), so we have to live with this: |
387 | return doOpenInternal(retryPolicy); |
388 | } |
389 | |
390 | return context; |
391 | |
392 | } |
393 | |
394 | /** |
395 | * @param retryPolicy |
396 | * @return |
397 | */ |
398 | private RetryContext doOpenInternal(RetryPolicy retryPolicy) { |
399 | return retryPolicy.open(RetrySynchronizationManager.getContext()); |
400 | } |
401 | |
402 | /** |
403 | * Actions to take after final attempt has failed. If there is state clean |
404 | * up the cache. If there is a recovery callback, execute that and return |
405 | * its result. Otherwise throw an exception. |
406 | * |
407 | * @param recoveryCallback the callback for recovery (might be null) |
408 | * @param context the current retry context |
409 | * @throws Exception if the callback does, and if there is no callback and |
410 | * the state is null then the last exception from the context |
411 | * @throws ExhaustedRetryException if the state is not null and there is no |
412 | * recovery callback |
413 | */ |
414 | protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback, RetryContext context, RetryState state) |
415 | throws Exception { |
416 | if (state != null) { |
417 | retryContextCache.remove(state.getKey()); |
418 | } |
419 | if (recoveryCallback != null) { |
420 | return recoveryCallback.recover(context); |
421 | } |
422 | if (state != null) { |
423 | logger.debug("Retry exhausted after last attempt with no recovery path."); |
424 | throw new ExhaustedRetryException("Retry exhausted after last attempt with no recovery path", context |
425 | .getLastThrowable()); |
426 | } |
427 | throw wrapIfNecessary(context.getLastThrowable()); |
428 | } |
429 | |
430 | /** |
431 | * Extension point for subclasses to decide on behaviour after catching an |
432 | * exception in a {@link RetryCallback}. Normal stateless behaviour is not |
433 | * to rethrow, and if there is state we rethrow. |
434 | * |
435 | * @param retryPolicy |
436 | * @param context the current context |
437 | * |
438 | * @return true if the state is not null but subclasses might choose |
439 | * otherwise |
440 | */ |
441 | protected boolean shouldRethrow(RetryPolicy retryPolicy, RetryContext context, RetryState state) { |
442 | if (state == null) { |
443 | return false; |
444 | } |
445 | else { |
446 | return state.rollbackFor(context.getLastThrowable()); |
447 | } |
448 | } |
449 | |
450 | private <T> boolean doOpenInterceptors(RetryCallback<T> callback, RetryContext context) { |
451 | |
452 | boolean result = true; |
453 | |
454 | for (int i = 0; i < listeners.length; i++) { |
455 | result = result && listeners[i].open(context, callback); |
456 | } |
457 | |
458 | return result; |
459 | |
460 | } |
461 | |
462 | private <T> void doCloseInterceptors(RetryCallback<T> callback, RetryContext context, Throwable lastException) { |
463 | for (int i = listeners.length; i-- > 0;) { |
464 | listeners[i].close(context, callback, lastException); |
465 | } |
466 | } |
467 | |
468 | private <T> void doOnErrorInterceptors(RetryCallback<T> callback, RetryContext context, Throwable throwable) { |
469 | for (int i = listeners.length; i-- > 0;) { |
470 | listeners[i].onError(context, callback, throwable); |
471 | } |
472 | } |
473 | |
474 | /** |
475 | * Re-throws the original throwable if it is unchecked, wraps checked |
476 | * exceptions into {@link RepeatException}. |
477 | */ |
478 | private static Exception wrapIfNecessary(Throwable throwable) { |
479 | if (throwable instanceof Error) { |
480 | throw (Error) throwable; |
481 | } |
482 | else if (throwable instanceof Exception) { |
483 | return (Exception) throwable; |
484 | } |
485 | else { |
486 | return new RetryException("Exception in batch process", throwable); |
487 | } |
488 | } |
489 | |
490 | } |