EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.item.database]

COVERAGE SUMMARY FOR SOURCE FILE [JdbcPagingItemReader.java]

nameclass, %method, %block, %line, %
JdbcPagingItemReader.java67%  (2/3)92%  (22/24)78%  (452/578)87%  (94.5/109)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class JdbcPagingItemReader$10%   (0/1)0%   (0/2)0%   (0/10)0%   (0/2)
JdbcPagingItemReader$1 (JdbcPagingItemReader): void 0%   (0/1)0%   (0/6)0%   (0/1)
mapRow (ResultSet, int): Object 0%   (0/1)0%   (0/4)0%   (0/1)
     
class JdbcPagingItemReader100% (1/1)100% (19/19)78%  (401/517)87%  (88.5/102)
doJumpToPage (int): void 100% (1/1)6%   (4/70)14%  (1.5/11)
doReadPage (): void 100% (1/1)81%  (119/147)92%  (23/25)
getParameterMap (Map, Map): Map 100% (1/1)81%  (47/58)90%  (9/10)
getParameterList (Map, Map): List 100% (1/1)86%  (66/77)93%  (13/14)
JdbcPagingItemReader (): void 100% (1/1)100% (10/10)100% (4/4)
access$100 (JdbcPagingItemReader): Map 100% (1/1)100% (3/3)100% (1/1)
access$102 (JdbcPagingItemReader, Map): Map 100% (1/1)100% (5/5)100% (1/1)
access$200 (JdbcPagingItemReader): PagingQueryProvider 100% (1/1)100% (3/3)100% (1/1)
access$300 (JdbcPagingItemReader): RowMapper 100% (1/1)100% (3/3)100% (1/1)
afterPropertiesSet (): void 100% (1/1)100% (52/52)100% (12/12)
getJdbcTemplate (): JdbcTemplate 100% (1/1)100% (5/5)100% (1/1)
isAtEndOfPage (): boolean 100% (1/1)100% (10/10)100% (1/1)
open (ExecutionContext): void 100% (1/1)100% (23/23)100% (6/6)
setDataSource (DataSource): void 100% (1/1)100% (4/4)100% (2/2)
setFetchSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setParameterValues (Map): void 100% (1/1)100% (4/4)100% (2/2)
setQueryProvider (PagingQueryProvider): void 100% (1/1)100% (4/4)100% (2/2)
setRowMapper (RowMapper): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (31/31)100% (7/7)
     
