EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.repeat.support]

COVERAGE SUMMARY FOR SOURCE FILE [ResultHolderResultQueue.java]

nameclass, %method, %block, %line, %
ResultHolderResultQueue.java67%  (2/3)100% (10/10)85%  (198/232)90%  (46.7/52)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ResultHolderResultQueue100% (1/1)100% (7/7)84%  (155/185)91%  (34.7/38)
put (ResultHolder): void 100% (1/1)70%  (23/33)85%  (6.8/8)
expect (): void 100% (1/1)78%  (18/23)95%  (4.8/5)
take (): ResultHolder 100% (1/1)81%  (65/80)88%  (14.1/16)
ResultHolderResultQueue (int): void 100% (1/1)100% (27/27)100% (6/6)
isContinuable (ResultHolder): boolean 100% (1/1)100% (11/11)100% (1/1)
isEmpty (): boolean 100% (1/1)100% (4/4)100% (1/1)
isExpecting (): boolean 100% (1/1)100% (7/7)100% (1/1)
     
class ResultHolderResultQueue$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class ResultHolderResultQueue$ResultHolderComparator100% (1/1)100% (3/3)91%  (43/47)86%  (12/14)
compare (ResultHolder, ResultHolder): int 100% (1/1)90%  (37/41)85%  (11/13)
ResultHolderResultQueue$ResultHolderComparator (): void 100% (1/1)100% (3/3)100% (1/1)
ResultHolderResultQueue$ResultHolderComparator (ResultHolderResultQueue$1): void 100% (1/1)100% (3/3)100% (1/1)

1/*
2 * Copyright 2002-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.repeat.support;
18 
19import java.util.Comparator;
20import java.util.NoSuchElementException;
21import java.util.concurrent.BlockingQueue;
22import java.util.concurrent.PriorityBlockingQueue;
23import java.util.concurrent.Semaphore;
24 
25import org.springframework.batch.repeat.RepeatStatus;
26 
27/**
28 * An implementation of the {@link ResultQueue} that throttles the number of
29 * expected results, limiting it to a maximum at any given time.
30 * 
31 * @author Dave Syer
32 */
33public class ResultHolderResultQueue implements ResultQueue<ResultHolder> {
34 
35        // Accumulation of result objects as they finish.
36        private final BlockingQueue<ResultHolder> results;
37 
38        // Accumulation of dummy objects flagging expected results in the future.
39        private final Semaphore waits;
40 
41        private final Object lock = new Object();
42 
43        private volatile int count = 0;
44 
45        /**
46         * @param throttleLimit the maximum number of results that can be expected
47         * at any given time.
48         */
49        public ResultHolderResultQueue(int throttleLimit) {
50                results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
51                waits = new Semaphore(throttleLimit);
52        }
53 
54        public boolean isEmpty() {
55                return results.isEmpty();
56        }
57 
58        /*
59         * (non-Javadoc)
60         * 
61         * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
62         */
63        public boolean isExpecting() {
64                // Base the decision about whether we expect more results on a
65                // counter of the number of expected results actually collected.
66                // Do not synchronize! Otherwise put and expect can deadlock.
67                return count > 0;
68        }
69 
70        /**
71         * Tell the queue to expect one more result. Blocks until a new result is
72         * available if already expecting too many (as determined by the throttle
73         * limit).
74         * 
75         * @see ResultQueue#expect()
76         */
77        public void expect() throws InterruptedException {
78                waits.acquire();
79                // Don't acquire the lock in a synchronized block - might deadlock
80                synchronized (lock) {
81                        count++;
82                }
83        }
84 
85        public void put(ResultHolder holder) throws IllegalArgumentException {
86                if (!isExpecting()) {
87                        throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
88                }
89                results.add(holder);
90                // Take from the waits queue now to allow another result to
91                // accumulate. But don't decrement the counter.
92                waits.release();
93                synchronized (lock) {
94                        lock.notifyAll();
95                }
96        }
97 
98        /**
99         * Get the next result as soon as it becomes available. <br/>
100         * <br/>
101         * Release result immediately if:
102         * <ul>
103         * <li>There is a result that is continuable.</li>
104         * </ul>
105         * Otherwise block if either:
106         * <ul>
107         * <li>There is no result (as per contract of {@link ResultQueue}).</li>
108         * <li>The number of results is less than the number expected.</li>
109         * </ul>
110         * Error if either:
111         * <ul>
112         * <li>Not expecting.</li>
113         * <li>Interrupted.</li>
114         * </ul>
115         * 
116         * @see ResultQueue#take()
117         */
118        public ResultHolder take() throws NoSuchElementException, InterruptedException {
119                if (!isExpecting()) {
120                        throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
121                }
122                ResultHolder value;
123                synchronized (lock) {
124                        value = results.take();
125                        if (isContinuable(value)) {
126                                // Decrement the counter only when the result is collected.
127                                count--;
128                                return value;
129                        }
130                }
131                results.put(value);
132                synchronized (lock) {
133                        while (count > results.size()) {
134                                lock.wait();
135                        }
136                        value = results.take();
137                        count--;
138                }
139                return value;
140        }
141 
142        private boolean isContinuable(ResultHolder value) {
143                return value.getResult() != null && value.getResult().isContinuable();
144        }
145 
146        /**
147         * Compares ResultHolders so that one that is continuable ranks lowest.
148         * 
149         * @author Dave Syer
150         * 
151         */
152        private static class ResultHolderComparator implements Comparator<ResultHolder> {
153                public int compare(ResultHolder h1, ResultHolder h2) {
154                        RepeatStatus result1 = h1.getResult();
155                        RepeatStatus result2 = h2.getResult();
156                        if (result1 == null && result2 == null) {
157                                return 0;
158                        }
159                        if (result1 == null) {
160                                return -1;
161                        }
162                        else if (result2 == null) {
163                                return 1;
164                        }
165                        if ((result1.isContinuable() && result2.isContinuable())
166                                        || (!result1.isContinuable() && !result2.isContinuable())) {
167                                return 0;
168                        }
169                        if (result1.isContinuable()) {
170                                return -1;
171                        }
172                        return 1;
173                }
174        }
175 
176}

[all classes][org.springframework.batch.repeat.support]
EMMA 2.0.5312 (C) Vladimir Roubtsov