View Javadoc

1   /*
2    * Copyright 2002-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.repeat.support;
18  
19  import java.util.NoSuchElementException;
20  import java.util.concurrent.BlockingQueue;
21  import java.util.concurrent.LinkedBlockingQueue;
22  import java.util.concurrent.Semaphore;
23  
24  /**
25   * An implementation of the {@link ResultQueue} that throttles the number of
26   * expected results, limiting it to a maximum at any given time.
27   * 
28   * @author Dave Syer
29   */
30  public class ThrottleLimitResultQueue<T> implements ResultQueue<T> {
31  
32  	// Accumulation of result objects as they finish.
33  	private final BlockingQueue<T> results;
34  
35  	// Accumulation of dummy objects flagging expected results in the future.
36  	private final Semaphore waits;
37  
38  	private final Object lock = new Object();
39  
40  	private volatile int count = 0;
41  
42  	/**
43  	 * @param throttleLimit the maximum number of results that can be expected
44  	 * at any given time.
45  	 */
46  	public ThrottleLimitResultQueue(int throttleLimit) {
47  		results = new LinkedBlockingQueue<T>();
48  		waits = new Semaphore(throttleLimit);
49  	}
50  
51      @Override
52  	public boolean isEmpty() {
53  		return results.isEmpty();
54  	}
55  
56  	/*
57  	 * (non-Javadoc)
58  	 * 
59  	 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
60  	 */
61      @Override
62  	public boolean isExpecting() {
63  		// Base the decision about whether we expect more results on a
64  		// counter of the number of expected results actually collected.
65  		// Do not synchronize!  Otherwise put and expect can deadlock.
66  		return count > 0;
67  	}
68  
69  	/**
70  	 * Tell the queue to expect one more result. Blocks until a new result is
71  	 * available if already expecting too many (as determined by the throttle
72  	 * limit).
73  	 * 
74  	 * @see ResultQueue#expect()
75  	 */
76      @Override
77  	public void expect() throws InterruptedException {
78  		synchronized (lock) {
79  			waits.acquire();
80  			count++;
81  		}
82  	}
83  
84      @Override
85  	public void put(T holder) throws IllegalArgumentException {
86  		if (!isExpecting()) {
87  			throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
88  		}
89  		// There should be no need to block here, or to use offer()
90  		results.add(holder);
91  		// Take from the waits queue now to allow another result to
92  		// accumulate. But don't decrement the counter.
93  		waits.release();
94  	}
95  
96      @Override
97  	public T take() throws NoSuchElementException, InterruptedException {
98  		if (!isExpecting()) {
99  			throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
100 		}
101 		T value;
102 		synchronized (lock) {
103 			value = results.take();
104 			// Decrement the counter only when the result is collected.
105 			count--;
106 		}
107 		return value;
108 	}
109 
110 }