EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [RepeatTemplate.java]

nameclass, %method, %block, %line, %
RepeatTemplate.java100% (1/1)100% (20/20)92%  (500/541)97%  (119.8/124)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class RepeatTemplate100% (1/1)100% (20/20)92%  (500/541)97%  (119.8/124)
executeInternal (RepeatCallback): RepeatStatus 100% (1/1)83%  (180/218)92%  (43.1/47)
iterate (RepeatCallback): RepeatStatus 100% (1/1)89%  (25/28)95%  (6.7/7)
RepeatTemplate (): void 100% (1/1)100% (22/22)100% (5/5)
canContinue (RepeatStatus): boolean 100% (1/1)100% (3/3)100% (1/1)
createInternalState (RepeatContext): RepeatInternalState 100% (1/1)100% (4/4)100% (1/1)
doHandle (Throwable, RepeatContext, Collection): void 100% (1/1)100% (78/78)100% (12/12)
executeAfterInterceptors (RepeatContext, RepeatStatus): void 100% (1/1)100% (23/23)100% (6/6)
getNextResult (RepeatContext, RepeatCallback, RepeatInternalState): RepeatStatus 100% (1/1)100% (23/23)100% (4/4)
isComplete (RepeatContext): boolean 100% (1/1)100% (13/13)100% (4/4)
isComplete (RepeatContext, RepeatStatus): boolean 100% (1/1)100% (14/14)100% (4/4)
isMarkedComplete (RepeatContext): boolean 100% (1/1)100% (25/25)100% (6/6)
registerListener (RepeatListener): void 100% (1/1)100% (21/21)100% (4/4)
rethrow (Throwable): void 100% (1/1)100% (18/18)100% (5/5)
setCompletionPolicy (CompletionPolicy): void 100% (1/1)100% (6/6)100% (3/3)
setExceptionHandler (ExceptionHandler): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (RepeatListener []): void 100% (1/1)100% (10/10)100% (2/2)
start (): RepeatContext 100% (1/1)100% (16/16)100% (5/5)
unwrapIfRethrown (Throwable): Throwable 100% (1/1)100% (8/8)100% (3/3)
update (RepeatContext): void 100% (1/1)100% (5/5)100% (2/2)
waitForResults (RepeatInternalState): boolean 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import java.util.ArrayList;
20import java.util.Arrays;
21import java.util.Collection;
22import java.util.List;
23 
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.springframework.batch.repeat.CompletionPolicy;
27import org.springframework.batch.repeat.RepeatCallback;
28import org.springframework.batch.repeat.RepeatContext;
29import org.springframework.batch.repeat.RepeatException;
30import org.springframework.batch.repeat.RepeatListener;
31import org.springframework.batch.repeat.RepeatOperations;
32import org.springframework.batch.repeat.RepeatStatus;
33import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34import org.springframework.batch.repeat.exception.ExceptionHandler;
35import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36import org.springframework.util.Assert;
37 
38/**
39 * Simple implementation and base class for batch templates implementing
40 * {@link RepeatOperations}. Provides a framework including interceptors and
41 * policies. Subclasses just need to provide a method that gets the next result
42 * and one that waits for all the results to be returned from concurrent
43 * processes or threads.<br/>
44 * 
45 * N.B. the template accumulates thrown exceptions during the iteration, and
46 * they are all processed together when the main loop ends (i.e. finished
47 * processing the items). Clients that do not want to stop execution when an
48 * exception is thrown can use a specific {@link CompletionPolicy} that does not
49 * finish when exceptions are received. This is not the default behaviour.<br/>
50 * 
51 * Clients that want to take some business action when an exception is thrown by
52 * the {@link RepeatCallback} can consider using a custom {@link RepeatListener}
53 * instead of trying to customise the {@link CompletionPolicy}. This is
54 * generally a friendlier interface to implement, and the
55 * {@link RepeatListener#after(RepeatContext, RepeatStatus)} method is passed in
56 * the result of the callback, which would be an instance of {@link Throwable}
57 * if the business processing had thrown an exception. If the exception is not
58 * to be propagated to the caller, then a non-default {@link CompletionPolicy}
59 * needs to be provided as well, but that could be off the shelf, with the
60 * business action implemented only in the interceptor.
61 * 
62 * @author Dave Syer
63 * 
64 */
65public class RepeatTemplate implements RepeatOperations {
66 
67        protected Log logger = LogFactory.getLog(getClass());
68 
69        private RepeatListener[] listeners = new RepeatListener[] {};
70 
71        private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72 
73        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74 
75        /**
76         * Set the listeners for this template, registering them for callbacks at
77         * appropriate times in the iteration.
78         * 
79         * @param listeners
80         */
81        public void setListeners(RepeatListener[] listeners) {
82                this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
83        }
84 
85        /**
86         * Register an additional listener.
87         * 
88         * @param listener
89         */
90        public void registerListener(RepeatListener listener) {
91                List<RepeatListener> list = new ArrayList<RepeatListener>(Arrays.asList(listeners));
92                list.add(listener);
93                listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94        }
95 
96        /**
97         * Setter for exception handler strategy. The exception handler is called at
98         * the end of a batch, after the {@link CompletionPolicy} has determined
99         * that the batch is complete. By default all exceptions are re-thrown.
100         * 
101         * @see ExceptionHandler
102         * @see DefaultExceptionHandler
103         * @see #setCompletionPolicy(CompletionPolicy)
104         * 
105         * @param exceptionHandler the {@link ExceptionHandler} to use.
106         */
107        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108                this.exceptionHandler = exceptionHandler;
109        }
110 
111        /**
112         * Setter for policy to decide when the batch is complete. The default is to
113         * complete normally when the callback returns a {@link RepeatStatus} which
114         * is not marked as continuable, and abnormally when the callback throws an
115         * exception (but the decision to re-throw the exception is deferred to the
116         * {@link ExceptionHandler}).
117         * 
118         * @see #setExceptionHandler(ExceptionHandler)
119         * 
120         * @param terminationPolicy a TerminationPolicy.
121         * @throws IllegalArgumentException if the argument is null
122         */
123        public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124                Assert.notNull(terminationPolicy);
125                this.completionPolicy = terminationPolicy;
126        }
127 
128        /**
129         * Execute the batch callback until the completion policy decides that we
130         * are finished. Wait for the whole batch to finish before returning even if
131         * the task executor is asynchronous.
132         * 
133         * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback)
134         */
135        public RepeatStatus iterate(RepeatCallback callback) {
136 
137                RepeatContext outer = RepeatSynchronizationManager.getContext();
138 
139                RepeatStatus result = RepeatStatus.CONTINUABLE;
140                try {
141                        // This works with an asynchronous TaskExecutor: the
142                        // interceptors have to wait for the child processes.
143                        result = executeInternal(callback);
144                }
145                finally {
146                        RepeatSynchronizationManager.clear();
147                        if (outer != null) {
148                                RepeatSynchronizationManager.register(outer);
149                        }
150                }
151 
152                return result;
153        }
154 
155        /**
156         * Internal convenience method to loop over interceptors and batch
157         * callbacks.
158         * 
159         * @param callback the callback to process each element of the loop.
160         * 
161         * @return the aggregate of {@link ContinuationPolicy#canContinue(Object)}
162         * for all the results from the callback.
163         * 
164         */
165        private RepeatStatus executeInternal(final RepeatCallback callback) {
166 
167                // Reset the termination policy if there is one...
168                RepeatContext context = start();
169 
170                // Make sure if we are already marked complete before we start then no
171                // processing takes place.
172                boolean running = !isMarkedComplete(context);
173 
174                for (int i = 0; i < listeners.length; i++) {
175                        RepeatListener interceptor = listeners[i];
176                        interceptor.open(context);
177                        running = running && !isMarkedComplete(context);
178                        if (!running)
179                                break;
180                }
181 
182                // Return value, default is to allow continued processing.
183                RepeatStatus result = RepeatStatus.CONTINUABLE;
184 
185                RepeatInternalState state = createInternalState(context);
186                // This is the list of exceptions thrown by all active callbacks
187                Collection<Throwable> throwables = state.getThrowables();
188                // Keep a separate list of exceptions we handled that need to be
189                // rethrown
190                Collection<Throwable> deferred = new ArrayList<Throwable>();
191 
192                try {
193 
194                        while (running) {
195 
196                                /*
197                                 * Run the before interceptors here, not in the task executor so
198                                 * that they all happen in the same thread - it's easier for
199                                 * tracking batch status, amongst other things.
200                                 */
201                                for (int i = 0; i < listeners.length; i++) {
202                                        RepeatListener interceptor = listeners[i];
203                                        interceptor.before(context);
204                                        // Allow before interceptors to veto the batch by setting
205                                        // flag.
206                                        running = running && !isMarkedComplete(context);
207                                }
208 
209                                // Check that we are still running (should always be true) ...
210                                if (running) {
211 
212                                        try {
213 
214                                                result = getNextResult(context, callback, state);
215                                                executeAfterInterceptors(context, result);
216 
217                                        }
218                                        catch (Throwable throwable) {
219                                                doHandle(throwable, context, deferred);
220                                        }
221 
222                                        // N.B. the order may be important here:
223                                        if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
224                                                running = false;
225                                        }
226 
227                                }
228 
229                        }
230 
231                        result = result.and(waitForResults(state));
232                        for (Throwable throwable : throwables) {
233                                doHandle(throwable, context, deferred);
234                        }
235 
236                        // Explicitly drop any references to internal state...
237                        state = null;
238 
239                }
240                /*
241                 * No need for explicit catch here - if the business processing threw an
242                 * exception it was already handled by the helper methods. An exception
243                 * here is necessarily fatal.
244                 */
245                finally {
246 
247                        try {
248 
249                                if (!deferred.isEmpty()) {
250                                        Throwable throwable = (Throwable) deferred.iterator().next();
251                                        logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
252                                                        + throwable.getClass().getName() + ": " + throwable.getMessage());
253                                        rethrow(throwable);
254                                }
255 
256                        }
257                        finally {
258 
259                                try {
260                                        for (int i = listeners.length; i-- > 0;) {
261                                                RepeatListener interceptor = listeners[i];
262                                                interceptor.close(context);
263                                        }
264                                }
265                                finally {
266                                        context.close();
267                                }
268 
269                        }
270 
271                }
272 
273                return result;
274 
275        }
276 
277        private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
278                // An exception alone is not sufficient grounds for not
279                // continuing
280                Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
281                try {
282 
283                        for (int i = listeners.length; i-- > 0;) {
284                                RepeatListener interceptor = listeners[i];
285                                // This is not an error - only log at debug
286                                // level.
287                                logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
288                                interceptor.onError(context, unwrappedThrowable);
289                        }
290 
291                        logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
292                                        + unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
293                        exceptionHandler.handleException(context, unwrappedThrowable);
294 
295                }
296                catch (Throwable handled) {
297                        deferred.add(handled);
298                }
299        }
300 
301        /**
302         * Re-throws the original throwable if it is unchecked, wraps checked
303         * exceptions into {@link RepeatException}.
304         */
305        private static void rethrow(Throwable throwable) throws RuntimeException {
306                if (throwable instanceof Error) {
307                        throw (Error) throwable;
308                }
309                else if (throwable instanceof RuntimeException) {
310                        throw (RuntimeException) throwable;
311                }
312                else {
313                        throw new RepeatException("Exception in batch process", throwable);
314                }
315        }
316 
317        /**
318         * Unwraps the throwable if it has been wrapped by
319         * {@link #rethrow(Throwable)}.
320         */
321        private static Throwable unwrapIfRethrown(Throwable throwable) {
322                if (throwable instanceof RepeatException) {
323                        return throwable.getCause();
324                }
325                else {
326                        return throwable;
327                }
328        }
329 
330        /**
331         * Create an internal state object that is used to store data needed
332         * internally in the scope of an iteration. Used by subclasses to manage the
333         * queueing and retrieval of asynchronous results. The default just provides
334         * an accumulation of Throwable instances for processing at the end of the
335         * batch.
336         * 
337         * @param context the current {@link RepeatContext}
338         * @return a {@link RepeatInternalState} instance.
339         * 
340         * @see RepeatTemplate#waitForResults(RepeatInternalState)
341         */
342        protected RepeatInternalState createInternalState(RepeatContext context) {
343                return new RepeatInternalStateSupport();
344        }
345 
346        /**
347         * Get the next completed result, possibly executing several callbacks until
348         * one finally finishes. Normally a subclass would have to override both
349         * this method and {@link #createInternalState(RepeatContext)} because the
350         * implementation of this method would rely on the details of the internal
351         * state.
352         * 
353         * @param context current BatchContext.
354         * @param callback the callback to execute.
355         * @param state maintained by the implementation.
356         * @return a finished result.
357         * 
358         * @see #isComplete(RepeatContext)
359         * @see #createInternalState(RepeatContext)
360         */
361        protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
362                        throws Throwable {
363                update(context);
364                if (logger.isDebugEnabled()) {
365                        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
366                }
367                return callback.doInIteration(context);
368 
369        }
370 
371        /**
372         * If necessary, wait for results to come back from remote or concurrent
373         * processes. By default does nothing and returns true.
374         * 
375         * @param state the internal state.
376         * @return true if {@link #canContinue(RepeatStatus)} is true for all
377         * results retrieved.
378         */
379        protected boolean waitForResults(RepeatInternalState state) {
380                // no-op by default
381                return true;
382        }
383 
384        /**
385         * Check return value from batch operation.
386         * 
387         * @param value the last callback result.
388         * @return true if the value is {@link RepeatStatus#CONTINUABLE}.
389         */
390        protected final boolean canContinue(RepeatStatus value) {
391                return ((RepeatStatus) value).isContinuable();
392        }
393 
394        private boolean isMarkedComplete(RepeatContext context) {
395                boolean complete = context.isCompleteOnly();
396                if (context.getParent() != null) {
397                        complete = complete || isMarkedComplete(context.getParent());
398                }
399                if (complete) {
400                        logger.debug("Repeat is complete according to context alone.");
401                }
402                return complete;
403 
404        }
405 
406        /**
407         * Convenience method to execute after interceptors on a callback result.
408         * 
409         * @param context the current batch context.
410         * @param value the result of the callback to process.
411         */
412        protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
413 
414                // Don't re-throw exceptions here: let the exception handler deal with
415                // that...
416 
417                if (value != null && value.isContinuable()) {
418                        for (int i = listeners.length; i-- > 0;) {
419                                RepeatListener interceptor = listeners[i];
420                                interceptor.after(context, value);
421                        }
422 
423                }
424 
425        }
426 
427        /**
428         * Delegate to the {@link CompletionPolicy}.
429         * 
430         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext,
431         * RepeatStatus)
432         */
433        protected boolean isComplete(RepeatContext context, RepeatStatus result) {
434                boolean complete = completionPolicy.isComplete(context, result);
435                if (complete) {
436                        logger.debug("Repeat is complete according to policy and result value.");
437                }
438                return complete;
439        }
440 
441        /**
442         * Delegate to {@link CompletionPolicy}.
443         * 
444         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext)
445         */
446        protected boolean isComplete(RepeatContext context) {
447                boolean complete = completionPolicy.isComplete(context);
448                if (complete) {
449                        logger.debug("Repeat is complete according to policy alone not including result.");
450                }
451                return complete;
452        }
453 
454        /**
455         * Delegate to the {@link CompletionPolicy}.
456         * 
457         * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
458         */
459        protected RepeatContext start() {
460                RepeatContext parent = RepeatSynchronizationManager.getContext();
461                RepeatContext context = completionPolicy.start(parent);
462                RepeatSynchronizationManager.register(context);
463                logger.debug("Starting repeat context.");
464                return context;
465        }
466 
467        /**
468         * Delegate to the {@link CompletionPolicy}.
469         * 
470         * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext)
471         */
472        protected void update(RepeatContext context) {
473                completionPolicy.update(context);
474        }
475 
476}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov