1 | /* |
2 | * Copyright 2006-2010 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.poller; |
17 | |
18 | import java.util.concurrent.Callable; |
19 | import java.util.concurrent.ExecutionException; |
20 | import java.util.concurrent.Future; |
21 | import java.util.concurrent.TimeUnit; |
22 | import java.util.concurrent.TimeoutException; |
23 | |
24 | /** |
25 | * A {@link Poller} that uses the callers thread to poll for a result as soon as |
26 | * it is asked for. This is often appropriate if you expect a result relatively |
27 | * quickly, or if there is only one such result expected (otherwise it is more |
28 | * efficient to use a background thread to do the polling). |
29 | * |
30 | * @author Dave Syer |
31 | * |
32 | * @param <S> the type of the result |
33 | */ |
34 | public class DirectPoller<S> implements Poller<S> { |
35 | |
36 | private final long interval; |
37 | |
38 | public DirectPoller(long interval) { |
39 | this.interval = interval; |
40 | } |
41 | |
42 | /** |
43 | * Get a future for a non-null result from the callback. Only when the |
44 | * result is asked for (using {@link Future#get()} or |
45 | * {@link Future#get(long, TimeUnit)} will the polling actually start. |
46 | * |
47 | * @see Poller#poll(Callable) |
48 | */ |
49 | @Override |
50 | public Future<S> poll(Callable<S> callable) throws Exception { |
51 | return new DirectPollingFuture<S>(interval, callable); |
52 | } |
53 | |
54 | private static class DirectPollingFuture<S> implements Future<S> { |
55 | |
56 | private final long startTime = System.currentTimeMillis(); |
57 | |
58 | private volatile boolean cancelled; |
59 | |
60 | private volatile S result = null; |
61 | |
62 | private final long interval; |
63 | |
64 | private final Callable<S> callable; |
65 | |
66 | public DirectPollingFuture(long interval, Callable<S> callable) { |
67 | this.interval = interval; |
68 | this.callable = callable; |
69 | } |
70 | |
71 | @Override |
72 | public boolean cancel(boolean mayInterruptIfRunning) { |
73 | cancelled = true; |
74 | return true; |
75 | } |
76 | |
77 | @Override |
78 | public S get() throws InterruptedException, ExecutionException { |
79 | try { |
80 | return get(-1, TimeUnit.MILLISECONDS); |
81 | } |
82 | catch (TimeoutException e) { |
83 | throw new IllegalStateException("Unexpected timeout waiting for result", e); |
84 | } |
85 | } |
86 | |
87 | @Override |
88 | public S get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
89 | |
90 | try { |
91 | result = callable.call(); |
92 | } |
93 | catch (Exception e) { |
94 | throw new ExecutionException(e); |
95 | } |
96 | |
97 | Long nextExecutionTime = startTime + interval; |
98 | long currentTimeMillis = System.currentTimeMillis(); |
99 | long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit); |
100 | |
101 | while (result == null && !cancelled) { |
102 | |
103 | long delta = nextExecutionTime - startTime; |
104 | if (delta >= timeoutMillis && timeoutMillis > 0) { |
105 | throw new TimeoutException("Timed out waiting for task to return non-null result"); |
106 | } |
107 | |
108 | if (nextExecutionTime > currentTimeMillis) { |
109 | Thread.sleep(nextExecutionTime - currentTimeMillis); |
110 | } |
111 | |
112 | currentTimeMillis = System.currentTimeMillis(); |
113 | nextExecutionTime = currentTimeMillis + interval; |
114 | |
115 | try { |
116 | result = callable.call(); |
117 | } |
118 | catch (Exception e) { |
119 | throw new ExecutionException(e); |
120 | } |
121 | |
122 | } |
123 | |
124 | return result; |
125 | |
126 | } |
127 | |
128 | @Override |
129 | public boolean isCancelled() { |
130 | return cancelled; |
131 | } |
132 | |
133 | @Override |
134 | public boolean isDone() { |
135 | return cancelled || result != null; |
136 | } |
137 | |
138 | } |
139 | |
140 | } |