EMMA Coverage Report (generated Tue May 06 07:28:24 PDT 2008)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [RepeatTemplate.java]

nameclass, %method, %block, %line, %
RepeatTemplate.java100% (1/1)100% (19/19)83%  (458/551)91%  (107/117)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class RepeatTemplate100% (1/1)100% (19/19)83%  (458/551)91%  (107/117)
executeInternal (RepeatCallback): ExitStatus 100% (1/1)72%  (232/322)81%  (40.4/50)
iterate (RepeatCallback): ExitStatus 100% (1/1)89%  (25/28)95%  (8.6/9)
RepeatTemplate (): void 100% (1/1)100% (22/22)100% (5/5)
canContinue (ExitStatus): boolean 100% (1/1)100% (3/3)100% (1/1)
createInternalState (RepeatContext): RepeatInternalState 100% (1/1)100% (4/4)100% (1/1)
executeAfterInterceptors (RepeatContext, ExitStatus): void 100% (1/1)100% (26/26)100% (6/6)
getNextResult (RepeatContext, RepeatCallback, RepeatInternalState): ExitStatus 100% (1/1)100% (10/10)100% (4/4)
isComplete (RepeatContext): boolean 100% (1/1)100% (13/13)100% (4/4)
isComplete (RepeatContext, ExitStatus): boolean 100% (1/1)100% (14/14)100% (4/4)
isMarkedComplete (RepeatContext): boolean 100% (1/1)100% (25/25)100% (6/6)
registerListener (RepeatListener): void 100% (1/1)100% (21/21)100% (4/4)
rethrow (Throwable): void 100% (1/1)100% (18/18)100% (5/5)
setCompletionPolicy (CompletionPolicy): void 100% (1/1)100% (6/6)100% (3/3)
setExceptionHandler (ExceptionHandler): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (RepeatListener []): void 100% (1/1)100% (4/4)100% (2/2)
start (): RepeatContext 100% (1/1)100% (16/16)100% (5/5)
unwrapIfRethrown (Throwable): Throwable 100% (1/1)100% (8/8)100% (3/3)
update (RepeatContext): void 100% (1/1)100% (5/5)100% (2/2)
waitForResults (RepeatInternalState): boolean 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import java.util.ArrayList;
20import java.util.Arrays;
21import java.util.Collection;
22import java.util.List;
23 
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.springframework.batch.repeat.CompletionPolicy;
27import org.springframework.batch.repeat.ExitStatus;
28import org.springframework.batch.repeat.RepeatCallback;
29import org.springframework.batch.repeat.RepeatContext;
30import org.springframework.batch.repeat.RepeatException;
31import org.springframework.batch.repeat.RepeatListener;
32import org.springframework.batch.repeat.RepeatOperations;
33import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
34import org.springframework.batch.repeat.exception.ExceptionHandler;
35import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
36import org.springframework.util.Assert;
37 
38/**
39 * Simple implementation and base class for batch templates implementing
40 * {@link RepeatOperations}. Provides a framework including interceptors and
41 * policies. Subclasses just need to provide a method that gets the next result
42 * and one that waits for all the results to be returned from concurrent
43 * processes or threads.<br/>
44 * 
45 * N.B. the template accumulates thrown exceptions during the iteration, and
46 * they are all processed together when the main loop ends (i.e. finished
47 * processing the items). Clients that do not want to stop execution when an
48 * exception is thrown can use a specific {@link CompletionPolicy} that does not
49 * finish when exceptions are received. This is not the default behaviour.<br/>
50 * 
51 * Clients that want to take some business action when an exception is thrown by
52 * the {@link RepeatCallback} can consider using a custom {@link RepeatListener}
53 * instead of trying to customise the {@link CompletionPolicy}. This is
54 * generally a friendlier interface to implement, and the
55 * {@link RepeatListener#after(RepeatContext, ExitStatus)} method is passed in
56 * the result of the callback, which would be an instance of {@link Throwable}
57 * if the business processing had thrown an exception. If the exception is not
58 * to be propagated to the caller, then a non-default {@link CompletionPolicy}
59 * needs to be provided as well, but that could be off the shelf, with the
60 * business action implemented only in the interceptor.
61 * 
62 * @author Dave Syer
63 * 
64 */
65public class RepeatTemplate implements RepeatOperations {
66 
67        protected Log logger = LogFactory.getLog(getClass());
68 
69        private RepeatListener[] listeners = new RepeatListener[] {};
70 
71        private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
72 
73        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
74 
75        /**
76         * Set the listeners for this template, registering them for callbacks at
77         * appropriate times in the iteration.
78         * 
79         * @param listeners
80         */
81        public void setListeners(RepeatListener[] listeners) {
82                this.listeners = listeners;
83        }
84 
85        /**
86         * Register an additional listener.
87         * 
88         * @param listener
89         */
90        public void registerListener(RepeatListener listener) {
91                List list = new ArrayList(Arrays.asList(listeners));
92                list.add(listener);
93                listeners = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
94        }
95 
96        /**
97         * Setter for exception handler strategy. The exception handler is called at
98         * the end of a batch, after the {@link CompletionPolicy} has determined
99         * that the batch is complete. By default all exceptions are re-thrown.
100         * 
101         * @see ExceptionHandler
102         * @see DefaultExceptionHandler
103         * @see #setCompletionPolicy(CompletionPolicy)
104         * 
105         * @param exceptionHandler the {@link ExceptionHandler} to use.
106         */
107        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
108                this.exceptionHandler = exceptionHandler;
109        }
110 
111        /**
112         * Setter for policy to decide when the batch is complete. The default is to
113         * complete normally when the callback returns a {@link ExitStatus} which is
114         * not marked as continuable, and abnormally when the callback throws an
115         * exception (but the decision to re-throw the exception is deferred to the
116         * {@link ExceptionHandler}).
117         * 
118         * @see #setExceptionHandler(ExceptionHandler)
119         * 
120         * @param terminationPolicy a TerminationPolicy.
121         * @throws IllegalArgumentException if the argument is null
122         */
123        public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
124                Assert.notNull(terminationPolicy);
125                this.completionPolicy = terminationPolicy;
126        }
127 
128        /**
129         * Execute the batch callback until the completion policy decides that we
130         * are finished. Wait for the whole batch to finish before returning even if
131         * the task executor is asynchronous.
132         * 
133         * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback)
134         */
135        public ExitStatus iterate(RepeatCallback callback) {
136 
137                RepeatContext outer = RepeatSynchronizationManager.getContext();
138 
139                ExitStatus result = ExitStatus.CONTINUABLE;
140                try {
141                        // This works with an asynchronous TaskExecutor: the
142                        // interceptors have to wait for the child processes.
143                        result = executeInternal(callback);
144                }
145                finally {
146                        RepeatSynchronizationManager.clear();
147                        if (outer != null) {
148                                RepeatSynchronizationManager.register(outer);
149                        }
150                }
151 
152                return result;
153        }
154 
155        /**
156         * Internal convenience method to loop over interceptors and batch
157         * callbacks.
158         * 
159         * @param callback the callback to process each element of the loop.
160         * 
161         * @return the aggregate of {@link ContinuationPolicy#canContinue(Object)}
162         * for all the results from the callback.
163         * 
164         */
165        private ExitStatus executeInternal(final RepeatCallback callback) {
166 
167                // Reset the termination policy if there is one...
168                RepeatContext context = start();
169 
170                // Make sure if we are already marked complete before we start then no
171                // processing takes place.
172                boolean running = !isMarkedComplete(context);
173 
174                for (int i = 0; i < listeners.length; i++) {
175                        RepeatListener interceptor = listeners[i];
176                        interceptor.open(context);
177                        running = running && !isMarkedComplete(context);
178                        if (!running)
179                                break;
180                }
181 
182                // Return value, default is to allow continued processing.
183                ExitStatus result = ExitStatus.CONTINUABLE;
184 
185                RepeatInternalState state = createInternalState(context);
186                Collection throwables = state.getThrowables();
187 
188                try {
189 
190                        while (running) {
191 
192                                /*
193                                 * Run the before interceptors here, not in the task executor so
194                                 * that they all happen in the same thread - it's easier for
195                                 * tracking batch status, amongst other things.
196                                 */
197                                for (int i = 0; i < listeners.length; i++) {
198                                        RepeatListener interceptor = listeners[i];
199                                        interceptor.before(context);
200                                        // Allow before interceptors to veto the batch by setting
201                                        // flag.
202                                        running = running && !isMarkedComplete(context);
203                                }
204 
205                                // Check that we are still running...
206                                if (running) {
207 
208                                        logger.debug("Batch operation about to start at count=" + context.getStartedCount());
209 
210                                        try {
211 
212                                                result = getNextResult(context, callback, state);
213                                                executeAfterInterceptors(context, result);
214 
215                                        }
216                                        catch (Throwable throwable) {
217 
218                                                // An exception alone is not sufficient grounds for not
219                                                // continuing
220                                                Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
221                                                try {
222 
223                                                        for (int i = listeners.length; i-- > 0;) {
224                                                                RepeatListener interceptor = listeners[i];
225                                                                interceptor.onError(context, unwrappedThrowable);
226                                                                // This is not an error - only log at debug
227                                                                // level.
228                                                                logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")",
229                                                                                unwrappedThrowable);
230                                                        }
231                                                        
232                                                        exceptionHandler.handleException(context, unwrappedThrowable);
233 
234                                                }
235                                                catch (Throwable handled) {
236                                                        throwables.add(handled);
237                                                }
238                                        }
239 
240                                        // N.B. the order may be important here:
241                                        if (isComplete(context, result) || isMarkedComplete(context) || !throwables.isEmpty()) {
242                                                running = false;
243                                        }
244                                }
245 
246                        }
247 
248                        result = result.and(waitForResults(state));
249 
250                        // Explicitly drop any references to internal state...
251                        state = null;
252 
253                }
254                /*
255                 * No need for explicit catch here - if the business processing threw an
256                 * exception it was already handled by the helper methods. An exception
257                 * here is necessarily fatal.
258                 */
259                finally {
260 
261                        try {
262 
263                                if (!throwables.isEmpty()) {
264                                        rethrow((Throwable) throwables.iterator().next());
265                                }
266 
267                        }
268                        finally {
269 
270                                try {
271                                        for (int i = listeners.length; i-- > 0;) {
272                                                RepeatListener interceptor = listeners[i];
273                                                interceptor.close(context);
274                                        }
275                                }
276                                finally {
277                                        // TODO: extend this to the completion policy?
278                                        context.close();
279                                }
280 
281                        }
282 
283                }
284 
285                return result;
286 
287        }
288 
289        /**
290         * Re-throws the original throwable if it is unchecked, wraps checked
291         * exceptions into {@link RepeatException}.
292         */
293        private static void rethrow(Throwable throwable) throws RuntimeException {
294                if (throwable instanceof Error) {
295                        throw (Error) throwable;
296                }
297                else if (throwable instanceof RuntimeException) {
298                        throw (RuntimeException) throwable;
299                }
300                else {
301                        throw new RepeatException("Exception in batch process", throwable);
302                }
303        }
304        
305        /**
306         * Unwraps the throwable if it has been wrapped by {@link #rethrow(Throwable)}.
307         */
308        private static Throwable unwrapIfRethrown(Throwable throwable) {
309                if (throwable instanceof RepeatException) {
310                        return throwable.getCause();
311                }
312                else {
313                        return throwable;
314                }
315        }
316 
317        /**
318         * Create an internal state object that is used to store data needed
319         * internally in the scope of an iteration. Used by subclasses to manage the
320         * queueing and retrieval of asynchronous results. The default just provides
321         * an accumulation of Throwable instances for processing at the end of the
322         * batch.
323         * 
324         * @param context the current {@link RepeatContext}
325         * @return a {@link RepeatInternalState} instance.
326         */
327        protected RepeatInternalState createInternalState(RepeatContext context) {
328                return new RepeatInternalStateSupport();
329        }
330 
331        /**
332         * Get the next completed result, possibly executing several callbacks until
333         * one finally finishes.
334         * 
335         * @param context current BatchContext.
336         * @param callback the callback to execute.
337         * @param state maintained by the implementation.
338         * @return a finished result.
339         * 
340         * @see #isComplete(RepeatContext)
341         */
342        protected ExitStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
343                        throws Throwable {
344                try {
345                        update(context);
346                        return callback.doInIteration(context);
347                }
348                catch (Throwable t) {
349                        throw t;
350                }
351        }
352 
353        /**
354         * If necessary, wait for results to come back from remote or concurrent
355         * processes. By default does nothing and returns true.
356         * 
357         * @param state the internal state.
358         * @return true if {@link #canContinue(ExitStatus)} is true for all results
359         * retrieved.
360         */
361        protected boolean waitForResults(RepeatInternalState state) {
362                // no-op by default
363                return true;
364        }
365 
366        /**
367         * Check return value from batch operation.
368         * 
369         * @param value the last callback result.
370         * @return true if the value is {@link ExitStatus#CONTINUABLE}.
371         */
372        protected final boolean canContinue(ExitStatus value) {
373                return ((ExitStatus) value).isContinuable();
374        }
375 
376        private boolean isMarkedComplete(RepeatContext context) {
377                boolean complete = context.isCompleteOnly();
378                if (context.getParent() != null) {
379                        complete = complete || isMarkedComplete(context.getParent());
380                }
381                if (complete) {
382                        logger.debug("Batch is complete according to context alone.");
383                }
384                return complete;
385 
386        }
387 
388        /**
389         * Convenience method to execute after interceptors on a callback result.
390         * 
391         * @param context the current batch context.
392         * @param value the result of the callback to process.
393         */
394        protected void executeAfterInterceptors(final RepeatContext context, ExitStatus value) {
395 
396                // Don't re-throw exceptions here: let the exception handler deal with
397                // that...
398 
399                if (value != null && value.isContinuable()) {
400                        for (int i = listeners.length; i-- > 0;) {
401                                RepeatListener interceptor = listeners[i];
402                                interceptor.after(context, value);
403                        }
404 
405                }
406 
407        }
408 
409        /**
410         * Delegate to the {@link CompletionPolicy}.
411         * 
412         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext,
413         * ExitStatus)
414         */
415        protected boolean isComplete(RepeatContext context, ExitStatus result) {
416                boolean complete = completionPolicy.isComplete(context, result);
417                if (complete) {
418                        logger.debug("Batch is complete according to policy and result value.");
419                }
420                return complete;
421        }
422 
423        /**
424         * Delegate to  {@link CompletionPolicy}.
425         * 
426         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext)
427         */
428        protected boolean isComplete(RepeatContext context) {
429                boolean complete = completionPolicy.isComplete(context);
430                if (complete) {
431                        logger.debug("Batch is complete according to policy alone not including result.");
432                }
433                return complete;
434        }
435 
436        /**
437         * Delegate to the {@link CompletionPolicy}.
438         * 
439         * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
440         */
441        protected RepeatContext start() {
442                RepeatContext parent = RepeatSynchronizationManager.getContext();
443                RepeatContext context = completionPolicy.start(parent);
444                RepeatSynchronizationManager.register(context);
445                logger.debug("Starting batch step.");
446                return context;
447        }
448 
449        /**
450         * Delegate to the {@link CompletionPolicy}.
451         * 
452         * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext)
453         */
454        protected void update(RepeatContext context) {
455                completionPolicy.update(context);
456        }
457 
458}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov