EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [TaskExecutorRepeatTemplate.java]

nameclass, %method, %block, %line, %
TaskExecutorRepeatTemplate.java100% (3/3)100% (14/14)83%  (207/248)89%  (63.1/71)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TaskExecutorRepeatTemplate$ExecutingRunnable100% (1/1)100% (6/6)69%  (74/107)83%  (24.1/29)
expect (): void 100% (1/1)38%  (5/13)50%  (3/6)
run (): void 100% (1/1)64%  (45/70)86%  (12.1/14)
TaskExecutorRepeatTemplate$ExecutingRunnable (TaskExecutorRepeatTemplate, Rep... 100% (1/1)100% (15/15)100% (6/6)
getContext (): RepeatContext 100% (1/1)100% (3/3)100% (1/1)
getError (): Throwable 100% (1/1)100% (3/3)100% (1/1)
getResult (): RepeatStatus 100% (1/1)100% (3/3)100% (1/1)
     
class TaskExecutorRepeatTemplate100% (1/1)100% (6/6)94%  (121/129)92%  (35/38)
waitForResults (RepeatInternalState): boolean 100% (1/1)86%  (51/59)82%  (14/17)
TaskExecutorRepeatTemplate (): void 100% (1/1)100% (11/11)100% (4/4)
createInternalState (RepeatContext): RepeatInternalState 100% (1/1)100% (6/6)100% (1/1)
getNextResult (RepeatContext, RepeatCallback, RepeatInternalState): RepeatStatus 100% (1/1)100% (43/43)100% (11/11)
setTaskExecutor (TaskExecutor): void 100% (1/1)100% (6/6)100% (3/3)
setThrottleLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
     
class TaskExecutorRepeatTemplate$ResultQueueInternalState100% (1/1)100% (2/2)100% (12/12)100% (4/4)
TaskExecutorRepeatTemplate$ResultQueueInternalState (int): void 100% (1/1)100% (9/9)100% (3/3)
getResultQueue (): ResultQueue 100% (1/1)100% (3/3)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import org.springframework.batch.repeat.RepeatCallback;
20import org.springframework.batch.repeat.RepeatContext;
21import org.springframework.batch.repeat.RepeatException;
22import org.springframework.batch.repeat.RepeatOperations;
23import org.springframework.batch.repeat.RepeatStatus;
24import org.springframework.core.task.SyncTaskExecutor;
25import org.springframework.core.task.TaskExecutor;
26import org.springframework.util.Assert;
27 
28/**
29 * Provides {@link RepeatOperations} support including interceptors that can be
30 * used to modify or monitor the behaviour at run time.<br/>
31 * 
32 * This implementation is sufficient to be used to configure transactional
33 * behaviour for each item by making the {@link RepeatCallback} transactional,
34 * or for the whole batch by making the execute method transactional (but only
35 * then if the task executor is synchronous).<br/>
36 * 
37 * This class is thread safe if its collaborators are thread safe (interceptors,
38 * terminationPolicy, callback). Normally this will be the case, but clients
39 * need to be aware that if the task executor is asynchronous, then the other
40 * collaborators should be also. In particular the {@link RepeatCallback} that
41 * is wrapped in the execute method must be thread safe - often it is based on
42 * some form of data source, which itself should be both thread safe and
43 * transactional (multiple threads could be accessing it at any given time, and
44 * each thread would have its own transaction).<br/>
45 * 
46 * @author Dave Syer
47 * 
48 */
49public class TaskExecutorRepeatTemplate extends RepeatTemplate {
50 
51        /**
52         * Default limit for maximum number of concurrent unfinished results allowed
53         * by the template.
54         * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}
55         * .
56         */
57        public static final int DEFAULT_THROTTLE_LIMIT = 4;
58 
59        private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
60 
61        private TaskExecutor taskExecutor = new SyncTaskExecutor();
62 
63        /**
64         * Public setter for the throttle limit. The throttle limit is the largest
65         * number of concurrent tasks that can be executing at one time - if a new
66         * task arrives and the throttle limit is breached we wait for one of the
67         * executing tasks to finish before submitting the new one to the
68         * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}.
69         * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool
70         * might prevent the throttle limit actually being reached (so make the core
71         * pool size larger than the throttle limit if possible).
72         * 
73         * @param throttleLimit the throttleLimit to set.
74         */
75        public void setThrottleLimit(int throttleLimit) {
76                this.throttleLimit = throttleLimit;
77        }
78 
79        /**
80         * Setter for task executor to be used to run the individual item callbacks.
81         * 
82         * @param taskExecutor a TaskExecutor
83         * @throws IllegalArgumentException if the argument is null
84         */
85        public void setTaskExecutor(TaskExecutor taskExecutor) {
86                Assert.notNull(taskExecutor);
87                this.taskExecutor = taskExecutor;
88        }
89 
90        /**
91         * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The
92         * internal state in this case is a queue of unfinished result holders of
93         * type {@link ResultHolder}. The holder with the return value should not be
94         * on the queue when this method exits. The queue is scoped in the calling
95         * method so there is no need to synchronize access.
96         * 
97         */
98    @Override
99        protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
100                        throws Throwable {
101 
102                ExecutingRunnable runnable = null;
103 
104                ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
105 
106                do {
107 
108                        /*
109                         * Wrap the callback in a runnable that will add its result to the
110                         * queue when it is ready.
111                         */
112                        runnable = new ExecutingRunnable(callback, context, queue);
113 
114                        /**
115                         * Tell the runnable that it can expect a result. This could have
116                         * been in-lined with the constructor, but it might block, so it's
117                         * better to do it here, since we have the option (it's a private
118                         * class).
119                         */
120                        runnable.expect();
121 
122                        /*
123                         * Start the task possibly concurrently / in the future.
124                         */
125                        taskExecutor.execute(runnable);
126 
127                        /*
128                         * Allow termination policy to update its state. This must happen
129                         * immediately before or after the call to the task executor.
130                         */
131                        update(context);
132 
133                        /*
134                         * Keep going until we get a result that is finished, or early
135                         * termination...
136                         */
137                } while (queue.isEmpty() && !isComplete(context));
138 
139                /*
140                 * N.B. If the queue is empty then take() blocks until a result appears,
141                 * and there must be at least one because we just submitted one to the
142                 * task executor.
143                 */
144                ResultHolder result = queue.take();
145                if (result.getError() != null) {
146                        throw result.getError();
147                }
148                return result.getResult();
149        }
150 
151        /**
152         * Wait for all the results to appear on the queue and execute the after
153         * interceptors for each one.
154         * 
155         * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
156         */
157    @Override
158        protected boolean waitForResults(RepeatInternalState state) {
159 
160                ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
161 
162                boolean result = true;
163 
164                while (queue.isExpecting()) {
165 
166                        /*
167                         * Careful that no runnables that are not going to finish ever get
168                         * onto the queue, else this may block forever.
169                         */
170                        ResultHolder future;
171                        try {
172                                future = queue.take();
173                        }
174                        catch (InterruptedException e) {
175                                Thread.currentThread().interrupt();
176                                throw new RepeatException("InterruptedException while waiting for result.");
177                        }
178 
179                        if (future.getError() != null) {
180                                state.getThrowables().add(future.getError());
181                                result = false;
182                        }
183                        else {
184                                RepeatStatus status = future.getResult();
185                                result = result && canContinue(status);
186                                executeAfterInterceptors(future.getContext(), status);
187                        }
188 
189                }
190 
191                Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
192 
193                return result;
194        }
195 
196    @Override
197        protected RepeatInternalState createInternalState(RepeatContext context) {
198                // Queue of pending results:
199                return new ResultQueueInternalState(throttleLimit);
200        }
201 
202        /**
203         * A runnable that puts its result on a queue when it is done.
204         * 
205         * @author Dave Syer
206         * 
207         */
208        private class ExecutingRunnable implements Runnable, ResultHolder {
209 
210                private final RepeatCallback callback;
211 
212                private final RepeatContext context;
213 
214                private final ResultQueue<ResultHolder> queue;
215 
216                private volatile RepeatStatus result;
217 
218                private volatile Throwable error;
219 
220                public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
221 
222                        super();
223 
224                        this.callback = callback;
225                        this.context = context;
226                        this.queue = queue;
227 
228                }
229 
230                /**
231                 * Tell the queue to expect a result.
232                 */
233                public void expect() {
234                        try {
235                                queue.expect();
236                        }
237                        catch (InterruptedException e) {
238                                Thread.currentThread().interrupt();
239                                throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
240                        }
241                }
242 
243                /**
244                 * Execute the batch callback, and store the result, or any exception
245                 * that is thrown for retrieval later by caller.
246                 * 
247                 * @see java.lang.Runnable#run()
248                 */
249        @Override
250                public void run() {
251                        boolean clearContext = false;
252                        try {
253                                if (RepeatSynchronizationManager.getContext() == null) {
254                                        clearContext = true;
255                                        RepeatSynchronizationManager.register(context);
256                                }
257 
258                                if (logger.isDebugEnabled()) {
259                                        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
260                                }
261 
262                                result = callback.doInIteration(context);
263 
264                        }
265                        catch (Throwable e) {
266                                error = e;
267                        }
268                        finally {
269 
270                                if (clearContext) {
271                                        RepeatSynchronizationManager.clear();
272                                }
273 
274                                queue.put(this);
275 
276                        }
277                }
278 
279                /**
280                 * Get the result - never blocks because the queue manages waiting for
281                 * the task to finish.
282                 */
283        @Override
284                public RepeatStatus getResult() {
285                        return result;
286                }
287 
288                /**
289                 * Get the error - never blocks because the queue manages waiting for
290                 * the task to finish.
291                 */
292        @Override
293                public Throwable getError() {
294                        return error;
295                }
296 
297                /**
298                 * Getter for the context.
299                 */
300        @Override
301                public RepeatContext getContext() {
302                        return this.context;
303                }
304 
305        }
306 
307        /**
308         * @author Dave Syer
309         * 
310         */
311        private static class ResultQueueInternalState extends RepeatInternalStateSupport {
312 
313                private final ResultQueue<ResultHolder> results;
314 
315                /**
316                 * @param throttleLimit the throttle limit for the result queue
317                 */
318                public ResultQueueInternalState(int throttleLimit) {
319                        super();
320                        this.results = new ResultHolderResultQueue(throttleLimit);
321                }
322 
323                /**
324                 * @return the result queue
325                 */
326                public ResultQueue<ResultHolder> getResultQueue() {
327                        return results;
328                }
329 
330        }
331 
332}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov