1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.poller.scheduling;
17
18 import java.util.Queue;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.springframework.batch.poller.Poller;
30 import org.springframework.beans.BeansException;
31 import org.springframework.beans.factory.BeanFactory;
32 import org.springframework.beans.factory.BeanFactoryAware;
33 import org.springframework.beans.factory.InitializingBean;
34 import org.springframework.scheduling.TaskScheduler;
35 import org.springframework.scheduling.Trigger;
36 import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
37 import org.springframework.scheduling.support.PeriodicTrigger;
38 import org.springframework.util.ErrorHandler;
39 import org.springframework.util.ReflectionUtils;
40
41
42
43
44
45
46
47
48 public class TaskSchedulerPoller<T> implements Poller<T>, BeanFactoryAware, InitializingBean {
49
50 private static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
51
52 private volatile Trigger trigger;
53
54 private volatile boolean initialized;
55
56 private final Object initializationMonitor = new Object();
57
58 private TaskScheduler taskScheduler;
59
60 private BeanFactory beanFactory;
61
62 public void setTrigger(Trigger trigger) {
63 this.trigger = trigger;
64 }
65
66 public void setTaskScheduler(TaskScheduler taskScheduler) {
67 this.taskScheduler = taskScheduler;
68 }
69
70 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
71 this.beanFactory = beanFactory;
72 }
73
74 public void afterPropertiesSet() throws Exception {
75 initialize();
76 }
77
78 private void initialize() {
79 synchronized (this.initializationMonitor) {
80 if (this.initialized) {
81 return;
82 }
83 if (this.trigger == null) {
84 this.trigger = new PeriodicTrigger(100L);
85 }
86 if (taskScheduler == null && beanFactory != null) {
87 taskScheduler = beanFactory.getBean(TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class);
88 }
89 }
90 }
91
92 private ScheduledFuture<?> getSchedule(final Callable<T> callable, final Queue<T> queue, final AtomicReference<Throwable> throwable) {
93
94 TaskScheduler scheduler = taskScheduler;
95 if (scheduler == null) {
96 ConcurrentTaskScheduler concurrentTaskScheduler = new ConcurrentTaskScheduler();
97 concurrentTaskScheduler.setErrorHandler(new PropagatingErrorHandler());
98 scheduler = concurrentTaskScheduler;
99 }
100
101 Runnable task = new Runnable() {
102
103 public void run() {
104 if (!queue.isEmpty() || throwable.get() != null) {
105 return;
106 }
107 T result;
108 try {
109 result = callable.call();
110 }
111 catch (RuntimeException e) {
112 throwable.set(e);
113 throw e;
114 }
115 catch (Exception e) {
116 throwable.set(e);
117 throw new IllegalStateException("Could not obtain result", e);
118 }
119 if (result != null) {
120 queue.add(result);
121 }
122 }
123 };
124
125 ScheduledFuture<?> schedule = scheduler.schedule(task, trigger);
126
127 return schedule;
128
129 }
130
131
132
133
134
135
136
137 public Future<T> poll(Callable<T> callback) throws Exception {
138
139 if (!initialized) {
140 initialize();
141 }
142
143 final BlockingQueue<T> queue = new LinkedBlockingQueue<T>(1);
144
145 final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
146 final ScheduledFuture<?> schedule = getSchedule(callback, queue, throwable);
147
148 return new Future<T>() {
149
150 public boolean cancel(boolean mayInterruptIfRunning) {
151 return schedule.cancel(mayInterruptIfRunning);
152 }
153
154 public T get() throws InterruptedException, ExecutionException {
155 try {
156 T result = queue.take();
157 if (throwable.get()!=null) {
158 throw new ExecutionException(throwable.get());
159 }
160 return result;
161 }
162 finally {
163 cancelAndMaybeRethrow(schedule);
164 }
165 }
166
167 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
168 try {
169 T result = queue.poll(timeout, unit);
170 if (throwable.get()!=null) {
171 throw new ExecutionException(throwable.get());
172 }
173 return result;
174 }
175 finally {
176 cancelAndMaybeRethrow(schedule);
177 }
178 }
179
180 public boolean isCancelled() {
181 return schedule.isCancelled();
182 }
183
184 public boolean isDone() {
185 return schedule.isDone() || !queue.isEmpty();
186 }
187
188 private void cancelAndMaybeRethrow(final ScheduledFuture<?> schedule) throws InterruptedException, ExecutionException {
189 try {
190
191 schedule.get();
192 }
193 catch (ExecutionException e) {
194 throw e;
195 }
196 catch (InterruptedException e) {
197 Thread.currentThread().interrupt();
198 throw e;
199 }
200 schedule.cancel(true);
201 }
202
203 };
204
205 }
206
207
208
209
210
211 static class PropagatingErrorHandler implements ErrorHandler {
212
213 public void handleError(Throwable t) {
214 ReflectionUtils.rethrowRuntimeException(t);
215 }
216
217 }
218
219 }