1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.repeat.support; |
18 | |
19 | import org.springframework.batch.repeat.RepeatCallback; |
20 | import org.springframework.batch.repeat.RepeatContext; |
21 | import org.springframework.batch.repeat.RepeatException; |
22 | import org.springframework.batch.repeat.RepeatOperations; |
23 | import org.springframework.batch.repeat.RepeatStatus; |
24 | import org.springframework.core.task.SyncTaskExecutor; |
25 | import org.springframework.core.task.TaskExecutor; |
26 | import org.springframework.util.Assert; |
27 | |
28 | /** |
29 | * Provides {@link RepeatOperations} support including interceptors that can be |
30 | * used to modify or monitor the behaviour at run time.<br/> |
31 | * |
32 | * This implementation is sufficient to be used to configure transactional |
33 | * behaviour for each item by making the {@link RepeatCallback} transactional, |
34 | * or for the whole batch by making the execute method transactional (but only |
35 | * then if the task executor is synchronous).<br/> |
36 | * |
37 | * This class is thread safe if its collaborators are thread safe (interceptors, |
38 | * terminationPolicy, callback). Normally this will be the case, but clients |
39 | * need to be aware that if the task executor is asynchronous, then the other |
40 | * collaborators should be also. In particular the {@link RepeatCallback} that |
41 | * is wrapped in the execute method must be thread safe - often it is based on |
42 | * some form of data source, which itself should be both thread safe and |
43 | * transactional (multiple threads could be accessing it at any given time, and |
44 | * each thread would have its own transaction).<br/> |
45 | * |
46 | * @author Dave Syer |
47 | * |
48 | */ |
49 | public class TaskExecutorRepeatTemplate extends RepeatTemplate { |
50 | |
51 | /** |
52 | * Default limit for maximum number of concurrent unfinished results allowed |
53 | * by the template. |
54 | * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)} |
55 | * . |
56 | */ |
57 | public static final int DEFAULT_THROTTLE_LIMIT = 4; |
58 | |
59 | private int throttleLimit = DEFAULT_THROTTLE_LIMIT; |
60 | |
61 | private TaskExecutor taskExecutor = new SyncTaskExecutor(); |
62 | |
63 | /** |
64 | * Public setter for the throttle limit. The throttle limit is the largest |
65 | * number of concurrent tasks that can be executing at one time - if a new |
66 | * task arrives and the throttle limit is breached we wait for one of the |
67 | * executing tasks to finish before submitting the new one to the |
68 | * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}. |
69 | * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool |
70 | * might prevent the throttle limit actually being reached (so make the core |
71 | * pool size larger than the throttle limit if possible). |
72 | * |
73 | * @param throttleLimit the throttleLimit to set. |
74 | */ |
75 | public void setThrottleLimit(int throttleLimit) { |
76 | this.throttleLimit = throttleLimit; |
77 | } |
78 | |
79 | /** |
80 | * Setter for task executor to be used to run the individual item callbacks. |
81 | * |
82 | * @param taskExecutor a TaskExecutor |
83 | * @throws IllegalArgumentException if the argument is null |
84 | */ |
85 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
86 | Assert.notNull(taskExecutor); |
87 | this.taskExecutor = taskExecutor; |
88 | } |
89 | |
90 | /** |
91 | * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The |
92 | * internal state in this case is a queue of unfinished result holders of |
93 | * type {@link ResultHolder}. The holder with the return value should not be |
94 | * on the queue when this method exits. The queue is scoped in the calling |
95 | * method so there is no need to synchronize access. |
96 | * |
97 | */ |
98 | protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) |
99 | throws Throwable { |
100 | |
101 | ExecutingRunnable runnable = null; |
102 | |
103 | ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); |
104 | |
105 | do { |
106 | |
107 | /* |
108 | * Wrap the callback in a runnable that will add its result to the |
109 | * queue when it is ready. |
110 | */ |
111 | runnable = new ExecutingRunnable(callback, context, queue); |
112 | |
113 | /** |
114 | * Tell the runnable that it can expect a result. This could have |
115 | * been in-lined with the constructor, but it might block, so it's |
116 | * better to do it here, since we have the option (it's a private |
117 | * class). |
118 | */ |
119 | runnable.expect(); |
120 | |
121 | /* |
122 | * Start the task possibly concurrently / in the future. |
123 | */ |
124 | taskExecutor.execute(runnable); |
125 | |
126 | /* |
127 | * Allow termination policy to update its state. This must happen |
128 | * immediately before or after the call to the task executor. |
129 | */ |
130 | update(context); |
131 | |
132 | /* |
133 | * Keep going until we get a result that is finished, or early |
134 | * termination... |
135 | */ |
136 | } while (queue.isEmpty() && !isComplete(context)); |
137 | |
138 | /* |
139 | * N.B. If the queue is empty then take() blocks until a result appears, |
140 | * and there must be at least one because we just submitted one to the |
141 | * task executor. |
142 | */ |
143 | ResultHolder result = queue.take(); |
144 | if (result.getError() != null) { |
145 | throw result.getError(); |
146 | } |
147 | return result.getResult(); |
148 | } |
149 | |
150 | /** |
151 | * Wait for all the results to appear on the queue and execute the after |
152 | * interceptors for each one. |
153 | * |
154 | * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState) |
155 | */ |
156 | protected boolean waitForResults(RepeatInternalState state) { |
157 | |
158 | ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); |
159 | |
160 | boolean result = true; |
161 | |
162 | while (queue.isExpecting()) { |
163 | |
164 | /* |
165 | * Careful that no runnables that are not going to finish ever get |
166 | * onto the queue, else this may block forever. |
167 | */ |
168 | ResultHolder future; |
169 | try { |
170 | future = queue.take(); |
171 | } |
172 | catch (InterruptedException e) { |
173 | Thread.currentThread().interrupt(); |
174 | throw new RepeatException("InterruptedException while waiting for result."); |
175 | } |
176 | |
177 | if (future.getError() != null) { |
178 | state.getThrowables().add(future.getError()); |
179 | result = false; |
180 | } |
181 | else { |
182 | RepeatStatus status = future.getResult(); |
183 | result = result && canContinue(status); |
184 | executeAfterInterceptors(future.getContext(), status); |
185 | } |
186 | |
187 | } |
188 | |
189 | Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch."); |
190 | |
191 | return result; |
192 | } |
193 | |
194 | protected RepeatInternalState createInternalState(RepeatContext context) { |
195 | // Queue of pending results: |
196 | return new ResultQueueInternalState(throttleLimit); |
197 | } |
198 | |
199 | /** |
200 | * A runnable that puts its result on a queue when it is done. |
201 | * |
202 | * @author Dave Syer |
203 | * |
204 | */ |
205 | private class ExecutingRunnable implements Runnable, ResultHolder { |
206 | |
207 | private final RepeatCallback callback; |
208 | |
209 | private final RepeatContext context; |
210 | |
211 | private final ResultQueue<ResultHolder> queue; |
212 | |
213 | private volatile RepeatStatus result; |
214 | |
215 | private volatile Throwable error; |
216 | |
217 | public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) { |
218 | |
219 | super(); |
220 | |
221 | this.callback = callback; |
222 | this.context = context; |
223 | this.queue = queue; |
224 | |
225 | } |
226 | |
227 | /** |
228 | * Tell the queue to expect a result. |
229 | */ |
230 | public void expect() { |
231 | try { |
232 | queue.expect(); |
233 | } |
234 | catch (InterruptedException e) { |
235 | Thread.currentThread().interrupt(); |
236 | throw new RepeatException("InterruptedException waiting for to acquire lock on input."); |
237 | } |
238 | } |
239 | |
240 | /** |
241 | * Execute the batch callback, and store the result, or any exception |
242 | * that is thrown for retrieval later by caller. |
243 | * |
244 | * @see java.lang.Runnable#run() |
245 | */ |
246 | public void run() { |
247 | boolean clearContext = false; |
248 | try { |
249 | if (RepeatSynchronizationManager.getContext() == null) { |
250 | clearContext = true; |
251 | RepeatSynchronizationManager.register(context); |
252 | } |
253 | |
254 | if (logger.isDebugEnabled()) { |
255 | logger.debug("Repeat operation about to start at count=" + context.getStartedCount()); |
256 | } |
257 | |
258 | result = callback.doInIteration(context); |
259 | |
260 | } |
261 | catch (Exception e) { |
262 | error = e; |
263 | } |
264 | finally { |
265 | |
266 | if (clearContext) { |
267 | RepeatSynchronizationManager.clear(); |
268 | } |
269 | |
270 | queue.put(this); |
271 | |
272 | } |
273 | } |
274 | |
275 | /** |
276 | * Get the result - never blocks because the queue manages waiting for |
277 | * the task to finish. |
278 | */ |
279 | public RepeatStatus getResult() { |
280 | return result; |
281 | } |
282 | |
283 | /** |
284 | * Get the error - never blocks because the queue manages waiting for |
285 | * the task to finish. |
286 | */ |
287 | public Throwable getError() { |
288 | return error; |
289 | } |
290 | |
291 | /** |
292 | * Getter for the context. |
293 | */ |
294 | public RepeatContext getContext() { |
295 | return this.context; |
296 | } |
297 | |
298 | } |
299 | |
300 | /** |
301 | * @author Dave Syer |
302 | * |
303 | */ |
304 | private static class ResultQueueInternalState extends RepeatInternalStateSupport { |
305 | |
306 | private final ResultQueue<ResultHolder> results; |
307 | |
308 | /** |
309 | * @param throttleLimit the throttle limit for the result queue |
310 | */ |
311 | public ResultQueueInternalState(int throttleLimit) { |
312 | super(); |
313 | this.results = new ResultHolderResultQueue(throttleLimit); |
314 | } |
315 | |
316 | /** |
317 | * @return the result queue |
318 | */ |
319 | public ResultQueue<ResultHolder> getResultQueue() { |
320 | return results; |
321 | } |
322 | |
323 | } |
324 | |
325 | } |