EMMA Coverage Report (generated Tue May 06 07:28:24 PDT 2008)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [TaskExecutorRepeatTemplate.java]

nameclass, %method, %block, %line, %
TaskExecutorRepeatTemplate.java100% (2/2)100% (12/12)96%  (161/168)99%  (49.5/50)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TaskExecutorRepeatTemplate$ExecutingRunnable100% (1/1)100% (6/6)87%  (47/54)97%  (15.5/16)
run (): void 100% (1/1)76%  (22/29)92%  (5.5/6)
TaskExecutorRepeatTemplate$ExecutingRunnable (RepeatCallback, RepeatContext, ... 100% (1/1)100% (12/12)100% (5/5)
expect (): void 100% (1/1)100% (4/4)100% (2/2)
getContext (): RepeatContext 100% (1/1)100% (3/3)100% (1/1)
getError (): Throwable 100% (1/1)100% (3/3)100% (1/1)
getResult (): ExitStatus 100% (1/1)100% (3/3)100% (1/1)
     
class TaskExecutorRepeatTemplate100% (1/1)100% (6/6)100% (114/114)100% (34/34)
TaskExecutorRepeatTemplate (): void 100% (1/1)100% (11/11)100% (4/4)
createInternalState (RepeatContext): RepeatInternalState 100% (1/1)100% (7/7)100% (1/1)
getNextResult (RepeatContext, RepeatCallback, RepeatInternalState): ExitStatus 100% (1/1)100% (40/40)100% (11/11)
setTaskExecutor (TaskExecutor): void 100% (1/1)100% (6/6)100% (3/3)
setThrottleLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
waitForResults (RepeatInternalState): boolean 100% (1/1)100% (46/46)100% (13/13)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import org.springframework.batch.repeat.ExitStatus;
20import org.springframework.batch.repeat.RepeatCallback;
21import org.springframework.batch.repeat.RepeatContext;
22import org.springframework.batch.repeat.RepeatOperations;
23import org.springframework.core.task.SyncTaskExecutor;
24import org.springframework.core.task.TaskExecutor;
25import org.springframework.util.Assert;
26 
27/**
28 * Provides {@link RepeatOperations} support including interceptors that can be
29 * used to modify or monitor the behaviour at run time.<br/>
30 * 
31 * This implementation is sufficient to be used to configure transactional
32 * behaviour for each item by making the {@link RepeatCallback} transactional,
33 * or for the whole batch by making the execute method transactional (but only
34 * then if the task executor is synchronous).<br/>
35 * 
36 * This class is thread safe if its collaborators are thread safe (interceptors,
37 * terminationPolicy, callback). Normally this will be the case, but clients
38 * need to be aware that if the task executor is asynchronous, then the other
39 * collaborators should be also. In particular the {@link RepeatCallback} that
40 * is wrapped in the execute method must be thread safe - often it is based on
41 * some form of data source, which itself should be both thread safe and
42 * transactional (multiple threads could be accessing it at any given time, and
43 * each thread would have its own transaction).<br/>
44 * 
45 * @author Dave Syer
46 * 
47 */
48public class TaskExecutorRepeatTemplate extends RepeatTemplate {
49 
50        /**
51         * Default limit for maximum number of concurrent unfinished results allowed
52         * by the template.
53         * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}.
54         */
55        public static final int DEFAULT_THROTTLE_LIMIT = 4;
56 
57        private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
58 
59        private TaskExecutor taskExecutor = new SyncTaskExecutor();
60 
61        /**
62         * Setter for task executor to be used to run the individual item callbacks.
63         * 
64         * @param taskExecutor a TaskExecutor
65         * @throws IllegalArgumentException if the argument is null
66         */
67        public void setTaskExecutor(TaskExecutor taskExecutor) {
68                Assert.notNull(taskExecutor);
69                this.taskExecutor = taskExecutor;
70        }
71 
72        /**
73         * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The internal state in
74         * this case is a queue of unfinished result holders of type
75         * {@link ResultHolder}. The holder with the return value should not be on
76         * the queue when this method exits. The queue is scoped in the calling
77         * method so there is no need to synchronize access.
78         * 
79         */
80        protected ExitStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
81                        throws Throwable {
82 
83                ExecutingRunnable runnable = null;
84 
85                ResultQueue queue = (ResultQueue) state;
86 
87                do {
88 
89                        /*
90                         * Wrap the callback in a runnable that will add its result to the
91                         * queue when it is ready.
92                         */
93                        runnable = new ExecutingRunnable(callback, context, queue);
94 
95                        /**
96                         * Tell the runnable that it can expect a result. This could have
97                         * been in-lined with the constructor, but it might block, so it's
98                         * better to do it here, since we have the option (it's a private
99                         * class).
100                         */
101                        runnable.expect();
102 
103                        /*
104                         * Start the task possibly concurrently / in the future.
105                         */
106                        taskExecutor.execute(runnable);
107 
108                        /*
109                         * Allow termination policy to update its state. This must happen
110                         * immediately before or after the call to the task executor.
111                         */
112                        update(context);
113 
114                        /*
115                         * Keep going until we get a result that is finished, or early
116                         * termination...
117                         */
118                } while (queue.isEmpty() && !isComplete(context));
119 
120                /*
121                 * N.B. If the queue is empty then take() blocks until a result appears,
122                 * and there must be at least one because we just submitted one to teh
123                 * task executor.
124                 */
125                ResultHolder result = queue.take();
126                if (result.getError() != null) {
127                        throw result.getError();
128                }
129                return result.getResult();
130        }
131 
132        /**
133         * Wait for all the results to appear on the queue and execute the after
134         * interceptors for each one.
135         * 
136         * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
137         */
138        protected boolean waitForResults(RepeatInternalState state) {
139 
140                ResultQueue queue = (ResultQueue) state;
141 
142                boolean result = true;
143 
144                while (queue.isExpecting()) {
145 
146                        /*
147                         * Careful that no runnables that are not going to finish ever get
148                         * onto the queue, else this may block forever.
149                         */
150                        ResultHolder future = (ResultHolder) queue.take();
151 
152                        if (future.getError() != null) {
153                                state.getThrowables().add(future.getError());
154                        }
155                        else {
156                                ExitStatus status = future.getResult();
157                                result = result && canContinue(status);
158                                executeAfterInterceptors(future.getContext(), status);
159                        }
160 
161                }
162 
163                Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
164 
165                return result;
166        }
167 
168        protected RepeatInternalState createInternalState(RepeatContext context) {
169                // Queue of pending results:
170                return new ResultQueueFactory().getResultQueue(throttleLimit);
171        }
172 
173        /**
174         * A runnable that puts its result on a queue when it is done.
175         * 
176         * @author Dave Syer
177         * 
178         */
179        private static class ExecutingRunnable implements Runnable, ResultHolder {
180                private RepeatCallback callback;
181 
182                private RepeatContext context;
183 
184                private ResultQueue queue;
185 
186                private ExitStatus result;
187 
188                private Throwable error;
189 
190                public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue queue) {
191 
192                        super();
193 
194                        this.callback = callback;
195                        this.context = context;
196                        this.queue = queue;
197 
198                }
199 
200                /**
201                 * Tell the queue to expect a result.
202                 */
203                public void expect() {
204                        queue.expect();
205                }
206 
207                /**
208                 * Execute the batch callback, and store the result, or any exception
209                 * that is thrown for retrieval later by caller.
210                 * 
211                 * @see java.lang.Runnable#run()
212                 */
213                public void run() {
214                        try {
215                                result = callback.doInIteration(context);
216                        }
217                        catch (Exception e) {
218                                error = e;
219                        }
220                        finally {
221                                queue.put(this);
222                        }
223                }
224 
225                /**
226                 * Get the result - never blocks because the queue manages waiting for
227                 * the task to finish.
228                 */
229                public ExitStatus getResult() {
230                        return result;
231                }
232 
233                /**
234                 * Get the error - never blocks because the queue manages waiting for
235                 * the task to finish.
236                 */
237                public Throwable getError() {
238                        return error;
239                }
240 
241                /**
242                 * Getter for the context.
243                 */
244                public RepeatContext getContext() {
245                        return this.context;
246                }
247 
248        }
249 
250        /**
251         * Public setter for the throttle limit. The throttle limit is the largest
252         * number of concurrent tasks that can be executing at one time - if a new
253         * task arrives and the throttle limit is breached we wait for one of the
254         * executing tasks to finish before submitting the new one to the
255         * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}.
256         * N.B. when used with a thread pooled {@link TaskExecutor} it doesn't make
257         * sense for the throttle limit to be less than the thread pool size.
258         * 
259         * @param throttleLimit the throttleLimit to set.
260         */
261        public void setThrottleLimit(int throttleLimit) {
262                this.throttleLimit = throttleLimit;
263        }
264 
265}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov