View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.database;
18  
19  import java.sql.CallableStatement;
20  import java.sql.Connection;
21  import java.sql.ResultSet;
22  import java.sql.SQLException;
23  import java.sql.Types;
24  import java.util.Arrays;
25  
26  import org.springframework.jdbc.core.PreparedStatementSetter;
27  import org.springframework.jdbc.core.RowMapper;
28  import org.springframework.jdbc.core.SqlOutParameter;
29  import org.springframework.jdbc.core.SqlParameter;
30  import org.springframework.jdbc.core.metadata.CallMetaDataContext;
31  import org.springframework.jdbc.support.JdbcUtils;
32  import org.springframework.util.Assert;
33  import org.springframework.util.ClassUtils;
34  
35  /**
36   * <p>
37   * Item reader implementation that executes a stored procedure and then reads the returned cursor
38   * and continually retrieves the next row in the <code>ResultSet</code>.
39   * </p>
40   *
41   * <p>
42   * The callable statement used to open the cursor is created with the 'READ_ONLY' option as well as with the
43   * 'TYPE_FORWARD_ONLY' option. By default the cursor will be opened using a separate connection which means
44   * that it will not participate in any transactions created as part of the step processing.
45   * </p>
46   *
47   * <p>
48   * Each call to {@link #read()} will call the provided RowMapper, passing in the
49   * ResultSet.
50   * </p>
51   *
52   * <p>
53   * This class is modeled after the similar <code>JdbcCursorItemReader</code> class.
54   * </p>
55   *
56   * @author Thomas Risberg
57   */
58  @SuppressWarnings("rawtypes")
59  public class StoredProcedureItemReader<T> extends AbstractCursorItemReader<T> {
60  
61  	private CallableStatement callableStatement;
62  
63  	private PreparedStatementSetter preparedStatementSetter;
64  
65  	private String procedureName;
66  
67  	private String callString;
68  
69  	private RowMapper rowMapper;
70  
71  	private SqlParameter[] parameters = new SqlParameter[0];
72  
73  	private boolean function = false;
74  
75  	private int refCursorPosition = 0;
76  
77  	public StoredProcedureItemReader() {
78  		super();
79  		setName(ClassUtils.getShortName(StoredProcedureItemReader.class));
80  	}
81  
82  	/**
83  	 * Set the RowMapper to be used for all calls to read().
84  	 *
85  	 * @param rowMapper
86  	 */
87  	public void setRowMapper(RowMapper rowMapper) {
88  		this.rowMapper = rowMapper;
89  	}
90  
91  	/**
92  	 * Set the SQL statement to be used when creating the cursor. This statement
93  	 * should be a complete and valid SQL statement, as it will be run directly
94  	 * without any modification.
95  	 *
96  	 * @param sprocedureName
97  	 */
98  	public void setProcedureName(String sprocedureName) {
99  		this.procedureName = sprocedureName;
100 	}
101 
102 	/**
103 	 * Set the PreparedStatementSetter to use if any parameter values that need
104 	 * to be set in the supplied query.
105 	 *
106 	 * @param preparedStatementSetter
107 	 */
108 	public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
109 		this.preparedStatementSetter = preparedStatementSetter;
110 	}
111 
112 	/**
113 	 * Add one or more declared parameters. Used for configuring this operation when used in a
114 	 * bean factory. Each parameter will specify SQL type and (optionally) the parameter's name.
115 	 *
116 	 * @param parameters Array containing the declared <code>SqlParameter</code> objects
117 	 */
118 	public void setParameters(SqlParameter[] parameters) {
119 		this.parameters = parameters;
120 	}
121 
122 	/**
123 	 * Set whether this stored procedure is a function.
124 	 */
125 	public void setFunction(boolean function) {
126 		this.function = function;
127 	}
128 
129 	/**
130 	 * Set the parameter position of the REF CURSOR. Only used for Oracle and
131 	 * PostgreSQL that use REF CURSORs. For any other database this should be
132 	 * kept as 0 which is the default.
133 	 *
134 	 * @param refCursorPosition The parameter position of the REF CURSOR
135 	 */
136 	public void setRefCursorPosition(int refCursorPosition) {
137 		this.refCursorPosition = refCursorPosition;
138 	}
139 
140 	/**
141 	 * Assert that mandatory properties are set.
142 	 *
143 	 * @throws IllegalArgumentException if either data source or sql properties
144 	 * not set.
145 	 */
146 	@Override
147 	public void afterPropertiesSet() throws Exception {
148 		super.afterPropertiesSet();
149 		Assert.notNull(procedureName, "The name of the stored procedure must be provided");
150 		Assert.notNull(rowMapper, "RowMapper must be provided");
151 	}
152 
153 	@Override
154 	protected void openCursor(Connection con) {
155 
156 		Assert.state(procedureName != null, "Procedure Name must not be null.");
157 		Assert.state(refCursorPosition >= 0,
158 				"invalid refCursorPosition specified as " + refCursorPosition + "; it can't be " +
159 				"specified as a negative number.");
160 		Assert.state(refCursorPosition == 0 || refCursorPosition > 0,
161 				"invalid refCursorPosition specified as " + refCursorPosition + "; there are " +
162 						parameters.length + " parameters defined.");
163 
164 		CallMetaDataContext callContext = new CallMetaDataContext();
165 		callContext.setAccessCallParameterMetaData(false);
166 		callContext.setProcedureName(procedureName);
167 		callContext.setFunction(function);
168 		callContext.initializeMetaData(getDataSource());
169 		callContext.processParameters(Arrays.asList(parameters));
170 		SqlParameter cursorParameter = callContext.createReturnResultSetParameter("cursor", rowMapper);
171 		this.callString = callContext.createCallString();
172 
173 		log.debug("Call string is: " + callString);
174 
175 		int cursorSqlType = Types.OTHER;
176 		if (function) {
177 			if (cursorParameter instanceof SqlOutParameter) {
178 				cursorSqlType = cursorParameter.getSqlType();
179 			}
180 		}
181 		else {
182 			if (refCursorPosition > 0 && refCursorPosition <= parameters.length) {
183 				cursorSqlType = parameters[refCursorPosition - 1].getSqlType();
184 			}
185 		}
186 
187 		try {
188 			if (isUseSharedExtendedConnection()) {
189 				callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
190 						ResultSet.HOLD_CURSORS_OVER_COMMIT);
191 			}
192 			else {
193 				callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
194 			}
195 			applyStatementSettings(callableStatement);
196 			if (this.preparedStatementSetter != null) {
197 				preparedStatementSetter.setValues(callableStatement);
198 			}
199 
200 			if (function) {
201 				callableStatement.registerOutParameter(1, cursorSqlType);
202 			}
203 			else {
204 				if (refCursorPosition > 0) {
205 					callableStatement.registerOutParameter(refCursorPosition, cursorSqlType);
206 				}
207 			}
208 			boolean results = callableStatement.execute();
209 			if (results) {
210 				rs = callableStatement.getResultSet();
211 			}
212 			else {
213 				if (function) {
214 					rs = (ResultSet) callableStatement.getObject(1);
215 				}
216 				else {
217 					rs = (ResultSet) callableStatement.getObject(refCursorPosition);
218 				}
219 			}
220 			handleWarnings(callableStatement);
221 		}
222 		catch (SQLException se) {
223 			close();
224 			throw getExceptionTranslator().translate("Executing stored procedure", getSql(), se);
225 		}
226 
227 	}
228 
229 	@Override
230 	@SuppressWarnings("unchecked")
231 	protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
232 		return (T) rowMapper.mapRow(rs, currentRow);
233 	}
234 
235 	/**
236 	 * Close the cursor and database connection.
237 	 */
238 	@Override
239 	protected void cleanupOnClose() throws Exception {
240 		JdbcUtils.closeStatement(this.callableStatement);
241 	}
242 
243 	@Override
244 	public String getSql() {
245 		if (callString != null) {
246 			return this.callString;
247 		}
248 		else {
249 			return "PROCEDURE NAME: " + procedureName;
250 		}
251 	}
252 
253 }