class JdbcPagingItemReader$PagingRowMapper100% (1/1)100% (3/3)100% (51/51)100% (6/6)
JdbcPagingItemReader$PagingRowMapper (JdbcPagingItemReader): void 100% (1/1)100% (6/6)100% (1/1)
JdbcPagingItemReader$PagingRowMapper (JdbcPagingItemReader, JdbcPagingItemRea... 100% (1/1)100% (4/4)100% (1/1)
mapRow (ResultSet, int): Object 100% (1/1)100% (41/41)100% (5/5)

1/*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.database;
18 
19import java.sql.ResultSet;
20import java.sql.SQLException;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.LinkedHashMap;
24import java.util.List;
25import java.util.Map;
26import java.util.SortedMap;
27import java.util.TreeMap;
28import java.util.concurrent.CopyOnWriteArrayList;
29 
30import javax.sql.DataSource;
31 
32import org.springframework.batch.item.ExecutionContext;
33import org.springframework.batch.item.ItemStreamException;
34import org.springframework.beans.factory.InitializingBean;
35import org.springframework.jdbc.core.JdbcTemplate;
36import org.springframework.jdbc.core.RowMapper;
37import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
38import org.springframework.util.Assert;
39import org.springframework.util.ClassUtils;
40 
41/**
42 * <p>
43 * {@link org.springframework.batch.item.ItemReader} for reading database
44 * records using JDBC in a paging fashion.
45 * </p>
46 * 
47 * <p>
48 * It executes the SQL built by the {@link PagingQueryProvider} to retrieve
49 * requested data. The query is executed using paged requests of a size
50 * specified in {@link #setPageSize(int)}. Additional pages are requested when
51 * needed as {@link #read()} method is called, returning an object corresponding
52 * to current position. On restart it uses the last sort key value to locate the
53 * first page to read (so it doesn't matter if the successfully processed items
54 * have been removed or modified).
55 * </p>
56 * 
57 * <p>
58 * The performance of the paging depends on the database specific features
59 * available to limit the number of returned rows. Setting a fairly large page
60 * size and using a commit interval that matches the page size should provide
61 * better performance.
62 * </p>
63 * 
64 * <p>
65 * The implementation is thread-safe in between calls to
66 * {@link #open(ExecutionContext)}, but remember to use
67 * <code>saveState=false</code> if used in a multi-threaded client (no restart
68 * available).
69 * </p>
70 * 
71 * @author Thomas Risberg
72 * @author Dave Syer
73 * @author Michael Minella
74 * @since 2.0
75 */
76public class JdbcPagingItemReader<T> extends AbstractPagingItemReader<T> implements InitializingBean {
77        private static final String START_AFTER_VALUE = "start.after";
78 
79        public static final int VALUE_NOT_SET = -1;
80 
81        private DataSource dataSource;
82 
83        private PagingQueryProvider queryProvider;
84 
85        private Map<String, Object> parameterValues;
86 
87        private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
88 
89        @SuppressWarnings("rawtypes")
90        private RowMapper rowMapper;
91 
92        private String firstPageSql;
93 
94        private String remainingPagesSql;
95 
96        private Map<String, Object> startAfterValues;
97        
98        private Map<String, Object> previousStartAfterValues;
99 
100        private int fetchSize = VALUE_NOT_SET;
101 
102        public JdbcPagingItemReader() {
103                setName(ClassUtils.getShortName(JdbcPagingItemReader.class));
104        }
105 
106        public void setDataSource(DataSource dataSource) {
107                this.dataSource = dataSource;
108        }
109 
110        /**
111         * Gives the JDBC driver a hint as to the number of rows that should be
112         * fetched from the database when more rows are needed for this
113         * <code>ResultSet</code> object. If the fetch size specified is zero, the
114         * JDBC driver ignores the value.
115         * 
116         * @param fetchSize the number of rows to fetch
117         * @see ResultSet#setFetchSize(int)
118         */
119        public void setFetchSize(int fetchSize) {
120                this.fetchSize = fetchSize;
121        }
122 
123        /**
124         * A {@link PagingQueryProvider}. Supplies all the platform dependent query
125         * generation capabilities needed by the reader.
126         * 
127         * @param queryProvider the {@link PagingQueryProvider} to use
128         */
129        public void setQueryProvider(PagingQueryProvider queryProvider) {
130                this.queryProvider = queryProvider;
131        }
132 
133        /**
134         * The row mapper implementation to be used by this reader. The row mapper
135         * is used to convert result set rows into objects, which are then returned
136         * by the reader.
137         * 
138         * @param rowMapper a
139         * {@link org.springframework.jdbc.core.simple.ParameterizedRowMapper}
140         * implementation
141         */
142        @SuppressWarnings("rawtypes")
143        public void setRowMapper(RowMapper rowMapper) {
144                this.rowMapper = rowMapper;
145        }
146 
147        /**
148         * The parameter values to be used for the query execution. If you use named
149         * parameters then the key should be the name used in the query clause. If
150         * you use "?" placeholders then the key should be the relative index that
151         * the parameter appears in the query string built using the select, from
152         * and where clauses specified.
153         * 
154         * @param parameterValues the values keyed by the parameter named/index used
155         * in the query string.
156         */
157        public void setParameterValues(Map<String, Object> parameterValues) {
158                this.parameterValues = parameterValues;
159        }
160 
161        /**
162         * Check mandatory properties.
163         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
164         */
165        @Override
166        public void afterPropertiesSet() throws Exception {
167                super.afterPropertiesSet();
168                Assert.notNull(dataSource);
169                JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
170                if (fetchSize != VALUE_NOT_SET) {
171                        jdbcTemplate.setFetchSize(fetchSize);
172                }
173                jdbcTemplate.setMaxRows(getPageSize());
174                namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
175                Assert.notNull(queryProvider);
176                queryProvider.init(dataSource);
177                this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize());
178                this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize());
179        }
180 
181        @Override
182        @SuppressWarnings("unchecked")
183        protected void doReadPage() {
184                if (results == null) {
185                        results = new CopyOnWriteArrayList<T>();
186                }
187                else {
188                        results.clear();
189                }
190 
191                PagingRowMapper rowCallback = new PagingRowMapper();
192 
193                List<?> query;
194 
195                if (getPage() == 0) {
196                        if (logger.isDebugEnabled()) {
197                                logger.debug("SQL used for reading first page: [" + firstPageSql + "]");
198                        }
199                        if (parameterValues != null && parameterValues.size() > 0) {
200                                if (this.queryProvider.isUsingNamedParameters()) {
201                                        query = namedParameterJdbcTemplate.query(firstPageSql,
202                                                        getParameterMap(parameterValues, null), rowCallback);
203                                }
204                                else {
205                                        query = getJdbcTemplate().query(firstPageSql,
206                                                        getParameterList(parameterValues, null).toArray(), rowCallback);
207                                }
208                        }
209                        else {
210                                query = getJdbcTemplate().query(firstPageSql, rowCallback);
211                        }
212 
213                }
214                else {
215                        previousStartAfterValues = startAfterValues;
216                        if (logger.isDebugEnabled()) {
217                                logger.debug("SQL used for reading remaining pages: [" + remainingPagesSql + "]");
218                        }
219                        if (this.queryProvider.isUsingNamedParameters()) {
220                                query = namedParameterJdbcTemplate.query(remainingPagesSql,
221                                                getParameterMap(parameterValues, startAfterValues), rowCallback);
222                        }
223                        else {
224                                query = getJdbcTemplate().query(remainingPagesSql,
225                                                getParameterList(parameterValues, startAfterValues).toArray(), rowCallback);
226                        }
227                }
228 
229                Collection<T> result = (Collection<T>) query;
230                results.addAll(result);
231        }
232 
233        @Override
234        public void update(ExecutionContext executionContext) throws ItemStreamException {
235                super.update(executionContext);
236                if (isSaveState()) {
237                        if (isAtEndOfPage() && startAfterValues != null) {
238                                // restart on next page
239                                executionContext.put(getExecutionContextKey(START_AFTER_VALUE), startAfterValues);        
240                        } else if (previousStartAfterValues != null) {
241                                // restart on current page
242                                executionContext.put(getExecutionContextKey(START_AFTER_VALUE), previousStartAfterValues);
243                        }
244                }
245        }
246        
247        private boolean isAtEndOfPage() {
248                return getCurrentItemCount() % getPageSize() == 0;
249        }
250 
251        @Override
252        @SuppressWarnings("unchecked")
253        public void open(ExecutionContext executionContext) {
254                if (isSaveState()) {
255                        startAfterValues = (Map<String, Object>) executionContext.get(getExecutionContextKey(START_AFTER_VALUE));
256 
257                        if(startAfterValues == null) {
258                                startAfterValues = new LinkedHashMap<String, Object>();
259                        }
260                }
261 
262                super.open(executionContext);
263        }
264 
265        @Override
266        @SuppressWarnings({"unchecked", "rawtypes"})
267        protected void doJumpToPage(int itemIndex) {
268                /*
269                 * Normally this would be false (the startAfterValue is enough
270                 * information to restart from.
271                 */
272                if (startAfterValues == null && getPage() > 0) {
273 
274                        String jumpToItemSql;
275                        jumpToItemSql = queryProvider.generateJumpToItemQuery(itemIndex, getPageSize());
276 
277                        if (logger.isDebugEnabled()) {
278                                logger.debug("SQL used for jumping: [" + jumpToItemSql + "]");
279                        }
280 
281                        RowMapper startMapper = new RowMapper() {
282                                @Override
283                                public Object mapRow(ResultSet rs, int i) throws SQLException {
284                                        return rs.getObject(1);
285                                }
286                        };
287                        if (this.queryProvider.isUsingNamedParameters()) {
288                                startAfterValues = (Map<String, Object>) namedParameterJdbcTemplate.queryForObject(jumpToItemSql,
289                                                getParameterMap(parameterValues, startAfterValues), startMapper);
290                        }
291                        else {
292                                startAfterValues = (Map<String, Object>) getJdbcTemplate().queryForObject(jumpToItemSql,
293                                                getParameterList(parameterValues, startAfterValues).toArray(), startMapper);
294                        }
295                }
296        }
297 
298        private Map<String, Object> getParameterMap(Map<String, Object> values, Map<String, Object> sortKeyValues) {
299                Map<String, Object> parameterMap = new LinkedHashMap<String, Object>();
300                if (values != null) {
301                        parameterMap.putAll(values);
302                }
303                if (sortKeyValues != null && !sortKeyValues.isEmpty()) {
304                        for (Map.Entry<String, Object> sortKey : sortKeyValues.entrySet()) {
305                                parameterMap.put("_" + sortKey.getKey(), sortKey.getValue());
306                        }
307                }
308                if (logger.isDebugEnabled()) {
309                        logger.debug("Using parameterMap:" + parameterMap);
310                }
311                return parameterMap;
312        }
313 
314        private List<Object> getParameterList(Map<String, Object> values, Map<String, Object> sortKeyValue) {
315                SortedMap<String, Object> sm = new TreeMap<String, Object>();
316                if (values != null) {
317                        sm.putAll(values);
318                }
319                List<Object> parameterList = new ArrayList<Object>();
320                parameterList.addAll(sm.values());
321                if (sortKeyValue != null && sortKeyValue.size() > 0) {
322                        List<Map.Entry<String, Object>> keys = new ArrayList<Map.Entry<String,Object>>(sortKeyValue.entrySet());
323 
324                        for(int i = 0; i < keys.size(); i++) {
325                                for(int j = 0; j < i; j++) {
326                                        parameterList.add(keys.get(j).getValue());
327                                }
328 
329                                parameterList.add(keys.get(i).getValue());
330                        }
331                }
332 
333                if (logger.isDebugEnabled()) {
334                        logger.debug("Using parameterList:" + parameterList);
335                }
336                return parameterList;
337        }
338 
339        @SuppressWarnings("rawtypes")
340        private class PagingRowMapper implements RowMapper {
341                @Override
342                public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
343                        startAfterValues = new LinkedHashMap<String, Object>();
344                        for (Map.Entry<String, Order> sortKey : queryProvider.getSortKeys().entrySet()) {
345                                startAfterValues.put(sortKey.getKey(), rs.getObject(sortKey.getKey()));
346                        }
347 
348                        return rowMapper.mapRow(rs, rowNum);
349                }
350        }
351 
352        private JdbcTemplate getJdbcTemplate() {
353                return (JdbcTemplate) namedParameterJdbcTemplate.getJdbcOperations();
354        }
355}

[all classes][org.springframework.batch.item.database]
EMMA 2.0.5312 (C) Vladimir Roubtsov