EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ChunkMonitor.java]

nameclass, %method, %block, %line, %
ChunkMonitor.java100% (2/2)100% (13/13)100% (200/200)100% (56/56)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ChunkMonitor100% (1/1)100% (12/12)100% (191/191)100% (52/52)
<static initializer> 100% (1/1)100% (11/11)100% (1/1)
ChunkMonitor (): void 100% (1/1)100% (21/21)100% (5/5)
close (): void 100% (1/1)100% (11/11)100% (4/4)
getData (): ChunkMonitor$ChunkMonitorData 100% (1/1)100% (26/26)100% (7/7)
getOffset (): int 100% (1/1)100% (4/4)100% (1/1)
incrementOffset (): void 100% (1/1)100% (17/17)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (57/57)100% (13/13)
registerItemStream (ItemStream): void 100% (1/1)100% (8/8)100% (3/3)
resetOffset (): void 100% (1/1)100% (5/5)100% (2/2)
setChunkSize (int): void 100% (1/1)100% (7/7)100% (3/3)
setItemReader (ItemReader): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (20/20)100% (6/6)
     
class ChunkMonitor$ChunkMonitorData100% (1/1)100% (1/1)100% (9/9)100% (4/4)
ChunkMonitor$ChunkMonitorData (int, int): void 100% (1/1)100% (9/9)100% (4/4)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import org.apache.commons.logging.Log;
19import org.apache.commons.logging.LogFactory;
20import org.springframework.batch.item.ExecutionContext;
21import org.springframework.batch.item.ItemReader;
22import org.springframework.batch.item.ItemStream;
23import org.springframework.batch.item.ItemStreamException;
24import org.springframework.batch.item.support.CompositeItemStream;
25 
26/**
27 * Manage the offset data between the last successful commit and updates made to
28 * an input chunk. Only works with single threaded steps because it has to use a
29 * {@link ThreadLocal} to manage the state and co-ordinate between the caller
30 * and the wrapped {@link ItemStream}.
31 * 
32 * @author Dave Syer
33 * @since 2.0
34 */
35class ChunkMonitor implements ItemStream {
36        
37        private Log logger = LogFactory.getLog(getClass());
38        
39        private boolean streamsRegistered = false;
40 
41        public static class ChunkMonitorData {
42                public int offset;
43 
44                public int chunkSize;
45 
46                public ChunkMonitorData(int offset, int chunkSize) {
47                        this.offset = offset;
48                        this.chunkSize = chunkSize;
49                }
50        }
51 
52        private static final String OFFSET = ChunkMonitor.class.getName() + ".OFFSET";
53 
54        private CompositeItemStream stream = new CompositeItemStream();
55 
56        private ThreadLocal<ChunkMonitorData> holder = new ThreadLocal<ChunkMonitorData>();
57 
58        private ItemReader<?> reader;
59 
60        /**
61         * @param stream the stream to set
62         */
63        public void registerItemStream(ItemStream stream) {
64                streamsRegistered = true;
65                this.stream.register(stream);
66        }
67 
68        /**
69         * @param reader the reader to set
70         */
71        public void setItemReader(ItemReader<?> reader) {
72                this.reader = reader;
73        }
74 
75        public void incrementOffset() {
76                ChunkMonitorData data = getData();
77                data.offset ++;
78                if (data.offset >= data.chunkSize) {
79                        resetOffset();
80                }
81        }
82 
83        public int getOffset() {
84                return getData().offset;
85        }
86 
87        public void resetOffset() {
88                getData().offset = 0;
89        }
90 
91        public void setChunkSize(int chunkSize) {
92                getData().chunkSize = chunkSize;
93                resetOffset();
94        }
95 
96        public void close() throws ItemStreamException {
97                holder.set(null);
98                if (streamsRegistered) {
99                        stream.close();
100                }
101        }
102 
103        public void open(ExecutionContext executionContext) throws ItemStreamException {
104                if (streamsRegistered) {
105                        stream.open(executionContext);
106                        ChunkMonitorData data = new ChunkMonitorData(executionContext.getInt(OFFSET, 0), 0);
107                        holder.set(data);
108                        if (reader == null) {
109                                logger.warn("No ItemReader set (must be concurrent step), so ignoring offset data.");
110                                return;
111                        }
112                        for (int i = 0; i < data.offset; i++) {
113                                try {
114                                        reader.read();
115                                }
116                                catch (Exception e) {
117                                        throw new ItemStreamException("Could not position reader with offset: " + data.offset, e);
118                                }
119                        }
120                }
121        }
122 
123        public void update(ExecutionContext executionContext) throws ItemStreamException {
124                if (streamsRegistered) {
125                        ChunkMonitorData data = getData();
126                        if (data.offset == 0) {
127                                // Only call the underlying update method if we are on a chunk
128                                // boundary
129                                stream.update(executionContext);
130                        }
131                        else {
132                                executionContext.putInt(OFFSET, data.offset);
133                        }
134                }
135        }
136 
137        private ChunkMonitorData getData() {
138                ChunkMonitorData data = holder.get();
139                if (data==null) {
140                        if (streamsRegistered) {
141                                logger.warn("ItemStream was opened in a different thread.  Restart data could be compromised.");
142                        }
143                        data = new ChunkMonitorData(0,0);
144                        holder.set(data);
145                }
146                return data;
147        }
148 
149}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov