View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.repeat.support;
18  
19  import java.util.ArrayList;
20  import java.util.Arrays;
21  import java.util.Collection;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.springframework.batch.repeat.CompletionPolicy;
27  import org.springframework.batch.repeat.RepeatCallback;
28  import org.springframework.batch.repeat.RepeatContext;
29  import org.springframework.batch.repeat.RepeatException;
30  import org.springframework.batch.repeat.RepeatListener;
31  import org.springframework.batch.repeat.RepeatOperations;
32  import org.springframework.batch.repeat.RepeatStatus;
33  import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34  import org.springframework.batch.repeat.exception.ExceptionHandler;
35  import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36  import org.springframework.util.Assert;
37  
38  /**
39   * Simple implementation and base class for batch templates implementing
40   * {@link RepeatOperations}. Provides a framework including interceptors and
41   * policies. Subclasses just need to provide a method that gets the next result
42   * and one that waits for all the results to be returned from concurrent
43   * processes or threads.<br/>
44   * 
45   * N.B. the template accumulates thrown exceptions during the iteration, and
46   * they are all processed together when the main loop ends (i.e. finished
47   * processing the items). Clients that do not want to stop execution when an
48   * exception is thrown can use a specific {@link CompletionPolicy} that does not
49   * finish when exceptions are received. This is not the default behaviour.<br/>
50   * 
51   * Clients that want to take some business action when an exception is thrown by
52   * the {@link RepeatCallback} can consider using a custom {@link RepeatListener}
53   * instead of trying to customise the {@link CompletionPolicy}. This is
54   * generally a friendlier interface to implement, and the
55   * {@link RepeatListener#after(RepeatContext, RepeatStatus)} method is passed in
56   * the result of the callback, which would be an instance of {@link Throwable}
57   * if the business processing had thrown an exception. If the exception is not
58   * to be propagated to the caller, then a non-default {@link CompletionPolicy}
59   * needs to be provided as well, but that could be off the shelf, with the
60   * business action implemented only in the interceptor.
61   * 
62   * @author Dave Syer
63   * 
64   */
65  public class RepeatTemplate implements RepeatOperations {
66  
67  	protected Log logger = LogFactory.getLog(getClass());
68  
69  	private RepeatListener[] listeners = new RepeatListener[] {};
70  
71  	private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72  
73  	private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74  
75  	/**
76  	 * Set the listeners for this template, registering them for callbacks at
77  	 * appropriate times in the iteration.
78  	 * 
79  	 * @param listeners
80  	 */
81  	public void setListeners(RepeatListener[] listeners) {
82  		this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
83  	}
84  
85  	/**
86  	 * Register an additional listener.
87  	 * 
88  	 * @param listener
89  	 */
90  	public void registerListener(RepeatListener listener) {
91  		List<RepeatListener> list = new ArrayList<RepeatListener>(Arrays.asList(listeners));
92  		list.add(listener);
93  		listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94  	}
95  
96  	/**
97  	 * Setter for exception handler strategy. The exception handler is called at
98  	 * the end of a batch, after the {@link CompletionPolicy} has determined
99  	 * that the batch is complete. By default all exceptions are re-thrown.
100 	 * 
101 	 * @see ExceptionHandler
102 	 * @see DefaultExceptionHandler
103 	 * @see #setCompletionPolicy(CompletionPolicy)
104 	 * 
105 	 * @param exceptionHandler the {@link ExceptionHandler} to use.
106 	 */
107 	public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108 		this.exceptionHandler = exceptionHandler;
109 	}
110 
111 	/**
112 	 * Setter for policy to decide when the batch is complete. The default is to
113 	 * complete normally when the callback returns a {@link RepeatStatus} which
114 	 * is not marked as continuable, and abnormally when the callback throws an
115 	 * exception (but the decision to re-throw the exception is deferred to the
116 	 * {@link ExceptionHandler}).
117 	 * 
118 	 * @see #setExceptionHandler(ExceptionHandler)
119 	 * 
120 	 * @param terminationPolicy a TerminationPolicy.
121 	 * @throws IllegalArgumentException if the argument is null
122 	 */
123 	public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124 		Assert.notNull(terminationPolicy);
125 		this.completionPolicy = terminationPolicy;
126 	}
127 
128 	/**
129 	 * Execute the batch callback until the completion policy decides that we
130 	 * are finished. Wait for the whole batch to finish before returning even if
131 	 * the task executor is asynchronous.
132 	 * 
133 	 * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback)
134 	 */
135 	public RepeatStatus iterate(RepeatCallback callback) {
136 
137 		RepeatContext outer = RepeatSynchronizationManager.getContext();
138 
139 		RepeatStatus result = RepeatStatus.CONTINUABLE;
140 		try {
141 			// This works with an asynchronous TaskExecutor: the
142 			// interceptors have to wait for the child processes.
143 			result = executeInternal(callback);
144 		}
145 		finally {
146 			RepeatSynchronizationManager.clear();
147 			if (outer != null) {
148 				RepeatSynchronizationManager.register(outer);
149 			}
150 		}
151 
152 		return result;
153 	}
154 
155 	/**
156 	 * Internal convenience method to loop over interceptors and batch
157 	 * callbacks.
158 	 * 
159 	 * @param callback the callback to process each element of the loop.
160 	 * 
161 	 * @return the aggregate of {@link ContinuationPolicy#canContinue(Object)}
162 	 * for all the results from the callback.
163 	 * 
164 	 */
165 	private RepeatStatus executeInternal(final RepeatCallback callback) {
166 
167 		// Reset the termination policy if there is one...
168 		RepeatContext context = start();
169 
170 		// Make sure if we are already marked complete before we start then no
171 		// processing takes place.
172 		boolean running = !isMarkedComplete(context);
173 
174 		for (int i = 0; i < listeners.length; i++) {
175 			RepeatListener interceptor = listeners[i];
176 			interceptor.open(context);
177 			running = running && !isMarkedComplete(context);
178 			if (!running)
179 				break;
180 		}
181 
182 		// Return value, default is to allow continued processing.
183 		RepeatStatus result = RepeatStatus.CONTINUABLE;
184 
185 		RepeatInternalState state = createInternalState(context);
186 		// This is the list of exceptions thrown by all active callbacks
187 		Collection<Throwable> throwables = state.getThrowables();
188 		// Keep a separate list of exceptions we handled that need to be
189 		// rethrown
190 		Collection<Throwable> deferred = new ArrayList<Throwable>();
191 
192 		try {
193 
194 			while (running) {
195 
196 				/*
197 				 * Run the before interceptors here, not in the task executor so
198 				 * that they all happen in the same thread - it's easier for
199 				 * tracking batch status, amongst other things.
200 				 */
201 				for (int i = 0; i < listeners.length; i++) {
202 					RepeatListener interceptor = listeners[i];
203 					interceptor.before(context);
204 					// Allow before interceptors to veto the batch by setting
205 					// flag.
206 					running = running && !isMarkedComplete(context);
207 				}
208 
209 				// Check that we are still running (should always be true) ...
210 				if (running) {
211 
212 					try {
213 
214 						result = getNextResult(context, callback, state);
215 						executeAfterInterceptors(context, result);
216 
217 					}
218 					catch (Throwable throwable) {
219 						doHandle(throwable, context, deferred);
220 					}
221 
222 					// N.B. the order may be important here:
223 					if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
224 						running = false;
225 					}
226 
227 				}
228 
229 			}
230 
231 			result = result.and(waitForResults(state));
232 			for (Throwable throwable : throwables) {
233 				doHandle(throwable, context, deferred);
234 			}
235 
236 			// Explicitly drop any references to internal state...
237 			state = null;
238 
239 		}
240 		/*
241 		 * No need for explicit catch here - if the business processing threw an
242 		 * exception it was already handled by the helper methods. An exception
243 		 * here is necessarily fatal.
244 		 */
245 		finally {
246 
247 			try {
248 
249 				if (!deferred.isEmpty()) {
250 					Throwable throwable = (Throwable) deferred.iterator().next();
251 					logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
252 							+ throwable.getClass().getName() + ": " + throwable.getMessage());
253 					rethrow(throwable);
254 				}
255 
256 			}
257 			finally {
258 
259 				try {
260 					for (int i = listeners.length; i-- > 0;) {
261 						RepeatListener interceptor = listeners[i];
262 						interceptor.close(context);
263 					}
264 				}
265 				finally {
266 					context.close();
267 				}
268 
269 			}
270 
271 		}
272 
273 		return result;
274 
275 	}
276 
277 	private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
278 		// An exception alone is not sufficient grounds for not
279 		// continuing
280 		Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
281 		try {
282 
283 			for (int i = listeners.length; i-- > 0;) {
284 				RepeatListener interceptor = listeners[i];
285 				// This is not an error - only log at debug
286 				// level.
287 				logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
288 				interceptor.onError(context, unwrappedThrowable);
289 			}
290 
291 			logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
292 					+ unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
293 			exceptionHandler.handleException(context, unwrappedThrowable);
294 
295 		}
296 		catch (Throwable handled) {
297 			deferred.add(handled);
298 		}
299 	}
300 
301 	/**
302 	 * Re-throws the original throwable if it is unchecked, wraps checked
303 	 * exceptions into {@link RepeatException}.
304 	 */
305 	private static void rethrow(Throwable throwable) throws RuntimeException {
306 		if (throwable instanceof Error) {
307 			throw (Error) throwable;
308 		}
309 		else if (throwable instanceof RuntimeException) {
310 			throw (RuntimeException) throwable;
311 		}
312 		else {
313 			throw new RepeatException("Exception in batch process", throwable);
314 		}
315 	}
316 
317 	/**
318 	 * Unwraps the throwable if it has been wrapped by
319 	 * {@link #rethrow(Throwable)}.
320 	 */
321 	private static Throwable unwrapIfRethrown(Throwable throwable) {
322 		if (throwable instanceof RepeatException) {
323 			return throwable.getCause();
324 		}
325 		else {
326 			return throwable;
327 		}
328 	}
329 
330 	/**
331 	 * Create an internal state object that is used to store data needed
332 	 * internally in the scope of an iteration. Used by subclasses to manage the
333 	 * queueing and retrieval of asynchronous results. The default just provides
334 	 * an accumulation of Throwable instances for processing at the end of the
335 	 * batch.
336 	 * 
337 	 * @param context the current {@link RepeatContext}
338 	 * @return a {@link RepeatInternalState} instance.
339 	 * 
340 	 * @see RepeatTemplate#waitForResults(RepeatInternalState)
341 	 */
342 	protected RepeatInternalState createInternalState(RepeatContext context) {
343 		return new RepeatInternalStateSupport();
344 	}
345 
346 	/**
347 	 * Get the next completed result, possibly executing several callbacks until
348 	 * one finally finishes. Normally a subclass would have to override both
349 	 * this method and {@link #createInternalState(RepeatContext)} because the
350 	 * implementation of this method would rely on the details of the internal
351 	 * state.
352 	 * 
353 	 * @param context current BatchContext.
354 	 * @param callback the callback to execute.
355 	 * @param state maintained by the implementation.
356 	 * @return a finished result.
357 	 * 
358 	 * @see #isComplete(RepeatContext)
359 	 * @see #createInternalState(RepeatContext)
360 	 */
361 	protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
362 			throws Throwable {
363 		update(context);
364 		if (logger.isDebugEnabled()) {
365 			logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
366 		}
367 		return callback.doInIteration(context);
368 
369 	}
370 
371 	/**
372 	 * If necessary, wait for results to come back from remote or concurrent
373 	 * processes. By default does nothing and returns true.
374 	 * 
375 	 * @param state the internal state.
376 	 * @return true if {@link #canContinue(RepeatStatus)} is true for all
377 	 * results retrieved.
378 	 */
379 	protected boolean waitForResults(RepeatInternalState state) {
380 		// no-op by default
381 		return true;
382 	}
383 
384 	/**
385 	 * Check return value from batch operation.
386 	 * 
387 	 * @param value the last callback result.
388 	 * @return true if the value is {@link RepeatStatus#CONTINUABLE}.
389 	 */
390 	protected final boolean canContinue(RepeatStatus value) {
391 		return ((RepeatStatus) value).isContinuable();
392 	}
393 
394 	private boolean isMarkedComplete(RepeatContext context) {
395 		boolean complete = context.isCompleteOnly();
396 		if (context.getParent() != null) {
397 			complete = complete || isMarkedComplete(context.getParent());
398 		}
399 		if (complete) {
400 			logger.debug("Repeat is complete according to context alone.");
401 		}
402 		return complete;
403 
404 	}
405 
406 	/**
407 	 * Convenience method to execute after interceptors on a callback result.
408 	 * 
409 	 * @param context the current batch context.
410 	 * @param value the result of the callback to process.
411 	 */
412 	protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
413 
414 		// Don't re-throw exceptions here: let the exception handler deal with
415 		// that...
416 
417 		if (value != null && value.isContinuable()) {
418 			for (int i = listeners.length; i-- > 0;) {
419 				RepeatListener interceptor = listeners[i];
420 				interceptor.after(context, value);
421 			}
422 
423 		}
424 
425 	}
426 
427 	/**
428 	 * Delegate to the {@link CompletionPolicy}.
429 	 * 
430 	 * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext,
431 	 * RepeatStatus)
432 	 */
433 	protected boolean isComplete(RepeatContext context, RepeatStatus result) {
434 		boolean complete = completionPolicy.isComplete(context, result);
435 		if (complete) {
436 			logger.debug("Repeat is complete according to policy and result value.");
437 		}
438 		return complete;
439 	}
440 
441 	/**
442 	 * Delegate to {@link CompletionPolicy}.
443 	 * 
444 	 * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext)
445 	 */
446 	protected boolean isComplete(RepeatContext context) {
447 		boolean complete = completionPolicy.isComplete(context);
448 		if (complete) {
449 			logger.debug("Repeat is complete according to policy alone not including result.");
450 		}
451 		return complete;
452 	}
453 
454 	/**
455 	 * Delegate to the {@link CompletionPolicy}.
456 	 * 
457 	 * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
458 	 */
459 	protected RepeatContext start() {
460 		RepeatContext parent = RepeatSynchronizationManager.getContext();
461 		RepeatContext context = completionPolicy.start(parent);
462 		RepeatSynchronizationManager.register(context);
463 		logger.debug("Starting repeat context.");
464 		return context;
465 	}
466 
467 	/**
468 	 * Delegate to the {@link CompletionPolicy}.
469 	 * 
470 	 * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext)
471 	 */
472 	protected void update(RepeatContext context) {
473 		completionPolicy.update(context);
474 	}
475 
476 }