EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [ResultHolderResultQueue.java]

nameclass, %method, %block, %line, %
ResultHolderResultQueue.java67%  (2/3)100% (10/10)85%  (198/232)90%  (47.7/53)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ResultHolderResultQueue100% (1/1)100% (7/7)84%  (155/185)91%  (34.7/38)
put (ResultHolder): void 100% (1/1)70%  (23/33)85%  (6.8/8)
expect (): void 100% (1/1)78%  (18/23)95%  (4.8/5)
take (): ResultHolder 100% (1/1)81%  (65/80)88%  (14.1/16)
ResultHolderResultQueue (int): void 100% (1/1)100% (27/27)100% (6/6)
isContinuable (ResultHolder): boolean 100% (1/1)100% (11/11)100% (1/1)
isEmpty (): boolean 100% (1/1)100% (4/4)100% (1/1)
isExpecting (): boolean 100% (1/1)100% (7/7)100% (1/1)
     
class ResultHolderResultQueue$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class ResultHolderResultQueue$ResultHolderComparator100% (1/1)100% (3/3)91%  (43/47)87%  (13/15)
compare (ResultHolder, ResultHolder): int 100% (1/1)90%  (37/41)86%  (12/14)
ResultHolderResultQueue$ResultHolderComparator (): void 100% (1/1)100% (3/3)100% (1/1)
ResultHolderResultQueue$ResultHolderComparator (ResultHolderResultQueue$1): void 100% (1/1)100% (3/3)100% (1/1)

1/*
2 * Copyright 2002-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import java.util.Comparator;
20import java.util.NoSuchElementException;
21import java.util.concurrent.BlockingQueue;
22import java.util.concurrent.PriorityBlockingQueue;
23import java.util.concurrent.Semaphore;
24 
25import org.springframework.batch.repeat.RepeatStatus;
26 
27/**
28 * An implementation of the {@link ResultQueue} that throttles the number of
29 * expected results, limiting it to a maximum at any given time.
30 * 
31 * @author Dave Syer
32 */
33public class ResultHolderResultQueue implements ResultQueue<ResultHolder> {
34 
35        // Accumulation of result objects as they finish.
36        private final BlockingQueue<ResultHolder> results;
37 
38        // Accumulation of dummy objects flagging expected results in the future.
39        private final Semaphore waits;
40 
41        private final Object lock = new Object();
42 
43        private volatile int count = 0;
44 
45        /**
46         * @param throttleLimit the maximum number of results that can be expected
47         * at any given time.
48         */
49        public ResultHolderResultQueue(int throttleLimit) {
50                results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
51                waits = new Semaphore(throttleLimit);
52        }
53 
54    @Override
55        public boolean isEmpty() {
56                return results.isEmpty();
57        }
58 
59        /*
60         * (non-Javadoc)
61         * 
62         * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
63         */
64    @Override
65        public boolean isExpecting() {
66                // Base the decision about whether we expect more results on a
67                // counter of the number of expected results actually collected.
68                // Do not synchronize! Otherwise put and expect can deadlock.
69                return count > 0;
70        }
71 
72        /**
73         * Tell the queue to expect one more result. Blocks until a new result is
74         * available if already expecting too many (as determined by the throttle
75         * limit).
76         * 
77         * @see ResultQueue#expect()
78         */
79    @Override
80        public void expect() throws InterruptedException {
81                waits.acquire();
82                // Don't acquire the lock in a synchronized block - might deadlock
83                synchronized (lock) {
84                        count++;
85                }
86        }
87 
88    @Override
89        public void put(ResultHolder holder) throws IllegalArgumentException {
90                if (!isExpecting()) {
91                        throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
92                }
93                results.add(holder);
94                // Take from the waits queue now to allow another result to
95                // accumulate. But don't decrement the counter.
96                waits.release();
97                synchronized (lock) {
98                        lock.notifyAll();
99                }
100        }
101 
102        /**
103         * Get the next result as soon as it becomes available. <br/>
104         * <br/>
105         * Release result immediately if:
106         * <ul>
107         * <li>There is a result that is continuable.</li>
108         * </ul>
109         * Otherwise block if either:
110         * <ul>
111         * <li>There is no result (as per contract of {@link ResultQueue}).</li>
112         * <li>The number of results is less than the number expected.</li>
113         * </ul>
114         * Error if either:
115         * <ul>
116         * <li>Not expecting.</li>
117         * <li>Interrupted.</li>
118         * </ul>
119         * 
120         * @see ResultQueue#take()
121         */
122    @Override
123        public ResultHolder take() throws NoSuchElementException, InterruptedException {
124                if (!isExpecting()) {
125                        throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
126                }
127                ResultHolder value;
128                synchronized (lock) {
129                        value = results.take();
130                        if (isContinuable(value)) {
131                                // Decrement the counter only when the result is collected.
132                                count--;
133                                return value;
134                        }
135                }
136                results.put(value);
137                synchronized (lock) {
138                        while (count > results.size()) {
139                                lock.wait();
140                        }
141                        value = results.take();
142                        count--;
143                }
144                return value;
145        }
146 
147        private boolean isContinuable(ResultHolder value) {
148                return value.getResult() != null && value.getResult().isContinuable();
149        }
150 
151        /**
152         * Compares ResultHolders so that one that is continuable ranks lowest.
153         * 
154         * @author Dave Syer
155         * 
156         */
157        private static class ResultHolderComparator implements Comparator<ResultHolder> {
158        @Override
159                public int compare(ResultHolder h1, ResultHolder h2) {
160                        RepeatStatus result1 = h1.getResult();
161                        RepeatStatus result2 = h2.getResult();
162                        if (result1 == null && result2 == null) {
163                                return 0;
164                        }
165                        if (result1 == null) {
166                                return -1;
167                        }
168                        else if (result2 == null) {
169                                return 1;
170                        }
171                        if ((result1.isContinuable() && result2.isContinuable())
172                                        || (!result1.isContinuable() && !result2.isContinuable())) {
173                                return 0;
174                        }
175                        if (result1.isContinuable()) {
176                                return -1;
177                        }
178                        return 1;
179                }
180        }
181 
182}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov