EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [RepeatTemplate.java]

nameclass, %method, %block, %line, %
RepeatTemplate.java100% (1/1)100% (20/20)78%  (497/640)91%  (112.9/124)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class RepeatTemplate100% (1/1)100% (20/20)78%  (497/640)91%  (112.9/124)
getNextResult (RepeatContext, RepeatCallback, RepeatInternalState): RepeatStatus 100% (1/1)48%  (11/23)75%  (3/4)
executeInternal (RepeatCallback): RepeatStatus 100% (1/1)60%  (189/317)79%  (36.3/46)
iterate (RepeatCallback): RepeatStatus 100% (1/1)89%  (25/28)95%  (6.7/7)
RepeatTemplate (): void 100% (1/1)100% (22/22)100% (5/5)
canContinue (RepeatStatus): boolean 100% (1/1)100% (3/3)100% (1/1)
createInternalState (RepeatContext): RepeatInternalState 100% (1/1)100% (4/4)100% (1/1)
doHandle (Throwable, RepeatContext, Collection): void 100% (1/1)100% (78/78)100% (13/13)
executeAfterInterceptors (RepeatContext, RepeatStatus): void 100% (1/1)100% (23/23)100% (6/6)
isComplete (RepeatContext): boolean 100% (1/1)100% (13/13)100% (4/4)
isComplete (RepeatContext, RepeatStatus): boolean 100% (1/1)100% (14/14)100% (4/4)
isMarkedComplete (RepeatContext): boolean 100% (1/1)100% (25/25)100% (6/6)
registerListener (RepeatListener): void 100% (1/1)100% (21/21)100% (4/4)
rethrow (Throwable): void 100% (1/1)100% (18/18)100% (5/5)
setCompletionPolicy (CompletionPolicy): void 100% (1/1)100% (6/6)100% (3/3)
setExceptionHandler (ExceptionHandler): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (RepeatListener []): void 100% (1/1)100% (10/10)100% (2/2)
start (): RepeatContext 100% (1/1)100% (16/16)100% (5/5)
unwrapIfRethrown (Throwable): Throwable 100% (1/1)100% (8/8)100% (3/3)
update (RepeatContext): void 100% (1/1)100% (5/5)100% (2/2)
waitForResults (RepeatInternalState): boolean 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import java.util.ArrayList;
20import java.util.Arrays;
21import java.util.Collection;
22import java.util.List;
23 
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.springframework.batch.repeat.CompletionPolicy;
27import org.springframework.batch.repeat.RepeatCallback;
28import org.springframework.batch.repeat.RepeatContext;
29import org.springframework.batch.repeat.RepeatException;
30import org.springframework.batch.repeat.RepeatListener;
31import org.springframework.batch.repeat.RepeatOperations;
32import org.springframework.batch.repeat.RepeatStatus;
33import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34import org.springframework.batch.repeat.exception.ExceptionHandler;
35import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36import org.springframework.util.Assert;
37 
38/**
39 * Simple implementation and base class for batch templates implementing
40 * {@link RepeatOperations}. Provides a framework including interceptors and
41 * policies. Subclasses just need to provide a method that gets the next result
42 * and one that waits for all the results to be returned from concurrent
43 * processes or threads.<br/>
44 * 
45 * N.B. the template accumulates thrown exceptions during the iteration, and
46 * they are all processed together when the main loop ends (i.e. finished
47 * processing the items). Clients that do not want to stop execution when an
48 * exception is thrown can use a specific {@link CompletionPolicy} that does not
49 * finish when exceptions are received. This is not the default behaviour.<br/>
50 * 
51 * Clients that want to take some business action when an exception is thrown by
52 * the {@link RepeatCallback} can consider using a custom {@link RepeatListener}
53 * instead of trying to customise the {@link CompletionPolicy}. This is
54 * generally a friendlier interface to implement, and the
55 * {@link RepeatListener#after(RepeatContext, RepeatStatus)} method is passed in
56 * the result of the callback, which would be an instance of {@link Throwable}
57 * if the business processing had thrown an exception. If the exception is not
58 * to be propagated to the caller, then a non-default {@link CompletionPolicy}
59 * needs to be provided as well, but that could be off the shelf, with the
60 * business action implemented only in the interceptor.
61 * 
62 * @author Dave Syer
63 * 
64 */
65public class RepeatTemplate implements RepeatOperations {
66 
67        protected Log logger = LogFactory.getLog(getClass());
68 
69        private RepeatListener[] listeners = new RepeatListener[] {};
70 
71        private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72 
73        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74 
75        /**
76         * Set the listeners for this template, registering them for callbacks at
77         * appropriate times in the iteration.
78         * 
79         * @param listeners
80         */
81        public void setListeners(RepeatListener[] listeners) {
82                this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
83        }
84 
85        /**
86         * Register an additional listener.
87         * 
88         * @param listener
89         */
90        public void registerListener(RepeatListener listener) {
91                List<RepeatListener> list = new ArrayList<RepeatListener>(Arrays.asList(listeners));
92                list.add(listener);
93                listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94        }
95 
96        /**
97         * Setter for exception handler strategy. The exception handler is called at
98         * the end of a batch, after the {@link CompletionPolicy} has determined
99         * that the batch is complete. By default all exceptions are re-thrown.
100         * 
101         * @see ExceptionHandler
102         * @see DefaultExceptionHandler
103         * @see #setCompletionPolicy(CompletionPolicy)
104         * 
105         * @param exceptionHandler the {@link ExceptionHandler} to use.
106         */
107        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108                this.exceptionHandler = exceptionHandler;
109        }
110 
111        /**
112         * Setter for policy to decide when the batch is complete. The default is to
113         * complete normally when the callback returns a {@link RepeatStatus} which
114         * is not marked as continuable, and abnormally when the callback throws an
115         * exception (but the decision to re-throw the exception is deferred to the
116         * {@link ExceptionHandler}).
117         * 
118         * @see #setExceptionHandler(ExceptionHandler)
119         * 
120         * @param terminationPolicy a TerminationPolicy.
121         * @throws IllegalArgumentException if the argument is null
122         */
123        public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124                Assert.notNull(terminationPolicy);
125                this.completionPolicy = terminationPolicy;
126        }
127 
128        /**
129         * Execute the batch callback until the completion policy decides that we
130         * are finished. Wait for the whole batch to finish before returning even if
131         * the task executor is asynchronous.
132         * 
133         * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback)
134         */
135    @Override
136        public RepeatStatus iterate(RepeatCallback callback) {
137 
138                RepeatContext outer = RepeatSynchronizationManager.getContext();
139 
140                RepeatStatus result = RepeatStatus.CONTINUABLE;
141                try {
142                        // This works with an asynchronous TaskExecutor: the
143                        // interceptors have to wait for the child processes.
144                        result = executeInternal(callback);
145                }
146                finally {
147                        RepeatSynchronizationManager.clear();
148                        if (outer != null) {
149                                RepeatSynchronizationManager.register(outer);
150                        }
151                }
152 
153                return result;
154        }
155 
156        /**
157         * Internal convenience method to loop over interceptors and batch
158         * callbacks.
159         * 
160         * @param callback the callback to process each element of the loop.
161         * 
162         * @return the aggregate of {@link ContinuationPolicy#canContinue(Object)}
163         * for all the results from the callback.
164         * 
165         */
166        private RepeatStatus executeInternal(final RepeatCallback callback) {
167 
168                // Reset the termination policy if there is one...
169                RepeatContext context = start();
170 
171                // Make sure if we are already marked complete before we start then no
172                // processing takes place.
173                boolean running = !isMarkedComplete(context);
174 
175                for (int i = 0; i < listeners.length; i++) {
176                        RepeatListener interceptor = listeners[i];
177                        interceptor.open(context);
178                        running = running && !isMarkedComplete(context);
179                        if (!running)
180                                break;
181                }
182 
183                // Return value, default is to allow continued processing.
184                RepeatStatus result = RepeatStatus.CONTINUABLE;
185 
186                RepeatInternalState state = createInternalState(context);
187                // This is the list of exceptions thrown by all active callbacks
188                Collection<Throwable> throwables = state.getThrowables();
189                // Keep a separate list of exceptions we handled that need to be
190                // rethrown
191                Collection<Throwable> deferred = new ArrayList<Throwable>();
192 
193                try {
194 
195                        while (running) {
196 
197                                /*
198                                 * Run the before interceptors here, not in the task executor so
199                                 * that they all happen in the same thread - it's easier for
200                                 * tracking batch status, amongst other things.
201                                 */
202                                for (int i = 0; i < listeners.length; i++) {
203                                        RepeatListener interceptor = listeners[i];
204                                        interceptor.before(context);
205                                        // Allow before interceptors to veto the batch by setting
206                                        // flag.
207                                        running = running && !isMarkedComplete(context);
208                                }
209 
210                                // Check that we are still running (should always be true) ...
211                                if (running) {
212 
213                                        try {
214 
215                                                result = getNextResult(context, callback, state);
216                                                executeAfterInterceptors(context, result);
217 
218                                        }
219                                        catch (Throwable throwable) {
220                                                doHandle(throwable, context, deferred);
221                                        }
222 
223                                        // N.B. the order may be important here:
224                                        if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
225                                                running = false;
226                                        }
227 
228                                }
229 
230                        }
231 
232                        result = result.and(waitForResults(state));
233                        for (Throwable throwable : throwables) {
234                                doHandle(throwable, context, deferred);
235                        }
236 
237                        // Explicitly drop any references to internal state...
238                        state = null;
239 
240                }
241                /*
242                 * No need for explicit catch here - if the business processing threw an
243                 * exception it was already handled by the helper methods. An exception
244                 * here is necessarily fatal.
245                 */
246                finally {
247 
248                        try {
249 
250                                if (!deferred.isEmpty()) {
251                                        Throwable throwable = (Throwable) deferred.iterator().next();
252                                        logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
253                                                        + throwable.getClass().getName() + ": " + throwable.getMessage());
254                                        rethrow(throwable);
255                                }
256 
257                        }
258                        finally {
259 
260                                try {
261                                        for (int i = listeners.length; i-- > 0;) {
262                                                RepeatListener interceptor = listeners[i];
263                                                interceptor.close(context);
264                                        }
265                                }
266                                finally {
267                                        context.close();
268                                }
269 
270                        }
271 
272                }
273 
274                return result;
275 
276        }
277 
278        private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
279                // An exception alone is not sufficient grounds for not
280                // continuing
281                Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
282                try {
283 
284                        for (int i = listeners.length; i-- > 0;) {
285                                RepeatListener interceptor = listeners[i];
286                                // This is not an error - only log at debug
287                                // level.
288                                logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
289                                interceptor.onError(context, unwrappedThrowable);
290                        }
291 
292                        logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
293                                        + unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
294                        exceptionHandler.handleException(context, unwrappedThrowable);
295 
296                }
297                catch (Throwable handled) {
298                        deferred.add(handled);
299                }
300        }
301 
302        /**
303         * Re-throws the original throwable if it is unchecked, wraps checked
304         * exceptions into {@link RepeatException}.
305         */
306        private static void rethrow(Throwable throwable) throws RuntimeException {
307                if (throwable instanceof Error) {
308                        throw (Error) throwable;
309                }
310                else if (throwable instanceof RuntimeException) {
311                        throw (RuntimeException) throwable;
312                }
313                else {
314                        throw new RepeatException("Exception in batch process", throwable);
315                }
316        }
317 
318        /**
319         * Unwraps the throwable if it has been wrapped by
320         * {@link #rethrow(Throwable)}.
321         */
322        private static Throwable unwrapIfRethrown(Throwable throwable) {
323                if (throwable instanceof RepeatException) {
324                        return throwable.getCause();
325                }
326                else {
327                        return throwable;
328                }
329        }
330 
331        /**
332         * Create an internal state object that is used to store data needed
333         * internally in the scope of an iteration. Used by subclasses to manage the
334         * queueing and retrieval of asynchronous results. The default just provides
335         * an accumulation of Throwable instances for processing at the end of the
336         * batch.
337         * 
338         * @param context the current {@link RepeatContext}
339         * @return a {@link RepeatInternalState} instance.
340         * 
341         * @see RepeatTemplate#waitForResults(RepeatInternalState)
342         */
343        protected RepeatInternalState createInternalState(RepeatContext context) {
344                return new RepeatInternalStateSupport();
345        }
346 
347        /**
348         * Get the next completed result, possibly executing several callbacks until
349         * one finally finishes. Normally a subclass would have to override both
350         * this method and {@link #createInternalState(RepeatContext)} because the
351         * implementation of this method would rely on the details of the internal
352         * state.
353         * 
354         * @param context current BatchContext.
355         * @param callback the callback to execute.
356         * @param state maintained by the implementation.
357         * @return a finished result.
358         * 
359         * @see #isComplete(RepeatContext)
360         * @see #createInternalState(RepeatContext)
361         */
362        protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
363                        throws Throwable {
364                update(context);
365                if (logger.isDebugEnabled()) {
366                        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
367                }
368                return callback.doInIteration(context);
369 
370        }
371 
372        /**
373         * If necessary, wait for results to come back from remote or concurrent
374         * processes. By default does nothing and returns true.
375         * 
376         * @param state the internal state.
377         * @return true if {@link #canContinue(RepeatStatus)} is true for all
378         * results retrieved.
379         */
380        protected boolean waitForResults(RepeatInternalState state) {
381                // no-op by default
382                return true;
383        }
384 
385        /**
386         * Check return value from batch operation.
387         * 
388         * @param value the last callback result.
389         * @return true if the value is {@link RepeatStatus#CONTINUABLE}.
390         */
391        protected final boolean canContinue(RepeatStatus value) {
392                return ((RepeatStatus) value).isContinuable();
393        }
394 
395        private boolean isMarkedComplete(RepeatContext context) {
396                boolean complete = context.isCompleteOnly();
397                if (context.getParent() != null) {
398                        complete = complete || isMarkedComplete(context.getParent());
399                }
400                if (complete) {
401                        logger.debug("Repeat is complete according to context alone.");
402                }
403                return complete;
404 
405        }
406 
407        /**
408         * Convenience method to execute after interceptors on a callback result.
409         * 
410         * @param context the current batch context.
411         * @param value the result of the callback to process.
412         */
413        protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
414 
415                // Don't re-throw exceptions here: let the exception handler deal with
416                // that...
417 
418                if (value != null && value.isContinuable()) {
419                        for (int i = listeners.length; i-- > 0;) {
420                                RepeatListener interceptor = listeners[i];
421                                interceptor.after(context, value);
422                        }
423 
424                }
425 
426        }
427 
428        /**
429         * Delegate to the {@link CompletionPolicy}.
430         * 
431         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext,
432         * RepeatStatus)
433         */
434        protected boolean isComplete(RepeatContext context, RepeatStatus result) {
435                boolean complete = completionPolicy.isComplete(context, result);
436                if (complete) {
437                        logger.debug("Repeat is complete according to policy and result value.");
438                }
439                return complete;
440        }
441 
442        /**
443         * Delegate to {@link CompletionPolicy}.
444         * 
445         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext)
446         */
447        protected boolean isComplete(RepeatContext context) {
448                boolean complete = completionPolicy.isComplete(context);
449                if (complete) {
450                        logger.debug("Repeat is complete according to policy alone not including result.");
451                }
452                return complete;
453        }
454 
455        /**
456         * Delegate to the {@link CompletionPolicy}.
457         * 
458         * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
459         */
460        protected RepeatContext start() {
461                RepeatContext parent = RepeatSynchronizationManager.getContext();
462                RepeatContext context = completionPolicy.start(parent);
463                RepeatSynchronizationManager.register(context);
464                logger.debug("Starting repeat context.");
465                return context;
466        }
467 
468        /**
469         * Delegate to the {@link CompletionPolicy}.
470         * 
471         * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext)
472         */
473        protected void update(RepeatContext context) {
474                completionPolicy.update(context);
475        }
476 
477}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov