EMMA Coverage Report (generated Fri Aug 21 15:59:46 BST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ChunkMonitor.java]

nameclass, %method, %block, %line, %
ChunkMonitor.java100% (2/2)100% (13/13)100% (205/205)100% (54/54)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ChunkMonitor100% (1/1)100% (12/12)100% (196/196)100% (50/50)
<static initializer> 100% (1/1)100% (11/11)100% (2/2)
ChunkMonitor (): void 100% (1/1)100% (26/26)100% (5/5)
close (): void 100% (1/1)100% (15/15)100% (4/4)
getData (): ChunkMonitor$ChunkMonitorData 100% (1/1)100% (26/26)100% (7/7)
getOffset (): int 100% (1/1)100% (4/4)100% (1/1)
incrementOffset (): void 100% (1/1)100% (17/17)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (56/56)100% (12/12)
registerItemStream (ItemStream): void 100% (1/1)100% (5/5)100% (2/2)
resetOffset (): void 100% (1/1)100% (5/5)100% (2/2)
setChunkSize (int): void 100% (1/1)100% (7/7)100% (3/3)
setItemReader (ItemReader): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (20/20)100% (6/6)
     
class ChunkMonitor$ChunkMonitorData100% (1/1)100% (1/1)100% (9/9)100% (4/4)
ChunkMonitor$ChunkMonitorData (int, int): void 100% (1/1)100% (9/9)100% (4/4)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import org.apache.commons.logging.Log;
19import org.apache.commons.logging.LogFactory;
20import org.springframework.batch.item.ExecutionContext;
21import org.springframework.batch.item.ItemReader;
22import org.springframework.batch.item.ItemStream;
23import org.springframework.batch.item.ItemStreamException;
24import org.springframework.batch.item.support.CompositeItemStream;
25 
26/**
27 * Manage the offset data between the last successful commit and updates made to
28 * an input chunk. Only works with single threaded steps because it has to use a
29 * {@link ThreadLocal} to manage the state and co-ordinate between the caller
30 * and the wrapped {@link ItemStream}.
31 * 
32 * @author Dave Syer
33 * @since 2.0
34 */
35class ChunkMonitor implements ItemStream {
36        
37        private Log logger = LogFactory.getLog(getClass());
38 
39        public static class ChunkMonitorData {
40                public int offset;
41 
42                public int chunkSize;
43 
44                public ChunkMonitorData(int offset, int chunkSize) {
45                        this.offset = offset;
46                        this.chunkSize = chunkSize;
47                }
48        }
49 
50        private static final String OFFSET = ChunkMonitor.class.getName() + ".OFFSET";
51 
52        private CompositeItemStream stream = new CompositeItemStream();
53 
54        private ThreadLocal<ChunkMonitorData> holder = new ThreadLocal<ChunkMonitorData>();
55        {
56                // For testing purposes, make an instance of the offset data
57                // available:
58                holder.set(new ChunkMonitorData(0, 0));
59        }
60 
61        private ItemReader<?> reader;
62 
63        /**
64         * @param stream the stream to set
65         */
66        public void registerItemStream(ItemStream stream) {
67                this.stream.register(stream);
68        }
69 
70        /**
71         * @param reader the reader to set
72         */
73        public void setItemReader(ItemReader<?> reader) {
74                this.reader = reader;
75        }
76 
77        public void incrementOffset() {
78                ChunkMonitorData data = getData();
79                data.offset ++;
80                if (data.offset >= data.chunkSize) {
81                        resetOffset();
82                }
83        }
84 
85        public int getOffset() {
86                return getData().offset;
87        }
88 
89        public void resetOffset() {
90                getData().offset = 0;
91        }
92 
93        public void setChunkSize(int chunkSize) {
94                getData().chunkSize = chunkSize;
95                resetOffset();
96        }
97 
98        public void close() throws ItemStreamException {
99                holder.set(new ChunkMonitorData(0,0));
100                if (stream != null) {
101                        stream.close();
102                }
103        }
104 
105        public void open(ExecutionContext executionContext) throws ItemStreamException {
106                if (stream != null) {
107                        stream.open(executionContext);
108                        ChunkMonitorData data = new ChunkMonitorData(executionContext.getInt(OFFSET, 0), 0);
109                        holder.set(data);
110                        if (reader == null) {
111                                logger.warn("No ItemReader set (must be concurrent step), so ignoring offset data.");
112                                return;
113                        }
114                        for (int i = 0; i < data.offset; i++) {
115                                try {
116                                        reader.read();
117                                }
118                                catch (Exception e) {
119                                        throw new ItemStreamException("Could not position reader with offset: " + data.offset, e);
120                                }
121                        }
122                }
123        }
124 
125        public void update(ExecutionContext executionContext) throws ItemStreamException {
126                if (stream != null) {
127                        ChunkMonitorData data = getData();
128                        if (data.offset == 0) {
129                                // Only call the underlying update method if we are on a chunk
130                                // boundary
131                                stream.update(executionContext);
132                        }
133                        else {
134                                executionContext.putInt(OFFSET, data.offset);
135                        }
136                }
137        }
138 
139        private ChunkMonitorData getData() {
140                ChunkMonitorData data = holder.get();
141                if (data==null) {
142                        if (stream!=null) {
143                                logger.warn("ItemStream was opened in a different thread.  Restart data could be compromised.");
144                        }
145                        data = new ChunkMonitorData(0,0);
146                        holder.set(data);
147                }
148                return data;
149        }
150 
151}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov