1 | /* |
2 | * Copyright 2002-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.item.database; |
18 | |
19 | import java.io.PrintWriter; |
20 | import java.lang.reflect.InvocationHandler; |
21 | import java.lang.reflect.InvocationTargetException; |
22 | import java.lang.reflect.Method; |
23 | import java.lang.reflect.Proxy; |
24 | import java.sql.Connection; |
25 | import java.sql.SQLException; |
26 | import java.sql.SQLFeatureNotSupportedException; |
27 | import java.util.logging.Logger; |
28 | |
29 | import javax.sql.DataSource; |
30 | |
31 | import org.springframework.beans.factory.InitializingBean; |
32 | import org.springframework.jdbc.datasource.ConnectionProxy; |
33 | import org.springframework.jdbc.datasource.DataSourceUtils; |
34 | import org.springframework.jdbc.datasource.SmartDataSource; |
35 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
36 | import org.springframework.util.Assert; |
37 | import org.springframework.util.MethodInvoker; |
38 | |
39 | /** |
40 | * Implementation of {@link SmartDataSource} that is capable of keeping a single |
41 | * JDBC Connection which is NOT closed after each use even if |
42 | * {@link Connection#close()} is called. |
43 | * |
44 | * The connection can be kept open over multiple transactions when used together |
45 | * with any of Spring's |
46 | * {@link org.springframework.transaction.PlatformTransactionManager} |
47 | * implementations. |
48 | * |
49 | * <p> |
50 | * Loosely based on the SingleConnectionDataSource implementation in Spring |
51 | * Core. Intended to be used with the {@link JdbcCursorItemReader} to provide a |
52 | * connection that remains open across transaction boundaries, It remains open |
53 | * for the life of the cursor, and can be shared with the main transaction of |
54 | * the rest of the step processing. |
55 | * |
56 | * <p> |
57 | * Once close suppression has been turned on for a connection, it will be |
58 | * returned for the first {@link #getConnection()} call. Any subsequent calls to |
59 | * {@link #getConnection()} will retrieve a new connection from the wrapped |
60 | * {@link DataSource} until the {@link DataSourceUtils} queries whether the |
61 | * connection should be closed or not by calling |
62 | * {@link #shouldClose(Connection)} for the close-suppressed {@link Connection}. |
63 | * At that point the cycle starts over again, and the next |
64 | * {@link #getConnection()} call will have the {@link Connection} that is being |
65 | * close-suppressed returned. This allows the use of the close-suppressed |
66 | * {@link Connection} to be the main {@link Connection} for an extended data |
67 | * access process. The close suppression is turned off by calling |
68 | * {@link #stopCloseSuppression(Connection)}. |
69 | * |
70 | * <p> |
71 | * This class is not multi-threading capable. |
72 | * |
73 | * <p> |
74 | * The connection returned will be a close-suppressing proxy instead of the |
75 | * physical {@link Connection}. Be aware that you will not be able to cast this |
76 | * to a native <code>OracleConnection</code> or the like anymore; you need to |
77 | * use a {@link org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor}. |
78 | * |
79 | * @author Thomas Risberg |
80 | * @see #getConnection() |
81 | * @see java.sql.Connection#close() |
82 | * @see DataSourceUtils#releaseConnection |
83 | * @see org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor |
84 | * @since 2.0 |
85 | */ |
86 | public class ExtendedConnectionDataSourceProxy implements SmartDataSource, InitializingBean { |
87 | |
88 | /** Provided DataSource */ |
89 | private DataSource dataSource; |
90 | |
91 | /** The connection to suppress close calls for */ |
92 | private Connection closeSuppressedConnection = null; |
93 | |
94 | /** The connection to suppress close calls for */ |
95 | private boolean borrowedConnection = false; |
96 | |
97 | /** Synchronization monitor for the shared Connection */ |
98 | private final Object connectionMonitor = new Object(); |
99 | |
100 | /** |
101 | * No arg constructor for use when configured using JavaBean style. |
102 | */ |
103 | public ExtendedConnectionDataSourceProxy() { |
104 | } |
105 | |
106 | /** |
107 | * Constructor that takes as a parameter with the {&link DataSource} to be |
108 | * wrapped. |
109 | */ |
110 | public ExtendedConnectionDataSourceProxy(DataSource dataSource) { |
111 | this.dataSource = dataSource; |
112 | } |
113 | |
114 | /** |
115 | * Setter for the {&link DataSource} that is to be wrapped. |
116 | * |
117 | * @param dataSource the DataSource |
118 | */ |
119 | public void setDataSource(DataSource dataSource) { |
120 | this.dataSource = dataSource; |
121 | } |
122 | |
123 | /** |
124 | * @see SmartDataSource |
125 | */ |
126 | @Override |
127 | public boolean shouldClose(Connection connection) { |
128 | boolean shouldClose = !isCloseSuppressionActive(connection); |
129 | if (borrowedConnection && closeSuppressedConnection.equals(connection)) { |
130 | borrowedConnection = false; |
131 | } |
132 | return shouldClose; |
133 | } |
134 | |
135 | /** |
136 | * Return the status of close suppression being activated for a given |
137 | * {@link Connection} |
138 | * |
139 | * @param connection the {@link Connection} that the close suppression |
140 | * status is requested for |
141 | * @return true or false |
142 | */ |
143 | public boolean isCloseSuppressionActive(Connection connection) { |
144 | if (connection == null) { |
145 | return false; |
146 | } |
147 | return connection.equals(closeSuppressedConnection) ? true : false; |
148 | } |
149 | |
150 | /** |
151 | * |
152 | * @param connection the {@link Connection} that close suppression is |
153 | * requested for |
154 | */ |
155 | public void startCloseSuppression(Connection connection) { |
156 | synchronized (this.connectionMonitor) { |
157 | closeSuppressedConnection = connection; |
158 | if (TransactionSynchronizationManager.isActualTransactionActive()) { |
159 | borrowedConnection = true; |
160 | } |
161 | } |
162 | } |
163 | |
164 | /** |
165 | * |
166 | * @param connection the {@link Connection} that close suppression should be |
167 | * turned off for |
168 | */ |
169 | public void stopCloseSuppression(Connection connection) { |
170 | synchronized (this.connectionMonitor) { |
171 | closeSuppressedConnection = null; |
172 | borrowedConnection = false; |
173 | } |
174 | } |
175 | |
176 | @Override |
177 | public Connection getConnection() throws SQLException { |
178 | synchronized (this.connectionMonitor) { |
179 | return initConnection(null, null); |
180 | } |
181 | } |
182 | |
183 | @Override |
184 | public Connection getConnection(String username, String password) throws SQLException { |
185 | synchronized (this.connectionMonitor) { |
186 | return initConnection(username, password); |
187 | } |
188 | } |
189 | |
190 | private boolean completeCloseCall(Connection connection) { |
191 | if (borrowedConnection && closeSuppressedConnection.equals(connection)) { |
192 | borrowedConnection = false; |
193 | } |
194 | return isCloseSuppressionActive(connection); |
195 | } |
196 | |
197 | private Connection initConnection(String username, String password) throws SQLException { |
198 | if (closeSuppressedConnection != null) { |
199 | if (!borrowedConnection) { |
200 | borrowedConnection = true; |
201 | return closeSuppressedConnection; |
202 | } |
203 | } |
204 | Connection target; |
205 | if (username != null) { |
206 | target = dataSource.getConnection(username, password); |
207 | } |
208 | else { |
209 | target = dataSource.getConnection(); |
210 | } |
211 | Connection connection = getCloseSuppressingConnectionProxy(target); |
212 | return connection; |
213 | } |
214 | |
215 | @Override |
216 | public PrintWriter getLogWriter() throws SQLException { |
217 | return dataSource.getLogWriter(); |
218 | } |
219 | |
220 | @Override |
221 | public int getLoginTimeout() throws SQLException { |
222 | return dataSource.getLoginTimeout(); |
223 | } |
224 | |
225 | @Override |
226 | public void setLogWriter(PrintWriter out) throws SQLException { |
227 | dataSource.setLogWriter(out); |
228 | } |
229 | |
230 | @Override |
231 | public void setLoginTimeout(int seconds) throws SQLException { |
232 | dataSource.setLoginTimeout(seconds); |
233 | } |
234 | |
235 | /** |
236 | * Wrap the given Connection with a proxy that delegates every method call |
237 | * to it but suppresses close calls. |
238 | * @param target the original Connection to wrap |
239 | * @return the wrapped Connection |
240 | */ |
241 | protected Connection getCloseSuppressingConnectionProxy(Connection target) { |
242 | return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(), |
243 | new Class[] { ConnectionProxy.class }, new CloseSuppressingInvocationHandler(target, this)); |
244 | } |
245 | |
246 | /** |
247 | * Invocation handler that suppresses close calls on JDBC Connections until |
248 | * the associated instance of the ExtendedConnectionDataSourceProxy |
249 | * determines the connection should actually be closed. |
250 | */ |
251 | private static class CloseSuppressingInvocationHandler implements InvocationHandler { |
252 | |
253 | private final Connection target; |
254 | |
255 | private final ExtendedConnectionDataSourceProxy dataSource; |
256 | |
257 | public CloseSuppressingInvocationHandler(Connection target, ExtendedConnectionDataSourceProxy dataSource) { |
258 | this.dataSource = dataSource; |
259 | this.target = target; |
260 | } |
261 | |
262 | @Override |
263 | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
264 | // Invocation on ConnectionProxy interface coming in... |
265 | |
266 | if (method.getName().equals("equals")) { |
267 | // Only consider equal when proxies are identical. |
268 | return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE); |
269 | } |
270 | else if (method.getName().equals("hashCode")) { |
271 | // Use hashCode of Connection proxy. |
272 | return new Integer(System.identityHashCode(proxy)); |
273 | } |
274 | else if (method.getName().equals("close")) { |
275 | // Handle close method: don't pass the call on if we are |
276 | // suppressing close calls. |
277 | if (dataSource.completeCloseCall((Connection) proxy)) { |
278 | return null; |
279 | } |
280 | else { |
281 | target.close(); |
282 | return null; |
283 | } |
284 | } |
285 | else if (method.getName().equals("getTargetConnection")) { |
286 | // Handle getTargetConnection method: return underlying |
287 | // Connection. |
288 | return this.target; |
289 | } |
290 | |
291 | // Invoke method on target Connection. |
292 | try { |
293 | return method.invoke(this.target, args); |
294 | } |
295 | catch (InvocationTargetException ex) { |
296 | throw ex.getTargetException(); |
297 | } |
298 | } |
299 | } |
300 | |
301 | /** |
302 | * Performs only a 'shallow' non-recursive check of self's and delegate's |
303 | * class to retain Java 5 compatibility. |
304 | */ |
305 | @Override |
306 | public boolean isWrapperFor(Class<?> iface) throws SQLException { |
307 | if (iface.isAssignableFrom(SmartDataSource.class) || iface.isAssignableFrom(dataSource.getClass())) { |
308 | return true; |
309 | } |
310 | return false; |
311 | } |
312 | |
313 | /** |
314 | * Returns either self or delegate (in this order) if one of them can be |
315 | * cast to supplied parameter class. Does *not* support recursive unwrapping |
316 | * of the delegate to retain Java 5 compatibility. |
317 | */ |
318 | @Override |
319 | public <T> T unwrap(Class<T> iface) throws SQLException { |
320 | if (iface.isAssignableFrom(SmartDataSource.class)) { |
321 | @SuppressWarnings("unchecked") |
322 | T casted = (T) this; |
323 | return casted; |
324 | } |
325 | else if (iface.isAssignableFrom(dataSource.getClass())) { |
326 | @SuppressWarnings("unchecked") |
327 | T casted = (T) dataSource; |
328 | return casted; |
329 | } |
330 | throw new SQLException("Unsupported class " + iface.getSimpleName()); |
331 | } |
332 | |
333 | @Override |
334 | public void afterPropertiesSet() throws Exception { |
335 | Assert.notNull(dataSource); |
336 | } |
337 | |
338 | /** |
339 | * Added due to JDK 7 compatibility. |
340 | */ |
341 | public Logger getParentLogger() throws SQLFeatureNotSupportedException{ |
342 | MethodInvoker invoker = new MethodInvoker(); |
343 | invoker.setTargetObject(dataSource); |
344 | invoker.setTargetMethod("getParentLogger"); |
345 | |
346 | try { |
347 | invoker.prepare(); |
348 | return (Logger) invoker.invoke(); |
349 | } catch (ClassNotFoundException cnfe) { |
350 | throw new SQLFeatureNotSupportedException(cnfe); |
351 | } catch (NoSuchMethodException nsme) { |
352 | throw new SQLFeatureNotSupportedException(nsme); |
353 | } catch (IllegalAccessException iae) { |
354 | throw new SQLFeatureNotSupportedException(iae); |
355 | } catch (InvocationTargetException ite) { |
356 | throw new SQLFeatureNotSupportedException(ite); |
357 | } |
358 | } |
359 | } |