1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.support.transaction; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collection; |
21 | import java.util.HashMap; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | import java.util.Set; |
26 | import java.util.concurrent.ConcurrentHashMap; |
27 | import java.util.concurrent.ConcurrentMap; |
28 | import java.util.concurrent.CopyOnWriteArrayList; |
29 | import java.util.concurrent.CopyOnWriteArraySet; |
30 | |
31 | import org.aopalliance.intercept.MethodInterceptor; |
32 | import org.aopalliance.intercept.MethodInvocation; |
33 | import org.springframework.aop.framework.ProxyFactory; |
34 | import org.springframework.transaction.support.TransactionSynchronization; |
35 | import org.springframework.transaction.support.TransactionSynchronizationAdapter; |
36 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
37 | |
38 | /** |
39 | * <p> |
40 | * Factory for transaction aware objects (like lists, sets, maps). If a |
41 | * transaction is active when a method is called on an instance created by the |
42 | * factory, it makes a copy of the target object and carries out all operations |
43 | * on the copy. Only when the transaction commits is the target re-initialised |
44 | * with the copy. |
45 | * </p> |
46 | * |
47 | * <p> |
48 | * Works well with collections and maps for testing transactional behaviour |
49 | * without needing a database. The base implementation handles lists, sets and |
50 | * maps. Subclasses can implement {@link #begin(Object)} and |
51 | * {@link #commit(Object, Object)} to provide support for other resources. |
52 | * </p> |
53 | * |
54 | * <p> |
55 | * Generally not intended for multi-threaded use, but the |
56 | * {@link #createAppendOnlyTransactionalMap() append only version} of |
57 | * collections gives isolation between threads operating on different keys in a |
58 | * map, provided they only append to the map. (Threads are limited to removing |
59 | * entries that were created in the same transaction.) |
60 | * </p> |
61 | * |
62 | * @author Dave Syer |
63 | * |
64 | */ |
65 | public class TransactionAwareProxyFactory<T> { |
66 | |
67 | private final T target; |
68 | |
69 | private final boolean appendOnly; |
70 | |
71 | private TransactionAwareProxyFactory(T target) { |
72 | this(target, false); |
73 | |
74 | } |
75 | |
76 | private TransactionAwareProxyFactory(T target, boolean appendOnly) { |
77 | super(); |
78 | this.target = target; |
79 | this.appendOnly = appendOnly; |
80 | } |
81 | |
82 | /** |
83 | * Make a copy of the target that can be used inside a transaction to |
84 | * isolate changes from the original. Also called from the factory |
85 | * constructor to isolate the target from the original value passed in. |
86 | * |
87 | * @param target the target object (List, Set or Map) |
88 | * @return an independent copy |
89 | */ |
90 | @SuppressWarnings({ "unchecked", "rawtypes" }) |
91 | protected final T begin(T target) { |
92 | // Unfortunately in Java 5 this method has to synchronized |
93 | // (works OK without in Java 6). |
94 | synchronized (target) { |
95 | if (target instanceof List) { |
96 | if (appendOnly) { |
97 | return (T) new ArrayList(); |
98 | } |
99 | return (T) new ArrayList((List) target); |
100 | } |
101 | else if (target instanceof Set) { |
102 | if (appendOnly) { |
103 | return (T) new HashSet(); |
104 | } |
105 | return (T) new HashSet((Set) target); |
106 | } |
107 | else if (target instanceof Map) { |
108 | if (appendOnly) { |
109 | return (T) new HashMap(); |
110 | } |
111 | return (T) new HashMap((Map) target); |
112 | } |
113 | else { |
114 | throw new UnsupportedOperationException("Cannot copy target for this type: " + target.getClass()); |
115 | } |
116 | } |
117 | } |
118 | |
119 | /** |
120 | * Take the working copy state and commit it back to the original target. |
121 | * The target then reflects all the changes applied to the copy during a |
122 | * transaction. |
123 | * |
124 | * @param copy the working copy. |
125 | * @param target the original target of the factory. |
126 | */ |
127 | @SuppressWarnings({ "unchecked", "rawtypes" }) |
128 | protected void commit(T copy, T target) { |
129 | // Unfortunately in Java 5 this method has to be synchronized |
130 | // (works OK without in Java 6). |
131 | synchronized (target) { |
132 | if (target instanceof Collection) { |
133 | if (!appendOnly) { |
134 | ((Collection) target).clear(); |
135 | } |
136 | ((Collection) target).addAll((Collection) copy); |
137 | } |
138 | else { |
139 | if (!appendOnly) { |
140 | ((Map) target).clear(); |
141 | } |
142 | ((Map) target).putAll((Map) copy); |
143 | } |
144 | } |
145 | } |
146 | |
147 | private T createInstance() { |
148 | |
149 | synchronized (target) { |
150 | |
151 | ProxyFactory factory = new ProxyFactory(target); |
152 | factory.addAdvice(new TransactionAwareInterceptor()); |
153 | @SuppressWarnings("unchecked") |
154 | T instance = (T) factory.getProxy(); |
155 | return instance; |
156 | |
157 | } |
158 | |
159 | } |
160 | |
161 | public static <K, V> Map<K, V> createTransactionalMap() { |
162 | return (Map<K, V>) new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>()).createInstance(); |
163 | } |
164 | |
165 | public static <K, V> Map<K, V> createTransactionalMap(Map<K, V> map) { |
166 | return (Map<K, V>) new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>(map)).createInstance(); |
167 | } |
168 | |
169 | public static <K, V> ConcurrentMap<K, V> createAppendOnlyTransactionalMap() { |
170 | return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>(), true).createInstance(); |
171 | } |
172 | |
173 | public static <T> Set<T> createAppendOnlyTransactionalSet() { |
174 | return (Set<T>) new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>(), true).createInstance(); |
175 | } |
176 | |
177 | public static <T> Set<T> createTransactionalSet() { |
178 | return (Set<T>) new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>()).createInstance(); |
179 | } |
180 | |
181 | public static <T> Set<T> createTransactionalSet(Set<T> set) { |
182 | return (Set<T>) new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>(set)).createInstance(); |
183 | } |
184 | |
185 | public static <T> List<T> createAppendOnlyTransactionalList() { |
186 | return (List<T>) new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>(), true).createInstance(); |
187 | } |
188 | |
189 | public static <T> List<T> createTransactionalList() { |
190 | return (List<T>) new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>()).createInstance(); |
191 | } |
192 | |
193 | public static <T> List<T> createTransactionalList(List<T> list) { |
194 | return (List<T>) new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>(list)).createInstance(); |
195 | } |
196 | |
197 | private class TargetSynchronization extends TransactionSynchronizationAdapter { |
198 | |
199 | private final T cache; |
200 | |
201 | private final Object key; |
202 | |
203 | public TargetSynchronization(Object key, T cache) { |
204 | super(); |
205 | this.cache = cache; |
206 | this.key = key; |
207 | } |
208 | |
209 | public void afterCompletion(int status) { |
210 | super.afterCompletion(status); |
211 | if (status == TransactionSynchronization.STATUS_COMMITTED) { |
212 | synchronized (target) { |
213 | commit(cache, target); |
214 | } |
215 | } |
216 | TransactionSynchronizationManager.unbindResource(key); |
217 | } |
218 | } |
219 | |
220 | private class TransactionAwareInterceptor implements MethodInterceptor { |
221 | |
222 | public Object invoke(MethodInvocation invocation) throws Throwable { |
223 | |
224 | if (!TransactionSynchronizationManager.isActualTransactionActive()) { |
225 | return invocation.proceed(); |
226 | } |
227 | |
228 | T cache; |
229 | |
230 | if (!TransactionSynchronizationManager.hasResource(this)) { |
231 | cache = begin(target); |
232 | TransactionSynchronizationManager.bindResource(this, cache); |
233 | TransactionSynchronizationManager.registerSynchronization(new TargetSynchronization(this, cache)); |
234 | } |
235 | else { |
236 | @SuppressWarnings("unchecked") |
237 | T retrievedCache = (T) TransactionSynchronizationManager.getResource(this); |
238 | cache = retrievedCache; |
239 | } |
240 | |
241 | Object result = invocation.getMethod().invoke(cache, invocation.getArguments()); |
242 | |
243 | if (appendOnly) { |
244 | String methodName = invocation.getMethod().getName(); |
245 | if ((result == null && methodName.equals("get")) |
246 | || (Boolean.FALSE.equals(result) && (methodName.startsWith("contains")) || (Boolean.TRUE |
247 | .equals(result) && methodName.startsWith("isEmpty")))) { |
248 | // In appendOnly mode the result of a get might not be |
249 | // in the cache... |
250 | return invocation.proceed(); |
251 | } |
252 | if (result instanceof Collection<?>) { |
253 | HashSet<Object> set = new HashSet<Object>((Collection<?>) result); |
254 | set.addAll((Collection<?>) invocation.proceed()); |
255 | result = set; |
256 | } |
257 | } |
258 | |
259 | return result; |
260 | |
261 | } |
262 | } |
263 | |
264 | } |