View Javadoc

1   /*
2    * Copyright 2006-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.poller;
17  
18  import java.util.concurrent.Callable;
19  import java.util.concurrent.ExecutionException;
20  import java.util.concurrent.Future;
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.TimeoutException;
23  
24  /**
25   * A {@link Poller} that uses the callers thread to poll for a result as soon as
26   * it is asked for. This is often appropriate if you expect a result relatively
27   * quickly, or if there is only one such result expected (otherwise it is more
28   * efficient to use a background thread to do the polling).
29   * 
30   * @author Dave Syer
31   * 
32   * @param <S> the type of the result
33   */
34  public class DirectPoller<S> implements Poller<S> {
35  
36  	private final long interval;
37  
38  	public DirectPoller(long interval) {
39  		this.interval = interval;
40  	}
41  
42  	/**
43  	 * Get a future for a non-null result from the callback. Only when the
44  	 * result is asked for (using {@link Future#get()} or
45  	 * {@link Future#get(long, TimeUnit)} will the polling actually start.
46  	 * 
47  	 * @see Poller#poll(Callable)
48  	 */
49      @Override
50  	public Future<S> poll(Callable<S> callable) throws Exception {
51  		return new DirectPollingFuture<S>(interval, callable);
52  	}
53  
54  	private static class DirectPollingFuture<S> implements Future<S> {
55  
56  		private final long startTime = System.currentTimeMillis();
57  
58  		private volatile boolean cancelled;
59  
60  		private volatile S result = null;
61  
62  		private final long interval;
63  
64  		private final Callable<S> callable;
65  
66  		public DirectPollingFuture(long interval, Callable<S> callable) {
67  			this.interval = interval;
68  			this.callable = callable;
69  		}
70  
71          @Override
72  		public boolean cancel(boolean mayInterruptIfRunning) {
73  			cancelled = true;
74  			return true;
75  		}
76  
77          @Override
78  		public S get() throws InterruptedException, ExecutionException {
79  			try {
80  				return get(-1, TimeUnit.MILLISECONDS);
81  			}
82  			catch (TimeoutException e) {
83  				throw new IllegalStateException("Unexpected timeout waiting for result", e);
84  			}
85  		}
86  
87          @Override
88  		public S get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
89  
90  			try {
91  				result = callable.call();
92  			}
93  			catch (Exception e) {
94  				throw new ExecutionException(e);
95  			}
96  
97  			Long nextExecutionTime = startTime + interval;
98  			long currentTimeMillis = System.currentTimeMillis();
99  			long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
100 
101 			while (result == null && !cancelled) {
102 
103 				long delta = nextExecutionTime - startTime;
104 				if (delta >= timeoutMillis && timeoutMillis > 0) {
105 					throw new TimeoutException("Timed out waiting for task to return non-null result");
106 				}
107 
108 				if (nextExecutionTime > currentTimeMillis) {
109 					Thread.sleep(nextExecutionTime - currentTimeMillis);
110 				}
111 
112 				currentTimeMillis = System.currentTimeMillis();
113 				nextExecutionTime = currentTimeMillis + interval;
114 
115 				try {
116 					result = callable.call();
117 				}
118 				catch (Exception e) {
119 					throw new ExecutionException(e);
120 				}
121 
122 			}
123 
124 			return result;
125 
126 		}
127 
128         @Override
129 		public boolean isCancelled() {
130 			return cancelled;
131 		}
132 
133         @Override
134 		public boolean isDone() {
135 			return cancelled || result != null;
136 		}
137 
138 	}
139 
140 }