1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.repeat.support;
18
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.springframework.batch.repeat.CompletionPolicy;
27 import org.springframework.batch.repeat.RepeatCallback;
28 import org.springframework.batch.repeat.RepeatContext;
29 import org.springframework.batch.repeat.RepeatException;
30 import org.springframework.batch.repeat.RepeatListener;
31 import org.springframework.batch.repeat.RepeatOperations;
32 import org.springframework.batch.repeat.RepeatStatus;
33 import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34 import org.springframework.batch.repeat.exception.ExceptionHandler;
35 import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36 import org.springframework.util.Assert;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class RepeatTemplate implements RepeatOperations {
66
67 protected Log logger = LogFactory.getLog(getClass());
68
69 private RepeatListener[] listeners = new RepeatListener[] {};
70
71 private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72
73 private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74
75
76
77
78
79
80
81 public void setListeners(RepeatListener[] listeners) {
82 this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
83 }
84
85
86
87
88
89
90 public void registerListener(RepeatListener listener) {
91 List<RepeatListener> list = new ArrayList<RepeatListener>(Arrays.asList(listeners));
92 list.add(listener);
93 listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108 this.exceptionHandler = exceptionHandler;
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124 Assert.notNull(terminationPolicy);
125 this.completionPolicy = terminationPolicy;
126 }
127
128
129
130
131
132
133
134
135 public RepeatStatus iterate(RepeatCallback callback) {
136
137 RepeatContext outer = RepeatSynchronizationManager.getContext();
138
139 RepeatStatus result = RepeatStatus.CONTINUABLE;
140 try {
141
142
143 result = executeInternal(callback);
144 }
145 finally {
146 RepeatSynchronizationManager.clear();
147 if (outer != null) {
148 RepeatSynchronizationManager.register(outer);
149 }
150 }
151
152 return result;
153 }
154
155
156
157
158
159
160
161
162
163
164
165 private RepeatStatus executeInternal(final RepeatCallback callback) {
166
167
168 RepeatContext context = start();
169
170
171
172 boolean running = !isMarkedComplete(context);
173
174 for (int i = 0; i < listeners.length; i++) {
175 RepeatListener interceptor = listeners[i];
176 interceptor.open(context);
177 running = running && !isMarkedComplete(context);
178 if (!running)
179 break;
180 }
181
182
183 RepeatStatus result = RepeatStatus.CONTINUABLE;
184
185 RepeatInternalState state = createInternalState(context);
186
187 Collection<Throwable> throwables = state.getThrowables();
188
189
190 Collection<Throwable> deferred = new ArrayList<Throwable>();
191
192 try {
193
194 while (running) {
195
196
197
198
199
200
201 for (int i = 0; i < listeners.length; i++) {
202 RepeatListener interceptor = listeners[i];
203 interceptor.before(context);
204
205
206 running = running && !isMarkedComplete(context);
207 }
208
209
210 if (running) {
211
212 try {
213
214 result = getNextResult(context, callback, state);
215 executeAfterInterceptors(context, result);
216
217 }
218 catch (Throwable throwable) {
219 doHandle(throwable, context, deferred);
220 }
221
222
223 if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
224 running = false;
225 }
226
227 }
228
229 }
230
231 result = result.and(waitForResults(state));
232 for (Throwable throwable : throwables) {
233 doHandle(throwable, context, deferred);
234 }
235
236
237 state = null;
238
239 }
240
241
242
243
244
245 finally {
246
247 try {
248
249 if (!deferred.isEmpty()) {
250 Throwable throwable = (Throwable) deferred.iterator().next();
251 logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
252 + throwable.getClass().getName() + ": " + throwable.getMessage());
253 rethrow(throwable);
254 }
255
256 }
257 finally {
258
259 try {
260 for (int i = listeners.length; i-- > 0;) {
261 RepeatListener interceptor = listeners[i];
262 interceptor.close(context);
263 }
264 }
265 finally {
266 context.close();
267 }
268
269 }
270
271 }
272
273 return result;
274
275 }
276
277 private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
278
279
280 Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
281 try {
282
283 for (int i = listeners.length; i-- > 0;) {
284 RepeatListener interceptor = listeners[i];
285
286
287 logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
288 interceptor.onError(context, unwrappedThrowable);
289 }
290
291 logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
292 + unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
293 exceptionHandler.handleException(context, unwrappedThrowable);
294
295 }
296 catch (Throwable handled) {
297 deferred.add(handled);
298 }
299 }
300
301
302
303
304
305 private static void rethrow(Throwable throwable) throws RuntimeException {
306 if (throwable instanceof Error) {
307 throw (Error) throwable;
308 }
309 else if (throwable instanceof RuntimeException) {
310 throw (RuntimeException) throwable;
311 }
312 else {
313 throw new RepeatException("Exception in batch process", throwable);
314 }
315 }
316
317
318
319
320
321 private static Throwable unwrapIfRethrown(Throwable throwable) {
322 if (throwable instanceof RepeatException) {
323 return throwable.getCause();
324 }
325 else {
326 return throwable;
327 }
328 }
329
330
331
332
333
334
335
336
337
338
339
340
341
342 protected RepeatInternalState createInternalState(RepeatContext context) {
343 return new RepeatInternalStateSupport();
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361 protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
362 throws Throwable {
363 update(context);
364 if (logger.isDebugEnabled()) {
365 logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
366 }
367 return callback.doInIteration(context);
368
369 }
370
371
372
373
374
375
376
377
378
379 protected boolean waitForResults(RepeatInternalState state) {
380
381 return true;
382 }
383
384
385
386
387
388
389
390 protected final boolean canContinue(RepeatStatus value) {
391 return ((RepeatStatus) value).isContinuable();
392 }
393
394 private boolean isMarkedComplete(RepeatContext context) {
395 boolean complete = context.isCompleteOnly();
396 if (context.getParent() != null) {
397 complete = complete || isMarkedComplete(context.getParent());
398 }
399 if (complete) {
400 logger.debug("Repeat is complete according to context alone.");
401 }
402 return complete;
403
404 }
405
406
407
408
409
410
411
412 protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
413
414
415
416
417 if (value != null && value.isContinuable()) {
418 for (int i = listeners.length; i-- > 0;) {
419 RepeatListener interceptor = listeners[i];
420 interceptor.after(context, value);
421 }
422
423 }
424
425 }
426
427
428
429
430
431
432
433 protected boolean isComplete(RepeatContext context, RepeatStatus result) {
434 boolean complete = completionPolicy.isComplete(context, result);
435 if (complete) {
436 logger.debug("Repeat is complete according to policy and result value.");
437 }
438 return complete;
439 }
440
441
442
443
444
445
446 protected boolean isComplete(RepeatContext context) {
447 boolean complete = completionPolicy.isComplete(context);
448 if (complete) {
449 logger.debug("Repeat is complete according to policy alone not including result.");
450 }
451 return complete;
452 }
453
454
455
456
457
458
459 protected RepeatContext start() {
460 RepeatContext parent = RepeatSynchronizationManager.getContext();
461 RepeatContext context = completionPolicy.start(parent);
462 RepeatSynchronizationManager.register(context);
463 logger.debug("Starting repeat context.");
464 return context;
465 }
466
467
468
469
470
471
472 protected void update(RepeatContext context) {
473 completionPolicy.update(context);
474 }
475
476 }