EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ChunkMonitor.java]

nameclass, %method, %block, %line, %
ChunkMonitor.java100% (2/2)100% (12/12)100% (213/213)100% (62/62)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ChunkMonitor100% (1/1)100% (11/11)100% (204/204)100% (58/58)
ChunkMonitor (): void 100% (1/1)100% (25/25)100% (7/7)
close (): void 100% (1/1)100% (13/13)100% (5/5)
getData (): ChunkMonitor$ChunkMonitorData 100% (1/1)100% (26/26)100% (7/7)
getOffset (): int 100% (1/1)100% (4/4)100% (1/1)
incrementOffset (): void 100% (1/1)100% (17/17)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (64/64)100% (15/15)
registerItemStream (ItemStream): void 100% (1/1)100% (8/8)100% (3/3)
resetOffset (): void 100% (1/1)100% (5/5)100% (2/2)
setChunkSize (int): void 100% (1/1)100% (7/7)100% (3/3)
setItemReader (ItemReader): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (31/31)100% (8/8)
     
class ChunkMonitor$ChunkMonitorData100% (1/1)100% (1/1)100% (9/9)100% (4/4)
ChunkMonitor$ChunkMonitorData (int, int): void 100% (1/1)100% (9/9)100% (4/4)

1/*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import org.apache.commons.logging.Log;
19import org.apache.commons.logging.LogFactory;
20import org.springframework.batch.item.ExecutionContext;
21import org.springframework.batch.item.ItemReader;
22import org.springframework.batch.item.ItemStream;
23import org.springframework.batch.item.ItemStreamException;
24import org.springframework.batch.item.ItemStreamSupport;
25import org.springframework.batch.item.support.CompositeItemStream;
26 
27/**
28 * Manage the offset data between the last successful commit and updates made to
29 * an input chunk. Only works with single threaded steps because it has to use a
30 * {@link ThreadLocal} to manage the state and co-ordinate between the caller
31 * and the wrapped {@link ItemStream}.
32 *
33 * @author Dave Syer
34 * @since 2.0
35 */
36public class ChunkMonitor extends ItemStreamSupport {
37 
38        private Log logger = LogFactory.getLog(getClass());
39 
40        private boolean streamsRegistered = false;
41 
42        public static class ChunkMonitorData {
43                public int offset;
44 
45                public int chunkSize;
46 
47                public ChunkMonitorData(int offset, int chunkSize) {
48                        this.offset = offset;
49                        this.chunkSize = chunkSize;
50                }
51        }
52 
53        private static final String OFFSET = "OFFSET";
54 
55        private CompositeItemStream stream = new CompositeItemStream();
56 
57        private ThreadLocal<ChunkMonitorData> holder = new ThreadLocal<ChunkMonitorData>();
58 
59        private ItemReader<?> reader;
60 
61        public ChunkMonitor() {
62                this.setExecutionContextName(ChunkMonitor.class.getName());
63        }
64 
65        /**
66         * @param stream the stream to set
67         */
68        public void registerItemStream(ItemStream stream) {
69                streamsRegistered = true;
70                this.stream.register(stream);
71        }
72 
73        /**
74         * @param reader the reader to set
75         */
76        public void setItemReader(ItemReader<?> reader) {
77                this.reader = reader;
78        }
79 
80        public void incrementOffset() {
81                ChunkMonitorData data = getData();
82                data.offset ++;
83                if (data.offset >= data.chunkSize) {
84                        resetOffset();
85                }
86        }
87 
88        public int getOffset() {
89                return getData().offset;
90        }
91 
92        public void resetOffset() {
93                getData().offset = 0;
94        }
95 
96        public void setChunkSize(int chunkSize) {
97                getData().chunkSize = chunkSize;
98                resetOffset();
99        }
100 
101        @Override
102        public void close() throws ItemStreamException {
103                super.close();
104                holder.set(null);
105                if (streamsRegistered) {
106                        stream.close();
107                }
108        }
109 
110        @Override
111        public void open(ExecutionContext executionContext) throws ItemStreamException {
112                super.open(executionContext);
113                if (streamsRegistered) {
114                        stream.open(executionContext);
115                        ChunkMonitorData data = new ChunkMonitorData(executionContext.getInt(getExecutionContextKey(OFFSET), 0), 0);
116                        holder.set(data);
117                        if (reader == null) {
118                                logger.warn("No ItemReader set (must be concurrent step), so ignoring offset data.");
119                                return;
120                        }
121                        for (int i = 0; i < data.offset; i++) {
122                                try {
123                                        reader.read();
124                                }
125                                catch (Exception e) {
126                                        throw new ItemStreamException("Could not position reader with offset: " + data.offset, e);
127                                }
128                        }
129 
130                        resetOffset();
131                }
132        }
133 
134        @Override
135        public void update(ExecutionContext executionContext) throws ItemStreamException {
136                super.update(executionContext);
137                if (streamsRegistered) {
138                        ChunkMonitorData data = getData();
139                        if (data.offset == 0) {
140                                // Only call the underlying update method if we are on a chunk
141                                // boundary
142                                stream.update(executionContext);
143                                executionContext.remove(getExecutionContextKey(OFFSET));
144                        }
145                        else {
146                                executionContext.putInt(getExecutionContextKey(OFFSET), data.offset);
147                        }
148                }
149        }
150 
151        private ChunkMonitorData getData() {
152                ChunkMonitorData data = holder.get();
153                if (data==null) {
154                        if (streamsRegistered) {
155                                logger.warn("ItemStream was opened in a different thread.  Restart data could be compromised.");
156                        }
157                        data = new ChunkMonitorData(0,0);
158                        holder.set(data);
159                }
160                return data;
161        }
162 
163}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov