View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.repeat.support;
18  
19  import java.util.ArrayList;
20  import java.util.Arrays;
21  import java.util.Collection;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.springframework.batch.repeat.CompletionPolicy;
27  import org.springframework.batch.repeat.RepeatCallback;
28  import org.springframework.batch.repeat.RepeatContext;
29  import org.springframework.batch.repeat.RepeatException;
30  import org.springframework.batch.repeat.RepeatListener;
31  import org.springframework.batch.repeat.RepeatOperations;
32  import org.springframework.batch.repeat.RepeatStatus;
33  import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34  import org.springframework.batch.repeat.exception.ExceptionHandler;
35  import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36  import org.springframework.util.Assert;
37  
38  /**
39   * Simple implementation and base class for batch templates implementing
40   * {@link RepeatOperations}. Provides a framework including interceptors and
41   * policies. Subclasses just need to provide a method that gets the next result
42   * and one that waits for all the results to be returned from concurrent
43   * processes or threads.<br/>
44   * 
45   * N.B. the template accumulates thrown exceptions during the iteration, and
46   * they are all processed together when the main loop ends (i.e. finished
47   * processing the items). Clients that do not want to stop execution when an
48   * exception is thrown can use a specific {@link CompletionPolicy} that does not
49   * finish when exceptions are received. This is not the default behaviour.<br/>
50   * 
51   * Clients that want to take some business action when an exception is thrown by
52   * the {@link RepeatCallback} can consider using a custom {@link RepeatListener}
53   * instead of trying to customise the {@link CompletionPolicy}. This is
54   * generally a friendlier interface to implement, and the
55   * {@link RepeatListener#after(RepeatContext, RepeatStatus)} method is passed in
56   * the result of the callback, which would be an instance of {@link Throwable}
57   * if the business processing had thrown an exception. If the exception is not
58   * to be propagated to the caller, then a non-default {@link CompletionPolicy}
59   * needs to be provided as well, but that could be off the shelf, with the
60   * business action implemented only in the interceptor.
61   * 
62   * @author Dave Syer
63   * 
64   */
65  public class RepeatTemplate implements RepeatOperations {
66  
67  	protected Log logger = LogFactory.getLog(getClass());
68  
69  	private RepeatListener[] listeners = new RepeatListener[] {};
70  
71  	private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72  
73  	private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74  
75  	/**
76  	 * Set the listeners for this template, registering them for callbacks at
77  	 * appropriate times in the iteration.
78  	 * 
79  	 * @param listeners
80  	 */
81  	public void setListeners(RepeatListener[] listeners) {
82  		this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
83  	}
84  
85  	/**
86  	 * Register an additional listener.
87  	 * 
88  	 * @param listener
89  	 */
90  	public void registerListener(RepeatListener listener) {
91  		List<RepeatListener> list = new ArrayList<RepeatListener>(Arrays.asList(listeners));
92  		list.add(listener);
93  		listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94  	}
95  
96  	/**
97  	 * Setter for exception handler strategy. The exception handler is called at
98  	 * the end of a batch, after the {@link CompletionPolicy} has determined
99  	 * that the batch is complete. By default all exceptions are re-thrown.
100 	 * 
101 	 * @see ExceptionHandler
102 	 * @see DefaultExceptionHandler
103 	 * @see #setCompletionPolicy(CompletionPolicy)
104 	 * 
105 	 * @param exceptionHandler the {@link ExceptionHandler} to use.
106 	 */
107 	public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108 		this.exceptionHandler = exceptionHandler;
109 	}
110 
111 	/**
112 	 * Setter for policy to decide when the batch is complete. The default is to
113 	 * complete normally when the callback returns a {@link RepeatStatus} which
114 	 * is not marked as continuable, and abnormally when the callback throws an
115 	 * exception (but the decision to re-throw the exception is deferred to the
116 	 * {@link ExceptionHandler}).
117 	 * 
118 	 * @see #setExceptionHandler(ExceptionHandler)
119 	 * 
120 	 * @param terminationPolicy a TerminationPolicy.
121 	 * @throws IllegalArgumentException if the argument is null
122 	 */
123 	public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124 		Assert.notNull(terminationPolicy);
125 		this.completionPolicy = terminationPolicy;
126 	}
127 
128 	/**
129 	 * Execute the batch callback until the completion policy decides that we
130 	 * are finished. Wait for the whole batch to finish before returning even if
131 	 * the task executor is asynchronous.
132 	 * 
133 	 * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback)
134 	 */
135     @Override
136 	public RepeatStatus iterate(RepeatCallback callback) {
137 
138 		RepeatContext outer = RepeatSynchronizationManager.getContext();
139 
140 		RepeatStatus result = RepeatStatus.CONTINUABLE;
141 		try {
142 			// This works with an asynchronous TaskExecutor: the
143 			// interceptors have to wait for the child processes.
144 			result = executeInternal(callback);
145 		}
146 		finally {
147 			RepeatSynchronizationManager.clear();
148 			if (outer != null) {
149 				RepeatSynchronizationManager.register(outer);
150 			}
151 		}
152 
153 		return result;
154 	}
155 
156 	/**
157 	 * Internal convenience method to loop over interceptors and batch
158 	 * callbacks.
159 	 * 
160 	 * @param callback the callback to process each element of the loop.
161 	 * 
162 	 * @return the aggregate of {@link ContinuationPolicy#canContinue(Object)}
163 	 * for all the results from the callback.
164 	 * 
165 	 */
166 	private RepeatStatus executeInternal(final RepeatCallback callback) {
167 
168 		// Reset the termination policy if there is one...
169 		RepeatContext context = start();
170 
171 		// Make sure if we are already marked complete before we start then no
172 		// processing takes place.
173 		boolean running = !isMarkedComplete(context);
174 
175 		for (int i = 0; i < listeners.length; i++) {
176 			RepeatListener interceptor = listeners[i];
177 			interceptor.open(context);
178 			running = running && !isMarkedComplete(context);
179 			if (!running)
180 				break;
181 		}
182 
183 		// Return value, default is to allow continued processing.
184 		RepeatStatus result = RepeatStatus.CONTINUABLE;
185 
186 		RepeatInternalState state = createInternalState(context);
187 		// This is the list of exceptions thrown by all active callbacks
188 		Collection<Throwable> throwables = state.getThrowables();
189 		// Keep a separate list of exceptions we handled that need to be
190 		// rethrown
191 		Collection<Throwable> deferred = new ArrayList<Throwable>();
192 
193 		try {
194 
195 			while (running) {
196 
197 				/*
198 				 * Run the before interceptors here, not in the task executor so
199 				 * that they all happen in the same thread - it's easier for
200 				 * tracking batch status, amongst other things.
201 				 */
202 				for (int i = 0; i < listeners.length; i++) {
203 					RepeatListener interceptor = listeners[i];
204 					interceptor.before(context);
205 					// Allow before interceptors to veto the batch by setting
206 					// flag.
207 					running = running && !isMarkedComplete(context);
208 				}
209 
210 				// Check that we are still running (should always be true) ...
211 				if (running) {
212 
213 					try {
214 
215 						result = getNextResult(context, callback, state);
216 						executeAfterInterceptors(context, result);
217 
218 					}
219 					catch (Throwable throwable) {
220 						doHandle(throwable, context, deferred);
221 					}
222 
223 					// N.B. the order may be important here:
224 					if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
225 						running = false;
226 					}
227 
228 				}
229 
230 			}
231 
232 			result = result.and(waitForResults(state));
233 			for (Throwable throwable : throwables) {
234 				doHandle(throwable, context, deferred);
235 			}
236 
237 			// Explicitly drop any references to internal state...
238 			state = null;
239 
240 		}
241 		/*
242 		 * No need for explicit catch here - if the business processing threw an
243 		 * exception it was already handled by the helper methods. An exception
244 		 * here is necessarily fatal.
245 		 */
246 		finally {
247 
248 			try {
249 
250 				if (!deferred.isEmpty()) {
251 					Throwable throwable = (Throwable) deferred.iterator().next();
252 					logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
253 							+ throwable.getClass().getName() + ": " + throwable.getMessage());
254 					rethrow(throwable);
255 				}
256 
257 			}
258 			finally {
259 
260 				try {
261 					for (int i = listeners.length; i-- > 0;) {
262 						RepeatListener interceptor = listeners[i];
263 						interceptor.close(context);
264 					}
265 				}
266 				finally {
267 					context.close();
268 				}
269 
270 			}
271 
272 		}
273 
274 		return result;
275 
276 	}
277 
278 	private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
279 		// An exception alone is not sufficient grounds for not
280 		// continuing
281 		Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
282 		try {
283 
284 			for (int i = listeners.length; i-- > 0;) {
285 				RepeatListener interceptor = listeners[i];
286 				// This is not an error - only log at debug
287 				// level.
288 				logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
289 				interceptor.onError(context, unwrappedThrowable);
290 			}
291 
292 			logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
293 					+ unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
294 			exceptionHandler.handleException(context, unwrappedThrowable);
295 
296 		}
297 		catch (Throwable handled) {
298 			deferred.add(handled);
299 		}
300 	}
301 
302 	/**
303 	 * Re-throws the original throwable if it is unchecked, wraps checked
304 	 * exceptions into {@link RepeatException}.
305 	 */
306 	private static void rethrow(Throwable throwable) throws RuntimeException {
307 		if (throwable instanceof Error) {
308 			throw (Error) throwable;
309 		}
310 		else if (throwable instanceof RuntimeException) {
311 			throw (RuntimeException) throwable;
312 		}
313 		else {
314 			throw new RepeatException("Exception in batch process", throwable);
315 		}
316 	}
317 
318 	/**
319 	 * Unwraps the throwable if it has been wrapped by
320 	 * {@link #rethrow(Throwable)}.
321 	 */
322 	private static Throwable unwrapIfRethrown(Throwable throwable) {
323 		if (throwable instanceof RepeatException) {
324 			return throwable.getCause();
325 		}
326 		else {
327 			return throwable;
328 		}
329 	}
330 
331 	/**
332 	 * Create an internal state object that is used to store data needed
333 	 * internally in the scope of an iteration. Used by subclasses to manage the
334 	 * queueing and retrieval of asynchronous results. The default just provides
335 	 * an accumulation of Throwable instances for processing at the end of the
336 	 * batch.
337 	 * 
338 	 * @param context the current {@link RepeatContext}
339 	 * @return a {@link RepeatInternalState} instance.
340 	 * 
341 	 * @see RepeatTemplate#waitForResults(RepeatInternalState)
342 	 */
343 	protected RepeatInternalState createInternalState(RepeatContext context) {
344 		return new RepeatInternalStateSupport();
345 	}
346 
347 	/**
348 	 * Get the next completed result, possibly executing several callbacks until
349 	 * one finally finishes. Normally a subclass would have to override both
350 	 * this method and {@link #createInternalState(RepeatContext)} because the
351 	 * implementation of this method would rely on the details of the internal
352 	 * state.
353 	 * 
354 	 * @param context current BatchContext.
355 	 * @param callback the callback to execute.
356 	 * @param state maintained by the implementation.
357 	 * @return a finished result.
358 	 * 
359 	 * @see #isComplete(RepeatContext)
360 	 * @see #createInternalState(RepeatContext)
361 	 */
362 	protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
363 			throws Throwable {
364 		update(context);
365 		if (logger.isDebugEnabled()) {
366 			logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
367 		}
368 		return callback.doInIteration(context);
369 
370 	}
371 
372 	/**
373 	 * If necessary, wait for results to come back from remote or concurrent
374 	 * processes. By default does nothing and returns true.
375 	 * 
376 	 * @param state the internal state.
377 	 * @return true if {@link #canContinue(RepeatStatus)} is true for all
378 	 * results retrieved.
379 	 */
380 	protected boolean waitForResults(RepeatInternalState state) {
381 		// no-op by default
382 		return true;
383 	}
384 
385 	/**
386 	 * Check return value from batch operation.
387 	 * 
388 	 * @param value the last callback result.
389 	 * @return true if the value is {@link RepeatStatus#CONTINUABLE}.
390 	 */
391 	protected final boolean canContinue(RepeatStatus value) {
392 		return ((RepeatStatus) value).isContinuable();
393 	}
394 
395 	private boolean isMarkedComplete(RepeatContext context) {
396 		boolean complete = context.isCompleteOnly();
397 		if (context.getParent() != null) {
398 			complete = complete || isMarkedComplete(context.getParent());
399 		}
400 		if (complete) {
401 			logger.debug("Repeat is complete according to context alone.");
402 		}
403 		return complete;
404 
405 	}
406 
407 	/**
408 	 * Convenience method to execute after interceptors on a callback result.
409 	 * 
410 	 * @param context the current batch context.
411 	 * @param value the result of the callback to process.
412 	 */
413 	protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
414 
415 		// Don't re-throw exceptions here: let the exception handler deal with
416 		// that...
417 
418 		if (value != null && value.isContinuable()) {
419 			for (int i = listeners.length; i-- > 0;) {
420 				RepeatListener interceptor = listeners[i];
421 				interceptor.after(context, value);
422 			}
423 
424 		}
425 
426 	}
427 
428 	/**
429 	 * Delegate to the {@link CompletionPolicy}.
430 	 * 
431 	 * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext,
432 	 * RepeatStatus)
433 	 */
434 	protected boolean isComplete(RepeatContext context, RepeatStatus result) {
435 		boolean complete = completionPolicy.isComplete(context, result);
436 		if (complete) {
437 			logger.debug("Repeat is complete according to policy and result value.");
438 		}
439 		return complete;
440 	}
441 
442 	/**
443 	 * Delegate to {@link CompletionPolicy}.
444 	 * 
445 	 * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext)
446 	 */
447 	protected boolean isComplete(RepeatContext context) {
448 		boolean complete = completionPolicy.isComplete(context);
449 		if (complete) {
450 			logger.debug("Repeat is complete according to policy alone not including result.");
451 		}
452 		return complete;
453 	}
454 
455 	/**
456 	 * Delegate to the {@link CompletionPolicy}.
457 	 * 
458 	 * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
459 	 */
460 	protected RepeatContext start() {
461 		RepeatContext parent = RepeatSynchronizationManager.getContext();
462 		RepeatContext context = completionPolicy.start(parent);
463 		RepeatSynchronizationManager.register(context);
464 		logger.debug("Starting repeat context.");
465 		return context;
466 	}
467 
468 	/**
469 	 * Delegate to the {@link CompletionPolicy}.
470 	 * 
471 	 * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext)
472 	 */
473 	protected void update(RepeatContext context) {
474 		completionPolicy.update(context);
475 	}
476 
477 }