1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.support;
18
19 import org.springframework.batch.item.ExecutionContext;
20 import org.springframework.batch.item.ItemReader;
21 import org.springframework.batch.item.ItemStreamException;
22 import org.springframework.batch.item.ItemStreamReader;
23 import org.springframework.batch.item.ParseException;
24 import org.springframework.batch.item.UnexpectedInputException;
25 import org.springframework.batch.item.util.ExecutionContextUserSupport;
26 import org.springframework.util.Assert;
27
28
29
30
31
32
33
34
35
36
37 public abstract class AbstractItemCountingItemStreamItemReader<T> implements ItemStreamReader<T> {
38
39 private static final String READ_COUNT = "read.count";
40
41 private static final String READ_COUNT_MAX = "read.count.max";
42
43 private int currentItemCount = 0;
44
45 private int maxItemCount = Integer.MAX_VALUE;
46
47 private ExecutionContextUserSupport ecSupport = new ExecutionContextUserSupport();
48
49 private boolean saveState = true;
50
51
52
53
54
55
56
57 protected abstract T doRead() throws Exception;
58
59
60
61
62 protected abstract void doOpen() throws Exception;
63
64
65
66
67 protected abstract void doClose() throws Exception;
68
69
70
71
72
73
74 protected void jumpToItem(int itemIndex) throws Exception {
75 for (int i = 0; i < itemIndex; i++) {
76 read();
77 }
78 }
79
80 public final T read() throws Exception, UnexpectedInputException, ParseException {
81 if (currentItemCount >= maxItemCount) {
82 return null;
83 }
84 currentItemCount++;
85 return doRead();
86 }
87
88 protected int getCurrentItemCount() {
89 return currentItemCount;
90 }
91
92
93
94
95
96
97
98
99
100
101
102 public void setCurrentItemCount(int count) {
103 this.currentItemCount = count;
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117 public void setMaxItemCount(int count) {
118 this.maxItemCount = count;
119 }
120
121 public void close() throws ItemStreamException {
122 currentItemCount = 0;
123 try {
124 doClose();
125 }
126 catch (Exception e) {
127 throw new ItemStreamException("Error while closing item reader", e);
128 }
129 }
130
131 public void open(ExecutionContext executionContext) throws ItemStreamException {
132
133 try {
134 doOpen();
135 }
136 catch (Exception e) {
137 throw new ItemStreamException("Failed to initialize the reader", e);
138 }
139 if (!isSaveState()) {
140 return;
141 }
142
143 if (executionContext.containsKey(ecSupport.getKey(READ_COUNT_MAX))) {
144 maxItemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT_MAX));
145 }
146
147 if (executionContext.containsKey(ecSupport.getKey(READ_COUNT))) {
148 int itemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT));
149
150 if (itemCount < maxItemCount) {
151 try {
152 jumpToItem(itemCount);
153 }
154 catch (Exception e) {
155 throw new ItemStreamException("Could not move to stored position on restart", e);
156 }
157 }
158 currentItemCount = itemCount;
159
160 }
161
162 }
163
164 public void update(ExecutionContext executionContext) throws ItemStreamException {
165 if (saveState) {
166 Assert.notNull(executionContext, "ExecutionContext must not be null");
167 executionContext.putInt(ecSupport.getKey(READ_COUNT), currentItemCount);
168 if (maxItemCount < Integer.MAX_VALUE) {
169 executionContext.putInt(ecSupport.getKey(READ_COUNT_MAX), maxItemCount);
170 }
171 }
172
173 }
174
175 protected ExecutionContextUserSupport getExecutionContextUserSupport() {
176 return ecSupport;
177 }
178
179
180
181
182
183
184
185
186 public void setName(String name) {
187 ecSupport.setName(name);
188 }
189
190
191
192
193
194
195
196
197
198
199 public void setSaveState(boolean saveState) {
200 this.saveState = saveState;
201 }
202
203
204
205
206
207 public boolean isSaveState() {
208 return saveState;
209 }
210
211 }