View Javadoc

1   /*
2    * Copyright 2002-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.item.database;
18  
19  import java.io.PrintWriter;
20  import java.lang.reflect.InvocationHandler;
21  import java.lang.reflect.InvocationTargetException;
22  import java.lang.reflect.Method;
23  import java.lang.reflect.Proxy;
24  import java.sql.Connection;
25  import java.sql.SQLException;
26  import java.util.logging.Logger;
27  
28  import javax.sql.DataSource;
29  
30  import org.springframework.beans.factory.InitializingBean;
31  import org.springframework.jdbc.datasource.ConnectionProxy;
32  import org.springframework.jdbc.datasource.DataSourceUtils;
33  import org.springframework.jdbc.datasource.SmartDataSource;
34  import org.springframework.transaction.support.TransactionSynchronizationManager;
35  import org.springframework.util.Assert;
36  
37  /**
38   * Implementation of {@link SmartDataSource} that is capable of keeping a single
39   * JDBC Connection which is NOT closed after each use even if
40   * {@link Connection#close()} is called.
41   *
42   * The connection can be kept open over multiple transactions when used together
43   * with any of Spring's
44   * {@link org.springframework.transaction.PlatformTransactionManager}
45   * implementations.
46   *
47   * <p>
48   * Loosely based on the SingleConnectionDataSource implementation in Spring
49   * Core. Intended to be used with the {@link JdbcCursorItemReader} to provide a
50   * connection that remains open across transaction boundaries, It remains open
51   * for the life of the cursor, and can be shared with the main transaction of
52   * the rest of the step processing.
53   *
54   * <p>
55   * Once close suppression has been turned on for a connection, it will be
56   * returned for the first {@link #getConnection()} call. Any subsequent calls to
57   * {@link #getConnection()} will retrieve a new connection from the wrapped
58   * {@link DataSource} until the {@link DataSourceUtils} queries whether the
59   * connection should be closed or not by calling
60   * {@link #shouldClose(Connection)} for the close-suppressed {@link Connection}.
61   * At that point the cycle starts over again, and the next
62   * {@link #getConnection()} call will have the {@link Connection} that is being
63   * close-suppressed returned. This allows the use of the close-suppressed
64   * {@link Connection} to be the main {@link Connection} for an extended data
65   * access process. The close suppression is turned off by calling
66   * {@link #stopCloseSuppression(Connection)}.
67   *
68   * <p>
69   * This class is not multi-threading capable.
70   *
71   * <p>
72   * The connection returned will be a close-suppressing proxy instead of the
73   * physical {@link Connection}. Be aware that you will not be able to cast this
74   * to a native <code>OracleConnection</code> or the like anymore; you need to
75   * use a {@link org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor}.
76   *
77   * @author Thomas Risberg
78   * @see #getConnection()
79   * @see java.sql.Connection#close()
80   * @see DataSourceUtils#releaseConnection
81   * @see org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor
82   * @since 2.0
83   */
84  public class ExtendedConnectionDataSourceProxy implements SmartDataSource, InitializingBean {
85  
86  	/** Provided DataSource */
87  	private DataSource dataSource;
88  
89  	/** The connection to suppress close calls for */
90  	private Connection closeSuppressedConnection = null;
91  
92  	/** The connection to suppress close calls for */
93  	private boolean borrowedConnection = false;
94  
95  	/** Synchronization monitor for the shared Connection */
96  	private final Object connectionMonitor = new Object();
97  
98  	/**
99  	 * No arg constructor for use when configured using JavaBean style.
100 	 */
101 	public ExtendedConnectionDataSourceProxy() {
102 	}
103 
104 	/**
105 	 * Constructor that takes as a parameter with the {&link DataSource} to be
106 	 * wrapped.
107 	 */
108 	public ExtendedConnectionDataSourceProxy(DataSource dataSource) {
109 		this.dataSource = dataSource;
110 	}
111 
112 	/**
113 	 * Setter for the {&link DataSource} that is to be wrapped.
114 	 *
115 	 * @param dataSource the DataSource
116 	 */
117 	public void setDataSource(DataSource dataSource) {
118 		this.dataSource = dataSource;
119 	}
120 
121 	/**
122 	 * @see SmartDataSource
123 	 */
124 	@Override
125 	public boolean shouldClose(Connection connection) {
126 		boolean shouldClose = !isCloseSuppressionActive(connection);
127 		if (borrowedConnection && closeSuppressedConnection.equals(connection)) {
128 			borrowedConnection = false;
129 		}
130 		return shouldClose;
131 	}
132 
133 	/**
134 	 * Return the status of close suppression being activated for a given
135 	 * {@link Connection}
136 	 *
137 	 * @param connection the {@link Connection} that the close suppression
138 	 * status is requested for
139 	 * @return true or false
140 	 */
141 	public boolean isCloseSuppressionActive(Connection connection) {
142 		if (connection == null) {
143 			return false;
144 		}
145 		return connection.equals(closeSuppressedConnection) ? true : false;
146 	}
147 
148 	/**
149 	 *
150 	 * @param connection the {@link Connection} that close suppression is
151 	 * requested for
152 	 */
153 	public void startCloseSuppression(Connection connection) {
154 		synchronized (this.connectionMonitor) {
155 			closeSuppressedConnection = connection;
156 			if (TransactionSynchronizationManager.isActualTransactionActive()) {
157 				borrowedConnection = true;
158 			}
159 		}
160 	}
161 
162 	/**
163 	 *
164 	 * @param connection the {@link Connection} that close suppression should be
165 	 * turned off for
166 	 */
167 	public void stopCloseSuppression(Connection connection) {
168 		synchronized (this.connectionMonitor) {
169 			closeSuppressedConnection = null;
170 			borrowedConnection = false;
171 		}
172 	}
173 
174 	@Override
175 	public Connection getConnection() throws SQLException {
176 		synchronized (this.connectionMonitor) {
177 			return initConnection(null, null);
178 		}
179 	}
180 
181 	@Override
182 	public Connection getConnection(String username, String password) throws SQLException {
183 		synchronized (this.connectionMonitor) {
184 			return initConnection(username, password);
185 		}
186 	}
187 
188 	private boolean completeCloseCall(Connection connection) {
189 		if (borrowedConnection && closeSuppressedConnection.equals(connection)) {
190 			borrowedConnection = false;
191 		}
192 		return isCloseSuppressionActive(connection);
193 	}
194 
195 	private Connection initConnection(String username, String password) throws SQLException {
196 		if (closeSuppressedConnection != null) {
197 			if (!borrowedConnection) {
198 				borrowedConnection = true;
199 				return closeSuppressedConnection;
200 			}
201 		}
202 		Connection target;
203 		if (username != null) {
204 			target = dataSource.getConnection(username, password);
205 		}
206 		else {
207 			target = dataSource.getConnection();
208 		}
209 		Connection connection = getCloseSuppressingConnectionProxy(target);
210 		return connection;
211 	}
212 
213 	@Override
214 	public PrintWriter getLogWriter() throws SQLException {
215 		return dataSource.getLogWriter();
216 	}
217 
218 	@Override
219 	public int getLoginTimeout() throws SQLException {
220 		return dataSource.getLoginTimeout();
221 	}
222 
223 	@Override
224 	public void setLogWriter(PrintWriter out) throws SQLException {
225 		dataSource.setLogWriter(out);
226 	}
227 
228 	@Override
229 	public void setLoginTimeout(int seconds) throws SQLException {
230 		dataSource.setLoginTimeout(seconds);
231 	}
232 
233 	/**
234 	 * Wrap the given Connection with a proxy that delegates every method call
235 	 * to it but suppresses close calls.
236 	 * @param target the original Connection to wrap
237 	 * @return the wrapped Connection
238 	 */
239 	protected Connection getCloseSuppressingConnectionProxy(Connection target) {
240 		return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
241 				new Class[] { ConnectionProxy.class }, new CloseSuppressingInvocationHandler(target, this));
242 	}
243 
244 	/**
245 	 * Invocation handler that suppresses close calls on JDBC Connections until
246 	 * the associated instance of the ExtendedConnectionDataSourceProxy
247 	 * determines the connection should actually be closed.
248 	 */
249 	private static class CloseSuppressingInvocationHandler implements InvocationHandler {
250 
251 		private final Connection target;
252 
253 		private final ExtendedConnectionDataSourceProxy dataSource;
254 
255 		public CloseSuppressingInvocationHandler(Connection target, ExtendedConnectionDataSourceProxy dataSource) {
256 			this.dataSource = dataSource;
257 			this.target = target;
258 		}
259 
260 		@Override
261 		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
262 			// Invocation on ConnectionProxy interface coming in...
263 
264 			if (method.getName().equals("equals")) {
265 				// Only consider equal when proxies are identical.
266 				return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
267 			}
268 			else if (method.getName().equals("hashCode")) {
269 				// Use hashCode of Connection proxy.
270 				return new Integer(System.identityHashCode(proxy));
271 			}
272 			else if (method.getName().equals("close")) {
273 				// Handle close method: don't pass the call on if we are
274 				// suppressing close calls.
275 				if (dataSource.completeCloseCall((Connection) proxy)) {
276 					return null;
277 				}
278 				else {
279 					target.close();
280 					return null;
281 				}
282 			}
283 			else if (method.getName().equals("getTargetConnection")) {
284 				// Handle getTargetConnection method: return underlying
285 				// Connection.
286 				return this.target;
287 			}
288 
289 			// Invoke method on target Connection.
290 			try {
291 				return method.invoke(this.target, args);
292 			}
293 			catch (InvocationTargetException ex) {
294 				throw ex.getTargetException();
295 			}
296 		}
297 	}
298 
299 	/**
300 	 * Performs only a 'shallow' non-recursive check of self's and delegate's
301 	 * class to retain Java 5 compatibility.
302 	 */
303 	@Override
304 	public boolean isWrapperFor(Class<?> iface) throws SQLException {
305 		if (iface.isAssignableFrom(SmartDataSource.class) || iface.isAssignableFrom(dataSource.getClass())) {
306 			return true;
307 		}
308 		return false;
309 	}
310 
311 	/**
312 	 * Returns either self or delegate (in this order) if one of them can be
313 	 * cast to supplied parameter class. Does *not* support recursive unwrapping
314 	 * of the delegate to retain Java 5 compatibility.
315 	 */
316 	@Override
317 	public <T> T unwrap(Class<T> iface) throws SQLException {
318 		if (iface.isAssignableFrom(SmartDataSource.class)) {
319 			@SuppressWarnings("unchecked")
320 			T casted = (T) this;
321 			return casted;
322 		}
323 		else if (iface.isAssignableFrom(dataSource.getClass())) {
324 			@SuppressWarnings("unchecked")
325 			T casted = (T) dataSource;
326 			return casted;
327 		}
328 		throw new SQLException("Unsupported class " + iface.getSimpleName());
329 	}
330 
331 	@Override
332 	public void afterPropertiesSet() throws Exception {
333 		Assert.notNull(dataSource);
334 	}
335 
336 	/**
337 	 * Added due to JDK 7 compatibility, sadly a proper implementation
338 	 * that would throw SqlFeatureNotSupportedException is not possible
339 	 * in Java 5 (the class was added in Java 6).
340 	 */
341 	public Logger getParentLogger() {
342 		throw new UnsupportedOperationException();
343 	}
344 
345 }