1 | /* |
2 | * Copyright 2006-2008 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.database; |
18 | |
19 | import java.sql.ResultSet; |
20 | import java.sql.SQLException; |
21 | import java.util.ArrayList; |
22 | import java.util.Collection; |
23 | import java.util.LinkedHashMap; |
24 | import java.util.List; |
25 | import java.util.Map; |
26 | import java.util.SortedMap; |
27 | import java.util.TreeMap; |
28 | import java.util.concurrent.CopyOnWriteArrayList; |
29 | |
30 | import javax.sql.DataSource; |
31 | |
32 | import org.springframework.batch.item.ExecutionContext; |
33 | import org.springframework.batch.item.ItemStreamException; |
34 | import org.springframework.beans.factory.InitializingBean; |
35 | import org.springframework.jdbc.core.JdbcTemplate; |
36 | import org.springframework.jdbc.core.RowMapper; |
37 | import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; |
38 | import org.springframework.util.Assert; |
39 | import org.springframework.util.ClassUtils; |
40 | |
41 | /** |
42 | * <p> |
43 | * {@link org.springframework.batch.item.ItemReader} for reading database |
44 | * records using JDBC in a paging fashion. |
45 | * </p> |
46 | * |
47 | * <p> |
48 | * It executes the SQL built by the {@link PagingQueryProvider} to retrieve |
49 | * requested data. The query is executed using paged requests of a size |
50 | * specified in {@link #setPageSize(int)}. Additional pages are requested when |
51 | * needed as {@link #read()} method is called, returning an object corresponding |
52 | * to current position. On restart it uses the last sort key value to locate the |
53 | * first page to read (so it doesn't matter if the successfully processed itmes |
54 | * have been removed or modified). |
55 | * </p> |
56 | * |
57 | * <p> |
58 | * The performance of the paging depends on the database specific features |
59 | * available to limit the number of returned rows. Setting a fairly large page |
60 | * size and using a commit interval that matches the page size should provide |
61 | * better performance. |
62 | * </p> |
63 | * |
64 | * <p> |
65 | * The implementation is thread-safe in between calls to |
66 | * {@link #open(ExecutionContext)}, but remember to use |
67 | * <code>saveState=false</code> if used in a multi-threaded client (no restart |
68 | * available). |
69 | * </p> |
70 | * |
71 | * @author Thomas Risberg |
72 | * @author Dave Syer |
73 | * @since 2.0 |
74 | */ |
75 | public class JdbcPagingItemReader<T> extends AbstractPagingItemReader<T> implements InitializingBean { |
76 | |
77 | /** |
78 | * |
79 | */ |
80 | private static final String START_AFTER_VALUE = "start.after"; |
81 | |
82 | public static final int VALUE_NOT_SET = -1; |
83 | |
84 | private DataSource dataSource; |
85 | |
86 | private PagingQueryProvider queryProvider; |
87 | |
88 | private Map<String, Object> parameterValues; |
89 | |
90 | private SimpleJdbcTemplate simpleJdbcTemplate; |
91 | |
92 | private RowMapper rowMapper; |
93 | |
94 | private String firstPageSql; |
95 | |
96 | private String remainingPagesSql; |
97 | |
98 | private Object startAfterValue; |
99 | |
100 | private int fetchSize = VALUE_NOT_SET; |
101 | |
102 | public JdbcPagingItemReader() { |
103 | setName(ClassUtils.getShortName(JdbcPagingItemReader.class)); |
104 | } |
105 | |
106 | public void setDataSource(DataSource dataSource) { |
107 | this.dataSource = dataSource; |
108 | } |
109 | |
110 | /** |
111 | * Gives the JDBC driver a hint as to the number of rows that should be |
112 | * fetched from the database when more rows are needed for this |
113 | * <code>ResultSet</code> object. If the fetch size specified is zero, the |
114 | * JDBC driver ignores the value. |
115 | * |
116 | * @param fetchSize the number of rows to fetch |
117 | * @see ResultSet#setFetchSize(int) |
118 | */ |
119 | public void setFetchSize(int fetchSize) { |
120 | this.fetchSize = fetchSize; |
121 | } |
122 | |
123 | /** |
124 | * A {@link PagingQueryProvider}. Supplies all the platform dependent query |
125 | * generation capabilities needed by the reader. |
126 | * |
127 | * @param queryProvider the {@link PagingQueryProvider} to use |
128 | */ |
129 | public void setQueryProvider(PagingQueryProvider queryProvider) { |
130 | this.queryProvider = queryProvider; |
131 | } |
132 | |
133 | /** |
134 | * The row mapper implementation to be used by this reader. The row mapper |
135 | * is used to convert result set rows into objects, which are then returned |
136 | * by the reader. |
137 | * |
138 | * @param rowMapper a |
139 | * {@link org.springframework.jdbc.core.simple.ParameterizedRowMapper} |
140 | * implementation |
141 | */ |
142 | public void setRowMapper(RowMapper rowMapper) { |
143 | this.rowMapper = rowMapper; |
144 | } |
145 | |
146 | /** |
147 | * The parameter values to be used for the query execution. If you use named |
148 | * parameters then the key should be the name used in the query clause. If |
149 | * you use "?" placeholders then the key should be the relative index that |
150 | * the parameter appears in the query string built using the select, from |
151 | * and where clauses specified. |
152 | * |
153 | * @param parameterValues the values keyed by the parameter named/index used |
154 | * in the query string. |
155 | */ |
156 | public void setParameterValues(Map<String, Object> parameterValues) { |
157 | this.parameterValues = parameterValues; |
158 | } |
159 | |
160 | /** |
161 | * Check mandatory properties. |
162 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
163 | */ |
164 | public void afterPropertiesSet() throws Exception { |
165 | super.afterPropertiesSet(); |
166 | Assert.notNull(dataSource); |
167 | JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); |
168 | if (fetchSize != VALUE_NOT_SET) { |
169 | jdbcTemplate.setFetchSize(fetchSize); |
170 | } |
171 | jdbcTemplate.setMaxRows(getPageSize()); |
172 | this.simpleJdbcTemplate = new SimpleJdbcTemplate(jdbcTemplate); |
173 | Assert.notNull(queryProvider); |
174 | queryProvider.init(dataSource); |
175 | this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize()); |
176 | this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize()); |
177 | } |
178 | |
179 | @Override |
180 | protected void doReadPage() { |
181 | |
182 | if (results == null) { |
183 | results = new CopyOnWriteArrayList<T>(); |
184 | } |
185 | else { |
186 | results.clear(); |
187 | } |
188 | |
189 | PagingRowMapper rowCallback = new PagingRowMapper(); |
190 | |
191 | List<?> query; |
192 | |
193 | if (getPage() == 0) { |
194 | if (logger.isDebugEnabled()) { |
195 | logger.debug("SQL used for reading first page: [" + firstPageSql + "]"); |
196 | } |
197 | if (parameterValues != null && parameterValues.size() > 0) { |
198 | if (this.queryProvider.isUsingNamedParameters()) { |
199 | query = simpleJdbcTemplate.getNamedParameterJdbcOperations().query(firstPageSql, |
200 | getParameterMap(parameterValues, null), rowCallback); |
201 | } |
202 | else { |
203 | query = simpleJdbcTemplate.getJdbcOperations().query(firstPageSql, |
204 | getParameterList(parameterValues, null).toArray(), rowCallback); |
205 | } |
206 | } |
207 | else { |
208 | query = simpleJdbcTemplate.getJdbcOperations().query(firstPageSql, rowCallback); |
209 | } |
210 | |
211 | } |
212 | else { |
213 | if (logger.isDebugEnabled()) { |
214 | logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]"); |
215 | } |
216 | if (this.queryProvider.isUsingNamedParameters()) { |
217 | query = simpleJdbcTemplate.getNamedParameterJdbcOperations().query(remainingPagesSql, |
218 | getParameterMap(parameterValues, startAfterValue), rowCallback); |
219 | } |
220 | else { |
221 | query = simpleJdbcTemplate.getJdbcOperations().query(remainingPagesSql, |
222 | getParameterList(parameterValues, startAfterValue).toArray(), rowCallback); |
223 | } |
224 | } |
225 | |
226 | @SuppressWarnings("unchecked") |
227 | Collection<T> result = (Collection<T>) query; |
228 | results.addAll(result); |
229 | |
230 | } |
231 | |
232 | @Override |
233 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
234 | super.update(executionContext); |
235 | if (isSaveState() && startAfterValue != null) { |
236 | executionContext.put(getExecutionContextUserSupport().getKey(START_AFTER_VALUE), startAfterValue); |
237 | } |
238 | } |
239 | |
240 | @Override |
241 | public void open(ExecutionContext executionContext) { |
242 | if (isSaveState()) { |
243 | startAfterValue = executionContext.get(getExecutionContextUserSupport().getKey(START_AFTER_VALUE)); |
244 | } |
245 | super.open(executionContext); |
246 | } |
247 | |
248 | @Override |
249 | protected void doJumpToPage(int itemIndex) { |
250 | /* |
251 | * Normally this would be false (the startAfterValue is enough |
252 | * information to restart from. |
253 | */ |
254 | if (startAfterValue == null && getPage() > 0) { |
255 | |
256 | String jumpToItemSql; |
257 | jumpToItemSql = queryProvider.generateJumpToItemQuery(itemIndex, getPageSize()); |
258 | |
259 | if (logger.isDebugEnabled()) { |
260 | logger.debug("SQL used for jumping: [" + jumpToItemSql + "]"); |
261 | } |
262 | |
263 | RowMapper startMapper = new RowMapper() { |
264 | public Object mapRow(ResultSet rs, int i) throws SQLException { |
265 | return rs.getObject(1); |
266 | } |
267 | }; |
268 | if (this.queryProvider.isUsingNamedParameters()) { |
269 | startAfterValue = simpleJdbcTemplate.getNamedParameterJdbcOperations().queryForObject(jumpToItemSql, |
270 | getParameterMap(parameterValues, startAfterValue), startMapper); |
271 | } |
272 | else { |
273 | startAfterValue = simpleJdbcTemplate.getJdbcOperations().queryForObject(jumpToItemSql, |
274 | getParameterList(parameterValues, startAfterValue).toArray(), startMapper); |
275 | } |
276 | |
277 | } |
278 | } |
279 | |
280 | private Map<String, Object> getParameterMap(Map<String, Object> values, Object sortKeyValue) { |
281 | Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(); |
282 | if (values != null) { |
283 | parameterMap.putAll(values); |
284 | } |
285 | if (sortKeyValue != null) { |
286 | parameterMap.put("_sortKey", sortKeyValue); |
287 | } |
288 | if (logger.isDebugEnabled()) { |
289 | logger.debug("Using parameterMap:" + parameterMap); |
290 | } |
291 | return parameterMap; |
292 | } |
293 | |
294 | private List<Object> getParameterList(Map<String, Object> values, Object sortKeyValue) { |
295 | SortedMap<String, Object> sm = new TreeMap<String, Object>(); |
296 | if (values != null) { |
297 | sm.putAll(values); |
298 | } |
299 | List<Object> parameterList = new ArrayList<Object>(); |
300 | parameterList.addAll(sm.values()); |
301 | if (sortKeyValue != null) { |
302 | parameterList.add(sortKeyValue); |
303 | } |
304 | if (logger.isDebugEnabled()) { |
305 | logger.debug("Using parameterList:" + parameterList); |
306 | } |
307 | return parameterList; |
308 | } |
309 | |
310 | private class PagingRowMapper implements RowMapper { |
311 | public Object mapRow(ResultSet rs, int rowNum) throws SQLException { |
312 | startAfterValue = rs.getObject(queryProvider.getSortKeyWithoutAlias()); |
313 | return rowMapper.mapRow(rs, rowNum); |
314 | } |
315 | } |
316 | |
317 | } |