View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.support;
18  
19  import org.springframework.batch.item.ExecutionContext;
20  import org.springframework.batch.item.ItemReader;
21  import org.springframework.batch.item.ItemStreamException;
22  import org.springframework.batch.item.ItemStreamReader;
23  import org.springframework.batch.item.ParseException;
24  import org.springframework.batch.item.UnexpectedInputException;
25  import org.springframework.batch.item.util.ExecutionContextUserSupport;
26  import org.springframework.util.Assert;
27  
28  /**
29   * Abstract superclass for {@link ItemReader}s that supports restart by storing
30   * item count in the {@link ExecutionContext} (therefore requires item ordering
31   * to be preserved between runs).
32   * 
33   * Subclasses are inherently *not* thread-safe.
34   * 
35   * @author Robert Kasanicky
36   */
37  public abstract class AbstractItemCountingItemStreamItemReader<T> implements ItemStreamReader<T> {
38  
39  	private static final String READ_COUNT = "read.count";
40  
41  	private static final String READ_COUNT_MAX = "read.count.max";
42  
43  	private int currentItemCount = 0;
44  
45  	private int maxItemCount = Integer.MAX_VALUE;
46  
47  	private ExecutionContextUserSupport ecSupport = new ExecutionContextUserSupport();
48  
49  	private boolean saveState = true;
50  
51  	/**
52  	 * Read next item from input.
53  	 * 
54  	 * @return item
55  	 * @throws Exception
56  	 */
57  	protected abstract T doRead() throws Exception;
58  
59  	/**
60  	 * Open resources necessary to start reading input.
61  	 */
62  	protected abstract void doOpen() throws Exception;
63  
64  	/**
65  	 * Close the resources opened in {@link #doOpen()}.
66  	 */
67  	protected abstract void doClose() throws Exception;
68  
69  	/**
70  	 * Move to the given item index. Subclasses should override this method if
71  	 * there is a more efficient way of moving to given index than re-reading
72  	 * the input using {@link #doRead()}.
73  	 */
74  	protected void jumpToItem(int itemIndex) throws Exception {
75  		for (int i = 0; i < itemIndex; i++) {
76  			read();
77  		}
78  	}
79  
80  	public final T read() throws Exception, UnexpectedInputException, ParseException {
81  		if (currentItemCount >= maxItemCount) {
82  			return null;
83  		}
84  		currentItemCount++;
85  		return doRead();
86  	}
87  
88  	protected int getCurrentItemCount() {
89  		return currentItemCount;
90  	}
91  
92  	/**
93  	 * The index of the item to start reading from. If the
94  	 * {@link ExecutionContext} contains a key <code>[name].read.count</code>
95  	 * (where <code>[name]</code> is the name of this component) the value from
96  	 * the {@link ExecutionContext} will be used in preference.
97  	 * 
98  	 * @see #setName(String)
99  	 * 
100 	 * @param count the value of the current item count
101 	 */
102 	public void setCurrentItemCount(int count) {
103 		this.currentItemCount = count;
104 	}
105 
106 	/**
107 	 * The maximum index of the items to be read. If the
108 	 * {@link ExecutionContext} contains a key
109 	 * <code>[name].read.count.max</code> (where <code>[name]</code> is the name
110 	 * of this component) the value from the {@link ExecutionContext} will be
111 	 * used in preference.
112 	 * 
113 	 * @see #setName(String)
114 	 * 
115 	 * @param count the value of the maximum item count
116 	 */
117 	public void setMaxItemCount(int count) {
118 		this.maxItemCount = count;
119 	}
120 
121 	public void close() throws ItemStreamException {
122 		currentItemCount = 0;
123 		try {
124 			doClose();
125 		}
126 		catch (Exception e) {
127 			throw new ItemStreamException("Error while closing item reader", e);
128 		}
129 	}
130 
131 	public void open(ExecutionContext executionContext) throws ItemStreamException {
132 
133 		try {
134 			doOpen();
135 		}
136 		catch (Exception e) {
137 			throw new ItemStreamException("Failed to initialize the reader", e);
138 		}
139 		if (!isSaveState()) {
140 			return;
141 		}
142 
143 		if (executionContext.containsKey(ecSupport.getKey(READ_COUNT_MAX))) {
144 			maxItemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT_MAX));
145 		}
146 
147 		if (executionContext.containsKey(ecSupport.getKey(READ_COUNT))) {
148 			int itemCount = executionContext.getInt(ecSupport.getKey(READ_COUNT));
149 
150 			if (itemCount < maxItemCount) {
151 				try {
152 					jumpToItem(itemCount);
153 				}
154 				catch (Exception e) {
155 					throw new ItemStreamException("Could not move to stored position on restart", e);
156 				}
157 			}
158 			currentItemCount = itemCount;
159 
160 		}
161 
162 	}
163 
164 	public void update(ExecutionContext executionContext) throws ItemStreamException {
165 		if (saveState) {
166 			Assert.notNull(executionContext, "ExecutionContext must not be null");
167 			executionContext.putInt(ecSupport.getKey(READ_COUNT), currentItemCount);
168 			if (maxItemCount < Integer.MAX_VALUE) {
169 				executionContext.putInt(ecSupport.getKey(READ_COUNT_MAX), maxItemCount);
170 			}
171 		}
172 
173 	}
174 
175 	protected ExecutionContextUserSupport getExecutionContextUserSupport() {
176 		return ecSupport;
177 	}
178 
179 	/**
180 	 * The name of the component which will be used as a stem for keys in the
181 	 * {@link ExecutionContext}. Subclasses should provide a default value, e.g.
182 	 * the short form of the class name.
183 	 * 
184 	 * @param name the name for the component
185 	 */
186 	public void setName(String name) {
187 		ecSupport.setName(name);
188 	}
189 
190 	/**
191 	 * Set the flag that determines whether to save internal data for
192 	 * {@link ExecutionContext}. Only switch this to false if you don't want to
193 	 * save any state from this stream, and you don't need it to be restartable.
194 	 * Always set it to false if the reader is being used in a concurrent
195 	 * environment.
196 	 * 
197 	 * @param saveState flag value (default true).
198 	 */
199 	public void setSaveState(boolean saveState) {
200 		this.saveState = saveState;
201 	}
202 
203 	/**
204 	 * The flag that determines whether to save internal state for restarts.
205 	 * @return true if the flag was set
206 	 */
207 	public boolean isSaveState() {
208 		return saveState;
209 	}
210 
211 }