View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.support;
18  
19  import org.springframework.batch.item.ExecutionContext;
20  import org.springframework.batch.item.ItemReader;
21  import org.springframework.batch.item.ItemStreamException;
22  import org.springframework.batch.item.ItemStreamReader;
23  import org.springframework.batch.item.ParseException;
24  import org.springframework.batch.item.UnexpectedInputException;
25  import org.springframework.batch.item.util.ExecutionContextUserSupport;
26  import org.springframework.util.Assert;
27  
28  /**
29   * Abstract superclass for {@link ItemReader}s that supports restart by storing
30   * item count in the {@link ExecutionContext} (therefore requires item ordering
31   * to be preserved between runs).
32   * 
33   * Subclasses are inherently *not* thread-safe.
34   * 
35   * @author Robert Kasanicky
36   */
37  public abstract class AbstractItemCountingItemStreamItemReader<T> implements ItemStreamReader<T> {
38  
39  	private static final String READ_COUNT = "read.count";
40  
41  	private static final String READ_COUNT_MAX = "read.count.max";
42  
43  	private int currentItemCount = 0;
44  
45  	private int maxItemCount = Integer.MAX_VALUE;
46  
47  	private ExecutionContextUserSupport ecSupport = new ExecutionContextUserSupport();
48  
49  	private boolean saveState = true;
50  
51  	/**
52  	 * Read next item from input.
53  	 * 
54  	 * @return item
55  	 * @throws Exception
56  	 */
57  	protected abstract T doRead() throws Exception;
58  
59  	/**
60  	 * Open resources necessary to start reading input.
61  	 */
62  	protected abstract void doOpen() throws Exception;
63  
64  	/**
65  	 * Close the resources opened in {@link #doOpen()}.
66  	 */
67  	protected abstract void doClose() throws Exception;
68  
69  	/**
70  	 * Move to the given item index. Subclasses should override this method if
71  	 * there is a more efficient way of moving to given index than re-reading
72  	 * the input using {@link #doRead()}.
73  	 */
74  	protected void jumpToItem(int itemIndex) throws Exception {
75  		for (int i = 0; i < itemIndex; i++) {
76  			read();
77  		}
78  	}
79  
80      @Override
81  	public final T read() throws Exception, UnexpectedInputException, ParseException {
82  		if (currentItemCount >= maxItemCount) {
83  			return null;
84  		}
85  		currentItemCount++;
86  		return doRead();
87  	}
88  
89  	protected int getCurrentItemCount() {
90  		return currentItemCount;
91  	}
92  
93  	/**
94  	 * The index of the item to start reading from. If the
95  	 * {@link ExecutionContext} contains a key <code>[name].read.count</code>
96  	 * (where <code>[name]</code> is the name of this component) the value from
97  	 * the {@link ExecutionContext} will be used in preference.
98  	 * 
99  	 * @see #setName(String)
100 	 * 
101 	 * @param count the value of the current item count
102 	 */
103 	public void setCurrentItemCount(int count) {
104 		this.currentItemCount = count;
105 	}
106 
107 	/**
108 	 * The maximum index of the items to be read. If the
109 	 * {@link ExecutionContext} contains a key
110 	 * <code>[name].read.count.max</code> (where <code>[name]</code> is the name
111 	 * of this component) the value from the {@link ExecutionContext} will be
112 	 * used in preference.
113 	 * 
114 	 * @see #setName(String)
115 	 * 
116 	 * @param count the value of the maximum item count
117 	 */
118 	public void setMaxItemCount(int count) {
119 		this.maxItemCount = count;
120 	}
121 
122     @Override
123 	public void close() throws ItemStreamException {
124 		currentItemCount = 0;
125 		try {
126 			doClose();
127 		}
128 		catch (Exception e) {
129 			throw new ItemStreamException("Error while closing item reader", e);
130 		}
131 	}
132 
133     @Override
134 	public void open(ExecutionContext executionContext) throws ItemStreamException {
135 
136 		try {
137 			doOpen();
138 		}
139 		catch (Exception e) {
140 			throw new ItemStreamException("Failed to initialize the reader", e);
141 		}
142 		if (!isSaveState()) {
143 			return;
144 		}
145 
146 		if (executionContext.containsKey(ecSupport.getKey(READ_COUNT_MAX))) {
147 			maxItemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT_MAX));
148 		}
149 
150 		if (executionContext.containsKey(ecSupport.getKey(READ_COUNT))) {
151 			int itemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT));
152 
153 			if (itemCount < maxItemCount) {
154 				try {
155 					jumpToItem(itemCount);
156 				}
157 				catch (Exception e) {
158 					throw new ItemStreamException("Could not move to stored position on restart", e);
159 				}
160 			}
161 			currentItemCount = itemCount;
162 
163 		}
164 
165 	}
166 
167     @Override
168 	public void update(ExecutionContext executionContext) throws ItemStreamException {
169 		if (saveState) {
170 			Assert.notNull(executionContext, "ExecutionContext must not be null");
171 			executionContext.putInt(ecSupport.getKey(READ_COUNT), currentItemCount);
172 			if (maxItemCount < Integer.MAX_VALUE) {
173 				executionContext.putInt(ecSupport.getKey(READ_COUNT_MAX), maxItemCount);
174 			}
175 		}
176 
177 	}
178 
179 	protected ExecutionContextUserSupport getExecutionContextUserSupport() {
180 		return ecSupport;
181 	}
182 
183 	/**
184 	 * The name of the component which will be used as a stem for keys in the
185 	 * {@link ExecutionContext}. Subclasses should provide a default value, e.g.
186 	 * the short form of the class name.
187 	 * 
188 	 * @param name the name for the component
189 	 */
190 	public void setName(String name) {
191 		ecSupport.setName(name);
192 	}
193 
194 	/**
195 	 * Set the flag that determines whether to save internal data for
196 	 * {@link ExecutionContext}. Only switch this to false if you don't want to
197 	 * save any state from this stream, and you don't need it to be restartable.
198 	 * Always set it to false if the reader is being used in a concurrent
199 	 * environment.
200 	 * 
201 	 * @param saveState flag value (default true).
202 	 */
203 	public void setSaveState(boolean saveState) {
204 		this.saveState = saveState;
205 	}
206 
207 	/**
208 	 * The flag that determines whether to save internal state for restarts.
209 	 * @return true if the flag was set
210 	 */
211 	public boolean isSaveState() {
212 		return saveState;
213 	}
214 
215 }