View Javadoc

1   /*
2    * Copyright 2006-2012 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.database;
18  
19  import java.sql.ResultSet;
20  import java.sql.SQLException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.SortedMap;
27  import java.util.TreeMap;
28  import java.util.concurrent.CopyOnWriteArrayList;
29  
30  import javax.sql.DataSource;
31  
32  import org.springframework.batch.item.ExecutionContext;
33  import org.springframework.batch.item.ItemStreamException;
34  import org.springframework.beans.factory.InitializingBean;
35  import org.springframework.jdbc.core.JdbcTemplate;
36  import org.springframework.jdbc.core.RowMapper;
37  import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
38  import org.springframework.util.Assert;
39  import org.springframework.util.ClassUtils;
40  
41  /**
42   * <p>
43   * {@link org.springframework.batch.item.ItemReader} for reading database
44   * records using JDBC in a paging fashion.
45   * </p>
46   * 
47   * <p>
48   * It executes the SQL built by the {@link PagingQueryProvider} to retrieve
49   * requested data. The query is executed using paged requests of a size
50   * specified in {@link #setPageSize(int)}. Additional pages are requested when
51   * needed as {@link #read()} method is called, returning an object corresponding
52   * to current position. On restart it uses the last sort key value to locate the
53   * first page to read (so it doesn't matter if the successfully processed itmes
54   * have been removed or modified).
55   * </p>
56   * 
57   * <p>
58   * The performance of the paging depends on the database specific features
59   * available to limit the number of returned rows. Setting a fairly large page
60   * size and using a commit interval that matches the page size should provide
61   * better performance.
62   * </p>
63   * 
64   * <p>
65   * The implementation is thread-safe in between calls to
66   * {@link #open(ExecutionContext)}, but remember to use
67   * <code>saveState=false</code> if used in a multi-threaded client (no restart
68   * available).
69   * </p>
70   * 
71   * @author Thomas Risberg
72   * @author Dave Syer
73   * @author Michael Minella
74   * @since 2.0
75   */
76  public class JdbcPagingItemReader<T> extends AbstractPagingItemReader<T> implements InitializingBean {
77  	private static final String START_AFTER_VALUE = "start.after";
78  
79  	public static final int VALUE_NOT_SET = -1;
80  
81  	private DataSource dataSource;
82  
83  	private PagingQueryProvider queryProvider;
84  
85  	private Map<String, Object> parameterValues;
86  
87  	private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
88  
89  	@SuppressWarnings("rawtypes")
90  	private RowMapper rowMapper;
91  
92  	private String firstPageSql;
93  
94  	private String remainingPagesSql;
95  
96  	private Map<String, Object> startAfterValues;
97  
98  	private int fetchSize = VALUE_NOT_SET;
99  
100 	public JdbcPagingItemReader() {
101 		setName(ClassUtils.getShortName(JdbcPagingItemReader.class));
102 	}
103 
104 	public void setDataSource(DataSource dataSource) {
105 		this.dataSource = dataSource;
106 	}
107 
108 	/**
109 	 * Gives the JDBC driver a hint as to the number of rows that should be
110 	 * fetched from the database when more rows are needed for this
111 	 * <code>ResultSet</code> object. If the fetch size specified is zero, the
112 	 * JDBC driver ignores the value.
113 	 * 
114 	 * @param fetchSize the number of rows to fetch
115 	 * @see ResultSet#setFetchSize(int)
116 	 */
117 	public void setFetchSize(int fetchSize) {
118 		this.fetchSize = fetchSize;
119 	}
120 
121 	/**
122 	 * A {@link PagingQueryProvider}. Supplies all the platform dependent query
123 	 * generation capabilities needed by the reader.
124 	 * 
125 	 * @param queryProvider the {@link PagingQueryProvider} to use
126 	 */
127 	public void setQueryProvider(PagingQueryProvider queryProvider) {
128 		this.queryProvider = queryProvider;
129 	}
130 
131 	/**
132 	 * The row mapper implementation to be used by this reader. The row mapper
133 	 * is used to convert result set rows into objects, which are then returned
134 	 * by the reader.
135 	 * 
136 	 * @param rowMapper a
137 	 * {@link org.springframework.jdbc.core.simple.ParameterizedRowMapper}
138 	 * implementation
139 	 */
140 	@SuppressWarnings("rawtypes")
141 	public void setRowMapper(RowMapper rowMapper) {
142 		this.rowMapper = rowMapper;
143 	}
144 
145 	/**
146 	 * The parameter values to be used for the query execution. If you use named
147 	 * parameters then the key should be the name used in the query clause. If
148 	 * you use "?" placeholders then the key should be the relative index that
149 	 * the parameter appears in the query string built using the select, from
150 	 * and where clauses specified.
151 	 * 
152 	 * @param parameterValues the values keyed by the parameter named/index used
153 	 * in the query string.
154 	 */
155 	public void setParameterValues(Map<String, Object> parameterValues) {
156 		this.parameterValues = parameterValues;
157 	}
158 
159 	/**
160 	 * Check mandatory properties.
161 	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
162 	 */
163 	@Override
164 	public void afterPropertiesSet() throws Exception {
165 		super.afterPropertiesSet();
166 		Assert.notNull(dataSource);
167 		JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
168 		if (fetchSize != VALUE_NOT_SET) {
169 			jdbcTemplate.setFetchSize(fetchSize);
170 		}
171 		jdbcTemplate.setMaxRows(getPageSize());
172 		namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
173 		Assert.notNull(queryProvider);
174 		queryProvider.init(dataSource);
175 		this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize());
176 		this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize());
177 	}
178 
179 	@Override
180 	@SuppressWarnings("unchecked")
181 	protected void doReadPage() {
182 		if (results == null) {
183 			results = new CopyOnWriteArrayList<T>();
184 		}
185 		else {
186 			results.clear();
187 		}
188 
189 		PagingRowMapper rowCallback = new PagingRowMapper();
190 
191 		List<?> query;
192 
193 		if (getPage() == 0) {
194 			if (logger.isDebugEnabled()) {
195 				logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
196 			}
197 			if (parameterValues != null && parameterValues.size() > 0) {
198 				if (this.queryProvider.isUsingNamedParameters()) {
199 					query = namedParameterJdbcTemplate.query(firstPageSql,
200 							getParameterMap(parameterValues, null), rowCallback);
201 				}
202 				else {
203 					query = getJdbcTemplate().query(firstPageSql,
204 							getParameterList(parameterValues, null).toArray(), rowCallback);
205 				}
206 			}
207 			else {
208 				query = getJdbcTemplate().query(firstPageSql, rowCallback);
209 			}
210 
211 		}
212 		else {
213 			if (logger.isDebugEnabled()) {
214 				logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
215 			}
216 			if (this.queryProvider.isUsingNamedParameters()) {
217 				query = namedParameterJdbcTemplate.query(remainingPagesSql,
218 						getParameterMap(parameterValues, startAfterValues), rowCallback);
219 			}
220 			else {
221 				query = getJdbcTemplate().query(remainingPagesSql,
222 						getParameterList(parameterValues, startAfterValues).toArray(), rowCallback);
223 			}
224 		}
225 		
226 		Collection<T> result = (Collection<T>) query;
227 		results.addAll(result);
228 	}
229 
230 	@Override
231 	public void update(ExecutionContext executionContext) throws ItemStreamException {
232 		super.update(executionContext);
233 		if (isSaveState() && startAfterValues != null) {
234 			executionContext.put(getExecutionContextUserSupport().getKey(START_AFTER_VALUE), startAfterValues);
235 		}
236 	}
237 
238 	@Override
239 	@SuppressWarnings("unchecked")
240 	public void open(ExecutionContext executionContext) {
241 		if (isSaveState()) {
242 			startAfterValues = (Map<String, Object>) executionContext.get(getExecutionContextUserSupport().getKey(START_AFTER_VALUE));
243 			
244 			if(startAfterValues == null) {
245 				startAfterValues = new LinkedHashMap<String, Object>();
246 			}
247 		}
248 		
249 		super.open(executionContext);
250 	}
251 
252 	@Override
253 	@SuppressWarnings({"unchecked", "rawtypes"})
254 	protected void doJumpToPage(int itemIndex) {
255 		/*
256 		 * Normally this would be false (the startAfterValue is enough
257 		 * information to restart from.
258 		 */
259 		if (startAfterValues == null && getPage() > 0) {
260 
261 			String jumpToItemSql;
262 			jumpToItemSql = queryProvider.generateJumpToItemQuery(itemIndex, getPageSize());
263 
264 			if (logger.isDebugEnabled()) {
265 				logger.debug("SQL used for jumping: [" + jumpToItemSql + "]");
266 			}
267 
268 			RowMapper startMapper = new RowMapper() {
269                 @Override
270 				public Object mapRow(ResultSet rs, int i) throws SQLException {
271 					return rs.getObject(1);
272 				}
273 			};
274 			if (this.queryProvider.isUsingNamedParameters()) {
275 				startAfterValues = (Map<String, Object>) namedParameterJdbcTemplate.queryForObject(jumpToItemSql,
276 						getParameterMap(parameterValues, startAfterValues), startMapper);
277 			}
278 			else {
279 				startAfterValues = (Map<String, Object>) getJdbcTemplate().queryForObject(jumpToItemSql,
280 						getParameterList(parameterValues, startAfterValues).toArray(), startMapper);
281 			}
282 		}
283 	}
284 
285 	private Map<String, Object> getParameterMap(Map<String, Object> values, Map<String, Object> sortKeyValues) {
286 		Map<String, Object> parameterMap = new LinkedHashMap<String, Object>();
287 		if (values != null) {
288 			parameterMap.putAll(values);
289 		}
290 		if (sortKeyValues != null && !sortKeyValues.isEmpty()) {
291 			for (Map.Entry<String, Object> sortKey : sortKeyValues.entrySet()) {
292 				parameterMap.put("_" + sortKey.getKey(), sortKey.getValue());
293 			}
294 		}
295 		if (logger.isDebugEnabled()) {
296 			logger.debug("Using parameterMap:" + parameterMap);
297 		}
298 		return parameterMap;
299 	}
300 
301 	private List<Object> getParameterList(Map<String, Object> values, Map<String, Object> sortKeyValue) {
302 		SortedMap<String, Object> sm = new TreeMap<String, Object>();
303 		if (values != null) {
304 			sm.putAll(values);
305 		}
306 		List<Object> parameterList = new ArrayList<Object>();
307 		parameterList.addAll(sm.values());
308 		if (sortKeyValue != null && sortKeyValue.size() > 0) {
309 			List<Map.Entry<String, Object>> keys = new ArrayList<Map.Entry<String,Object>>(sortKeyValue.entrySet());
310 
311 			for(int i = 0; i < keys.size(); i++) {
312 				for(int j = 0; j < i; j++) {
313 					parameterList.add(keys.get(j).getValue());
314 				}
315 
316 				parameterList.add(keys.get(i).getValue());
317 			}
318 		}
319 		
320 		if (logger.isDebugEnabled()) {
321 			logger.debug("Using parameterList:" + parameterList);
322 		}
323 		return parameterList;
324 	}
325 
326 	@SuppressWarnings("rawtypes")
327 	private class PagingRowMapper implements RowMapper {
328         @Override
329 		public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
330 			startAfterValues = new LinkedHashMap<String, Object>();
331 			for (Map.Entry<String, Order> sortKey : queryProvider.getSortKeys().entrySet()) {
332 				startAfterValues.put(sortKey.getKey(), rs.getObject(sortKey.getKey()));
333 			}
334 			
335 			return rowMapper.mapRow(rs, rowNum);
336 		}
337 	}
338 
339 	private JdbcTemplate getJdbcTemplate() {
340 		return (JdbcTemplate) namedParameterJdbcTemplate.getJdbcOperations();
341 	}
342 }