1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.item.database;
18
19 import java.sql.Connection;
20 import java.sql.PreparedStatement;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23
24
25 import org.springframework.jdbc.core.PreparedStatementSetter;
26 import org.springframework.jdbc.core.RowMapper;
27 import org.springframework.jdbc.support.JdbcUtils;
28 import org.springframework.util.Assert;
29 import org.springframework.util.ClassUtils;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class JdbcCursorItemReader<T> extends AbstractCursorItemReader<T> {
55
56 PreparedStatement preparedStatement;
57
58 PreparedStatementSetter preparedStatementSetter;
59
60 String sql;
61
62 RowMapper rowMapper;
63
64 public JdbcCursorItemReader() {
65 super();
66 setName(ClassUtils.getShortName(JdbcCursorItemReader.class));
67 }
68
69
70
71
72
73
74 public void setRowMapper(RowMapper rowMapper) {
75 this.rowMapper = rowMapper;
76 }
77
78
79
80
81
82
83
84
85 public void setSql(String sql) {
86 this.sql = sql;
87 }
88
89
90
91
92
93
94
95 public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
96 this.preparedStatementSetter = preparedStatementSetter;
97 }
98
99
100
101
102
103
104
105 public void afterPropertiesSet() throws Exception {
106 super.afterPropertiesSet();
107 Assert.notNull(sql, "The SQL query must be provided");
108 Assert.notNull(rowMapper, "RowMapper must be provided");
109 }
110
111
112 protected void openCursor(Connection con) {
113 try {
114 if (isUseSharedExtendedConnection()) {
115 preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
116 ResultSet.HOLD_CURSORS_OVER_COMMIT);
117 }
118 else {
119 preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
120 }
121 applyStatementSettings(preparedStatement);
122 if (this.preparedStatementSetter != null) {
123 preparedStatementSetter.setValues(preparedStatement);
124 }
125 this.rs = preparedStatement.executeQuery();
126 handleWarnings(preparedStatement);
127 }
128 catch (SQLException se) {
129 close();
130 throw getExceptionTranslator().translate("Executing query", getSql(), se);
131 }
132
133 }
134
135
136 @SuppressWarnings("unchecked")
137 protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
138 return (T) rowMapper.mapRow(rs, currentRow);
139 }
140
141
142
143
144 protected void cleanupOnClose() throws Exception {
145 JdbcUtils.closeStatement(this.preparedStatement);
146 }
147
148 @Override
149 public String getSql() {
150 return this.sql;
151 }
152 }