1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.database; |
18 | |
19 | import java.sql.ResultSet; |
20 | import java.sql.SQLException; |
21 | import java.util.ArrayList; |
22 | import java.util.Collection; |
23 | import java.util.LinkedHashMap; |
24 | import java.util.List; |
25 | import java.util.Map; |
26 | import java.util.SortedMap; |
27 | import java.util.TreeMap; |
28 | import java.util.concurrent.CopyOnWriteArrayList; |
29 | |
30 | import javax.sql.DataSource; |
31 | |
32 | import org.springframework.batch.item.ExecutionContext; |
33 | import org.springframework.batch.item.ItemStreamException; |
34 | import org.springframework.beans.factory.InitializingBean; |
35 | import org.springframework.jdbc.core.JdbcTemplate; |
36 | import org.springframework.jdbc.core.RowMapper; |
37 | import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; |
38 | import org.springframework.util.Assert; |
39 | import org.springframework.util.ClassUtils; |
40 | |
41 | /** |
42 | * <p> |
43 | * {@link org.springframework.batch.item.ItemReader} for reading database |
44 | * records using JDBC in a paging fashion. |
45 | * </p> |
46 | * |
47 | * <p> |
48 | * It executes the SQL built by the {@link PagingQueryProvider} to retrieve |
49 | * requested data. The query is executed using paged requests of a size |
50 | * specified in {@link #setPageSize(int)}. Additional pages are requested when |
51 | * needed as {@link #read()} method is called, returning an object corresponding |
52 | * to current position. On restart it uses the last sort key value to locate the |
53 | * first page to read (so it doesn't matter if the successfully processed items |
54 | * have been removed or modified). |
55 | * </p> |
56 | * |
57 | * <p> |
58 | * The performance of the paging depends on the database specific features |
59 | * available to limit the number of returned rows. Setting a fairly large page |
60 | * size and using a commit interval that matches the page size should provide |
61 | * better performance. |
62 | * </p> |
63 | * |
64 | * <p> |
65 | * The implementation is thread-safe in between calls to |
66 | * {@link #open(ExecutionContext)}, but remember to use |
67 | * <code>saveState=false</code> if used in a multi-threaded client (no restart |
68 | * available). |
69 | * </p> |
70 | * |
71 | * @author Thomas Risberg |
72 | * @author Dave Syer |
73 | * @author Michael Minella |
74 | * @since 2.0 |
75 | */ |
76 | public class JdbcPagingItemReader<T> extends AbstractPagingItemReader<T> implements InitializingBean { |
77 | private static final String START_AFTER_VALUE = "start.after"; |
78 | |
79 | public static final int VALUE_NOT_SET = -1; |
80 | |
81 | private DataSource dataSource; |
82 | |
83 | private PagingQueryProvider queryProvider; |
84 | |
85 | private Map<String, Object> parameterValues; |
86 | |
87 | private NamedParameterJdbcTemplate namedParameterJdbcTemplate; |
88 | |
89 | @SuppressWarnings("rawtypes") |
90 | private RowMapper rowMapper; |
91 | |
92 | private String firstPageSql; |
93 | |
94 | private String remainingPagesSql; |
95 | |
96 | private Map<String, Object> startAfterValues; |
97 | |
98 | private Map<String, Object> previousStartAfterValues; |
99 | |
100 | private int fetchSize = VALUE_NOT_SET; |
101 | |
102 | public JdbcPagingItemReader() { |
103 | setName(ClassUtils.getShortName(JdbcPagingItemReader.class)); |
104 | } |
105 | |
106 | public void setDataSource(DataSource dataSource) { |
107 | this.dataSource = dataSource; |
108 | } |
109 | |
110 | /** |
111 | * Gives the JDBC driver a hint as to the number of rows that should be |
112 | * fetched from the database when more rows are needed for this |
113 | * <code>ResultSet</code> object. If the fetch size specified is zero, the |
114 | * JDBC driver ignores the value. |
115 | * |
116 | * @param fetchSize the number of rows to fetch |
117 | * @see ResultSet#setFetchSize(int) |
118 | */ |
119 | public void setFetchSize(int fetchSize) { |
120 | this.fetchSize = fetchSize; |
121 | } |
122 | |
123 | /** |
124 | * A {@link PagingQueryProvider}. Supplies all the platform dependent query |
125 | * generation capabilities needed by the reader. |
126 | * |
127 | * @param queryProvider the {@link PagingQueryProvider} to use |
128 | */ |
129 | public void setQueryProvider(PagingQueryProvider queryProvider) { |
130 | this.queryProvider = queryProvider; |
131 | } |
132 | |
133 | /** |
134 | * The row mapper implementation to be used by this reader. The row mapper |
135 | * is used to convert result set rows into objects, which are then returned |
136 | * by the reader. |
137 | * |
138 | * @param rowMapper a |
139 | * {@link org.springframework.jdbc.core.simple.ParameterizedRowMapper} |
140 | * implementation |
141 | */ |
142 | @SuppressWarnings("rawtypes") |
143 | public void setRowMapper(RowMapper rowMapper) { |
144 | this.rowMapper = rowMapper; |
145 | } |
146 | |
147 | /** |
148 | * The parameter values to be used for the query execution. If you use named |
149 | * parameters then the key should be the name used in the query clause. If |
150 | * you use "?" placeholders then the key should be the relative index that |
151 | * the parameter appears in the query string built using the select, from |
152 | * and where clauses specified. |
153 | * |
154 | * @param parameterValues the values keyed by the parameter named/index used |
155 | * in the query string. |
156 | */ |
157 | public void setParameterValues(Map<String, Object> parameterValues) { |
158 | this.parameterValues = parameterValues; |
159 | } |
160 | |
161 | /** |
162 | * Check mandatory properties. |
163 | * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() |
164 | */ |
165 | @Override |
166 | public void afterPropertiesSet() throws Exception { |
167 | super.afterPropertiesSet(); |
168 | Assert.notNull(dataSource); |
169 | JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); |
170 | if (fetchSize != VALUE_NOT_SET) { |
171 | jdbcTemplate.setFetchSize(fetchSize); |
172 | } |
173 | jdbcTemplate.setMaxRows(getPageSize()); |
174 | namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate); |
175 | Assert.notNull(queryProvider); |
176 | queryProvider.init(dataSource); |
177 | this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize()); |
178 | this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize()); |
179 | } |
180 | |
181 | @Override |
182 | @SuppressWarnings("unchecked") |
183 | protected void doReadPage() { |
184 | if (results == null) { |
185 | results = new CopyOnWriteArrayList<T>(); |
186 | } |
187 | else { |
188 | results.clear(); |
189 | } |
190 | |
191 | PagingRowMapper rowCallback = new PagingRowMapper(); |
192 | |
193 | List<?> query; |
194 | |
195 | if (getPage() == 0) { |
196 | if (logger.isDebugEnabled()) { |
197 | logger.debug("SQL used for reading first page: [" + firstPageSql + "]"); |
198 | } |
199 | if (parameterValues != null && parameterValues.size() > 0) { |
200 | if (this.queryProvider.isUsingNamedParameters()) { |
201 | query = namedParameterJdbcTemplate.query(firstPageSql, |
202 | getParameterMap(parameterValues, null), rowCallback); |
203 | } |
204 | else { |
205 | query = getJdbcTemplate().query(firstPageSql, |
206 | getParameterList(parameterValues, null).toArray(), rowCallback); |
207 | } |
208 | } |
209 | else { |
210 | query = getJdbcTemplate().query(firstPageSql, rowCallback); |
211 | } |
212 | |
213 | } |
214 | else { |
215 | previousStartAfterValues = startAfterValues; |
216 | if (logger.isDebugEnabled()) { |
217 | logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]"); |
218 | } |
219 | if (this.queryProvider.isUsingNamedParameters()) { |
220 | query = namedParameterJdbcTemplate.query(remainingPagesSql, |
221 | getParameterMap(parameterValues, startAfterValues), rowCallback); |
222 | } |
223 | else { |
224 | query = getJdbcTemplate().query(remainingPagesSql, |
225 | getParameterList(parameterValues, startAfterValues).toArray(), rowCallback); |
226 | } |
227 | } |
228 | |
229 | Collection<T> result = (Collection<T>) query; |
230 | results.addAll(result); |
231 | } |
232 | |
233 | @Override |
234 | public void update(ExecutionContext executionContext) throws ItemStreamException { |
235 | super.update(executionContext); |
236 | if (isSaveState()) { |
237 | if (isAtEndOfPage() && startAfterValues != null) { |
238 | // restart on next page |
239 | executionContext.put(getExecutionContextKey(START_AFTER_VALUE), startAfterValues); |
240 | } else if (previousStartAfterValues != null) { |
241 | // restart on current page |
242 | executionContext.put(getExecutionContextKey(START_AFTER_VALUE), previousStartAfterValues); |
243 | } |
244 | } |
245 | } |
246 | |
247 | private boolean isAtEndOfPage() { |
248 | return getCurrentItemCount() % getPageSize() == 0; |
249 | } |
250 | |
251 | @Override |
252 | @SuppressWarnings("unchecked") |
253 | public void open(ExecutionContext executionContext) { |
254 | if (isSaveState()) { |
255 | startAfterValues = (Map<String, Object>) executionContext.get(getExecutionContextKey(START_AFTER_VALUE)); |
256 | |
257 | if(startAfterValues == null) { |
258 | startAfterValues = new LinkedHashMap<String, Object>(); |
259 | } |
260 | } |
261 | |
262 | super.open(executionContext); |
263 | } |
264 | |
265 | @Override |
266 | @SuppressWarnings({"unchecked", "rawtypes"}) |
267 | protected void doJumpToPage(int itemIndex) { |
268 | /* |
269 | * Normally this would be false (the startAfterValue is enough |
270 | * information to restart from. |
271 | */ |
272 | if (startAfterValues == null && getPage() > 0) { |
273 | |
274 | String jumpToItemSql; |
275 | jumpToItemSql = queryProvider.generateJumpToItemQuery(itemIndex, getPageSize()); |
276 | |
277 | if (logger.isDebugEnabled()) { |
278 | logger.debug("SQL used for jumping: [" + jumpToItemSql + "]"); |
279 | } |
280 | |
281 | RowMapper startMapper = new RowMapper() { |
282 | @Override |
283 | public Object mapRow(ResultSet rs, int i) throws SQLException { |
284 | return rs.getObject(1); |
285 | } |
286 | }; |
287 | if (this.queryProvider.isUsingNamedParameters()) { |
288 | startAfterValues = (Map<String, Object>) namedParameterJdbcTemplate.queryForObject(jumpToItemSql, |
289 | getParameterMap(parameterValues, startAfterValues), startMapper); |
290 | } |
291 | else { |
292 | startAfterValues = (Map<String, Object>) getJdbcTemplate().queryForObject(jumpToItemSql, |
293 | getParameterList(parameterValues, startAfterValues).toArray(), startMapper); |
294 | } |
295 | } |
296 | } |
297 | |
298 | private Map<String, Object> getParameterMap(Map<String, Object> values, Map<String, Object> sortKeyValues) { |
299 | Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(); |
300 | if (values != null) { |
301 | parameterMap.putAll(values); |
302 | } |
303 | if (sortKeyValues != null && !sortKeyValues.isEmpty()) { |
304 | for (Map.Entry<String, Object> sortKey : sortKeyValues.entrySet()) { |
305 | parameterMap.put("_" + sortKey.getKey(), sortKey.getValue()); |
306 | } |
307 | } |
308 | if (logger.isDebugEnabled()) { |
309 | logger.debug("Using parameterMap:" + parameterMap); |
310 | } |
311 | return parameterMap; |
312 | } |
313 | |
314 | private List<Object> getParameterList(Map<String, Object> values, Map<String, Object> sortKeyValue) { |
315 | SortedMap<String, Object> sm = new TreeMap<String, Object>(); |
316 | if (values != null) { |
317 | sm.putAll(values); |
318 | } |
319 | List<Object> parameterList = new ArrayList<Object>(); |
320 | parameterList.addAll(sm.values()); |
321 | if (sortKeyValue != null && sortKeyValue.size() > 0) { |
322 | List<Map.Entry<String, Object>> keys = new ArrayList<Map.Entry<String,Object>>(sortKeyValue.entrySet()); |
323 | |
324 | for(int i = 0; i < keys.size(); i++) { |
325 | for(int j = 0; j < i; j++) { |
326 | parameterList.add(keys.get(j).getValue()); |
327 | } |
328 | |
329 | parameterList.add(keys.get(i).getValue()); |
330 | } |
331 | } |
332 | |
333 | if (logger.isDebugEnabled()) { |
334 | logger.debug("Using parameterList:" + parameterList); |
335 | } |
336 | return parameterList; |
337 | } |
338 | |
339 | @SuppressWarnings("rawtypes") |
340 | private class PagingRowMapper implements RowMapper { |
341 | @Override |
342 | public Object mapRow(ResultSet rs, int rowNum) throws SQLException { |
343 | startAfterValues = new LinkedHashMap<String, Object>(); |
344 | for (Map.Entry<String, Order> sortKey : queryProvider.getSortKeys().entrySet()) { |
345 | startAfterValues.put(sortKey.getKey(), rs.getObject(sortKey.getKey())); |
346 | } |
347 | |
348 | return rowMapper.mapRow(rs, rowNum); |
349 | } |
350 | } |
351 | |
352 | private JdbcTemplate getJdbcTemplate() { |
353 | return (JdbcTemplate) namedParameterJdbcTemplate.getJdbcOperations(); |
354 | } |
355 | } |