EMMA Coverage Report (generated Tue May 06 07:28:24 PDT 2008)
[all classes][org.springframework.batch.item.database]

COVERAGE SUMMARY FOR SOURCE FILE [JdbcCursorItemReader.java]

nameclass, %method, %block, %line, %
JdbcCursorItemReader.java100% (2/2)87%  (27/31)77%  (396/517)82%  (116.7/142)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class JdbcCursorItemReader100% (1/1)88%  (21/24)74%  (276/374)81%  (86.7/107)
getCurrentProcessedRow (): long 0%   (0/1)0%   (0/4)0%   (0/1)
getExceptionTranslator (): SQLExceptionTranslator 0%   (0/1)0%   (0/22)0%   (0/6)
setPreparedStatementSetter (PreparedStatementSetter): void 0%   (0/1)0%   (0/4)0%   (0/2)
handleWarnings (SQLWarning): void 100% (1/1)20%  (9/44)50%  (5/10)
executeQuery (): void 100% (1/1)69%  (41/59)69%  (9/13)
JdbcCursorItemReader (): void 100% (1/1)86%  (30/35)98%  (9.8/10)
open (ExecutionContext): void 100% (1/1)87%  (60/69)89%  (16/18)
<static initializer> 100% (1/1)91%  (10/11)90%  (0.9/1)
afterPropertiesSet (): void 100% (1/1)100% (13/13)100% (4/4)
applyStatementSettings (PreparedStatement): void 100% (1/1)100% (28/28)100% (8/8)
close (ExecutionContext): void 100% (1/1)100% (19/19)100% (7/7)
mark (): void 100% (1/1)100% (4/4)100% (2/2)
read (): Object 100% (1/1)100% (4/4)100% (1/1)
reset (): void 100% (1/1)100% (4/4)100% (2/2)
setDataSource (DataSource): void 100% (1/1)100% (4/4)100% (2/2)
setFetchSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setIgnoreWarnings (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setMapper (RowMapper): void 100% (1/1)100% (4/4)100% (2/2)
setMaxRows (int): void 100% (1/1)100% (4/4)100% (2/2)
setQueryTimeout (int): void 100% (1/1)100% (4/4)100% (2/2)
setSaveState (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setSql (String): void 100% (1/1)100% (4/4)100% (2/2)
setVerifyCursorPosition (boolean): void 100% (1/1)100% (4/4)100% (2/2)
update (ExecutionContext): void 100% (1/1)100% (18/18)100% (4/4)
     
class JdbcCursorItemReader$BufferredResultSetReader100% (1/1)86%  (6/7)84%  (120/143)86%  (30/35)
JdbcCursorItemReader$BufferredResultSetReader (JdbcCursorItemReader, ResultSe... 0%   (0/1)0%   (0/7)0%   (0/2)
verifyCursorPosition (long): void 100% (1/1)71%  (12/17)75%  (3/4)
read (): Object 100% (1/1)82%  (50/61)83%  (10/12)
JdbcCursorItemReader$BufferredResultSetReader (JdbcCursorItemReader, ResultSe... 100% (1/1)100% (33/33)100% (10/10)
getProcessedRowCount (): long 100% (1/1)100% (4/4)100% (1/1)
mark (): void 100% (1/1)100% (8/8)100% (3/3)
reset (): void 100% (1/1)100% (13/13)100% (3/3)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.item.database;
18 
19import java.sql.Connection;
20import java.sql.PreparedStatement;
21import java.sql.ResultSet;
22import java.sql.SQLException;
23import java.sql.SQLWarning;
24import java.sql.Statement;
25import java.util.ArrayList;
26import java.util.List;
27 
28import javax.sql.DataSource;
29 
30import org.apache.commons.logging.Log;
31import org.apache.commons.logging.LogFactory;
32import org.springframework.batch.item.ExecutionContext;
33import org.springframework.batch.item.ExecutionContextUserSupport;
34import org.springframework.batch.item.ItemReader;
35import org.springframework.batch.item.ItemStream;
36import org.springframework.batch.item.MarkFailedException;
37import org.springframework.batch.item.NoWorkFoundException;
38import org.springframework.batch.item.ParseException;
39import org.springframework.batch.item.ResetFailedException;
40import org.springframework.batch.item.UnexpectedInputException;
41import org.springframework.beans.factory.InitializingBean;
42import org.springframework.dao.DataAccessException;
43import org.springframework.dao.InvalidDataAccessResourceUsageException;
44import org.springframework.jdbc.SQLWarningException;
45import org.springframework.jdbc.core.PreparedStatementSetter;
46import org.springframework.jdbc.core.RowMapper;
47import org.springframework.jdbc.support.JdbcUtils;
48import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
49import org.springframework.jdbc.support.SQLExceptionTranslator;
50import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
51import org.springframework.util.Assert;
52import org.springframework.util.ClassUtils;
53 
54/**
55 * <p>
56 * Simple item reader that opens a JDBC cursor and continually retrieves the
57 * next row in the ResultSet. It is extremely important to note that the
58 * JdbcDriver used must be version 3.0 or higher. This is because earlier
59 * versions do not support holding a ResultSet open over commits.
60 * </p>
61 * 
62 * <p>
63 * Each call to {@link #read()} will call the provided RowMapper, passing in the
64 * ResultSet. There is currently no wrapping of the ResultSet to suppress calls
65 * to next(). However, if the RowMapper (mistakenly) increments the current row,
66 * the next call to read will verify that the current row is at the expected
67 * position and throw a DataAccessException if it is not. This means that, in
68 * theory, a RowMapper could read ahead, as long as it returns the row back to
69 * the correct position before returning. The reason for such strictness on the
70 * ResultSet is due to the need to maintain control for transactions,
71 * restartability and skippability. This ensures that each call to
72 * {@link #read()} returns the ResultSet at the correct line, regardless of
73 * rollbacks, restarts, or skips.
74 * </p>
75 * 
76 * <p>
77 * {@link ExecutionContext}: The current row is returned as restart data, and
78 * when restored from that same data, the cursor is opened and the current row
79 * set to the value within the restart data. Two values are stored: the current
80 * line being processed and the number of lines that have been skipped.
81 * </p>
82 * 
83 * <p>
84 * Transactions: The same ResultSet is held open regardless of commits or roll
85 * backs in a surrounding transaction. This means that when such a transaction
86 * is committed, the input source is notified through the {@link #mark()} and
87 * {@link #reset()} so that it can save it's current row number. Later, if the
88 * transaction is rolled back, the current row can be moved back to the same row
89 * number as it was on when commit was called.
90 * </p>
91 * 
92 * <p>
93 * Calling skip will indicate that a record is bad and should not be
94 * re-presented to the user if the transaction is rolled back. For example, if
95 * row 2 is read in, and found to be bad, calling skip will inform the
96 * {@link ItemReader}. If reading is then continued, and a rollback is
97 * necessary because of an error on output, the input source will be returned to
98 * row 1. Calling read while on row 1 will move the current row to 3, not 2,
99 * because 2 has been marked as skipped.
100 * </p>
101 * 
102 * <p>
103 * Calling close on this {@link ItemStream} will cause all resources it is
104 * currently using to be freed. (Connection, ResultSet, etc). It is then illegal
105 * to call {@link #read()} again until it has been opened.
106 * </p>
107 * 
108 * <p>
109 * Known limitation: when used with Derby
110 * {@link #setVerifyCursorPosition(boolean)} needs to be <code>false</code>
111 * because {@link ResultSet#getRow()} call used for cursor position verification
112 * throws an exception.
113 * </p>
114 * 
115 * @author Lucas Ward
116 * @author Peter Zozom
117 */
118public class JdbcCursorItemReader extends ExecutionContextUserSupport implements ItemReader, InitializingBean,
119                ItemStream {
120 
121        private static Log log = LogFactory.getLog(JdbcCursorItemReader.class);
122 
123        public static final int VALUE_NOT_SET = -1;
124 
125        private static final String CURRENT_PROCESSED_ROW = "last.processed.row.number";
126 
127        private Connection con;
128 
129        private PreparedStatement preparedStatement;
130 
131        private PreparedStatementSetter preparedStatementSetter;
132 
133        protected ResultSet rs;
134 
135        private DataSource dataSource;
136 
137        private String sql;
138 
139        private int fetchSize = VALUE_NOT_SET;
140 
141        private int maxRows = VALUE_NOT_SET;
142 
143        private int queryTimeout = VALUE_NOT_SET;
144 
145        private boolean ignoreWarnings = true;
146 
147        private boolean verifyCursorPosition = true;
148 
149        private SQLExceptionTranslator exceptionTranslator;
150 
151        private RowMapper mapper;
152 
153        private boolean initialized = false;
154 
155        private boolean saveState = false;
156 
157        private BufferredResultSetReader bufferredReader;
158 
159        public JdbcCursorItemReader() {
160                setName(ClassUtils.getShortName(JdbcCursorItemReader.class));
161        }
162 
163        /**
164         * Assert that mandatory properties are set.
165         * 
166         * @throws IllegalArgumentException if either data source or sql properties
167         * not set.
168         */
169        public void afterPropertiesSet() throws Exception {
170                Assert.notNull(dataSource, "DataSOurce must be provided");
171                Assert.notNull(sql, "The SQL query must be provided");
172                Assert.notNull(mapper, "RowMapper must be provided");
173        }
174 
175        /**
176         * Public setter for the data source for injection purposes.
177         * 
178         * @param dataSource
179         */
180        public void setDataSource(DataSource dataSource) {
181                this.dataSource = dataSource;
182        }
183 
184        /**
185         * Increment the cursor to the next row, validating the cursor position and
186         * passing the resultset to the RowMapper. If read has not been called on
187         * this instance before, the cursor will be opened. If there are skipped
188         * records for this commit scope, an internal list of skipped records will
189         * be checked to ensure that only a valid row is given to the mapper.
190         * 
191         * @returns Object returned by RowMapper
192         * @throws DataAccessException
193         */
194        public Object read() throws Exception {
195 
196                return bufferredReader.read();
197        }
198 
199        public long getCurrentProcessedRow() {
200                return bufferredReader.getProcessedRowCount();
201        }
202 
203        /**
204         * Mark the current row. Calling reset will cause the result set to be set
205         * to the current row when mark was called.
206         */
207        public void mark() {
208                bufferredReader.mark();
209        }
210 
211        /**
212         * Set the ResultSet's current row to the last marked position.
213         * 
214         * @throws DataAccessException
215         */
216        public void reset() throws ResetFailedException {
217                bufferredReader.reset();
218        }
219 
220        /**
221         * Close this input source. The ResultSet, Statement and Connection created
222         * will be closed. This must be called or the connection and cursor will be
223         * held open indefinitely!
224         * 
225         * @see org.springframework.batch.item.ItemStream#close(ExecutionContext)
226         */
227        public void close(ExecutionContext executionContext) {
228                initialized = false;
229                JdbcUtils.closeResultSet(this.rs);
230                JdbcUtils.closeStatement(this.preparedStatement);
231                JdbcUtils.closeConnection(this.con);
232                bufferredReader = null;
233                rs = null;
234        }
235 
236        /*
237         * Executes the provided SQL query. The statement is created with
238         * 'READ_ONLY' and 'HOLD_CUSORS_OVER_COMMIT' set to true. This is extremely
239         * important, since a non read-only cursor may lock tables that shouldn't be
240         * locked, and not holding the cursor open over a commit would require it to
241         * be reopened after each commit, which would destroy performance.
242         */
243        private void executeQuery() {
244 
245                Assert.state(dataSource != null, "DataSource must not be null.");
246 
247                try {
248                        this.con = dataSource.getConnection();
249                        preparedStatement = this.con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
250                                        ResultSet.HOLD_CURSORS_OVER_COMMIT);
251                        applyStatementSettings(preparedStatement);
252                        if (this.preparedStatementSetter != null) {
253                                preparedStatementSetter.setValues(preparedStatement);
254                        }
255                        this.rs = preparedStatement.executeQuery();
256                        handleWarnings(preparedStatement.getWarnings());
257                }
258                catch (SQLException se) {
259                        close(null);
260                        throw getExceptionTranslator().translate("Executing query", sql, se);
261                }
262 
263        }
264 
265        /*
266         * Prepare the given JDBC Statement (or PreparedStatement or
267         * CallableStatement), applying statement settings such as fetch size, max
268         * rows, and query timeout. @param stmt the JDBC Statement to prepare
269         * @throws SQLException
270         * 
271         * @see #setFetchSize
272         * @see #setMaxRows
273         * @see #setQueryTimeout
274         */
275        private void applyStatementSettings(PreparedStatement stmt) throws SQLException {
276                if (fetchSize != VALUE_NOT_SET) {
277                        stmt.setFetchSize(fetchSize);
278                        stmt.setFetchDirection(ResultSet.FETCH_FORWARD);
279                }
280                if (maxRows != VALUE_NOT_SET) {
281                        stmt.setMaxRows(maxRows);
282                }
283                if (queryTimeout != VALUE_NOT_SET) {
284                        stmt.setQueryTimeout(queryTimeout);
285                }
286        }
287 
288        /*
289         * Return the exception translator for this instance. <p>Creates a default
290         * SQLErrorCodeSQLExceptionTranslator for the specified DataSource if none
291         * is set.
292         */
293        protected SQLExceptionTranslator getExceptionTranslator() {
294                if (exceptionTranslator == null) {
295                        if (dataSource != null) {
296                                exceptionTranslator = new SQLErrorCodeSQLExceptionTranslator(dataSource);
297                        }
298                        else {
299                                exceptionTranslator = new SQLStateSQLExceptionTranslator();
300                        }
301                }
302                return exceptionTranslator;
303        }
304 
305        /*
306         * Throw a SQLWarningException if we're not ignoring warnings, else log the
307         * warnings (at debug level).
308         * 
309         * @param warning the warnings object from the current statement. May be
310         * <code>null</code>, in which case this method does nothing.
311         * 
312         * @see org.springframework.jdbc.SQLWarningException
313         */
314        private void handleWarnings(SQLWarning warnings) throws SQLWarningException {
315                if (ignoreWarnings) {
316                        SQLWarning warningToLog = warnings;
317                        while (warningToLog != null) {
318                                log.debug("SQLWarning ignored: SQL state '" + warningToLog.getSQLState() + "', error code '"
319                                                + warningToLog.getErrorCode() + "', message [" + warningToLog.getMessage() + "]");
320                                warningToLog = warningToLog.getNextWarning();
321                        }
322                }
323                else if (warnings != null) {
324                        throw new SQLWarningException("Warning not ignored", warnings);
325                }
326        }
327 
328        /*
329         * (non-Javadoc)
330         * 
331         * @see org.springframework.batch.item.stream.ItemStreamAdapter#getExecutionContext()
332         */
333        public void update(ExecutionContext executionContext) {
334                if (saveState && initialized) {
335                        Assert.notNull(executionContext, "ExecutionContext must not be null");
336                        executionContext.putLong(getKey(CURRENT_PROCESSED_ROW), bufferredReader.getProcessedRowCount());
337                }
338        }
339 
340        /*
341         * (non-Javadoc)
342         * 
343         * @see org.springframework.batch.item.stream.ItemStreamAdapter#restoreFrom(org.springframework.batch.item.ExecutionContext)
344         */
345        public void open(ExecutionContext context) {
346                Assert.state(!initialized, "Stream is already initialized.  Close before re-opening.");
347                Assert.isNull(rs, "ResultSet still open!  Close before re-opening.");
348                Assert.notNull(context, "ExecutionContext must not be null");
349                executeQuery();
350                initialized = true;
351                int processedRowCount = 0;
352 
353                if (context.containsKey(getKey(CURRENT_PROCESSED_ROW))) {
354                        try {
355                                processedRowCount = Long.valueOf(context.getLong(getKey(CURRENT_PROCESSED_ROW))).intValue();
356                                int count = 0;
357                                while (rs.next()) {
358                                        count++;
359                                        if (count == processedRowCount) {
360                                                break;
361                                        }
362                                }
363                        }
364                        catch (SQLException se) {
365                                throw getExceptionTranslator().translate("Attempted to move ResultSet to last committed row", sql, se);
366                        }
367                }
368 
369                bufferredReader = new BufferredResultSetReader(rs, mapper, processedRowCount);
370        }
371 
372        /**
373         * Gives the JDBC driver a hint as to the number of rows that should be
374         * fetched from the database when more rows are needed for this
375         * <code>ResultSet</code> object. If the fetch size specified is zero, the
376         * JDBC driver ignores the value.
377         * 
378         * @param fetchSize the number of rows to fetch
379         * @see ResultSet#setFetchSize(int)
380         */
381        public void setFetchSize(int fetchSize) {
382                this.fetchSize = fetchSize;
383        }
384 
385        /**
386         * Sets the limit for the maximum number of rows that any
387         * <code>ResultSet</code> object can contain to the given number.
388         * 
389         * @param maxRows the new max rows limit; zero means there is no limit
390         * @see Statement#setMaxRows(int)
391         */
392        public void setMaxRows(int maxRows) {
393                this.maxRows = maxRows;
394        }
395 
396        /**
397         * Sets the number of seconds the driver will wait for a
398         * <code>Statement</code> object to execute to the given number of
399         * seconds. If the limit is exceeded, an <code>SQLException</code> is
400         * thrown.
401         * 
402         * @param queryTimeout seconds the new query timeout limit in seconds; zero
403         * means there is no limit
404         * @see Statement#setQueryTimeout(int)
405         */
406        public void setQueryTimeout(int queryTimeout) {
407                this.queryTimeout = queryTimeout;
408        }
409 
410        /**
411         * Set whether SQLWarnings should be ignored (only logged) or exception
412         * should be thrown.
413         * 
414         * @param ignoreWarnings if TRUE, warnings are ignored
415         */
416        public void setIgnoreWarnings(boolean ignoreWarnings) {
417                this.ignoreWarnings = ignoreWarnings;
418        }
419 
420        /**
421         * Allow verification of cursor position after current row is processed by
422         * RowMapper or RowCallbackHandler. Default value is TRUE.
423         * 
424         * @param verifyCursorPosition if true, cursor position is verified
425         */
426        public void setVerifyCursorPosition(boolean verifyCursorPosition) {
427                this.verifyCursorPosition = verifyCursorPosition;
428        }
429 
430        /**
431         * Set the RowMapper to be used for all calls to read().
432         * 
433         * @param mapper
434         */
435        public void setMapper(RowMapper mapper) {
436                this.mapper = mapper;
437        }
438 
439        /**
440         * Set the sql statement to be used when creating the cursor. This statement
441         * should be a complete and valid Sql statement, as it will be run directly
442         * without any modification.
443         * 
444         * @param sql
445         */
446        public void setSql(String sql) {
447                this.sql = sql;
448        }
449 
450        /**
451         * Set the PreparedStatementSetter to use if any parameter values that need
452         * to be set in the supplied query.
453         * 
454         * @param preparedStatementSetter
455         */
456        public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
457                this.preparedStatementSetter = preparedStatementSetter;
458        }
459 
460        /**
461         * Set whether this {@link ItemReader} should save it's state in the
462         * {@link ExecutionContext} or not
463         * 
464         * @param saveState
465         */
466        public void setSaveState(boolean saveState) {
467                this.saveState = saveState;
468        }
469 
470        private class BufferredResultSetReader implements ItemReader {
471 
472                private ResultSet rs;
473 
474                private RowMapper rowMapper;
475 
476                private List buffer;
477 
478                private int currentIndex;
479 
480                private int processedRowCount;
481 
482                private int INITIAL_POSITION = -1;
483 
484                public BufferredResultSetReader(ResultSet rs, RowMapper rowMapper, int processedRowCount) {
485                        Assert.notNull(rs, "The ResultSet must not be null");
486                        Assert.notNull(rowMapper, "The RowMapper must not be null");
487                        this.rs = rs;
488                        this.rowMapper = rowMapper;
489                        buffer = new ArrayList();
490                        currentIndex = INITIAL_POSITION;
491                        this.processedRowCount = processedRowCount;
492                }
493 
494                public BufferredResultSetReader(ResultSet rs, RowMapper rowMapper) {
495                        this(rs, rowMapper, 0);
496                }
497 
498                public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException {
499 
500                        currentIndex++;
501                        // if the incremented index reaches out of the buffer, add next item
502                        // from result set to buffer
503                        if (buffer.size() == currentIndex) {
504                                try {
505                                        if (!rs.next()) {
506                                                return null;
507                                        }
508                                        int currentRow = processedRowCount + 1;// rs.getRow();
509                                        buffer.add(rowMapper.mapRow(rs, currentRow));
510                                        verifyCursorPosition(currentRow);
511                                }
512                                catch (SQLException se) {
513                                        throw getExceptionTranslator().translate("Attempt to process next row failed", sql, se);
514                                }
515                        }
516 
517                        processedRowCount++;
518                        return buffer.get(currentIndex);
519                }
520 
521                public void mark() throws MarkFailedException {
522                        buffer.clear();
523                        currentIndex = INITIAL_POSITION;
524                }
525 
526                public void reset() throws ResetFailedException {
527                        processedRowCount -= buffer.size();
528                        currentIndex = INITIAL_POSITION;
529                }
530 
531                // Check the result set is in synch with the currentRow attribute. This
532                // is
533                // important
534                // to ensure that the user hasn't modified the current row.
535                private void verifyCursorPosition(long expectedCurrentRow) throws SQLException {
536                        if (verifyCursorPosition) {
537                                if (expectedCurrentRow != this.rs.getRow()) {
538                                        throw new InvalidDataAccessResourceUsageException("Unexpected cursor position change.");
539                                }
540                        }
541                }
542 
543                public long getProcessedRowCount() {
544                        return processedRowCount;
545                }
546 
547        }
548}

[all classes][org.springframework.batch.item.database]
EMMA 2.0.5312 (C) Vladimir Roubtsov