1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.support; |
18 | |
19 | import org.springframework.batch.item.ExecutionContext; |
20 | import org.springframework.batch.item.ItemReader; |
21 | import org.springframework.batch.item.ItemStreamException; |
22 | import org.springframework.batch.item.ItemStreamReader; |
23 | import org.springframework.batch.item.ParseException; |
24 | import org.springframework.batch.item.UnexpectedInputException; |
25 | import org.springframework.batch.item.util.ExecutionContextUserSupport; |
26 | import org.springframework.util.Assert; |
27 | |
28 | /** |
29 | * Abstract superclass for {@link ItemReader}s that supports restart by storing |
30 | * item count in the {@link ExecutionContext} (therefore requires item ordering |
31 | * to be preserved between runs). |
32 | * |
33 | * Subclasses are inherently *not* thread-safe. |
34 | * |
35 | * @author Robert Kasanicky |
36 | */ |
37 | public abstract class AbstractItemCountingItemStreamItemReader<T> implements ItemStreamReader<T> { |
38 | |
39 | private static final String READ_COUNT = "read.count"; |
40 | |
41 | private static final String READ_COUNT_MAX = "read.count.max"; |
42 | |
43 | private int currentItemCount = 0; |
44 | |
45 | private int maxItemCount = Integer.MAX_VALUE; |
46 | |
47 | private ExecutionContextUserSupport ecSupport = new ExecutionContextUserSupport(); |
48 | |
49 | private boolean saveState = true; |
50 | |
51 | /** |
52 | * Read next item from input. |
53 | * |
54 | * @return item |
55 | * @throws Exception |
56 | */ |
57 | protected abstract T doRead() throws Exception; |
58 | |
59 | /** |
60 | * Open resources necessary to start reading input. |
61 | */ |
62 | protected abstract void doOpen() throws Exception; |
63 | |
64 | /** |
65 | * Close the resources opened in {@link #doOpen()}. |
66 | */ |
67 | protected abstract void doClose() throws Exception; |
68 | |
69 | /** |
70 | * Move to the given item index. Subclasses should override this method if |
71 | * there is a more efficient way of moving to given index than re-reading |
72 | * the input using {@link #doRead()}. |
73 | */ |
74 | protected void jumpToItem(int itemIndex) throws Exception { |
75 | for (int i = 0; i < itemIndex; i++) { |
76 | read(); |
77 | } |
78 | } |
79 | |
80 | public final T read() throws Exception, UnexpectedInputException, ParseException { |
81 | if (currentItemCount >= maxItemCount) { |
82 | return null; |
83 | } |
84 | currentItemCount++; |
85 | return doRead(); |
86 | } |
87 | |
88 | protected int getCurrentItemCount() { |
89 | return currentItemCount; |
90 | } |
91 | |
92 | /** |
93 | * The index of the item to start reading from. If the |
94 | * {@link ExecutionContext} contains a key <code>[name].read.count</code> |
95 | * (where <code>[name]</code> is the name of this component) the value from |
96 | * the {@link ExecutionContext} will be used in preference. |
97 | * |
98 | * @see #setName(String) |
99 | * |
100 | * @param count the value of the current item count |
101 | */ |
102 | public void setCurrentItemCount(int count) { |
103 | this.currentItemCount = count; |
104 | } |
105 | |
106 | /** |
107 | * The maximum index of the items to be read. If the |
108 | * {@link ExecutionContext} contains a key |
109 | * <code>[name].read.count.max</code> (where <code>[name]</code> is the name |
110 | * of this component) the value from the {@link ExecutionContext} will be |
111 | * used in preference. |
112 | * |
113 | * @see #setName(String) |
114 | * |
115 | * @param count the value of the maximum item count |
116 | */ |
117 | public void setMaxItemCount(int count) { |
118 | this.maxItemCount = count; |
119 | } |
120 | |
121 | public void close() throws ItemStreamException { |
122 | currentItemCount = 0; |
123 | try { |
124 | doClose(); |
125 | } |
126 | catch (Exception e) { |
127 | throw new ItemStreamException("Error while closing item reader", e); |
128 | } |
129 | } |
130 | |
131 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
132 | |
133 | try { |
134 | doOpen(); |
135 | } |
136 | catch (Exception e) { |
137 | throw new ItemStreamException("Failed to initialize the reader", e); |
138 | } |
139 | if (!isSaveState()) { |
140 | return; |
141 | } |
142 | |
143 | if (executionContext.containsKey(ecSupport.getKey(READ_COUNT_MAX))) { |
144 | maxItemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT_MAX)); |
145 | } |
146 | |
147 | if (executionContext.containsKey(ecSupport.getKey(READ_COUNT))) { |
148 | int itemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT)); |
149 | |
150 | if (itemCount < maxItemCount) { |
151 | try { |
152 | jumpToItem(itemCount); |
153 | } |
154 | catch (Exception e) { |
155 | throw new ItemStreamException("Could not move to stored position on restart", e); |
156 | } |
157 | } |
158 | currentItemCount = itemCount; |
159 | |
160 | } |
161 | |
162 | } |
163 | |
164 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
165 | if (saveState) { |
166 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
167 | executionContext.putInt(ecSupport.getKey(READ_COUNT), currentItemCount); |
168 | if (maxItemCount < Integer.MAX_VALUE) { |
169 | executionContext.putInt(ecSupport.getKey(READ_COUNT_MAX), maxItemCount); |
170 | } |
171 | } |
172 | |
173 | } |
174 | |
175 | protected ExecutionContextUserSupport getExecutionContextUserSupport() { |
176 | return ecSupport; |
177 | } |
178 | |
179 | /** |
180 | * The name of the component which will be used as a stem for keys in the |
181 | * {@link ExecutionContext}. Subclasses should provide a default value, e.g. |
182 | * the short form of the class name. |
183 | * |
184 | * @param name the name for the component |
185 | */ |
186 | public void setName(String name) { |
187 | ecSupport.setName(name); |
188 | } |
189 | |
190 | /** |
191 | * Set the flag that determines whether to save internal data for |
192 | * {@link ExecutionContext}. Only switch this to false if you don't want to |
193 | * save any state from this stream, and you don't need it to be restartable. |
194 | * Always set it to false if the reader is being used in a concurrent |
195 | * environment. |
196 | * |
197 | * @param saveState flag value (default true). |
198 | */ |
199 | public void setSaveState(boolean saveState) { |
200 | this.saveState = saveState; |
201 | } |
202 | |
203 | /** |
204 | * The flag that determines whether to save internal state for restarts. |
205 | * @return true if the flag was set |
206 | */ |
207 | public boolean isSaveState() { |
208 | return saveState; |
209 | } |
210 | |
211 | } |