EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [ThrottleLimitResultQueue.java]

nameclass, %method, %block, %line, %
ThrottleLimitResultQueue.java100% (1/1)100% (6/6)91%  (96/106)98%  (24.6/25)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ThrottleLimitResultQueue100% (1/1)100% (6/6)91%  (96/106)98%  (24.6/25)
expect (): void 100% (1/1)78%  (18/23)95%  (4.8/5)
take (): Object 100% (1/1)85%  (28/33)97%  (6.8/7)
ThrottleLimitResultQueue (int): void 100% (1/1)100% (22/22)100% (6/6)
isEmpty (): boolean 100% (1/1)100% (4/4)100% (1/1)
isExpecting (): boolean 100% (1/1)100% (7/7)100% (1/1)
put (Object): void 100% (1/1)100% (17/17)100% (5/5)

1/*
2 * Copyright 2002-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import java.util.NoSuchElementException;
20import java.util.concurrent.BlockingQueue;
21import java.util.concurrent.LinkedBlockingQueue;
22import java.util.concurrent.Semaphore;
23 
24/**
25 * An implementation of the {@link ResultQueue} that throttles the number of
26 * expected results, limiting it to a maximum at any given time.
27 * 
28 * @author Dave Syer
29 */
30public class ThrottleLimitResultQueue<T> implements ResultQueue<T> {
31 
32        // Accumulation of result objects as they finish.
33        private final BlockingQueue<T> results;
34 
35        // Accumulation of dummy objects flagging expected results in the future.
36        private final Semaphore waits;
37 
38        private final Object lock = new Object();
39 
40        private volatile int count = 0;
41 
42        /**
43         * @param throttleLimit the maximum number of results that can be expected
44         * at any given time.
45         */
46        public ThrottleLimitResultQueue(int throttleLimit) {
47                results = new LinkedBlockingQueue<T>();
48                waits = new Semaphore(throttleLimit);
49        }
50 
51    @Override
52        public boolean isEmpty() {
53                return results.isEmpty();
54        }
55 
56        /*
57         * (non-Javadoc)
58         * 
59         * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
60         */
61    @Override
62        public boolean isExpecting() {
63                // Base the decision about whether we expect more results on a
64                // counter of the number of expected results actually collected.
65                // Do not synchronize!  Otherwise put and expect can deadlock.
66                return count > 0;
67        }
68 
69        /**
70         * Tell the queue to expect one more result. Blocks until a new result is
71         * available if already expecting too many (as determined by the throttle
72         * limit).
73         * 
74         * @see ResultQueue#expect()
75         */
76    @Override
77        public void expect() throws InterruptedException {
78                synchronized (lock) {
79                        waits.acquire();
80                        count++;
81                }
82        }
83 
84    @Override
85        public void put(T holder) throws IllegalArgumentException {
86                if (!isExpecting()) {
87                        throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
88                }
89                // There should be no need to block here, or to use offer()
90                results.add(holder);
91                // Take from the waits queue now to allow another result to
92                // accumulate. But don't decrement the counter.
93                waits.release();
94        }
95 
96    @Override
97        public T take() throws NoSuchElementException, InterruptedException {
98                if (!isExpecting()) {
99                        throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
100                }
101                T value;
102                synchronized (lock) {
103                        value = results.take();
104                        // Decrement the counter only when the result is collected.
105                        count--;
106                }
107                return value;
108        }
109 
110}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov