1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.repeat.support; |
18 | |
19 | import org.springframework.batch.repeat.ExitStatus; |
20 | import org.springframework.batch.repeat.RepeatCallback; |
21 | import org.springframework.batch.repeat.RepeatContext; |
22 | import org.springframework.batch.repeat.RepeatOperations; |
23 | import org.springframework.core.task.SyncTaskExecutor; |
24 | import org.springframework.core.task.TaskExecutor; |
25 | import org.springframework.util.Assert; |
26 | |
27 | /** |
28 | * Provides {@link RepeatOperations} support including interceptors that can be |
29 | * used to modify or monitor the behaviour at run time.<br/> |
30 | * |
31 | * This implementation is sufficient to be used to configure transactional |
32 | * behaviour for each item by making the {@link RepeatCallback} transactional, |
33 | * or for the whole batch by making the execute method transactional (but only |
34 | * then if the task executor is synchronous).<br/> |
35 | * |
36 | * This class is thread safe if its collaborators are thread safe (interceptors, |
37 | * terminationPolicy, callback). Normally this will be the case, but clients |
38 | * need to be aware that if the task executor is asynchronous, then the other |
39 | * collaborators should be also. In particular the {@link RepeatCallback} that |
40 | * is wrapped in the execute method must be thread safe - often it is based on |
41 | * some form of data source, which itself should be both thread safe and |
42 | * transactional (multiple threads could be accessing it at any given time, and |
43 | * each thread would have its own transaction).<br/> |
44 | * |
45 | * @author Dave Syer |
46 | * |
47 | */ |
48 | public class TaskExecutorRepeatTemplate extends RepeatTemplate { |
49 | |
50 | /** |
51 | * Default limit for maximum number of concurrent unfinished results allowed |
52 | * by the template. |
53 | * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}. |
54 | */ |
55 | public static final int DEFAULT_THROTTLE_LIMIT = 4; |
56 | |
57 | private int throttleLimit = DEFAULT_THROTTLE_LIMIT; |
58 | |
59 | private TaskExecutor taskExecutor = new SyncTaskExecutor(); |
60 | |
61 | /** |
62 | * Setter for task executor to be used to run the individual item callbacks. |
63 | * |
64 | * @param taskExecutor a TaskExecutor |
65 | * @throws IllegalArgumentException if the argument is null |
66 | */ |
67 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
68 | Assert.notNull(taskExecutor); |
69 | this.taskExecutor = taskExecutor; |
70 | } |
71 | |
72 | /** |
73 | * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The internal state in |
74 | * this case is a queue of unfinished result holders of type |
75 | * {@link ResultHolder}. The holder with the return value should not be on |
76 | * the queue when this method exits. The queue is scoped in the calling |
77 | * method so there is no need to synchronize access. |
78 | * |
79 | */ |
80 | protected ExitStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) |
81 | throws Throwable { |
82 | |
83 | ExecutingRunnable runnable = null; |
84 | |
85 | ResultQueue queue = (ResultQueue) state; |
86 | |
87 | do { |
88 | |
89 | /* |
90 | * Wrap the callback in a runnable that will add its result to the |
91 | * queue when it is ready. |
92 | */ |
93 | runnable = new ExecutingRunnable(callback, context, queue); |
94 | |
95 | /** |
96 | * Tell the runnable that it can expect a result. This could have |
97 | * been in-lined with the constructor, but it might block, so it's |
98 | * better to do it here, since we have the option (it's a private |
99 | * class). |
100 | */ |
101 | runnable.expect(); |
102 | |
103 | /* |
104 | * Start the task possibly concurrently / in the future. |
105 | */ |
106 | taskExecutor.execute(runnable); |
107 | |
108 | /* |
109 | * Allow termination policy to update its state. This must happen |
110 | * immediately before or after the call to the task executor. |
111 | */ |
112 | update(context); |
113 | |
114 | /* |
115 | * Keep going until we get a result that is finished, or early |
116 | * termination... |
117 | */ |
118 | } while (queue.isEmpty() && !isComplete(context)); |
119 | |
120 | /* |
121 | * N.B. If the queue is empty then take() blocks until a result appears, |
122 | * and there must be at least one because we just submitted one to teh |
123 | * task executor. |
124 | */ |
125 | ResultHolder result = queue.take(); |
126 | if (result.getError() != null) { |
127 | throw result.getError(); |
128 | } |
129 | return result.getResult(); |
130 | } |
131 | |
132 | /** |
133 | * Wait for all the results to appear on the queue and execute the after |
134 | * interceptors for each one. |
135 | * |
136 | * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState) |
137 | */ |
138 | protected boolean waitForResults(RepeatInternalState state) { |
139 | |
140 | ResultQueue queue = (ResultQueue) state; |
141 | |
142 | boolean result = true; |
143 | |
144 | while (queue.isExpecting()) { |
145 | |
146 | /* |
147 | * Careful that no runnables that are not going to finish ever get |
148 | * onto the queue, else this may block forever. |
149 | */ |
150 | ResultHolder future = (ResultHolder) queue.take(); |
151 | |
152 | if (future.getError() != null) { |
153 | state.getThrowables().add(future.getError()); |
154 | } |
155 | else { |
156 | ExitStatus status = future.getResult(); |
157 | result = result && canContinue(status); |
158 | executeAfterInterceptors(future.getContext(), status); |
159 | } |
160 | |
161 | } |
162 | |
163 | Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch."); |
164 | |
165 | return result; |
166 | } |
167 | |
168 | protected RepeatInternalState createInternalState(RepeatContext context) { |
169 | // Queue of pending results: |
170 | return new ResultQueueFactory().getResultQueue(throttleLimit); |
171 | } |
172 | |
173 | /** |
174 | * A runnable that puts its result on a queue when it is done. |
175 | * |
176 | * @author Dave Syer |
177 | * |
178 | */ |
179 | private static class ExecutingRunnable implements Runnable, ResultHolder { |
180 | private RepeatCallback callback; |
181 | |
182 | private RepeatContext context; |
183 | |
184 | private ResultQueue queue; |
185 | |
186 | private ExitStatus result; |
187 | |
188 | private Throwable error; |
189 | |
190 | public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue queue) { |
191 | |
192 | super(); |
193 | |
194 | this.callback = callback; |
195 | this.context = context; |
196 | this.queue = queue; |
197 | |
198 | } |
199 | |
200 | /** |
201 | * Tell the queue to expect a result. |
202 | */ |
203 | public void expect() { |
204 | queue.expect(); |
205 | } |
206 | |
207 | /** |
208 | * Execute the batch callback, and store the result, or any exception |
209 | * that is thrown for retrieval later by caller. |
210 | * |
211 | * @see java.lang.Runnable#run() |
212 | */ |
213 | public void run() { |
214 | try { |
215 | result = callback.doInIteration(context); |
216 | } |
217 | catch (Exception e) { |
218 | error = e; |
219 | } |
220 | finally { |
221 | queue.put(this); |
222 | } |
223 | } |
224 | |
225 | /** |
226 | * Get the result - never blocks because the queue manages waiting for |
227 | * the task to finish. |
228 | */ |
229 | public ExitStatus getResult() { |
230 | return result; |
231 | } |
232 | |
233 | /** |
234 | * Get the error - never blocks because the queue manages waiting for |
235 | * the task to finish. |
236 | */ |
237 | public Throwable getError() { |
238 | return error; |
239 | } |
240 | |
241 | /** |
242 | * Getter for the context. |
243 | */ |
244 | public RepeatContext getContext() { |
245 | return this.context; |
246 | } |
247 | |
248 | } |
249 | |
250 | /** |
251 | * Public setter for the throttle limit. The throttle limit is the largest |
252 | * number of concurrent tasks that can be executing at one time - if a new |
253 | * task arrives and the throttle limit is breached we wait for one of the |
254 | * executing tasks to finish before submitting the new one to the |
255 | * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}. |
256 | * N.B. when used with a thread pooled {@link TaskExecutor} it doesn't make |
257 | * sense for the throttle limit to be less than the thread pool size. |
258 | * |
259 | * @param throttleLimit the throttleLimit to set. |
260 | */ |
261 | public void setThrottleLimit(int throttleLimit) { |
262 | this.throttleLimit = throttleLimit; |
263 | } |
264 | |
265 | } |