View Javadoc

1   /*
2    * Copyright 2002-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.repeat.support;
18  
19  import java.util.Comparator;
20  import java.util.NoSuchElementException;
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.PriorityBlockingQueue;
23  import java.util.concurrent.Semaphore;
24  
25  import org.springframework.batch.repeat.RepeatStatus;
26  
27  /**
28   * An implementation of the {@link ResultQueue} that throttles the number of
29   * expected results, limiting it to a maximum at any given time.
30   * 
31   * @author Dave Syer
32   */
33  public class ResultHolderResultQueue implements ResultQueue<ResultHolder> {
34  
35  	// Accumulation of result objects as they finish.
36  	private final BlockingQueue<ResultHolder> results;
37  
38  	// Accumulation of dummy objects flagging expected results in the future.
39  	private final Semaphore waits;
40  
41  	private final Object lock = new Object();
42  
43  	private volatile int count = 0;
44  
45  	/**
46  	 * @param throttleLimit the maximum number of results that can be expected
47  	 * at any given time.
48  	 */
49  	public ResultHolderResultQueue(int throttleLimit) {
50  		results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
51  		waits = new Semaphore(throttleLimit);
52  	}
53  
54      @Override
55  	public boolean isEmpty() {
56  		return results.isEmpty();
57  	}
58  
59  	/*
60  	 * (non-Javadoc)
61  	 * 
62  	 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
63  	 */
64      @Override
65  	public boolean isExpecting() {
66  		// Base the decision about whether we expect more results on a
67  		// counter of the number of expected results actually collected.
68  		// Do not synchronize! Otherwise put and expect can deadlock.
69  		return count > 0;
70  	}
71  
72  	/**
73  	 * Tell the queue to expect one more result. Blocks until a new result is
74  	 * available if already expecting too many (as determined by the throttle
75  	 * limit).
76  	 * 
77  	 * @see ResultQueue#expect()
78  	 */
79      @Override
80  	public void expect() throws InterruptedException {
81  		waits.acquire();
82  		// Don't acquire the lock in a synchronized block - might deadlock
83  		synchronized (lock) {
84  			count++;
85  		}
86  	}
87  
88      @Override
89  	public void put(ResultHolder holder) throws IllegalArgumentException {
90  		if (!isExpecting()) {
91  			throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
92  		}
93  		results.add(holder);
94  		// Take from the waits queue now to allow another result to
95  		// accumulate. But don't decrement the counter.
96  		waits.release();
97  		synchronized (lock) {
98  			lock.notifyAll();
99  		}
100 	}
101 
102 	/**
103 	 * Get the next result as soon as it becomes available. <br/>
104 	 * <br/>
105 	 * Release result immediately if:
106 	 * <ul>
107 	 * <li>There is a result that is continuable.</li>
108 	 * </ul>
109 	 * Otherwise block if either:
110 	 * <ul>
111 	 * <li>There is no result (as per contract of {@link ResultQueue}).</li>
112 	 * <li>The number of results is less than the number expected.</li>
113 	 * </ul>
114 	 * Error if either:
115 	 * <ul>
116 	 * <li>Not expecting.</li>
117 	 * <li>Interrupted.</li>
118 	 * </ul>
119 	 * 
120 	 * @see ResultQueue#take()
121 	 */
122     @Override
123 	public ResultHolder take() throws NoSuchElementException, InterruptedException {
124 		if (!isExpecting()) {
125 			throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
126 		}
127 		ResultHolder value;
128 		synchronized (lock) {
129 			value = results.take();
130 			if (isContinuable(value)) {
131 				// Decrement the counter only when the result is collected.
132 				count--;
133 				return value;
134 			}
135 		}
136 		results.put(value);
137 		synchronized (lock) {
138 			while (count > results.size()) {
139 				lock.wait();
140 			}
141 			value = results.take();
142 			count--;
143 		}
144 		return value;
145 	}
146 
147 	private boolean isContinuable(ResultHolder value) {
148 		return value.getResult() != null && value.getResult().isContinuable();
149 	}
150 
151 	/**
152 	 * Compares ResultHolders so that one that is continuable ranks lowest.
153 	 * 
154 	 * @author Dave Syer
155 	 * 
156 	 */
157 	private static class ResultHolderComparator implements Comparator<ResultHolder> {
158         @Override
159 		public int compare(ResultHolder h1, ResultHolder h2) {
160 			RepeatStatus result1 = h1.getResult();
161 			RepeatStatus result2 = h2.getResult();
162 			if (result1 == null && result2 == null) {
163 				return 0;
164 			}
165 			if (result1 == null) {
166 				return -1;
167 			}
168 			else if (result2 == null) {
169 				return 1;
170 			}
171 			if ((result1.isContinuable() && result2.isContinuable())
172 					|| (!result1.isContinuable() && !result2.isContinuable())) {
173 				return 0;
174 			}
175 			if (result1.isContinuable()) {
176 				return -1;
177 			}
178 			return 1;
179 		}
180 	}
181 
182 }