1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.database;
18
19 import java.sql.CallableStatement;
20 import java.sql.Connection;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.Types;
24 import java.util.Arrays;
25
26 import org.springframework.jdbc.core.PreparedStatementSetter;
27 import org.springframework.jdbc.core.RowMapper;
28 import org.springframework.jdbc.core.SqlOutParameter;
29 import org.springframework.jdbc.core.SqlParameter;
30 import org.springframework.jdbc.core.metadata.CallMetaDataContext;
31 import org.springframework.jdbc.support.JdbcUtils;
32 import org.springframework.util.Assert;
33 import org.springframework.util.ClassUtils;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class StoredProcedureItemReader<T> extends AbstractCursorItemReader<T> {
59
60 private CallableStatement callableStatement;
61
62 private PreparedStatementSetter preparedStatementSetter;
63
64 private String procedureName;
65
66 private String callString;
67
68 private RowMapper rowMapper;
69
70 private SqlParameter[] parameters = new SqlParameter[0];
71
72 private boolean function = false;
73
74 private int refCursorPosition = 0;
75
76 public StoredProcedureItemReader() {
77 super();
78 setName(ClassUtils.getShortName(StoredProcedureItemReader.class));
79 }
80
81
82
83
84
85
86 public void setRowMapper(RowMapper rowMapper) {
87 this.rowMapper = rowMapper;
88 }
89
90
91
92
93
94
95
96
97 public void setProcedureName(String sprocedureName) {
98 this.procedureName = sprocedureName;
99 }
100
101
102
103
104
105
106
107 public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
108 this.preparedStatementSetter = preparedStatementSetter;
109 }
110
111
112
113
114
115
116
117 public void setParameters(SqlParameter[] parameters) {
118 this.parameters = parameters;
119 }
120
121
122
123
124 public void setFunction(boolean function) {
125 this.function = function;
126 }
127
128
129
130
131
132
133
134
135 public void setRefCursorPosition(int refCursorPosition) {
136 this.refCursorPosition = refCursorPosition;
137 }
138
139
140
141
142
143
144
145 public void afterPropertiesSet() throws Exception {
146 super.afterPropertiesSet();
147 Assert.notNull(procedureName, "The name of the stored procedure must be provided");
148 Assert.notNull(rowMapper, "RowMapper must be provided");
149 }
150
151 protected void openCursor(Connection con) {
152
153 Assert.state(procedureName != null, "Procedure Name must not be null.");
154 Assert.state(refCursorPosition >= 0,
155 "invalid refCursorPosition specified as " + refCursorPosition + "; it can't be " +
156 "specified as a negative number.");
157 Assert.state(refCursorPosition == 0 || refCursorPosition > 0,
158 "invalid refCursorPosition specified as " + refCursorPosition + "; there are " +
159 parameters.length + " parameters defined.");
160
161 CallMetaDataContext callContext = new CallMetaDataContext();
162 callContext.setAccessCallParameterMetaData(false);
163 callContext.setProcedureName(procedureName);
164 callContext.setFunction(function);
165 callContext.initializeMetaData(getDataSource());
166 callContext.processParameters(Arrays.asList(parameters));
167 SqlParameter cursorParameter = callContext.createReturnResultSetParameter("cursor", rowMapper);
168 this.callString = callContext.createCallString();
169
170 log.debug("Call string is: " + callString);
171
172 int cursorSqlType = Types.OTHER;
173 if (function) {
174 if (cursorParameter instanceof SqlOutParameter) {
175 cursorSqlType = cursorParameter.getSqlType();
176 }
177 }
178 else {
179 if (refCursorPosition > 0 && refCursorPosition <= parameters.length) {
180 cursorSqlType = parameters[refCursorPosition - 1].getSqlType();
181 }
182 }
183
184 try {
185 if (isUseSharedExtendedConnection()) {
186 callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
187 ResultSet.HOLD_CURSORS_OVER_COMMIT);
188 }
189 else {
190 callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
191 }
192 applyStatementSettings(callableStatement);
193 if (this.preparedStatementSetter != null) {
194 preparedStatementSetter.setValues(callableStatement);
195 }
196
197 if (function) {
198 callableStatement.registerOutParameter(1, cursorSqlType);
199 }
200 else {
201 if (refCursorPosition > 0) {
202 callableStatement.registerOutParameter(refCursorPosition, cursorSqlType);
203 }
204 }
205 boolean results = callableStatement.execute();
206 if (results) {
207 rs = callableStatement.getResultSet();
208 }
209 else {
210 if (function) {
211 rs = (ResultSet) callableStatement.getObject(1);
212 }
213 else {
214 rs = (ResultSet) callableStatement.getObject(refCursorPosition);
215 }
216 }
217 handleWarnings(callableStatement);
218 }
219 catch (SQLException se) {
220 close();
221 throw getExceptionTranslator().translate("Executing stored procedure", getSql(), se);
222 }
223
224 }
225
226 @SuppressWarnings("unchecked")
227 protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
228 return (T) rowMapper.mapRow(rs, currentRow);
229 }
230
231
232
233
234 protected void cleanupOnClose() throws Exception {
235 JdbcUtils.closeStatement(this.callableStatement);
236 }
237
238 @Override
239 public String getSql() {
240 if (callString != null) {
241 return this.callString;
242 }
243 else {
244 return "PROCEDURE NAME: " + procedureName;
245 }
246 }
247
248 }