EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.item.database]

COVERAGE SUMMARY FOR SOURCE FILE [JdbcCursorItemReader.java]

nameclass, %method, %block, %line, %
JdbcCursorItemReader.java100% (1/1)91%  (21/23)66%  (254/386)74%  (82.7/112)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class JdbcCursorItemReader100% (1/1)91%  (21/23)66%  (254/386)74%  (82.7/112)
getExceptionTranslator (): SQLExceptionTranslator 0%   (0/1)0%   (0/22)0%   (0/5)
setPreparedStatementSetter (PreparedStatementSetter): void 0%   (0/1)0%   (0/4)0%   (0/2)
handleWarnings (PreparedStatement): void 100% (1/1)14%  (7/51)27%  (3/11)
jumpToItem (int): void 100% (1/1)32%  (7/22)38%  (3/8)
moveCursorToRow (int): void 100% (1/1)64%  (16/25)78%  (7/9)
verifyCursorPosition (long): void 100% (1/1)69%  (11/16)75%  (3/4)
executeQuery (): void 100% (1/1)69%  (40/58)69%  (9/13)
doRead (): Object 100% (1/1)71%  (22/31)75%  (6/8)
JdbcCursorItemReader (): void 100% (1/1)86%  (30/35)98%  (9.8/10)
<static initializer> 100% (1/1)91%  (10/11)90%  (0.9/1)
afterPropertiesSet (): void 100% (1/1)100% (13/13)100% (4/4)
applyStatementSettings (PreparedStatement): void 100% (1/1)100% (28/28)100% (8/8)
doClose (): void 100% (1/1)100% (16/16)100% (6/6)
doOpen (): void 100% (1/1)100% (18/18)100% (5/5)
setDataSource (DataSource): void 100% (1/1)100% (4/4)100% (2/2)
setDriverSupportsAbsolute (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setFetchSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setIgnoreWarnings (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setMapper (RowMapper): void 100% (1/1)100% (4/4)100% (2/2)
setMaxRows (int): void 100% (1/1)100% (4/4)100% (2/2)
setQueryTimeout (int): void 100% (1/1)100% (4/4)100% (2/2)
setSql (String): void 100% (1/1)100% (4/4)100% (2/2)
setVerifyCursorPosition (boolean): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.database;
18 
19import java.sql.Connection;
20import java.sql.PreparedStatement;
21import java.sql.ResultSet;
22import java.sql.SQLException;
23import java.sql.SQLWarning;
24import java.sql.Statement;
25 
26import javax.sql.DataSource;
27 
28import org.apache.commons.logging.Log;
29import org.apache.commons.logging.LogFactory;
30import org.springframework.batch.item.ExecutionContext;
31import org.springframework.batch.item.ItemStream;
32import org.springframework.batch.item.support.AbstractBufferedItemReaderItemStream;
33import org.springframework.beans.factory.InitializingBean;
34import org.springframework.dao.InvalidDataAccessResourceUsageException;
35import org.springframework.jdbc.SQLWarningException;
36import org.springframework.jdbc.core.PreparedStatementSetter;
37import org.springframework.jdbc.core.RowMapper;
38import org.springframework.jdbc.support.JdbcUtils;
39import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
40import org.springframework.jdbc.support.SQLExceptionTranslator;
41import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
42import org.springframework.util.Assert;
43import org.springframework.util.ClassUtils;
44 
45/**
46 * <p>
47 * Simple item reader that opens a JDBC cursor and continually retrieves the
48 * next row in the ResultSet. It is extremely important to note that the
49 * JdbcDriver used must be version 3.0 or higher. This is because earlier
50 * versions do not support holding a ResultSet open over commits.
51 * </p>
52 * 
53 * <p>
54 * Each call to {@link #read()} will call the provided RowMapper, passing in the
55 * ResultSet. There is currently no wrapping of the ResultSet to suppress calls
56 * to next(). However, if the RowMapper (mistakenly) increments the current row,
57 * the next call to read will verify that the current row is at the expected
58 * position and throw a DataAccessException if it is not. This means that, in
59 * theory, a RowMapper could read ahead, as long as it returns the row back to
60 * the correct position before returning. The reason for such strictness on the
61 * ResultSet is due to the need to maintain control for transactions and
62 * restartability. This ensures that each call to {@link #read()} returns the
63 * ResultSet at the correct line, regardless of rollbacks or restarts.
64 * </p>
65 * 
66 * <p>
67 * {@link ExecutionContext}: The current row is returned as restart data, and
68 * when restored from that same data, the cursor is opened and the current row
69 * set to the value within the restart data. See
70 * {@link #setDriverSupportsAbsolute(boolean)} for improving restart
71 * performance.
72 * </p>
73 * 
74 * <p>
75 * Transactions: The same ResultSet is held open regardless of commits or roll
76 * backs in a surrounding transaction. This means that when such a transaction
77 * is committed, the reader is notified through the {@link #mark()} and
78 * {@link #reset()} so that it can save it's current row number. Later, if the
79 * transaction is rolled back, the current row can be moved back to the same row
80 * number as it was on when commit was called.
81 * </p>
82 *
83 * <p>
84 * NOTE that the cursor is opened using a separate connection from the rest of the
85 * processing in the step. This means that this reader also runs within its own
86 * JDBC transaction.
87 * </p>
88 *
89 * <p>
90 * Calling close on this {@link ItemStream} will cause all resources it is
91 * currently using to be freed. (Connection, ResultSet, etc). It is then illegal
92 * to call {@link #read()} again until it has been opened.
93 * </p>
94 * 
95 * <p>
96 * Known limitation: when used with Derby
97 * {@link #setVerifyCursorPosition(boolean)} needs to be <code>false</code>
98 * because {@link ResultSet#getRow()} call used for cursor position verification
99 * throws an exception.
100 * </p>
101 * 
102 * @author Lucas Ward
103 * @author Peter Zozom
104 * @author Robert Kasanicky
105 */
106public class JdbcCursorItemReader extends AbstractBufferedItemReaderItemStream implements InitializingBean {
107 
108        private static Log log = LogFactory.getLog(JdbcCursorItemReader.class);
109 
110        public static final int VALUE_NOT_SET = -1;
111 
112        private Connection con;
113 
114        private PreparedStatement preparedStatement;
115 
116        private PreparedStatementSetter preparedStatementSetter;
117 
118        protected ResultSet rs;
119 
120        private DataSource dataSource;
121 
122        private String sql;
123 
124        private int fetchSize = VALUE_NOT_SET;
125 
126        private int maxRows = VALUE_NOT_SET;
127 
128        private int queryTimeout = VALUE_NOT_SET;
129 
130        private boolean ignoreWarnings = true;
131 
132        private boolean verifyCursorPosition = true;
133 
134        private SQLExceptionTranslator exceptionTranslator;
135 
136        private RowMapper mapper;
137 
138        private boolean initialized = false;
139 
140        private boolean driverSupportsAbsolute = false;
141 
142        public JdbcCursorItemReader() {
143                setName(ClassUtils.getShortName(JdbcCursorItemReader.class));
144        }
145 
146        /**
147         * Assert that mandatory properties are set.
148         * 
149         * @throws IllegalArgumentException if either data source or sql properties
150         * not set.
151         */
152        public void afterPropertiesSet() throws Exception {
153                Assert.notNull(dataSource, "DataSOurce must be provided");
154                Assert.notNull(sql, "The SQL query must be provided");
155                Assert.notNull(mapper, "RowMapper must be provided");
156        }
157 
158        /**
159         * Public setter for the data source for injection purposes.
160         * 
161         * @param dataSource
162         */
163        public void setDataSource(DataSource dataSource) {
164                this.dataSource = dataSource;
165        }
166 
167        /**
168         * Executes the provided SQL query. The statement is created with
169         * 'READ_ONLY' and 'HOLD_CUSORS_OVER_COMMIT' set to true. This is extremely
170         * important, since a non read-only cursor may lock tables that shouldn't be
171         * locked, and not holding the cursor open over a commit would require it to
172         * be reopened after each commit, which would destroy performance.
173         */
174        private void executeQuery() {
175 
176                Assert.state(dataSource != null, "DataSource must not be null.");
177 
178                try {
179                        // Note: this is a connection that is separate from the current transaction
180                        this.con = dataSource.getConnection();
181                        preparedStatement = this.con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
182                                        ResultSet.HOLD_CURSORS_OVER_COMMIT);
183                        applyStatementSettings(preparedStatement);
184                        if (this.preparedStatementSetter != null) {
185                                preparedStatementSetter.setValues(preparedStatement);
186                        }
187                        this.rs = preparedStatement.executeQuery();
188                        handleWarnings(preparedStatement);
189                }
190                catch (SQLException se) {
191                        close(null);
192                        throw getExceptionTranslator().translate("Executing query", sql, se);
193                }
194 
195        }
196 
197        /**
198         * Prepare the given JDBC Statement (or PreparedStatement or
199         * CallableStatement), applying statement settings such as fetch size, max
200         * rows, and query timeout. @param stmt the JDBC Statement to prepare
201         * @throws SQLException
202         * 
203         * @see #setFetchSize
204         * @see #setMaxRows
205         * @see #setQueryTimeout
206         */
207        private void applyStatementSettings(PreparedStatement stmt) throws SQLException {
208                if (fetchSize != VALUE_NOT_SET) {
209                        stmt.setFetchSize(fetchSize);
210                        stmt.setFetchDirection(ResultSet.FETCH_FORWARD);
211                }
212                if (maxRows != VALUE_NOT_SET) {
213                        stmt.setMaxRows(maxRows);
214                }
215                if (queryTimeout != VALUE_NOT_SET) {
216                        stmt.setQueryTimeout(queryTimeout);
217                }
218        }
219 
220        /**
221         * Return the exception translator for this instance.
222         * 
223         * Creates a default SQLErrorCodeSQLExceptionTranslator for the specified
224         * DataSource if none is set.
225         */
226        protected SQLExceptionTranslator getExceptionTranslator() {
227                if (exceptionTranslator == null) {
228                        if (dataSource != null) {
229                                exceptionTranslator = new SQLErrorCodeSQLExceptionTranslator(dataSource);
230                        }
231                        else {
232                                exceptionTranslator = new SQLStateSQLExceptionTranslator();
233                        }
234                }
235                return exceptionTranslator;
236        }
237 
238        /**
239         * Throw a SQLWarningException if we're not ignoring warnings, else log the
240         * warnings (at debug level).
241         * 
242         * @param warning the warnings object from the current statement. May be
243         * <code>null</code>, in which case this method does nothing.
244         * 
245         * @see org.springframework.jdbc.SQLWarningException
246         */
247        private void handleWarnings(PreparedStatement pstmt) throws SQLWarningException, SQLException {
248                if (ignoreWarnings) {
249                        if (log.isDebugEnabled()) {
250                                SQLWarning warningToLog = pstmt.getWarnings();
251                                while (warningToLog != null) {
252                                        log.debug("SQLWarning ignored: SQL state '" + warningToLog.getSQLState() + "', error code '"
253                                                        + warningToLog.getErrorCode() + "', message [" + warningToLog.getMessage() + "]");
254                                        warningToLog = warningToLog.getNextWarning();
255                                }
256                        }
257                }
258                else {
259                        SQLWarning warnings = pstmt.getWarnings();
260                        if (warnings != null) {
261                                throw new SQLWarningException("Warning not ignored", warnings);
262                        }
263                }
264        }
265 
266        /**
267         * Moves the cursor in the ResultSet to the position specified by the row
268         * parameter by traversing the ResultSet.
269         * @param row
270         */
271        private void moveCursorToRow(int row) {
272                try {
273                        int count = 0;
274                        while (count!=row && rs.next()) {
275                                count++;
276                                if (count == row) {
277                                        break;
278                                }
279                        }
280                }
281                catch (SQLException se) {
282                        throw getExceptionTranslator().translate("Attempted to move ResultSet to last committed row", sql, se);
283                }
284        }
285 
286        /**
287         * Gives the JDBC driver a hint as to the number of rows that should be
288         * fetched from the database when more rows are needed for this
289         * <code>ResultSet</code> object. If the fetch size specified is zero, the
290         * JDBC driver ignores the value.
291         * 
292         * @param fetchSize the number of rows to fetch
293         * @see ResultSet#setFetchSize(int)
294         */
295        public void setFetchSize(int fetchSize) {
296                this.fetchSize = fetchSize;
297        }
298 
299        /**
300         * Sets the limit for the maximum number of rows that any
301         * <code>ResultSet</code> object can contain to the given number.
302         * 
303         * @param maxRows the new max rows limit; zero means there is no limit
304         * @see Statement#setMaxRows(int)
305         */
306        public void setMaxRows(int maxRows) {
307                this.maxRows = maxRows;
308        }
309 
310        /**
311         * Sets the number of seconds the driver will wait for a
312         * <code>Statement</code> object to execute to the given number of
313         * seconds. If the limit is exceeded, an <code>SQLException</code> is
314         * thrown.
315         * 
316         * @param queryTimeout seconds the new query timeout limit in seconds; zero
317         * means there is no limit
318         * @see Statement#setQueryTimeout(int)
319         */
320        public void setQueryTimeout(int queryTimeout) {
321                this.queryTimeout = queryTimeout;
322        }
323 
324        /**
325         * Set whether SQLWarnings should be ignored (only logged) or exception
326         * should be thrown.
327         * 
328         * @param ignoreWarnings if TRUE, warnings are ignored
329         */
330        public void setIgnoreWarnings(boolean ignoreWarnings) {
331                this.ignoreWarnings = ignoreWarnings;
332        }
333 
334        /**
335         * Allow verification of cursor position after current row is processed by
336         * RowMapper or RowCallbackHandler. Default value is TRUE.
337         * 
338         * @param verifyCursorPosition if true, cursor position is verified
339         */
340        public void setVerifyCursorPosition(boolean verifyCursorPosition) {
341                this.verifyCursorPosition = verifyCursorPosition;
342        }
343 
344        /**
345         * Set the RowMapper to be used for all calls to read().
346         * 
347         * @param mapper
348         */
349        public void setMapper(RowMapper mapper) {
350                this.mapper = mapper;
351        }
352 
353        /**
354         * Set the SQL statement to be used when creating the cursor. This statement
355         * should be a complete and valid SQL statement, as it will be run directly
356         * without any modification.
357         * 
358         * @param sql
359         */
360        public void setSql(String sql) {
361                this.sql = sql;
362        }
363 
364        /**
365         * Set the PreparedStatementSetter to use if any parameter values that need
366         * to be set in the supplied query.
367         * 
368         * @param preparedStatementSetter
369         */
370        public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
371                this.preparedStatementSetter = preparedStatementSetter;
372        }
373 
374        /**
375         * Indicate whether the JDBC driver supports setting the absolute row on a
376         * {@link ResultSet}. It is recommended that this is set to
377         * <code>true</code> for JDBC drivers that supports ResultSet.absolute()
378         * as it may improve performance, especially if a step fails while working
379         * with a large data set.
380         * 
381         * @see ResultSet#absolute(int)
382         * 
383         * @param driverSupportsAbsolute <code>false</code> by default
384         */
385        public void setDriverSupportsAbsolute(boolean driverSupportsAbsolute) {
386                this.driverSupportsAbsolute = driverSupportsAbsolute;
387        }
388 
389        /**
390         * Check the result set is in synch with the currentRow attribute. This is
391         * important to ensure that the user hasn't modified the current row.
392         */
393        private void verifyCursorPosition(long expectedCurrentRow) throws SQLException {
394                if (verifyCursorPosition) {
395                        if (expectedCurrentRow != this.rs.getRow()) {
396                                throw new InvalidDataAccessResourceUsageException("Unexpected cursor position change.");
397                        }
398                }
399        }
400 
401        /**
402         * Close the cursor and database connection.
403         */
404        protected void doClose() throws Exception {
405                initialized = false;
406                JdbcUtils.closeResultSet(this.rs);
407                JdbcUtils.closeStatement(this.preparedStatement);
408                JdbcUtils.closeConnection(this.con);
409                rs = null;
410 
411        }
412 
413        /**
414         * Execute the {@link #setSql(String)} query.
415         */
416        protected void doOpen() throws Exception {
417                Assert.state(!initialized, "Stream is already initialized.  Close before re-opening.");
418                Assert.isNull(rs, "ResultSet still open!  Close before re-opening.");
419                executeQuery();
420                initialized = true;
421 
422        }
423 
424        /**
425         * Read next row and map it to item, verify cursor position if
426         * {@link #setVerifyCursorPosition(boolean)} is true.
427         */
428        protected Object doRead() throws Exception {
429                try {
430                        if (!rs.next()) {
431                                return null;
432                        }
433                        int currentRow = getCurrentItemCount();
434                        Object item = mapper.mapRow(rs, currentRow);
435                        verifyCursorPosition(currentRow);
436                        return item;
437                }
438                catch (SQLException se) {
439                        throw getExceptionTranslator().translate("Attempt to process next row failed", sql, se);
440                }
441        }
442 
443        /**
444         * Use {@link ResultSet#absolute(int)} if possible, otherwise scroll by
445         * calling {@link ResultSet#next()}.
446         */
447        protected void jumpToItem(int itemIndex) throws Exception {
448                if (driverSupportsAbsolute) {
449                        try {
450                                rs.absolute(itemIndex);
451                        }
452                        catch (SQLException e) {
453                                // Driver does not support rs.absolute(int) revert to
454                                // traversing ResultSet
455                                log.warn("The JDBC driver does not appear to support ResultSet.absolute(). Consider"
456                                                + " reverting to the default behavior setting the driverSupportsAbsolute to false", e);
457 
458                                moveCursorToRow(itemIndex);
459                        }
460                }
461                else {
462                        moveCursorToRow(itemIndex);
463                }
464        }
465 
466}

[all classes][org.springframework.batch.item.database]
EMMA 2.0.5312 (C) Vladimir Roubtsov