View Javadoc
1   /*
2    * Copyright 2006-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.poller.scheduling;
17  
18  import java.util.Queue;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.ExecutionException;
22  import java.util.concurrent.Future;
23  import java.util.concurrent.LinkedBlockingQueue;
24  import java.util.concurrent.ScheduledFuture;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.springframework.batch.poller.Poller;
30  import org.springframework.beans.BeansException;
31  import org.springframework.beans.factory.BeanFactory;
32  import org.springframework.beans.factory.BeanFactoryAware;
33  import org.springframework.beans.factory.InitializingBean;
34  import org.springframework.scheduling.TaskScheduler;
35  import org.springframework.scheduling.Trigger;
36  import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
37  import org.springframework.scheduling.support.PeriodicTrigger;
38  import org.springframework.util.ErrorHandler;
39  import org.springframework.util.ReflectionUtils;
40  
41  /**
42   * A {@link Poller} implementation that uses a {@link TaskScheduler} to poll in
43   * a background thread.
44   * 
45   * @author Dave Syer
46   * 
47   */
48  public class TaskSchedulerPoller<T> implements Poller<T>, BeanFactoryAware, InitializingBean {
49  
50  	private static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
51  
52  	private volatile Trigger trigger;
53  
54  	private volatile boolean initialized;
55  
56  	private final Object initializationMonitor = new Object();
57  
58  	private TaskScheduler taskScheduler;
59  
60  	private BeanFactory beanFactory;
61  
62  	public void setTrigger(Trigger trigger) {
63  		this.trigger = trigger;
64  	}
65  
66  	public void setTaskScheduler(TaskScheduler taskScheduler) {
67  		this.taskScheduler = taskScheduler;
68  	}
69  
70  	public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
71  		this.beanFactory = beanFactory;
72  	}
73  
74  	public void afterPropertiesSet() throws Exception {
75  		initialize();
76  	}
77  
78  	private void initialize() {
79  		synchronized (this.initializationMonitor) {
80  			if (this.initialized) {
81  				return;
82  			}
83  			if (this.trigger == null) {
84  				this.trigger = new PeriodicTrigger(100L);
85  			}
86  			if (taskScheduler == null && beanFactory != null) {
87  				taskScheduler = beanFactory.getBean(TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class);
88  			}
89  		}
90  	}
91  
92  	private ScheduledFuture<?> getSchedule(final Callable<T> callable, final Queue<T> queue, final AtomicReference<Throwable> throwable) {
93  
94  		TaskScheduler scheduler = taskScheduler;
95  		if (scheduler == null) {
96  			ConcurrentTaskScheduler concurrentTaskScheduler = new ConcurrentTaskScheduler();
97  			concurrentTaskScheduler.setErrorHandler(new PropagatingErrorHandler());
98  			scheduler = concurrentTaskScheduler;
99  		}
100 
101 		Runnable task = new Runnable() {
102 
103 			public void run() {
104 				if (!queue.isEmpty() || throwable.get() != null) {
105 					return;
106 				}
107 				T result;
108 				try {
109 					result = callable.call();
110 				}
111 				catch (RuntimeException e) {
112 					throwable.set(e);
113 					throw e;
114 				}
115 				catch (Exception e) {
116 					throwable.set(e);
117 					throw new IllegalStateException("Could not obtain result", e);
118 				}
119 				if (result != null) {
120 					queue.add(result);
121 				}
122 			}
123 		};
124 
125 		ScheduledFuture<?> schedule = scheduler.schedule(task, trigger);
126 
127 		return schedule;
128 
129 	}
130 
131 	/**
132 	 * @param callback a {@link Callable} to use to retrieve a result
133 	 * @return the result, or null if the operation times out
134 	 * 
135 	 * @see Poller#poll(Callable)
136 	 */
137 	public Future<T> poll(Callable<T> callback) throws Exception {
138 
139 		if (!initialized) {
140 			initialize();
141 		}
142 
143 		final BlockingQueue<T> queue = new LinkedBlockingQueue<T>(1);
144 
145 		final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
146 		final ScheduledFuture<?> schedule = getSchedule(callback, queue, throwable);
147 
148 		return new Future<T>() {
149 
150 			public boolean cancel(boolean mayInterruptIfRunning) {
151 				return schedule.cancel(mayInterruptIfRunning);
152 			}
153 
154 			public T get() throws InterruptedException, ExecutionException {
155 				try {
156 					T result = queue.take();
157 					if (throwable.get()!=null) {
158 						throw new ExecutionException(throwable.get());
159 					}
160 					return result;
161 				}
162 				finally {
163 					cancelAndMaybeRethrow(schedule);
164 				}
165 			}
166 
167 			public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
168 				try {
169 					T result = queue.poll(timeout, unit);
170 					if (throwable.get()!=null) {
171 						throw new ExecutionException(throwable.get());
172 					}
173 					return result;
174 				}
175 				finally {
176 					cancelAndMaybeRethrow(schedule);
177 				}
178 			}
179 
180 			public boolean isCancelled() {
181 				return schedule.isCancelled();
182 			}
183 
184 			public boolean isDone() {
185 				return schedule.isDone() || !queue.isEmpty();
186 			}
187 
188 			private void cancelAndMaybeRethrow(final ScheduledFuture<?> schedule) throws InterruptedException, ExecutionException {
189 				try {
190 					// Just returns null if the task was successful.
191 					schedule.get();
192 				}
193 				catch (ExecutionException e) {
194 					throw e;
195 				}
196 				catch (InterruptedException e) {
197 					Thread.currentThread().interrupt();
198 					throw e;
199 				}
200 				schedule.cancel(true);
201 			}
202 
203 		};
204 
205 	}
206 
207 	/**
208 	 * An {@link ErrorHandler} implementation that propagates the throwable as a
209 	 * runtime exception.
210 	 */
211 	static class PropagatingErrorHandler implements ErrorHandler {
212 
213 		public void handleError(Throwable t) {
214 			ReflectionUtils.rethrowRuntimeException(t);
215 		}
216 
217 	}
218 
219 }