EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [TaskExecutorRepeatTemplate.java]

nameclass, %method, %block, %line, %
TaskExecutorRepeatTemplate.java100% (3/3)100% (14/14)92%  (217/237)91%  (66.4/73)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TaskExecutorRepeatTemplate$ExecutingRunnable100% (1/1)100% (6/6)88%  (84/96)88%  (27.4/31)
expect (): void 100% (1/1)38%  (5/13)50%  (3/6)
run (): void 100% (1/1)93%  (55/59)96%  (15.4/16)
TaskExecutorRepeatTemplate$ExecutingRunnable (TaskExecutorRepeatTemplate, Rep... 100% (1/1)100% (15/15)100% (6/6)
getContext (): RepeatContext 100% (1/1)100% (3/3)100% (1/1)
getError (): Throwable 100% (1/1)100% (3/3)100% (1/1)
getResult (): RepeatStatus 100% (1/1)100% (3/3)100% (1/1)
     
class TaskExecutorRepeatTemplate100% (1/1)100% (6/6)94%  (121/129)92%  (35/38)
waitForResults (RepeatInternalState): boolean 100% (1/1)86%  (51/59)82%  (14/17)
TaskExecutorRepeatTemplate (): void 100% (1/1)100% (11/11)100% (4/4)
createInternalState (RepeatContext): RepeatInternalState 100% (1/1)100% (6/6)100% (1/1)
getNextResult (RepeatContext, RepeatCallback, RepeatInternalState): RepeatStatus 100% (1/1)100% (43/43)100% (11/11)
setTaskExecutor (TaskExecutor): void 100% (1/1)100% (6/6)100% (3/3)
setThrottleLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
     
class TaskExecutorRepeatTemplate$ResultQueueInternalState100% (1/1)100% (2/2)100% (12/12)100% (4/4)
TaskExecutorRepeatTemplate$ResultQueueInternalState (int): void 100% (1/1)100% (9/9)100% (3/3)
getResultQueue (): ResultQueue 100% (1/1)100% (3/3)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import org.springframework.batch.repeat.RepeatCallback;
20import org.springframework.batch.repeat.RepeatContext;
21import org.springframework.batch.repeat.RepeatException;
22import org.springframework.batch.repeat.RepeatOperations;
23import org.springframework.batch.repeat.RepeatStatus;
24import org.springframework.core.task.SyncTaskExecutor;
25import org.springframework.core.task.TaskExecutor;
26import org.springframework.util.Assert;
27 
28/**
29 * Provides {@link RepeatOperations} support including interceptors that can be
30 * used to modify or monitor the behaviour at run time.<br/>
31 * 
32 * This implementation is sufficient to be used to configure transactional
33 * behaviour for each item by making the {@link RepeatCallback} transactional,
34 * or for the whole batch by making the execute method transactional (but only
35 * then if the task executor is synchronous).<br/>
36 * 
37 * This class is thread safe if its collaborators are thread safe (interceptors,
38 * terminationPolicy, callback). Normally this will be the case, but clients
39 * need to be aware that if the task executor is asynchronous, then the other
40 * collaborators should be also. In particular the {@link RepeatCallback} that
41 * is wrapped in the execute method must be thread safe - often it is based on
42 * some form of data source, which itself should be both thread safe and
43 * transactional (multiple threads could be accessing it at any given time, and
44 * each thread would have its own transaction).<br/>
45 * 
46 * @author Dave Syer
47 * 
48 */
49public class TaskExecutorRepeatTemplate extends RepeatTemplate {
50 
51        /**
52         * Default limit for maximum number of concurrent unfinished results allowed
53         * by the template.
54         * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}
55         * .
56         */
57        public static final int DEFAULT_THROTTLE_LIMIT = 4;
58 
59        private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
60 
61        private TaskExecutor taskExecutor = new SyncTaskExecutor();
62 
63        /**
64         * Public setter for the throttle limit. The throttle limit is the largest
65         * number of concurrent tasks that can be executing at one time - if a new
66         * task arrives and the throttle limit is breached we wait for one of the
67         * executing tasks to finish before submitting the new one to the
68         * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}.
69         * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool
70         * might prevent the throttle limit actually being reached (so make the core
71         * pool size larger than the throttle limit if possible).
72         * 
73         * @param throttleLimit the throttleLimit to set.
74         */
75        public void setThrottleLimit(int throttleLimit) {
76                this.throttleLimit = throttleLimit;
77        }
78 
79        /**
80         * Setter for task executor to be used to run the individual item callbacks.
81         * 
82         * @param taskExecutor a TaskExecutor
83         * @throws IllegalArgumentException if the argument is null
84         */
85        public void setTaskExecutor(TaskExecutor taskExecutor) {
86                Assert.notNull(taskExecutor);
87                this.taskExecutor = taskExecutor;
88        }
89 
90        /**
91         * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The
92         * internal state in this case is a queue of unfinished result holders of
93         * type {@link ResultHolder}. The holder with the return value should not be
94         * on the queue when this method exits. The queue is scoped in the calling
95         * method so there is no need to synchronize access.
96         * 
97         */
98        protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
99                        throws Throwable {
100 
101                ExecutingRunnable runnable = null;
102 
103                ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
104 
105                do {
106 
107                        /*
108                         * Wrap the callback in a runnable that will add its result to the
109                         * queue when it is ready.
110                         */
111                        runnable = new ExecutingRunnable(callback, context, queue);
112 
113                        /**
114                         * Tell the runnable that it can expect a result. This could have
115                         * been in-lined with the constructor, but it might block, so it's
116                         * better to do it here, since we have the option (it's a private
117                         * class).
118                         */
119                        runnable.expect();
120 
121                        /*
122                         * Start the task possibly concurrently / in the future.
123                         */
124                        taskExecutor.execute(runnable);
125 
126                        /*
127                         * Allow termination policy to update its state. This must happen
128                         * immediately before or after the call to the task executor.
129                         */
130                        update(context);
131 
132                        /*
133                         * Keep going until we get a result that is finished, or early
134                         * termination...
135                         */
136                } while (queue.isEmpty() && !isComplete(context));
137 
138                /*
139                 * N.B. If the queue is empty then take() blocks until a result appears,
140                 * and there must be at least one because we just submitted one to the
141                 * task executor.
142                 */
143                ResultHolder result = queue.take();
144                if (result.getError() != null) {
145                        throw result.getError();
146                }
147                return result.getResult();
148        }
149 
150        /**
151         * Wait for all the results to appear on the queue and execute the after
152         * interceptors for each one.
153         * 
154         * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
155         */
156        protected boolean waitForResults(RepeatInternalState state) {
157 
158                ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
159 
160                boolean result = true;
161 
162                while (queue.isExpecting()) {
163 
164                        /*
165                         * Careful that no runnables that are not going to finish ever get
166                         * onto the queue, else this may block forever.
167                         */
168                        ResultHolder future;
169                        try {
170                                future = queue.take();
171                        }
172                        catch (InterruptedException e) {
173                                Thread.currentThread().interrupt();
174                                throw new RepeatException("InterruptedException while waiting for result.");
175                        }
176 
177                        if (future.getError() != null) {
178                                state.getThrowables().add(future.getError());
179                                result = false;
180                        }
181                        else {
182                                RepeatStatus status = future.getResult();
183                                result = result && canContinue(status);
184                                executeAfterInterceptors(future.getContext(), status);
185                        }
186 
187                }
188 
189                Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
190 
191                return result;
192        }
193 
194        protected RepeatInternalState createInternalState(RepeatContext context) {
195                // Queue of pending results:
196                return new ResultQueueInternalState(throttleLimit);
197        }
198 
199        /**
200         * A runnable that puts its result on a queue when it is done.
201         * 
202         * @author Dave Syer
203         * 
204         */
205        private class ExecutingRunnable implements Runnable, ResultHolder {
206 
207                private final RepeatCallback callback;
208 
209                private final RepeatContext context;
210 
211                private final ResultQueue<ResultHolder> queue;
212 
213                private volatile RepeatStatus result;
214 
215                private volatile Throwable error;
216 
217                public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
218 
219                        super();
220 
221                        this.callback = callback;
222                        this.context = context;
223                        this.queue = queue;
224 
225                }
226 
227                /**
228                 * Tell the queue to expect a result.
229                 */
230                public void expect() {
231                        try {
232                                queue.expect();
233                        }
234                        catch (InterruptedException e) {
235                                Thread.currentThread().interrupt();
236                                throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
237                        }
238                }
239 
240                /**
241                 * Execute the batch callback, and store the result, or any exception
242                 * that is thrown for retrieval later by caller.
243                 * 
244                 * @see java.lang.Runnable#run()
245                 */
246                public void run() {
247                        boolean clearContext = false;
248                        try {
249                                if (RepeatSynchronizationManager.getContext() == null) {
250                                        clearContext = true;
251                                        RepeatSynchronizationManager.register(context);
252                                }
253 
254                                if (logger.isDebugEnabled()) {
255                                        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
256                                }
257 
258                                result = callback.doInIteration(context);
259 
260                        }
261                        catch (Exception e) {
262                                error = e;
263                        }
264                        finally {
265 
266                                if (clearContext) {
267                                        RepeatSynchronizationManager.clear();
268                                }
269 
270                                queue.put(this);
271 
272                        }
273                }
274 
275                /**
276                 * Get the result - never blocks because the queue manages waiting for
277                 * the task to finish.
278                 */
279                public RepeatStatus getResult() {
280                        return result;
281                }
282 
283                /**
284                 * Get the error - never blocks because the queue manages waiting for
285                 * the task to finish.
286                 */
287                public Throwable getError() {
288                        return error;
289                }
290 
291                /**
292                 * Getter for the context.
293                 */
294                public RepeatContext getContext() {
295                        return this.context;
296                }
297 
298        }
299 
300        /**
301         * @author Dave Syer
302         * 
303         */
304        private static class ResultQueueInternalState extends RepeatInternalStateSupport {
305 
306                private final ResultQueue<ResultHolder> results;
307 
308                /**
309                 * @param throttleLimit the throttle limit for the result queue
310                 */
311                public ResultQueueInternalState(int throttleLimit) {
312                        super();
313                        this.results = new ResultHolderResultQueue(throttleLimit);
314                }
315 
316                /**
317                 * @return the result queue
318                 */
319                public ResultQueue<ResultHolder> getResultQueue() {
320                        return results;
321                }
322 
323        }
324 
325}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov