1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.repeat.support; |
18 | |
19 | import org.springframework.batch.repeat.RepeatCallback; |
20 | import org.springframework.batch.repeat.RepeatContext; |
21 | import org.springframework.batch.repeat.RepeatException; |
22 | import org.springframework.batch.repeat.RepeatOperations; |
23 | import org.springframework.batch.repeat.RepeatStatus; |
24 | import org.springframework.core.task.SyncTaskExecutor; |
25 | import org.springframework.core.task.TaskExecutor; |
26 | import org.springframework.util.Assert; |
27 | |
28 | /** |
29 | * Provides {@link RepeatOperations} support including interceptors that can be |
30 | * used to modify or monitor the behaviour at run time.<br/> |
31 | * |
32 | * This implementation is sufficient to be used to configure transactional |
33 | * behaviour for each item by making the {@link RepeatCallback} transactional, |
34 | * or for the whole batch by making the execute method transactional (but only |
35 | * then if the task executor is synchronous).<br/> |
36 | * |
37 | * This class is thread safe if its collaborators are thread safe (interceptors, |
38 | * terminationPolicy, callback). Normally this will be the case, but clients |
39 | * need to be aware that if the task executor is asynchronous, then the other |
40 | * collaborators should be also. In particular the {@link RepeatCallback} that |
41 | * is wrapped in the execute method must be thread safe - often it is based on |
42 | * some form of data source, which itself should be both thread safe and |
43 | * transactional (multiple threads could be accessing it at any given time, and |
44 | * each thread would have its own transaction).<br/> |
45 | * |
46 | * @author Dave Syer |
47 | * |
48 | */ |
49 | public class TaskExecutorRepeatTemplate extends RepeatTemplate { |
50 | |
51 | /** |
52 | * Default limit for maximum number of concurrent unfinished results allowed |
53 | * by the template. |
54 | * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)} |
55 | * . |
56 | */ |
57 | public static final int DEFAULT_THROTTLE_LIMIT = 4; |
58 | |
59 | private int throttleLimit = DEFAULT_THROTTLE_LIMIT; |
60 | |
61 | private TaskExecutor taskExecutor = new SyncTaskExecutor(); |
62 | |
63 | /** |
64 | * Public setter for the throttle limit. The throttle limit is the largest |
65 | * number of concurrent tasks that can be executing at one time - if a new |
66 | * task arrives and the throttle limit is breached we wait for one of the |
67 | * executing tasks to finish before submitting the new one to the |
68 | * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}. |
69 | * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool |
70 | * might prevent the throttle limit actually being reached (so make the core |
71 | * pool size larger than the throttle limit if possible). |
72 | * |
73 | * @param throttleLimit the throttleLimit to set. |
74 | */ |
75 | public void setThrottleLimit(int throttleLimit) { |
76 | this.throttleLimit = throttleLimit; |
77 | } |
78 | |
79 | /** |
80 | * Setter for task executor to be used to run the individual item callbacks. |
81 | * |
82 | * @param taskExecutor a TaskExecutor |
83 | * @throws IllegalArgumentException if the argument is null |
84 | */ |
85 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
86 | Assert.notNull(taskExecutor); |
87 | this.taskExecutor = taskExecutor; |
88 | } |
89 | |
90 | /** |
91 | * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The |
92 | * internal state in this case is a queue of unfinished result holders of |
93 | * type {@link ResultHolder}. The holder with the return value should not be |
94 | * on the queue when this method exits. The queue is scoped in the calling |
95 | * method so there is no need to synchronize access. |
96 | * |
97 | */ |
98 | @Override |
99 | protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) |
100 | throws Throwable { |
101 | |
102 | ExecutingRunnable runnable = null; |
103 | |
104 | ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); |
105 | |
106 | do { |
107 | |
108 | /* |
109 | * Wrap the callback in a runnable that will add its result to the |
110 | * queue when it is ready. |
111 | */ |
112 | runnable = new ExecutingRunnable(callback, context, queue); |
113 | |
114 | /** |
115 | * Tell the runnable that it can expect a result. This could have |
116 | * been in-lined with the constructor, but it might block, so it's |
117 | * better to do it here, since we have the option (it's a private |
118 | * class). |
119 | */ |
120 | runnable.expect(); |
121 | |
122 | /* |
123 | * Start the task possibly concurrently / in the future. |
124 | */ |
125 | taskExecutor.execute(runnable); |
126 | |
127 | /* |
128 | * Allow termination policy to update its state. This must happen |
129 | * immediately before or after the call to the task executor. |
130 | */ |
131 | update(context); |
132 | |
133 | /* |
134 | * Keep going until we get a result that is finished, or early |
135 | * termination... |
136 | */ |
137 | } while (queue.isEmpty() && !isComplete(context)); |
138 | |
139 | /* |
140 | * N.B. If the queue is empty then take() blocks until a result appears, |
141 | * and there must be at least one because we just submitted one to the |
142 | * task executor. |
143 | */ |
144 | ResultHolder result = queue.take(); |
145 | if (result.getError() != null) { |
146 | throw result.getError(); |
147 | } |
148 | return result.getResult(); |
149 | } |
150 | |
151 | /** |
152 | * Wait for all the results to appear on the queue and execute the after |
153 | * interceptors for each one. |
154 | * |
155 | * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState) |
156 | */ |
157 | @Override |
158 | protected boolean waitForResults(RepeatInternalState state) { |
159 | |
160 | ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); |
161 | |
162 | boolean result = true; |
163 | |
164 | while (queue.isExpecting()) { |
165 | |
166 | /* |
167 | * Careful that no runnables that are not going to finish ever get |
168 | * onto the queue, else this may block forever. |
169 | */ |
170 | ResultHolder future; |
171 | try { |
172 | future = queue.take(); |
173 | } |
174 | catch (InterruptedException e) { |
175 | Thread.currentThread().interrupt(); |
176 | throw new RepeatException("InterruptedException while waiting for result."); |
177 | } |
178 | |
179 | if (future.getError() != null) { |
180 | state.getThrowables().add(future.getError()); |
181 | result = false; |
182 | } |
183 | else { |
184 | RepeatStatus status = future.getResult(); |
185 | result = result && canContinue(status); |
186 | executeAfterInterceptors(future.getContext(), status); |
187 | } |
188 | |
189 | } |
190 | |
191 | Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch."); |
192 | |
193 | return result; |
194 | } |
195 | |
196 | @Override |
197 | protected RepeatInternalState createInternalState(RepeatContext context) { |
198 | // Queue of pending results: |
199 | return new ResultQueueInternalState(throttleLimit); |
200 | } |
201 | |
202 | /** |
203 | * A runnable that puts its result on a queue when it is done. |
204 | * |
205 | * @author Dave Syer |
206 | * |
207 | */ |
208 | private class ExecutingRunnable implements Runnable, ResultHolder { |
209 | |
210 | private final RepeatCallback callback; |
211 | |
212 | private final RepeatContext context; |
213 | |
214 | private final ResultQueue<ResultHolder> queue; |
215 | |
216 | private volatile RepeatStatus result; |
217 | |
218 | private volatile Throwable error; |
219 | |
220 | public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) { |
221 | |
222 | super(); |
223 | |
224 | this.callback = callback; |
225 | this.context = context; |
226 | this.queue = queue; |
227 | |
228 | } |
229 | |
230 | /** |
231 | * Tell the queue to expect a result. |
232 | */ |
233 | public void expect() { |
234 | try { |
235 | queue.expect(); |
236 | } |
237 | catch (InterruptedException e) { |
238 | Thread.currentThread().interrupt(); |
239 | throw new RepeatException("InterruptedException waiting for to acquire lock on input."); |
240 | } |
241 | } |
242 | |
243 | /** |
244 | * Execute the batch callback, and store the result, or any exception |
245 | * that is thrown for retrieval later by caller. |
246 | * |
247 | * @see java.lang.Runnable#run() |
248 | */ |
249 | @Override |
250 | public void run() { |
251 | boolean clearContext = false; |
252 | try { |
253 | if (RepeatSynchronizationManager.getContext() == null) { |
254 | clearContext = true; |
255 | RepeatSynchronizationManager.register(context); |
256 | } |
257 | |
258 | if (logger.isDebugEnabled()) { |
259 | logger.debug("Repeat operation about to start at count=" + context.getStartedCount()); |
260 | } |
261 | |
262 | result = callback.doInIteration(context); |
263 | |
264 | } |
265 | catch (Throwable e) { |
266 | error = e; |
267 | } |
268 | finally { |
269 | |
270 | if (clearContext) { |
271 | RepeatSynchronizationManager.clear(); |
272 | } |
273 | |
274 | queue.put(this); |
275 | |
276 | } |
277 | } |
278 | |
279 | /** |
280 | * Get the result - never blocks because the queue manages waiting for |
281 | * the task to finish. |
282 | */ |
283 | @Override |
284 | public RepeatStatus getResult() { |
285 | return result; |
286 | } |
287 | |
288 | /** |
289 | * Get the error - never blocks because the queue manages waiting for |
290 | * the task to finish. |
291 | */ |
292 | @Override |
293 | public Throwable getError() { |
294 | return error; |
295 | } |
296 | |
297 | /** |
298 | * Getter for the context. |
299 | */ |
300 | @Override |
301 | public RepeatContext getContext() { |
302 | return this.context; |
303 | } |
304 | |
305 | } |
306 | |
307 | /** |
308 | * @author Dave Syer |
309 | * |
310 | */ |
311 | private static class ResultQueueInternalState extends RepeatInternalStateSupport { |
312 | |
313 | private final ResultQueue<ResultHolder> results; |
314 | |
315 | /** |
316 | * @param throttleLimit the throttle limit for the result queue |
317 | */ |
318 | public ResultQueueInternalState(int throttleLimit) { |
319 | super(); |
320 | this.results = new ResultHolderResultQueue(throttleLimit); |
321 | } |
322 | |
323 | /** |
324 | * @return the result queue |
325 | */ |
326 | public ResultQueue<ResultHolder> getResultQueue() { |
327 | return results; |
328 | } |
329 | |
330 | } |
331 | |
332 | } |