View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.repeat.support;
18  
19  import org.springframework.batch.repeat.RepeatCallback;
20  import org.springframework.batch.repeat.RepeatContext;
21  import org.springframework.batch.repeat.RepeatException;
22  import org.springframework.batch.repeat.RepeatOperations;
23  import org.springframework.batch.repeat.RepeatStatus;
24  import org.springframework.core.task.SyncTaskExecutor;
25  import org.springframework.core.task.TaskExecutor;
26  import org.springframework.util.Assert;
27  
28  /**
29   * Provides {@link RepeatOperations} support including interceptors that can be
30   * used to modify or monitor the behaviour at run time.<br/>
31   * 
32   * This implementation is sufficient to be used to configure transactional
33   * behaviour for each item by making the {@link RepeatCallback} transactional,
34   * or for the whole batch by making the execute method transactional (but only
35   * then if the task executor is synchronous).<br/>
36   * 
37   * This class is thread safe if its collaborators are thread safe (interceptors,
38   * terminationPolicy, callback). Normally this will be the case, but clients
39   * need to be aware that if the task executor is asynchronous, then the other
40   * collaborators should be also. In particular the {@link RepeatCallback} that
41   * is wrapped in the execute method must be thread safe - often it is based on
42   * some form of data source, which itself should be both thread safe and
43   * transactional (multiple threads could be accessing it at any given time, and
44   * each thread would have its own transaction).<br/>
45   * 
46   * @author Dave Syer
47   * 
48   */
49  public class TaskExecutorRepeatTemplate extends RepeatTemplate {
50  
51  	/**
52  	 * Default limit for maximum number of concurrent unfinished results allowed
53  	 * by the template.
54  	 * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}
55  	 * .
56  	 */
57  	public static final int DEFAULT_THROTTLE_LIMIT = 4;
58  
59  	private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
60  
61  	private TaskExecutor taskExecutor = new SyncTaskExecutor();
62  
63  	/**
64  	 * Public setter for the throttle limit. The throttle limit is the largest
65  	 * number of concurrent tasks that can be executing at one time - if a new
66  	 * task arrives and the throttle limit is breached we wait for one of the
67  	 * executing tasks to finish before submitting the new one to the
68  	 * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}.
69  	 * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool
70  	 * might prevent the throttle limit actually being reached (so make the core
71  	 * pool size larger than the throttle limit if possible).
72  	 * 
73  	 * @param throttleLimit the throttleLimit to set.
74  	 */
75  	public void setThrottleLimit(int throttleLimit) {
76  		this.throttleLimit = throttleLimit;
77  	}
78  
79  	/**
80  	 * Setter for task executor to be used to run the individual item callbacks.
81  	 * 
82  	 * @param taskExecutor a TaskExecutor
83  	 * @throws IllegalArgumentException if the argument is null
84  	 */
85  	public void setTaskExecutor(TaskExecutor taskExecutor) {
86  		Assert.notNull(taskExecutor);
87  		this.taskExecutor = taskExecutor;
88  	}
89  
90  	/**
91  	 * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The
92  	 * internal state in this case is a queue of unfinished result holders of
93  	 * type {@link ResultHolder}. The holder with the return value should not be
94  	 * on the queue when this method exits. The queue is scoped in the calling
95  	 * method so there is no need to synchronize access.
96  	 * 
97  	 */
98      @Override
99  	protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
100 			throws Throwable {
101 
102 		ExecutingRunnable runnable = null;
103 
104 		ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
105 
106 		do {
107 
108 			/*
109 			 * Wrap the callback in a runnable that will add its result to the
110 			 * queue when it is ready.
111 			 */
112 			runnable = new ExecutingRunnable(callback, context, queue);
113 
114 			/**
115 			 * Tell the runnable that it can expect a result. This could have
116 			 * been in-lined with the constructor, but it might block, so it's
117 			 * better to do it here, since we have the option (it's a private
118 			 * class).
119 			 */
120 			runnable.expect();
121 
122 			/*
123 			 * Start the task possibly concurrently / in the future.
124 			 */
125 			taskExecutor.execute(runnable);
126 
127 			/*
128 			 * Allow termination policy to update its state. This must happen
129 			 * immediately before or after the call to the task executor.
130 			 */
131 			update(context);
132 
133 			/*
134 			 * Keep going until we get a result that is finished, or early
135 			 * termination...
136 			 */
137 		} while (queue.isEmpty() && !isComplete(context));
138 
139 		/*
140 		 * N.B. If the queue is empty then take() blocks until a result appears,
141 		 * and there must be at least one because we just submitted one to the
142 		 * task executor.
143 		 */
144 		ResultHolder result = queue.take();
145 		if (result.getError() != null) {
146 			throw result.getError();
147 		}
148 		return result.getResult();
149 	}
150 
151 	/**
152 	 * Wait for all the results to appear on the queue and execute the after
153 	 * interceptors for each one.
154 	 * 
155 	 * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
156 	 */
157     @Override
158 	protected boolean waitForResults(RepeatInternalState state) {
159 
160 		ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
161 
162 		boolean result = true;
163 
164 		while (queue.isExpecting()) {
165 
166 			/*
167 			 * Careful that no runnables that are not going to finish ever get
168 			 * onto the queue, else this may block forever.
169 			 */
170 			ResultHolder future;
171 			try {
172 				future = queue.take();
173 			}
174 			catch (InterruptedException e) {
175 				Thread.currentThread().interrupt();
176 				throw new RepeatException("InterruptedException while waiting for result.");
177 			}
178 
179 			if (future.getError() != null) {
180 				state.getThrowables().add(future.getError());
181 				result = false;
182 			}
183 			else {
184 				RepeatStatus status = future.getResult();
185 				result = result && canContinue(status);
186 				executeAfterInterceptors(future.getContext(), status);
187 			}
188 
189 		}
190 
191 		Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
192 
193 		return result;
194 	}
195 
196     @Override
197 	protected RepeatInternalState createInternalState(RepeatContext context) {
198 		// Queue of pending results:
199 		return new ResultQueueInternalState(throttleLimit);
200 	}
201 
202 	/**
203 	 * A runnable that puts its result on a queue when it is done.
204 	 * 
205 	 * @author Dave Syer
206 	 * 
207 	 */
208 	private class ExecutingRunnable implements Runnable, ResultHolder {
209 
210 		private final RepeatCallback callback;
211 
212 		private final RepeatContext context;
213 
214 		private final ResultQueue<ResultHolder> queue;
215 
216 		private volatile RepeatStatus result;
217 
218 		private volatile Throwable error;
219 
220 		public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
221 
222 			super();
223 
224 			this.callback = callback;
225 			this.context = context;
226 			this.queue = queue;
227 
228 		}
229 
230 		/**
231 		 * Tell the queue to expect a result.
232 		 */
233 		public void expect() {
234 			try {
235 				queue.expect();
236 			}
237 			catch (InterruptedException e) {
238 				Thread.currentThread().interrupt();
239 				throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
240 			}
241 		}
242 
243 		/**
244 		 * Execute the batch callback, and store the result, or any exception
245 		 * that is thrown for retrieval later by caller.
246 		 * 
247 		 * @see java.lang.Runnable#run()
248 		 */
249         @Override
250 		public void run() {
251 			boolean clearContext = false;
252 			try {
253 				if (RepeatSynchronizationManager.getContext() == null) {
254 					clearContext = true;
255 					RepeatSynchronizationManager.register(context);
256 				}
257 
258 				if (logger.isDebugEnabled()) {
259 					logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
260 				}
261 
262 				result = callback.doInIteration(context);
263 
264 			}
265 			catch (Throwable e) {
266 				error = e;
267 			}
268 			finally {
269 
270 				if (clearContext) {
271 					RepeatSynchronizationManager.clear();
272 				}
273 
274 				queue.put(this);
275 
276 			}
277 		}
278 
279 		/**
280 		 * Get the result - never blocks because the queue manages waiting for
281 		 * the task to finish.
282 		 */
283         @Override
284 		public RepeatStatus getResult() {
285 			return result;
286 		}
287 
288 		/**
289 		 * Get the error - never blocks because the queue manages waiting for
290 		 * the task to finish.
291 		 */
292         @Override
293 		public Throwable getError() {
294 			return error;
295 		}
296 
297 		/**
298 		 * Getter for the context.
299 		 */
300         @Override
301 		public RepeatContext getContext() {
302 			return this.context;
303 		}
304 
305 	}
306 
307 	/**
308 	 * @author Dave Syer
309 	 * 
310 	 */
311 	private static class ResultQueueInternalState extends RepeatInternalStateSupport {
312 
313 		private final ResultQueue<ResultHolder> results;
314 
315 		/**
316 		 * @param throttleLimit the throttle limit for the result queue
317 		 */
318 		public ResultQueueInternalState(int throttleLimit) {
319 			super();
320 			this.results = new ResultHolderResultQueue(throttleLimit);
321 		}
322 
323 		/**
324 		 * @return the result queue
325 		 */
326 		public ResultQueue<ResultHolder> getResultQueue() {
327 			return results;
328 		}
329 
330 	}
331 
332 }