View Javadoc

1   /*
2    * Copyright 2006-2009 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.database;
18  
19  import java.sql.CallableStatement;
20  import java.sql.Connection;
21  import java.sql.ResultSet;
22  import java.sql.SQLException;
23  import java.sql.Types;
24  import java.util.Arrays;
25  
26  import org.springframework.jdbc.core.PreparedStatementSetter;
27  import org.springframework.jdbc.core.RowMapper;
28  import org.springframework.jdbc.core.SqlOutParameter;
29  import org.springframework.jdbc.core.SqlParameter;
30  import org.springframework.jdbc.core.metadata.CallMetaDataContext;
31  import org.springframework.jdbc.support.JdbcUtils;
32  import org.springframework.util.Assert;
33  import org.springframework.util.ClassUtils;
34  
35  /**
36   * <p>
37   * Item reader implementation that executes a stored procedure and then reads the returned cursor 
38   * and continually retrieves the next row in the <code>ResultSet</code>. 
39   * </p>
40   * 
41   * <p>
42   * The callable statement used to open the cursor is created with the 'READ_ONLY' option as well as with the 
43   * 'TYPE_FORWARD_ONLY' option. By default the cursor will be opened using a separate connection which means 
44   * that it will not participate in any transactions created as part of the step processing.
45   * </p>
46   *  
47   * <p>
48   * Each call to {@link #read()} will call the provided RowMapper, passing in the
49   * ResultSet. 
50   * </p>
51   * 
52   * <p>
53   * This class is modeled after the similar <code>JdbcCursorItemReader</code> class. 
54   * </p>
55   * 
56   * @author Thomas Risberg
57   */
58  public class StoredProcedureItemReader<T> extends AbstractCursorItemReader<T> {
59  
60  	private CallableStatement callableStatement;
61  
62  	private PreparedStatementSetter preparedStatementSetter;
63  
64  	private String procedureName;
65  	
66  	private String callString;
67  
68  	private RowMapper rowMapper;
69  
70  	private SqlParameter[] parameters = new SqlParameter[0];
71  	
72  	private boolean function = false;
73  	
74  	private int refCursorPosition = 0;
75  
76  	public StoredProcedureItemReader() {
77  		super();
78  		setName(ClassUtils.getShortName(StoredProcedureItemReader.class));
79  	}
80  
81  	/**
82  	 * Set the RowMapper to be used for all calls to read().
83  	 * 
84  	 * @param rowMapper
85  	 */
86  	public void setRowMapper(RowMapper rowMapper) {
87  		this.rowMapper = rowMapper;
88  	}
89  
90  	/**
91  	 * Set the SQL statement to be used when creating the cursor. This statement
92  	 * should be a complete and valid SQL statement, as it will be run directly
93  	 * without any modification.
94  	 * 
95  	 * @param sprocedureName
96  	 */
97  	public void setProcedureName(String sprocedureName) {
98  		this.procedureName = sprocedureName;
99  	}
100 
101 	/**
102 	 * Set the PreparedStatementSetter to use if any parameter values that need
103 	 * to be set in the supplied query.
104 	 * 
105 	 * @param preparedStatementSetter
106 	 */
107 	public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
108 		this.preparedStatementSetter = preparedStatementSetter;
109 	}
110 
111 	/**
112 	 * Add one or more declared parameters. Used for configuring this operation when used in a 
113 	 * bean factory. Each parameter will specify SQL type and (optionally) the parameter's name. 
114 	 * 
115 	 * @param parameters Array containing the declared <code>SqlParameter</code> objects
116 	 */
117 	public void setParameters(SqlParameter[] parameters) {
118 		this.parameters = parameters;
119 	}
120 	
121 	/**
122 	 * Set whether this stored procedure is a function.
123 	 */
124 	public void setFunction(boolean function) {
125 		this.function = function;
126 	}
127 
128 	/**
129 	 * Set the parameter position of the REF CURSOR. Only used for Oracle and
130 	 * PostgreSQL that use REF CURSORs. For any other database this should be 
131 	 * kept as 0 which is the default.
132 	 *  
133 	 * @param refCursorPosition The parameter position of the REF CURSOR
134 	 */
135 	public void setRefCursorPosition(int refCursorPosition) {
136 		this.refCursorPosition = refCursorPosition;
137 	}
138 
139 	/**
140 	 * Assert that mandatory properties are set.
141 	 * 
142 	 * @throws IllegalArgumentException if either data source or sql properties
143 	 * not set.
144 	 */
145 	public void afterPropertiesSet() throws Exception {
146 		super.afterPropertiesSet();
147 		Assert.notNull(procedureName, "The name of the stored procedure must be provided");
148 		Assert.notNull(rowMapper, "RowMapper must be provided");
149 	}
150 
151 	protected void openCursor(Connection con) {	
152 
153 		Assert.state(procedureName != null, "Procedure Name must not be null.");
154 		Assert.state(refCursorPosition >= 0, 
155 				"invalid refCursorPosition specified as " + refCursorPosition + "; it can't be " +
156 				"specified as a negative number.");
157 		Assert.state(refCursorPosition == 0 || refCursorPosition > 0, 
158 				"invalid refCursorPosition specified as " + refCursorPosition + "; there are " + 
159 				parameters.length + " parameters defined.");
160 
161 		CallMetaDataContext callContext = new CallMetaDataContext();
162 		callContext.setAccessCallParameterMetaData(false);
163 		callContext.setProcedureName(procedureName);
164 		callContext.setFunction(function);
165 		callContext.initializeMetaData(getDataSource());
166 		callContext.processParameters(Arrays.asList(parameters));
167 		SqlParameter cursorParameter = callContext.createReturnResultSetParameter("cursor", rowMapper);
168 		this.callString = callContext.createCallString();
169 
170 		log.debug("Call string is: " + callString);
171 		
172 		int cursorSqlType = Types.OTHER;
173 		if (function) {
174 			if (cursorParameter instanceof SqlOutParameter) {
175 				cursorSqlType = cursorParameter.getSqlType();
176 			}
177 		}
178 		else {
179 			if (refCursorPosition > 0 && refCursorPosition <= parameters.length) {
180 				cursorSqlType = parameters[refCursorPosition - 1].getSqlType();
181 			}
182 		}
183 		
184 		try {
185 			if (isUseSharedExtendedConnection()) {
186 				callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
187 						ResultSet.HOLD_CURSORS_OVER_COMMIT);
188 			}
189 			else {
190 				callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
191 			}
192 			applyStatementSettings(callableStatement);
193 			if (this.preparedStatementSetter != null) {
194 				preparedStatementSetter.setValues(callableStatement);
195 			}
196 			
197 			if (function) {
198 				callableStatement.registerOutParameter(1, cursorSqlType);
199 			}
200 			else {
201 				if (refCursorPosition > 0) {
202 					callableStatement.registerOutParameter(refCursorPosition, cursorSqlType);
203 				}
204 			}
205 			boolean results = callableStatement.execute();
206 			if (results) {
207 				rs = callableStatement.getResultSet();
208 			}
209 			else {
210 				if (function) {
211 					rs = (ResultSet) callableStatement.getObject(1);
212 				}
213 				else {
214 					rs = (ResultSet) callableStatement.getObject(refCursorPosition);
215 				}
216 			}
217 			handleWarnings(callableStatement);
218 		}
219 		catch (SQLException se) {
220 			close();
221 			throw getExceptionTranslator().translate("Executing stored procedure", getSql(), se);
222 		}
223 
224 	}
225 
226 	@SuppressWarnings("unchecked")
227 	protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
228 		return (T) rowMapper.mapRow(rs, currentRow);
229 	}
230 
231 	/**
232 	 * Close the cursor and database connection.
233 	 */
234 	protected void cleanupOnClose() throws Exception {
235 		JdbcUtils.closeStatement(this.callableStatement);
236 	}
237 
238 	@Override
239 	public String getSql() {
240 		if (callString != null) {
241 			return this.callString;
242 		}
243 		else {
244 			return "PROCEDURE NAME: " + procedureName;
245 		}
246 	}
247 
248 }